Go学习笔记08-Channel

什么是信道?

信道是Go协程之间通信的管道,通过使用信道,数据可以从一端发送,在另一端接收。

信道的声明

  • 所有信道都关联了一个类型,信道只能运输关联类型的数据。
  • chan T表示T类型的信道。
  • 信道的零值为nil,信道的零值没有什么用,如map和slice,用make来定义信道。
  • 定义信道的简洁有效的方法:ch := make(chan int)

通过信道进行发送和接收

通过信道进行发送和接收数据的语法(通过信道旁的箭头方向来判断是读还是写):

a <- data   // 写入信道a
data := <-a // 读取信道a

发送与接收默认是阻塞的

  • 当把数据发送到信道时,程序控制会在发送数据的语句处发生阻塞,直到有其它goroutine从信道读取数据,才会解除阻塞。
  • 同理,当读取信道的数据时,如果没有其它的goroutine把数据写入到这个信道,那么读取过程就会一直阻塞着。

Note:信道的这种特性能够帮助goroutine之间进行高效的通信,不需要用到其它编程语言常见的显式锁或条件变量。

信道的代码示例

示例程序会计算一个数中每一位的平方和与立方和,然后把平方和与立方和相加并打印结果。例如,如果输入是123,该程序会作如下计算:

squares = (1*1) + (2*2) + (3*3)
cubes = (1*1*1) + (2*2*2) + (3*3*3)
output = squares + cubes = 50

我们可以这样构建程序:在一个单独的goroutine里计算平方和,在另一个goroutine里计算立方和,最后在主协程里把它们的结果相加。

package main

import "fmt"

func calcSquares(number int, squareop chan int) {
    sum := 0
    for number != 0 {
        digit := number % 10
        sum += digit * digit
        number /= 10
    }
    squareop <- sum
}

func calcCubes(number int, cubeop chan int) {
    sum := 0
    for number != 0 {
        digit := number % 10
        sum += digit * digit * digit
        number /= 10
    }
    cubeop <- sum
}

func main() {
    number := 589
    sqrch := make(chan int)
    cubech := make(chan int)
    go calcSquares(number, sqrch)
    go calcCubes(number, cubech)
    squares, cubes := <-sqrch, <-cubech
    fmt.Println("Final output:", squares+cubes) // Final output: 1536
}

死锁

使用信道需要考虑的一个重点是死锁。

  • 当goroutine给一个信道发送数据时,照理说会有其它的goroutine来接收数据,如果没有的话,程序就会在运行时触发panic,形成死锁。
  • 同理,当有goroutine等着从一个信道接收数据时,我们期望其它的goroutine会向该信道写入数据,要不然程序就会触发panic。

单向信道

在双向信道中,既能发送数据,又能接收数据。其实也可以创建单向信道,这种信道只能发送或者接收数据。

那么单向信道的意义何在呢?

先要了解信道转换:把一个双向信道转换成唯送信道(chan<-)或者唯收信道(<-chan)都是行得通的,但是反过来就不行。所以说,真正使用过程中,一般我们需要在外部申请一个双向信道,然后传递给函数后,转换为单向信道,这主要是方便函数内部做访问控制。举个例子方便理解:

package main

import "fmt"

// 我们的函数只需要发送数据,通过参数里指定唯送信道,
// 表达明确,也避免函数内部通过信道去接收数据的操作。
func sendData(sendch chan<- int) {
    sendch <- 10
}

func main() {
    cha1 := make(chan int)
    go sendData(cha1)
    fmt.Println(<-cha1) // 10
}

关闭信道和使用for range遍历信道

数据发送方可以关闭信道,通知接收方这个信道不再有数据发送过来。当从信道接收数据时,接收方可以多用一个变量来检查信道是否已经关闭:v, ok := <-ch。上述语句,如果成功接收信道里的数据,那么ok等于true,而如果ok等于false,说明我们试图读取一个关闭的信道。从关闭的信道读取到的值会是该信道类型的零值。

通过一个例子来理解关闭信道的意义:

package main

import "fmt"

func producer(ch chan int) {
    for i := 0; i < 10; i++ {
        ch <- i
    }
    close(ch)
}

func main() {
    ch := make(chan int)
    go producer(ch)
    for {
        v, ok := <-ch
        if ok == false {
            break
        }
        // 0-true 1-true 2-true 3-true 4-true 5-true 6-true 7-true 8-true 9-true
        fmt.Printf("%v-%v ", v, ok)
    }
}

Go还给我们提供了一种更方便的方式:for range循环用于在一个信道关闭之前,从信道接收数据

package main

import "fmt"

func producer(ch chan int) {
    for i := 0; i < 10; i++ {
        ch <- i
    }
    close(ch)
}

func main() {
    ch := make(chan int)
    go producer(ch)
    
    // for range 循环从信道 ch 接收数据,直到该信道关闭,一旦关闭了 ch,
    // 循环会自动结束。这里需要注意一点,假设 producer 里关闭了信道,但是
    // 这里还没读取完毕,会继续读取直到信道里的数据都取出然后自动退出循环。
    for v := range ch {
        fmt.Printf("%v ", v) // 0 1 2 3 4 5 6 7 8 9
    }
}

