一般接触新东西都会有三问(what/why/how),我们也从这几个角度看下golang context。

WHAT IS CONTEXT

Goroutine和Channel是go语言的特色,并发编程会使用协程来处理工作,通道用来协程之间的通信。主协程创建子协程,子协程再创建子协程,就会形成一个协程树状结构,协程之间可以采用通道来进行通信。
每个Goroutine在执行之前,都要先知道程序当前的执行状态,通常将这些执行状态都会封装Context变量中,传递给要执行的Goroutine中。比如在网络编程中,处理一个网络Request时,我们可能需要开启不同的Goroutine来获取数据与逻辑处理。而这些Goroutine可能需要共享Request的一些信息;同时当Request异常、超时或者被取消的时候,所有从这个Request创建的所有Goroutine都应该被结束。

WHY CONTEXT

上面我们说过,由于异常、超时或者被取消的情况,都应该结束该Request创建的所有Goroutine。假如不用context,我们看怎么去控制协程树。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package services

import (
"fmt"
"time"
)

func StartWork() {
data := make(chan int, 10)
done := make(chan struct{}) // 使用一个chan来控制协程的退出

defer close(data)
// consumer
go func() {
for {
select {
case <-done:
fmt.Println("child process interrupt...")
return
default:
fmt.Printf("send message: %d\n", <-data)
time.Sleep(time.Second * 1)
}
}
}()

// producer
for i := 0; i < 10; i++ {
data <- i
}
time.Sleep(5 * time.Second)
// 退出协程
close(done)
time.Sleep(1 * time.Second)
fmt.Println("main process exit!")
}

子协程运行着任务,如果主协程需要在某个时刻发送消息通知子协程中断任务退出,那么就可以让子协程监听这个done channel,一旦主协程关闭done channel,那么子协程就可以推出了,这样就实现了主协程通知子协程的需求。这种方式运行良好,但是这也是有限的。
考虑这种情况:如果主协程中有多个任务1, 2, …n,主协程对这些任务有时间限制;而任务1又有多个子任务1, 2, …,k, 任务1对这些子任务也有自己的时间控制,那么这些子任务既要感知主协程的取消信号,也需要感知任务1的取消信号。
如果使用done channel的用法,这个时候需要定义两个done channel,子任务们需要同时监听这两个done channel,这样也可以正常work。但是如果层级更深,如果这些子任务还有子任务,那么使用done channel的方式将会变得非常繁琐且混乱。
这个时候 context出场了。

context interface

1
2
3
4
5
6
type Context interface {
Deadline() (deadline time.Time, ok bool)
Done() <-chan struct{}
Err() error
Value(key interface{}) interface{}
}

context包含四个方法

  • Deadline 返回绑定当前context的任务执行的截止时间,如果没有截止时间,则ok==false
  • Done 该函数返回一个只读的chan 数据是空结构体,类似我们上面的done,控制协程的退出
  • Err 如果协程退出,该函数返回退出的原因,被取消或者执行超时等错误
  • Value 获取context中传递的值

可以看到Done方法返回的chan正是用来传递结束信号以抢占并中断当前任务的;Deadline方法用来表示gorotine是否在指定时间被取消;Err方法是用来解释goroutine被取消的原因;而Value则用于获取特定于当前任务的数据。
而context所包含的额外信息键值对是如何存储的呢?其实可以想象一颗树,树的每个节点可能携带一组键值对,如果当前节点上无法找到key所对应的值,就会向上去父节点里找,直到根节点,具体后面会说到。

下面我们看下context的两个比较常用的实现,valueCtx/cancelCtx

valueCtx

1
2
3
4
type valueCtx struct {
Context
key, val interface{}
}

结构比较简单就是增加了key/val字段存储键值对
WithValue用以向其中存储键值对

1
2
3
4
5
6
7
8
9
func WithValue(parent Context, key, val interface{}) Context {
if key == nil {
panic("nil key")
}
if !reflect.TypeOf(key).Comparable() {
panic("key is not comparable")
}
return &valueCtx{parent, key, val}
}

