跳到主要内容

channel

channel一个类型管道,通过它可以在goroutine之间发送和接收消息。它是Golang在语言层面提供的goroutine间的通信方式。Go依赖于成为CSP的并发模型,通过Channel实现这种同步模式。Golang并发的核心哲学是不要通过共享内存进行通信。

在地铁站、食堂、洗手间等公共场所人很多的情况下,大家养成了排队的习惯,目的也是避免拥挤、插队导致的低效的资源使用和交换过程。代码与数据也是如此,多个 goroutine 为了争抢数据,势必造成执行的低效率,使用队列的方式是最高效的,channel 就是一种队列一样的结构。

Go语言中的通道(channel)是一种特殊的类型。在任何时候,同时只能有一个 goroutine 访问通道进行发送和获取数据。goroutine 间通过通道就可以通信。通道像一个传送带或者队列,总是遵循先入先出(First In First Out)的规则,保证收发数据的顺序。

5.1、声明创建通道

通道本身需要一个类型进行修饰,就像切片类型需要标识元素类型。通道的元素类型就是在其内部传输的数据类型,声明如下:

var 通道变量 chan 通道类型

通道是引用类型,需要使用 make 进行创建,格式如下:

通道实例 := make(chan 数据类型) // 不带缓冲的chan
通道实例 := make(chan 数据类型,数量) // 带缓冲的chan

5.2、channel基本操作

ch := make(chan 数据类型)
// 向通道发送数据
ch <-
// 从通道接收数据
<- ch

示例:

package main

import "fmt"

type Stu struct {
Name string
age int
}

func main() {
// 案例1
ch := make(chan int, 10)
fmt.Println(len(ch), cap(ch))

ch <- 1
ch <- 2
ch <- 3

fmt.Println(<-ch) // FIFO
fmt.Println(<-ch)
fmt.Println(<-ch)

// 案例2
ch2 := make(chan interface{}, 3)
ch2 <- 100
ch2 <- "hello"
ch2 <- Stu{"yuan", 22}

fmt.Println(<-ch2)
fmt.Println(<-ch2)
fmt.Println(<-ch2)
// s := <-ch2
//fmt.Println(s.Name)
// fmt.Println(s.(Stu).Name)

// 案例3
ch3 := make(chan int, 3)
x := 10
ch3 <- x // 值拷贝
x = 20
fmt.Println(<-ch3)

ch4 := make(chan *int, 3)
y := 20
ch4 <- &y
y = 30
p := <-ch4
fmt.Println(*p)
}

5.3、chan是引用类型

通道的结构hchan,源码再src/runtime/chan.go下:

   type hchan struct {
qcount uint // total data in the queue 当前队列里还剩余元素个数
dataqsiz uint // size of the circular queue 环形队列长度,即缓冲区的大小,即make(chan T,N) 中的N
buf unsafe.Pointer // points to an array of dataqsiz elements 环形队列指针
elemsize uint16 //每个元素的大小
closed uint32 //标识当前通道是否处于关闭状态,创建通道后,该字段设置0,即打开通道;通道调用close将其设置为1,通道关闭
elemtype *_type // element type 元素类型,用于数据传递过程中的赋值
sendx uint // send index 环形缓冲区的状态字段,它只是缓冲区的当前索引-支持数组,它可以从中发送数据
recvx uint // receive index 环形缓冲区的状态字段,它只是缓冲区当前索引-支持数组,它可以从中接受数据
recvq waitq // list of recv waiters 等待读消息的goroutine队列
sendq waitq // list of send waiters 等待写消息的goroutine队列

// lock protects all fields in hchan, as well as several
// fields in sudogs blocked on this channel.
//
// Do not change another G's status while holding this lock
// (in particular, do not ready a G), as this can deadlock
// with stack shrinking.
lock mutex //互斥锁,为每个读写操作锁定通道,因为发送和接受必须是互斥操作
}

// sudog 代表goroutine
type waitq struct {
first *sudog
last *sudog
}

img

han内部实现了一个环形队列作为其缓冲区,队列的长度是创建chan时指定的。

环形队列

下图展示了一个可缓存6个元素的channel示意图:

image-20220304120254478

  • dataqsiz指示了队列长度为6,即可缓存6个元素;

  • buf指向队列的内存,队列中还剩余两个元素;

  • qcount表示队列中还有两个元素(len(chan)可查询chan的队列元素个数);

  • sendx指示后续写入的数据存储的位置,取值[0, 6);

  • recvx指示从该位置读取数据, 取值[0, 6);

package main

import "fmt"

func foo(c chan int) {
c <- 50
}