使用for range来改进前面的平方和例子,编写更加高并发,更地道的Go!

package main

import "fmt"

func digit(number int, dchnl chan int) {
    for number != 0 {
        digit := number % 10
        dchnl <- digit
        number /= 10
    }
    close(dchnl)
}

func calcSquares(number int, squareop chan int) {
    sum := 0
    dch := make(chan int)
    go digit(number, dch)
    for digit := range dch {
        sum += digit * digit
    }
    squareop <- sum
}

func calcCubes(number int, cubeop chan int) {
    sum := 0
    dch := make(chan int)
    go digit(number, dch)
    for digit := range dch {
        sum += digit * digit * digit
    }
    cubeop <- sum
}

func main() {
    number := 589
    sqrch := make(chan int)
    cubech := make(chan int)
    go calcSquares(number, sqrch)
    go calcCubes(number, cubech)
    squares, cubes := <-sqrch, <-cubech
    fmt.Println("Final output:", squares+cubes) // Final output: 1536
}

缓冲信道

无缓冲信道的发送和接收过程是阻塞的。我们还可以创建一个有缓冲的信道。只有在缓冲已满的情况,才会阻塞向缓冲信道发送数据。同样,只有在缓冲为空的时候,才会阻塞从缓冲信道接收数据。

通过向make函数再传递一个表示容量的参数(指定缓冲的大小)ch := make(chan type, capacity),可以创建缓冲信道。

要让一个信道有缓冲,上面语法中的capacity应该大于0。无缓冲信道的容量默认为0。

长度、容量

  • 缓冲信道的容量是指信道可以存储的值的数量。
  • 缓冲信道的长度是指信道中当前排队的元素个数。

WaitGroup

WaitGroup用于等待一批Go协程执行结束。程序控制会一直阻塞,直到这些协程全部执行完毕。

WaitGroup使用计数器来工作,通过Add()添加计数,通过Done()减小计数,Wait()方法会阻塞调用它的Go协程,直到计数器变为0后才会停止阻塞。

另外需要注意,作为参数时,需要将WaitGroup的指针传递给函数。

工作池的实现

缓冲信道的重要应用之一就是实现工作池。一般而言,工作池就是一组等待任务分配的线程。一旦完成了所分配的任务,这些线程可继续等待任务的分配。

下面是一个用缓冲信道实现工作池的例子,工作池的任务是计算所输入数字的每一位的和,例如,输入234,结果就是9(2+3+4),向工作池输入的是一系列伪随机数。

package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

// 表示作业
type Job struct {
    id       int // 作业ID
    randomno int // 待计算的数
}

// 表示结果
type Result struct {
    job         Job // 对应的作业
    sumofdigits int // 计算的结果
}

var jobs = make(chan Job, 10)       // 接收作业的缓冲信道
var results = make(chan Result, 10) // 写入结果的缓冲信道

// 计算整数的每一位之和,返回结果。
func digits(number int) int {
    sum := 0
    no := number
    for no != 0 {
        digit := no % 10
        sum += digit
        no /= 10
    }
    time.Sleep(2 * time.Second)
    return sum
}

// 工作协程:监听缓冲信道jobs里更新的作业,一旦完成了计算任务,
// 其结果会写入缓冲信道results。
func worker(wg *sync.WaitGroup) {
    for job := range jobs {
        output := Result{job, digits(job.randomno)}
        results <- output
    }
    wg.Done()
}

// 创建Go协程的工作池,参数是需要创建的工作协程的数量。创建了指定数量
// 的工作协程后,调用wg.Wait(),等待所有goroutine执行完毕。所有协程
// 完成执行后,关闭results信道。因为所有协程都已执行完毕,于是不再需要
// 向results信道写入数据了。
func createWorkerPool(noOfWorkers int) {
    var wg sync.WaitGroup
    for i := 0; i < noOfWorkers; i++ {
        wg.Add(1)
        go worker(&wg)
    }
    wg.Wait()
    close(results)
}

// 分配作业,当写入所有job后,关闭jobs信道。
func allocate(noOfJobs int) {
    for i := 0; i < noOfJobs; i++ {
        randomno := rand.Intn(999)
        job := Job{i, randomno}
        jobs <- job
    }
    close(jobs)
}

// results信道读取结果,当读取了所有结果,done写入true。
func result(done chan bool) {
    for result := range results {
        fmt.Printf("Job id %d, input random no %d , sum of digits %d\n",
            result.job.id, result.job.randomno, result.sumofdigits)
    }
    done <- true
}

func main() {
    startTime := time.Now()
    noOfJobs := 100
    go allocate(noOfJobs) // 添加作业
    done := make(chan bool)
    go result(done)
    noOfWorkers := 10 // 可以通过增加工作者的数量来节省时间
    createWorkerPool(noOfWorkers) // 创建工作池
    <-done
    endTime := time.Now()
    diff := endTime.Sub(startTime)
    fmt.Println("total time taken ", diff.Seconds(), "seconds")
}

reference:

https://studygolang.com/articles/12402
https://studygolang.com/articles/12512