这里添加键值对不是直接在原context结构体上直接添加,而是重新创建一个新的valueCtx节点,作为原ctx的子节点,并将键值对添加在子节点上,由此形成一条链表。获取value就是从这条链表上尾部向前搜寻(代码如下):

1
2
3
4
5
6
func (c *valueCtx) Value(key interface{}) interface{} {
if c.key == key {
return c.val
}
return c.Context.Value(key)
}

valueCtx的增加值与获取值的过程如下:
With就是往链表的尾部增加节点,value就是从尾部开始获取对应的值,找到就退出;否则找到头部返回空.

enter image description here

cancelCtx

1
2
3
4
5
6
7
8
9
10
11
12
type canceler interface {
cancel(removeFromParent bool, err error)
Done() <-chan struct{}
}

type cancelCtx struct {
Context
mu sync.Mutex // protects following fields
done chan struct{} // created lazily, closed by first cancel call
children map[canceler]struct{} // set to nil by the first cancel call
err error // set to non-nil by the first cancel call
}

cancelCtx是一个可以取消的 Context,实现了 canceler 接口。它直接将接口 Context 作为它的一个匿名字段,这样,它就可以被看成一个 Context。done是一个空结构体类型的channel,用来传递关闭信号,在协程中一般结合select来监听父协程退出信号;children是一个map,存储了所有可取消context的子节点,这样任意层级的context取消,都会给所有子context发送取消信号;err用于存储错误信息表示任务结束的原因。
先看下Done方法:

1
2
3
4
5
6
7
8
9
func (c *cancelCtx) Done() <-chan struct{} {
c.mu.Lock()
if c.done == nil {
c.done = make(chan struct{})
}
d := c.done
c.mu.Unlock()
return d
}

c.done懒汉式创建,调用Done方法才会创建。返回一个只读的chan,一般结合select监听。一旦取消,立马可读。
重点看下cancel方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
func (c *cancelCtx) cancel(removeFromParent bool, err error) {
// 必须要传 err
if err == nil {
panic("context: internal error: missing cancel error")
}
c.mu.Lock()
if c.err != nil {
c.mu.Unlock()
return // 已经被其他协程取消
}
// 给 err 字段赋值
c.err = err
// 关闭 channel,通知其他协程
if c.done == nil {
c.done = closedchan
} else {
close(c.done)
}

// 遍历它的所有子节点
for child := range c.children {
// 递归地取消所有子节点
child.cancel(false, err)
}
// 将子节点置空
c.children = nil
c.mu.Unlock()

if removeFromParent {
// 从父节点中移除自己
removeChild(c.Context, c)
}
}

总体来看,cancel() 方法的功能就是关闭 channel:c.done;递归地取消所有的子节点;将自己从父节点树中摘掉。通过关闭 channel,goroutine 这边接收取消信号的方式就是 select 语句中的读 ctx.Done 可读,执行相应的退出函数。
这里有个比较巧妙的地方,调用子节点 cancel 方法的时候,传入的第一个参数是 false,最终子节点是没有调用removeChild,把自己从父节点移除。
移除的操作很简单,找到最近的可取消的祖先节点,将自己从其map中删除。最关键的一行:delete(p.children, child)。

1
2
3
4
5
6
7
8
9
10
11
12
13
func removeChild(parent Context, child canceler) {
// 查找最近的可取消节点
p, ok := parentCancelCtx(parent)
if !ok {
return
}
p.mu.Lock()
if p.children != nil {
// 从最近的祖先节点的map中移除自己
delete(p.children, child)
}
p.mu.Unlock()
}

那什么时候需要移除,什么时候不需要移除呢?
我们先来看下创建一个可取消的context。
WithCancel函数创建一个可取消的context,即cancelCtx类型的context,传入一个父context,返回context和CancelFunc,调用CancelFunc即可触发cancel操作。直接看源码:

1
2
3
4
5
6
7
8
9
10
11
type CancelFunc func()