func main() {

// 引用类型
var ch5 = make(chan int, 3)
var ch6 = ch5
ch5 <- 100
ch5 <- 200
fmt.Println(<-ch6)
fmt.Println(<-ch5)

var ch7 = make(chan int, 3)
foo(ch7)
fmt.Println(<-ch7)
}

5.4、管道的关闭与循环

当向通道中发送完数据时,我们可以通过close函数来关闭通道。关闭 channel 非常简单,直接使用Go语言内置的 close() 函数即可,关闭后的通道只可读不可写。

ch3 := make(chan int, 10)
ch3 <- 1
ch3 <- 2
ch3 <- 3

close(ch3)
fmt.Println(<-ch3)
ch3 <- 4

如果不close掉channel是会发生死锁的,原因是当for循环读完channel后会继续尝试读取下一个,而由于channel没有写入的协程且没关闭,会一直阻塞形成死锁。

package main

import (
"fmt"
"time"
)

func main() {

ch := make(chan int, 10)
ch <- 1
ch <- 2
ch <- 3

// 方式1
go func() {
time.Sleep(time.Second * 10)
ch <- 4
}()
for v := range ch {
fmt.Println(v, len(ch))
// 读取完所有值后,ch的sendq中没有groutine
if len(ch) == 0 { // 如果现有数据量为0,跳出循环
break
}
}

close(ch)
for i := range ch {
fmt.Println(i)
}
}

在介绍了如何关闭 channel 之后,我们就多了一个问题:如何判断一个 channel 是否已经被关闭?我们可以在读取的时候使用多重返回值的方式:

x, ok := <-ch

这个用法与 map 中的按键获取 value 的过程比较类似,只需要看第二个 bool 返回值即可,如果返回值是 false 则表示 ch 已经被关闭。

生产者消费者模型

package main

import (
"fmt"
"sync"
"time"
)

func producer(ch chan int) {

for i := 1; i < 11; i++ {

ch <- i
fmt.Println("插入值",i)
}
wg.Done()
}

func consumer(ch chan int) {

for i := 1; i < 11; i++ {
time.Sleep(time.Second)
fmt.Println("取出值",<-ch)
}
wg.Done()
}

var wg sync.WaitGroup
func main() {
ch := make(chan int, 100)

wg.Add(2)
go producer(ch)
go consumer(ch)

wg.Wait()
fmt.Println("process end")
}

5.5、缓冲通道

  • 无缓冲的通道是指在接收前没有能力保存任何值的通道

  • 有缓冲的通道是一种在被接收前能存储一个或者多个值的通道

无缓冲的通道又称为阻塞的通道。我们来看一下下面的代码:

func main() {
ch := make(chan int)
ch <- 10
fmt.Println("发送成功")
}

上面这段代码能够通过编译,但是执行的时候会出现以下错误:

fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan send]:
main.main()
.../src/github.com/Q1mi/studygo/day06/channel02/main.go:8 +0x54

为什么会出现deadlock错误呢?

因为我们使用ch := make(chan int)创建的是无缓冲的通道,无缓冲的通道只有在有人接收值的时候才能发送值。简单来说就是无缓冲的通道必须有接收才能发送。

func recv(c chan int) {
ret := <-c
fmt.Println("接收成功", ret)
}
func main() {
ch := make(chan int)
go recv(ch) // 启用goroutine从通道接收值
ch <- 10
fmt.Println("发送成功")
}

无缓冲通道上的发送操作会阻塞,直到另一个goroutine在该通道上执行接收操作,这时值才能发送成功,两个goroutine将继续执行。相反,如果接收操作先执行,接收方的goroutine将阻塞,直到另一个goroutine在该通道上发送一个值。

使用无缓冲通道进行通信将导致发送和接收的goroutine同步化。因此,无缓冲通道也被称为同步通道

提示

have a try:生产者消费者模型案例改为无缓冲通道的运行结果?

5.6、死锁(deadlock)

案例1

当程序一直在等待从信道里读取数据,而此时并没有人会往信道中写入数据。此时程序就会陷入死循环,造成死锁。

package main

import (
"fmt"
"sync"
"time"
)

var wg sync.WaitGroup

func main() {
pipline := make(chan int)
wg.Add(2)
go func() {
defer wg.Done()
for i := 0; i < 10; i++ {
time.Sleep(time.Second)
pipline <- i
}
close(pipline) // 关闭chan,循环chan的协程就可以退出循环了,否则因为chan的sendq为空陷入deadlock
}()

go func() {
defer wg.Done()
for v := range pipline {
fmt.Println("v:", v)
}
}()
wg.Wait()
}

解决方法很简单,只要在发送完数据后,手动关闭信道,告诉 range 信道已经关闭,无需等待就行。

案例2

package main

