什么是信道?
信道是Go协程之间通信的管道,通过使用信道,数据可以从一端发送,在另一端接收。
信道的声明
- 所有信道都关联了一个类型,信道只能运输关联类型的数据。
chan T
表示T
类型的信道。- 信道的零值为
nil
,信道的零值没有什么用,如map和slice,用make
来定义信道。 - 定义信道的简洁有效的方法:
ch := make(chan int)
。
通过信道进行发送和接收
通过信道进行发送和接收数据的语法(通过信道旁的箭头方向来判断是读还是写):
a <- data // 写入信道a
data := <-a // 读取信道a
发送与接收默认是阻塞的
- 当把数据发送到信道时,程序控制会在发送数据的语句处发生阻塞,直到有其它goroutine从信道读取数据,才会解除阻塞。
- 同理,当读取信道的数据时,如果没有其它的goroutine把数据写入到这个信道,那么读取过程就会一直阻塞着。
Note:信道的这种特性能够帮助goroutine之间进行高效的通信,不需要用到其它编程语言常见的显式锁或条件变量。
信道的代码示例
示例程序会计算一个数中每一位的平方和与立方和,然后把平方和与立方和相加并打印结果。例如,如果输入是123,该程序会作如下计算:
squares = (1*1) + (2*2) + (3*3)
cubes = (1*1*1) + (2*2*2) + (3*3*3)
output = squares + cubes = 50
我们可以这样构建程序:在一个单独的goroutine里计算平方和,在另一个goroutine里计算立方和,最后在主协程里把它们的结果相加。
package main
import "fmt"
func calcSquares(number int, squareop chan int) {
sum := 0
for number != 0 {
digit := number % 10
sum += digit * digit
number /= 10
}
squareop <- sum
}
func calcCubes(number int, cubeop chan int) {
sum := 0
for number != 0 {
digit := number % 10
sum += digit * digit * digit
number /= 10
}
cubeop <- sum
}
func main() {
number := 589
sqrch := make(chan int)
cubech := make(chan int)
go calcSquares(number, sqrch)
go calcCubes(number, cubech)
squares, cubes := <-sqrch, <-cubech
fmt.Println("Final output:", squares+cubes) // Final output: 1536
}
死锁
使用信道需要考虑的一个重点是死锁。
- 当goroutine给一个信道发送数据时,照理说会有其它的goroutine来接收数据,如果没有的话,程序就会在运行时触发panic,形成死锁。
- 同理,当有goroutine等着从一个信道接收数据时,我们期望其它的goroutine会向该信道写入数据,要不然程序就会触发panic。
单向信道
在双向信道中,既能发送数据,又能接收数据。其实也可以创建单向信道,这种信道只能发送或者接收数据。
那么单向信道的意义何在呢?
先要了解信道转换:把一个双向信道转换成唯送信道(chan<-
)或者唯收信道(<-chan
)都是行得通的,但是反过来就不行。所以说,真正使用过程中,一般我们需要在外部申请一个双向信道,然后传递给函数后,转换为单向信道,这主要是方便函数内部做访问控制。举个例子方便理解:
package main
import "fmt"
// 我们的函数只需要发送数据,通过参数里指定唯送信道,
// 表达明确,也避免函数内部通过信道去接收数据的操作。
func sendData(sendch chan<- int) {
sendch <- 10
}
func main() {
cha1 := make(chan int)
go sendData(cha1)
fmt.Println(<-cha1) // 10
}
关闭信道和使用for range遍历信道
数据发送方可以关闭信道,通知接收方这个信道不再有数据发送过来。当从信道接收数据时,接收方可以多用一个变量来检查信道是否已经关闭:v, ok := <-ch
。上述语句,如果成功接收信道里的数据,那么ok
等于true,而如果ok
等于false,说明我们试图读取一个关闭的信道。从关闭的信道读取到的值会是该信道类型的零值。
通过一个例子来理解关闭信道的意义:
package main
import "fmt"
func producer(ch chan int) {
for i := 0; i < 10; i++ {
ch <- i
}
close(ch)
}
func main() {
ch := make(chan int)
go producer(ch)
for {
v, ok := <-ch
if ok == false {
break
}
// 0-true 1-true 2-true 3-true 4-true 5-true 6-true 7-true 8-true 9-true
fmt.Printf("%v-%v ", v, ok)
}
}
Go还给我们提供了一种更方便的方式:for range循环用于在一个信道关闭之前,从信道接收数据。
package main
import "fmt"
func producer(ch chan int) {
for i := 0; i < 10; i++ {
ch <- i
}
close(ch)
}
func main() {
ch := make(chan int)
go producer(ch)
// for range 循环从信道 ch 接收数据,直到该信道关闭,一旦关闭了 ch,
// 循环会自动结束。这里需要注意一点,假设 producer 里关闭了信道,但是
// 这里还没读取完毕,会继续读取直到信道里的数据都取出然后自动退出循环。
for v := range ch {
fmt.Printf("%v ", v) // 0 1 2 3 4 5 6 7 8 9
}
}
使用for range来改进前面的平方和例子,编写更加高并发,更地道的Go!
package main
import "fmt"
func digit(number int, dchnl chan int) {
for number != 0 {
digit := number % 10
dchnl <- digit
number /= 10
}
close(dchnl)
}
func calcSquares(number int, squareop chan int) {
sum := 0
dch := make(chan int)
go digit(number, dch)
for digit := range dch {
sum += digit * digit
}
squareop <- sum
}
func calcCubes(number int, cubeop chan int) {
sum := 0
dch := make(chan int)
go digit(number, dch)
for digit := range dch {
sum += digit * digit * digit
}
cubeop <- sum
}
func main() {
number := 589
sqrch := make(chan int)
cubech := make(chan int)
go calcSquares(number, sqrch)
go calcCubes(number, cubech)
squares, cubes := <-sqrch, <-cubech
fmt.Println("Final output:", squares+cubes) // Final output: 1536
}
缓冲信道
无缓冲信道的发送和接收过程是阻塞的。我们还可以创建一个有缓冲的信道。只有在缓冲已满的情况,才会阻塞向缓冲信道发送数据。同样,只有在缓冲为空的时候,才会阻塞从缓冲信道接收数据。
通过向make
函数再传递一个表示容量的参数(指定缓冲的大小)ch := make(chan type, capacity)
,可以创建缓冲信道。
要让一个信道有缓冲,上面语法中的capacity
应该大于0。无缓冲信道的容量默认为0。
长度、容量
- 缓冲信道的容量是指信道可以存储的值的数量。
- 缓冲信道的长度是指信道中当前排队的元素个数。
WaitGroup
WaitGroup
用于等待一批Go协程执行结束。程序控制会一直阻塞,直到这些协程全部执行完毕。
WaitGroup
使用计数器来工作,通过Add()
添加计数,通过Done()
减小计数,Wait()
方法会阻塞调用它的Go协程,直到计数器变为0后才会停止阻塞。
另外需要注意,作为参数时,需要将WaitGroup
的指针传递给函数。
工作池的实现
缓冲信道的重要应用之一就是实现工作池。一般而言,工作池就是一组等待任务分配的线程。一旦完成了所分配的任务,这些线程可继续等待任务的分配。
下面是一个用缓冲信道实现工作池的例子,工作池的任务是计算所输入数字的每一位的和,例如,输入234,结果就是9(2+3+4),向工作池输入的是一系列伪随机数。
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
// 表示作业
type Job struct {
id int // 作业ID
randomno int // 待计算的数
}
// 表示结果
type Result struct {
job Job // 对应的作业
sumofdigits int // 计算的结果
}
var jobs = make(chan Job, 10) // 接收作业的缓冲信道
var results = make(chan Result, 10) // 写入结果的缓冲信道
// 计算整数的每一位之和,返回结果。
func digits(number int) int {
sum := 0
no := number
for no != 0 {
digit := no % 10
sum += digit
no /= 10
}
time.Sleep(2 * time.Second)
return sum
}
// 工作协程:监听缓冲信道jobs里更新的作业,一旦完成了计算任务,
// 其结果会写入缓冲信道results。
func worker(wg *sync.WaitGroup) {
for job := range jobs {
output := Result{job, digits(job.randomno)}
results <- output
}
wg.Done()
}
// 创建Go协程的工作池,参数是需要创建的工作协程的数量。创建了指定数量
// 的工作协程后,调用wg.Wait(),等待所有goroutine执行完毕。所有协程
// 完成执行后,关闭results信道。因为所有协程都已执行完毕,于是不再需要
// 向results信道写入数据了。
func createWorkerPool(noOfWorkers int) {
var wg sync.WaitGroup
for i := 0; i < noOfWorkers; i++ {
wg.Add(1)
go worker(&wg)
}
wg.Wait()
close(results)
}
// 分配作业,当写入所有job后,关闭jobs信道。
func allocate(noOfJobs int) {
for i := 0; i < noOfJobs; i++ {
randomno := rand.Intn(999)
job := Job{i, randomno}
jobs <- job
}
close(jobs)
}
// results信道读取结果,当读取了所有结果,done写入true。
func result(done chan bool) {
for result := range results {
fmt.Printf("Job id %d, input random no %d , sum of digits %d\n",
result.job.id, result.job.randomno, result.sumofdigits)
}
done <- true
}
func main() {
startTime := time.Now()
noOfJobs := 100
go allocate(noOfJobs) // 添加作业
done := make(chan bool)
go result(done)
noOfWorkers := 10 // 可以通过增加工作者的数量来节省时间
createWorkerPool(noOfWorkers) // 创建工作池
<-done
endTime := time.Now()
diff := endTime.Sub(startTime)
fmt.Println("total time taken ", diff.Seconds(), "seconds")
}
reference:
https://studygolang.com/articles/12402
https://studygolang.com/articles/12512