func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
c := newCancelCtx(parent)
propagateCancel(parent, &c)
return &c, func() { c.cancel(true, Canceled) } // 这里cancel传入了true
}

func newCancelCtx(parent Context) cancelCtx {
return cancelCtx{Context: parent}
}

可以看到,只有在使用WithCancel创建context的时候,返回的cancelFunc会传入true。这样当调用cancelFunc 时,会将这个 可取消的context从它的父节点里移除,因为父节点可能有很多子节点,取消之后要和父节点断绝关系,对其他没影响。而对于该context的所有子节点都会因为该节点的cancelFunc调用c.children = nil而化为灰烬,没有必要再一个一个移除。

enter image description here

如上左图,代表一棵 context 树,当然也可以看做是一个协程树。当调用左图子context的cancel 方法后,该 context会依次调用children中的cancel的方法,此时子context不会移除自己;该context将自己从它的父 context中去除掉了:从children中delete,实线箭头变成了虚线。且虚线圈框出来的 context 都被取消了,圈内的 context间的父子关系都被取消掉了。
再重点看propagateCancel():

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func propagateCancel(parent Context, child canceler) {
// 父节点是个空节点
if parent.Done() == nil {
return // parent is never canceled
}
// 找到可以取消的父 context
if p, ok := parentCancelCtx(parent); ok {
p.mu.Lock()
if p.err != nil {
// 父节点已经被取消了,本节点(子节点)也要取消
child.cancel(false, p.err)
} else {
// 父节点未取消
if p.children == nil {
p.children = make(map[canceler]struct{})
}
// "挂到"父节点上
p.children[child] = struct{}{}
}
p.mu.Unlock()
} else {
// 如果没有找到可取消的父 context。新启动一个协程监控父节点或子节点取消信号
go func() {
select {
case <-parent.Done():
child.cancel(false, parent.Err())
case <-child.Done():
}
}()
}
}

上面这段代码的功能就是向上寻找可以“依靠”的“可取消”的context,并且“挂靠”上去。这样,调用上层cancel方法的时候,就可以层层传递,将那些挂靠的子context同时“取消”。
这里也有一个比较巧妙的设计,就是else的情况。起初我一直不理解,怎么可能会有else的情况发生,parent.Done()不为空,怎么会找不到可取消的父节点。这里要翻看parentCancelCtx的源码了。。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func parentCancelCtx(parent Context) (*cancelCtx, bool) {
for {
switch c := parent.(type) {
case *cancelCtx:
return c, true
case *timerCtx:
return &c.cancelCtx, true
case *valueCtx:
parent = c.Context
default:
return nil, false
}
}
}

这里只会找三种Context类型:cancelCtx,timerCtx,*valueCtx。若是把Context内嵌到自定义类型里,就识别不出来了,就会走default了。为避免这种情况父节点取消传递不到当前节点之后的节点,重新启动了一个协程来传递这种情况,所以使用可取消的context的时候,尽量避免将ctx塞入自定义的结构里,不然会多一个协程来处理。
另一个巧妙的地方就是select

1
2
3
4
5
select {
case <-parent.Done():
child.cancel(false, parent.Err())
case <-child.Done():
}

同时监听了父节点是否退出,也监听当前节点是否退出。这二者缺一不可。

第一个case 好理解,上层几点取消要继续传递下去,就监听了上层是否被取消。

第二个case 如果子节点自己退出了,那就不需要这个协程了,他的作用就是为了连接上层节点与自己。但如果去掉这个case,上层协程一直不取消,那这个goroutine就泄漏了。

总结

context主要用于处理父子协程之间的同步取消信号,本质上是一种协程同步调用协调的方式。
在使用context的时候,有两点要注意:

  • 父协程只是使用context通知所有的子协程,我已经不需要你了,但不会直接干涉和中断下游任务的执行,具体的操作是由子协程决定操作,因此子协程要使用select来监听ctx.Done。
  • context是线程安全的,可以放心地在多个协程中传递使用。