import (
"fmt"
"sync"
"time"
)

var wg sync.WaitGroup

func recv(c chan interface{}) {
defer wg.Done()
for true {
time.Sleep(time.Second)
ret := <-c
if ret == "exit"{
close(c)
break
}
fmt.Println("接收成功", ret)
}
}

func send(c chan interface{}) {
defer wg.Done()
for i:=0;i<10;i++{
c<- i
}
c<- "exit"

// time.Sleep(time.Second*10) // 写的协程结束,导致死锁
}

func main() {
ch := make(chan interface{})
wg.Add(2)
go recv(ch) // 启用goroutine从通道接收值
go send(ch) // 启用goroutine从通道接收值

wg.Wait()
fmt.Println("end")
}

5.7、单向通道

Go语言的类型系统提供了单方向的 channel 类型,顾名思义,单向 channel 就是只能用于写入或者只能用于读取数据。当然 channel 本身必然是同时支持读写的,否则根本没法用。

我们在将一个 channel 变量传递到一个函数时,可以通过将其指定为单向 channel 变量,从而限制该函数中可以对此 channel 的操作,比如只能往这个 channel 中写入数据,或者只能从这个 channel 读取数据。

单向 channel 变量的声明非常简单,只能写入数据的通道类型为chan<-,只能读取数据的通道类型为<-chan,格式如下:

var 通道实例 chan<- 元素类型    // 只能写入数据的通道
var 通道实例 <-chan 元素类型 // 只能读取数据的通道
package main

import (
"fmt"
"sync"
"time"
)

func producer(ch chan<- int) {

for i := 1; i < 11; i++ {
ch <- i
fmt.Println("插入值", i)
}
wg.Done()
}

func consumer(ch <-chan int) {

for i := 1; i < 11; i++ {
time.Sleep(time.Second)
fmt.Println("取出值", <-ch)
}
wg.Done()
}

var wg sync.WaitGroup

func main() {
ch := make(chan int)

wg.Add(2)
go producer(ch)
go consumer(ch)

wg.Wait()
fmt.Println("end")
}

5.8、select语句

golang中的select语句格式如下

select {
case <-ch1:
// 如果从 ch1 信道成功接收数据,则执行该分支代码
case ch2 <- 1:
// 如果成功向 ch2 信道成功发送数据,则执行该分支代码
default:
// 如果上面都没有成功,则进入 default 分支处理流程
}

可以看到select的语法结构有点类似于switch,但又有些不同。select里的case后面并不带判断条件,而是一个信道的操作,不同于switch里的case,对于从其它语言转过来的开发者来说有些需要特别注意的地方。golang 的 select 就是监听 IO 操作,当 IO 操作发生时,触发相应的动作每个case语句里必须是一个IO操作,确切的说,应该是一个面向channel的IO操作。

提示

注:Go 语言的 select 语句借鉴自 Unix 的 select() 函数,在 Unix 中,可以通过调用 select() 函数来监控一系列的文件句柄,一旦其中一个文件句柄发生了 IO 动作,该 select() 调用就会被返回(C 语言中就是这么做的),后来该机制也被用于实现高并发的 Socket 服务器程序。Go 语言直接在语言级别支持 select关键字,用于处理并发编程中通道之间异步 IO 通信问题。

注意:如果 ch1 或者 ch2 信道都阻塞的话,就会立即进入 default 分支,并不会阻塞。但是如果没有 default 语句,则会阻塞直到某个信道操作成功为止。

(1)select语句只能用于信道的读写操作

package main

import "fmt"

func main() {
size := 10
ch1 := make(chan int, size)
for i := 0; i < size; i++ {
ch1 <- 1
}

ch2 := make(chan int, size+1)
for i := 0; i < size; i++ {
ch2 <- 2
}

// select中的case语句是随机执行的
select {
case a := <-ch1:
fmt.Println("a", a)
case b := <-ch2:
fmt.Println("b", b)
case ch2 <- 200:
fmt.Println("插值成功")
default: // 如果 ch1 和 ch2 信道都阻塞的话,就会立即进入default分支
fmt.Println("default")
}
}

(2)超时用法

package main

import (
"fmt"
"time"
)

func main() {
ch := make(chan int)
go func(c chan int) {
// 修改时间后,再查看执行结果
time.Sleep(time.Second * 3)
ch <- 1
}(ch)

select {
case v := <-ch:
fmt.Print(v)
case <-time.After(2 * time.Second): // 等待 2s
fmt.Println("no case ok")
}
}

(3)空select

select指的是内部不包含任何case,例如:

select{

}

空的 select 语句会直接阻塞当前的goroutine,使得该goroutine进入无法被唤醒的永久休眠状态。