Golang 内核体系架构

初始化

1
2
3
4
5
/usr/local/go/src/runtime/runtime1.go

/usr/local/go/src/runtime/os_linux.go

/usr/local/go/src/runtime/proc.go

runtime1.go

函数args整理命令行参数

1
2
3
4
5
func args(c int32, v **byte) {
argc = c
argv = v
sysargs(c, v)
}

os_linux.go

osinit确定cpu core数量

1
2
3
func osinit() {
ncpu = getproccount()
}

proc.go

最关键的schedinit这里,几乎要关注的所有运行时环境初始化构造都在这里被调用

内存分配器、垃圾回收器、并发调度器的初始化细节需要涉猎很多专属特征

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
// The bootstrap sequence is:
//
// call osinit
// call schedinit
// make & queue new G
// call runtime·mstart
//
// The new G calls runtime·main.
func schedinit() {
_g_ := getg()
if raceenabled {
_g_.racectx, raceprocctx0 = raceinit()
}


// 系统最大线程数量限制
sched.maxmcount = 10000

//栈、内存分配器、调度器相关初始化
tracebackinit()
moduledataverify()
stackinit()
mallocinit()
mcommoninit(_g_.m)
cpuinit() // must run before alginit
alginit() // maps must not be used before this call
modulesinit() // provides activeModules
typelinksinit() // uses maps, activeModules
itabsinit() // uses activeModules

msigsave(_g_.m)
initSigmask = _g_.m.sigmask

// 垃圾回收初始化
goargs()
goenvs()
parsedebugvars()
gcinit()


// 通过CPU core和GOMAXPROCS环境变量确定P数量
sched.lastpoll = uint64(nanotime())
procs := ncpu
if n, ok := atoi32(gogetenv("GOMAXPROCS")); ok && n > 0 {
procs = n
}

// 调整P数量
if procresize(procs) != nil {
throw("unknown runnable goroutine during bootstrap")
}

// For cgocheck > 1, we turn on the write barrier at all times
// and check all pointer writes. We can't do this until after
// procresize because the write barrier needs a P.
if debug.cgocheck > 1 {
writeBarrier.cgo = true
writeBarrier.enabled = true
for _, p := range allp {
p.wbBuf.reset()
}
}

if buildVersion == "" {
// Condition should never trigger. This code just serves
// to ensure runtime·buildVersion is kept in the resulting binary.
buildVersion = "unknown"
}
}

接下来要执行runtime.main而不是用户逻辑入口函数main.main

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
// The main goroutine.
func main() {
g := getg()

// Racectx of m0->g0 is used only as the parent of the main goroutine.
// It must not be used for anything else.
g.m.g0.racectx = 0

// 执行栈的最大限制: 1GB on 64-bit, 250MB on 32-bit
if sys.PtrSize == 8 {
maxstacksize = 1000000000
} else {
maxstacksize = 250000000
}

// Allow newproc to start new Ms.
mainStarted = true


// 启动系统后台监控(定时垃圾回收,以及并发任务调度相关的信息)
if GOARCH != "wasm" { // no threads on wasm yet, so no sysmon
systemstack(func() {
newm(sysmon, nil)
})
}

// Lock the main goroutine onto this, the main OS thread,
// during initialization. Most programs won't care, but a few
// do require certain calls to be made by the main thread.
// Those can arrange for main.main to run in the main thread
// by calling runtime.LockOSThread during initialization
// to preserve the lock.
lockOSThread()

if g.m != &m0 {
throw("runtime.main not on m0")
}


// 执行runtime包内所有的初始化函数 init
runtime_init() // must be before defer
if nanotime() == 0 {
throw("nanotime returning zero")
}

// Defer unlock so that runtime.Goexit during init does the unlock too.
needUnlock := true
defer func() {
if needUnlock {
unlockOSThread()
}
}()


runtimeInitTime = nanotime()


// 启动垃圾回收器后台操作
gcenable()

main_init_done = make(chan bool)
if iscgo {
if _cgo_thread_start == nil {
throw("_cgo_thread_start missing")
}
if GOOS != "windows" {
if _cgo_setenv == nil {
throw("_cgo_setenv missing")
}
if _cgo_unsetenv == nil {
throw("_cgo_unsetenv missing")
}
}
if _cgo_notify_runtime_init_done == nil {
throw("_cgo_notify_runtime_init_done missing")
}
// Start the template thread in case we enter Go from
// a C-created thread and need to create a new thread.
startTemplateThread()
cgocall(_cgo_notify_runtime_init_done, nil)
}



// 执行所有用户包的初始化函数init
fn := main_init // make an indirect call, as the linker doesn't know the address of the main package when laying down the runtime
fn()
close(main_init_done)

needUnlock = false
unlockOSThread()



// 执行用户逻辑入口 main.main函数
if isarchive || islibrary {
// A program compiled with -buildmode=c-archive or c-shared
// has a main, but it is not executed.
return
}
fn = main_main // make an indirect call, as the linker doesn't know the address of the main package when laying down the runtime
fn()
if raceenabled {
racefini()
}

// Make racy client program work: if panicking on
// another goroutine at the same time as main returns,
// let the other goroutine finish printing the panic trace.
// Once it does, it will exit. See issues 3934 and 20018.
if atomic.Load(&runningPanicDefers) != 0 {
// Running deferred functions should not take long.
for c := 0; c < 1000; c++ {
if atomic.Load(&runningPanicDefers) == 0 {
break
}
Gosched()
}
}
if atomic.Load(&panicking) != 0 {
gopark(nil, nil, waitReasonPanicWait, traceEvGoStop, 1)
}

exit(0)
for {
var x *int32
*x = 0
}
}
  • 所有init函数都在同一个goroutine内执行

  • 所有init函数结束后才会执行main.main函数




内存分配

在深入内存分配算法细节前,需了解基本概念

  • 每次从操作系统申请一大块内存,减少系统调用
  • 将申请到的大块内存按特定大小预先切分成小块,构成链表
  • 为对象分配内存时,只须从大小合适的链表提取一个小块即可
  • 回收对象内存时,将该小块内存重新归还原链表以供复用
  • 如闲置内存过多,则归还部分内存给操作系统,降低整体开销

基础概念

申请到的内存块被分配了三个区域,在X64上分别是512MB,16GB,512GB大小。

arena

arena区域就是我们所谓的堆区,Go动态分配的内存都是在这个区域,它把内存分割成8KB大小的页,一些页组合起来称为mspan。

bitmap

bitmap区域标识arena区域哪些地址保存了对象,并且用4bit标志位表示对象是否包含指针、GC标记信息。bitmap中一个byte大小的内存对应arena区域中4个指针大小(指针大小为 8B )的内存,所以bitmap区域的大小是512GB/(4*8B)=16GB

可以看到bitmap的高地址部分指向arena区域的低地址部分,也就是说bitmap的地址是由高地址向低地址增长的

spans

spans区域存放mspan(也就是一些arena分割的页组合起来的内存管理基本单元,后文会再讲)的指针

每个指针对应一页,所以spans区域的大小就是512GB/8KB*8B=512MB

除以8KB是计算arena区域的页数,而最后乘以8是计算spans区域所有指针的大小。

创建mspan的时候,按页填充对应的spans区域,在回收object时,根据地址很容易就能找到它所属的mspan




内存管理单元

内存块

  • span: 是一个双端链表的形式,里面存储了它的一些位置信息组成的大块内存。

    通过一个基地址+(页号*页大小),就可以定位到这个MSpan的实际内存空间。

  • object: 将span按特定大小切分成多个小块,每个小块可存储一个对象
1
2
3
4
5
6
7
8
9
mspan:Go中内存管理的基本单元,是由一片连续的8KB的页组成的大块内存。

注意这里的页和操作系统本身的页并不是一回事,它一般是操作系统页大小的几倍。
一句话概括:mspan是一个包含起始地址、mspan规格、页的数量等内容的双端链表。

每个mspan按照它自身的属性Size Class的大小分割成若干个object,每个object可存储一个对象。并且会使用一个位图来标记其尚未使用的object。
属性Size Class决定object大小,而mspan只会分配给和object尺寸大小接近的对象,当然,对象的大小要小于object大小。

还有一个概念:Span Class,它和Size Class的含义差不多

Go1.9.2里mspan的Size Class共有67种,每种mspan分割的object大小是8*2n的倍数,这个是写死在代码里的:

1
2
3
4
5
// path: /usr/local/go/src/runtime/sizeclasses.go

const _NumSizeClasses = 67

var class_to_size = [_NumSizeClasses]uint16{0, 8, 16, 32, 48, 64, 80, 96, 112, 128, 144, 160, 176, 192, 208, 224, 240, 256, 288, 320, 352, 384, 416, 448, 480, 512, 576, 640, 704, 768, 896, 1024, 1152, 1280, 1408, 1536,1792, 2048, 2304, 2688, 3072, 3200, 3456, 4096, 4864, 5376, 6144, 6528, 6784, 6912, 8192, 9472, 9728, 10240, 10880, 12288, 13568, 14336, 16384, 18432, 19072, 20480, 21760, 24576, 27264, 28672, 32768}

根据mspan的Size Class可以得到它划分的object大小。

比如Size Class等于3,object大小就是32B。

32B大小的object可以存储对象大小范围在17B~32B的对象。

而对于微小对象(小于16B),分配器会将其进行合并,将几个对象分配到同一个object中。

数组里最大的数是32768,也就是32KB,超过此大小就是大对象了,它会被特别对待这个稍后会再介绍。

顺便提一句类型Size Class为0表示大对象,它实际上直接由堆内存分配,而小对象都要通过mspan来分配。


对于mspan来说,它的Size Class会决定它所能分到的页数,这也是写死在代码里的:

1
2
3
const _NumSizeClasses = 67

var class_to_allocnpages = [_NumSizeClasses]uint8{0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 1, 2, 1, 2, 1, 3, 2, 3, 1, 3, 2, 3, 4, 5, 6, 1, 7, 6, 5, 4, 3, 5, 7, 2, 9, 7, 5, 8, 3, 10, 7, 4}

比如当我们要申请一个object大小为32B的mspan的时候,在class_to_size里对应的索引是3,而索引3在class_to_allocnpages数组里对应的页数就是1。

mspan结构体定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// path: /usr/local/go/src/runtime/mheap.go

type mspan struct {
//链表前向指针,用于将span链接起来
next *mspan
//链表前向指针,用于将span链接起来
prev *mspan
// 起始地址,也即所管理页的地址
startAddr uintptr
// 管理的页数
npages uintptr
// 块个数,表示有多少个块可供分配
nelems uintptr

//分配位图,每一位代表一个块是否已分配
allocBits *gcBits

// 已分配块的个数
allocCount uint16
// class表中的class ID,和Size Classs相关
spanclass spanClass

// class表中的对象大小,也即块大小
elemsize uintptr
}

上图可以看到有两个S指向了同一个mspan,因为这两个S指向的P是同属一个mspan的。
所以通过arena上的地址可以快速找到指向它的S,通过S就能找到mspan,回忆一下前面我们说的mspan区域的每个指针对应一页。

1
2
3
4
5
假设最左边第一个mspan的Size Class等于10,根据前面的class_to_size数组
得出这个msapn分割的object大小是144B,算出可分配的对象个数是8KB/144B=56.89个,取整56个
所以会有一些内存浪费掉了,Go的源码里有所有Size Class的mspan浪费的内存的大小;
再根据class_to_allocnpages数组,得到这个mspan只由1个page组成;
假设这个mspan是分配给无指针对象的,那么spanClass等于20。
  • startAddr直接指向arena区域的某个位置,表示这个mspan的起始地址
  • allocBits指向一个位图,每位代表一个块是否被分配了对象
  • allocCount则表示总共已分配的对象个数




内存管理组件

  • cache: 每个运行期工作线程都会绑定一个cache,用于无所object分配
  • central: 为所有cache提供切分好的后备span资源
  • heap: 分配堆,主要是负责向系统申请大块的内存,为下层MCentral和MCache提供内存服务。他管理的基本单位是MSpan(若干连续内存页的数据结构)

Cache

mcache:每个工作线程都会绑定一个mcache,本地缓存可用的mspan资源,这样就可以直接给Goroutine分配,因为不存在多个Goroutine竞争的情况,所以不会消耗锁资源。

mcache的结构体定义:

1
2
3
4
5
6
7
//path: /usr/local/go/src/runtime/mcache.go

type mcache struct {
alloc [numSpanClasses]*mspan
}

numSpanClasses = _NumSizeClasses << 1

mcacheSpan Classes作为索引管理多个用于分配的mspan,它包含所有规格的mspan

它是_NumSizeClasses的2倍,也就是67*2=134

为什么有一个两倍的关系,前面我们提到过:为了加速之后内存回收的速度,数组里一半的mspan中分配的对象不包含指针,另一半则包含指针。

对于无指针对象的mspan在进行垃圾回收的时候无需进一步扫描它是否引用了其他活跃的对象。

mcache在初始化的时候是没有任何mspan资源的,在使用过程中会动态地从mcentral申请,之后会缓存下来。

当对象小于等于32KB大小时,使用mcache的相应规格的mspan进行分配。


Central

mcentral:为所有mcache提供切分好的mspan资源。

每个central保存一种特定大小的全局mspan列表,包括已分配出去的和未分配出去的。

每个mcentral对应一种mspan,而mspan的种类导致它分割的object大小不同。

当工作线程的mcache中没有合适(也就是特定大小的)的mspan时就会从mcentral获取。

mcentral被所有的工作线程共同享有,存在多个Goroutine竞争的情况,因此会消耗锁资源。

结构体定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//path: /usr/local/go/src/runtime/mcentral.go

type mcentral struct {
// 互斥锁
lock mutex
// 规格
sizeclass int32
// 尚有空闲object的mspan链表
nonempty mSpanList
// 没有空闲object的mspan链表,或者是已被mcache取走的msapn链表
empty mSpanList
// 已累计分配的对象个数
nmalloc uint64
}

empty表示这条链表里的mspan都被分配了object,或者是已经被cache取走了的mspan

这个mspan就被那个工作线程独占了。而nonempty则表示有空闲对象的mspan列表。每个central结构体都在mheap中维护。

简单说下mcachemcentral获取和归还mspan的流程:

  • 获取 加锁:从nonempty链表找到一个可用的mspan;并将其从nonempty链表删除;将取出的mspan加入到empty链表;将mspan返回给工作线程;解锁。
  • 归还 加锁:将mspanempty链表删除;将mspan加入到nonempty链表;解锁。


Heap

mheap:代表Go程序持有的所有堆空间,Go程序使用一个mheap的全局对象_mheap来管理堆内存。

mcentral没有空闲的mspan时,会向mheap申请。

mheap没有资源时,会向操作系统申请新内存。

mheap主要用于大对象的内存分配,以及管理未切割的mspan,用于给mcentral切割成小对象。

同时我们也看到,mheap中含有所有规格的mcentral,所以当一个mcachemcentral申请mspan

只需要在独立的mcentral中使用锁,并不会影响申请其他规格的mspan

mheap结构体定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
//path: /usr/local/go/src/runtime/mheap.go

type mheap struct {
lock mutex
// spans: 指向mspans区域,用于映射mspan和page的关系
spans []*mspan
// 指向bitmap首地址,bitmap是从高地址向低地址增长的
bitmap uintptr

// 指示arena区首地址
arena_start uintptr
// 指示arena区已使用地址位置
arena_used uintptr
// 指示arena区末地址
arena_end uintptr

central [67*2]struct {
mcentral mcentral
pad [sys.CacheLineSize - unsafe.Sizeof(mcentral{})%sys.CacheLineSize]byte
}
}

分配流程

Go的内存分配器在分配对象时,根据对象的大小分成三类:小对象(小于等于16B)、一般对象(大于16B,小于等于32KB)、大对象(大于32KB)。

大体上的分配流程:

  • 32KB 的对象,直接从mheap上分配;
  • <=16B 的对象使用mcache的tiny分配器分配;
  • (16B,32KB] 的对象首先计算对象的规格大小,然后使用mcache中相应规格大小的mspan分配;
  • 如果mcache没有相应规格大小的mspan,则向mcentral申请
  • 如果mcentral没有相应规格大小的mspan,则向mheap申请
  • 如果mheap中也没有合适大小的mspan,则向操作系统申请




并发调度

P、M、G关系

用户空间线程和内核空间线程之间的映射关系有:N:1、1:1和M:N

  • N:1是说,多个(N)用户线程始终在一个内核线程上跑,context上下文切换确实很快,但是无法真正的利用多核。
  • 1:1是说,一个用户线程就只在一个内核线程上跑,这时可以利用多核,但是上下文switch很慢。
  • M:N是说,多个goroutine在多个内核线程上跑,这个看似可以集齐上面两者的优势,但是无疑增加了调度的难度。

Go的调度器内部有三个重要的结构:M,P,G
M:代表真正的内核OS线程,和POSIX里的thread差不多,真正干活的人
G:代表一个goroutine,它有自己的栈,instruction pointer和其他信息(正在等待的channel等等),用于调度。
P:代表调度的上下文,可以把它看做一个局部的调度器,使go代码在一个线程上跑,它是实现从N:1到N:M映射的关键。

图中看,有2个物理线程M,每一个M都拥有一个context(P),每一个也都有一个正在运行的goroutine。
P的数量可以通过runtime.GOMAXPROCS()来设置,它其实也就代表了真正的并发度,即有多少个goroutine可以同时运行。
图中灰色的那些goroutine并没有运行,而是出于ready的就绪态,正在等待被调度。P维护着这个队列(称之为runqueue),
Go语言里,启动一个goroutine很容易:go function 就行,所以每有一个go语句被执行,runqueue队列就在其末尾加入一个
goroutine,在下一个调度点,就从runqueue中取出(如何决定取哪个goroutine?)一个goroutine执行。


为何要维护多个上下文P?

因为当一个OS线程被阻塞时,P可以转而投奔另一个OS线程!
图中看到,当一个OS线程M0陷入阻塞时,P转而在OS线程M1上运行。调度器保证有足够的线程来运行所有的context P。

图中的M1可能是被创建,或者从线程缓存中取出。

当MO返回时它必须尝试取得一个context P来运行goroutine

一般情况下它会从其他的OS线程那里steal偷一个context过来,

如果没有偷到的话,它就把goroutine放在一个global runqueue里然后自己就去睡大觉了(放入线程缓存里)。

Contexts们也会周期性的检查global runqueue,否则global runqueue上的goroutine永远无法执行。


另一种情况是P所分配的任务G很快就执行完了(分配不均),这就导致了一个上下文P闲着没事儿干而系统却任然忙碌。

但是如果global runqueue没有任务G了,那么P就不得不从其他的上下文P那里拿一些G来执行。

一般来说如果上下文P从其他的上下文P那里要偷一个任务的话,一般就‘偷’runqueue的一半,这就确保了每个OS线程都能充分的使用。

调度流程简述

Go语言是原生支持语言级并发的,这个并发的最小逻辑单元就是goroutine。

goroutine就类似于Go语言提供的一种“用户态线程”

当然这种“用户态线程”是跑在内核级线程之上的。

当我们创建了很多的goroutine,并且它们都是跑在同一个内核线程之上的时候,就需要一个调度器来维护这些goroutine,确保所有的goroutine都使用CPU并且是尽可能公平的使用CPU资源。

这个调度器的原理以及实现值得我们去深入研究一下。

支撑整个调度器的主要有4个重要结构,分别是P、M、G、Sched前三个定义在runtime.h中,Sched定义在proc.c中。

  • Sched结构就是调度器,它维护有存储M和G的队列以及调度器的一些状态信息等。
  • M代表内核级线程,一个M就是一个线程,goroutine就是跑在M之上的;M是一个很大的结构,里面维护小对象内存cache(mcache)、当前执行的goroutine、随机数发生器等等非常多的信息。
  • P全称是Processor,处理器,它的主要用途就是用来执行goroutine的,所以它也维护了一个goroutine队列,里面存储了所有需要它来执行的goroutine,这个P的角色可能有一点让人迷惑,一开始容易和M冲突,后面重点聊一下它们的关系。
  • G就是goroutine实现的核心结构了,G维护了goroutine需要的栈、程序计数器以及它所在的M等信息。

理解M、P、G三者的关系对理解整个调度器非常重要,我从网络上找了一个图来说明其三者关系:

地鼠用小车运着一堆待加工的砖。

M就可以看作图中的地鼠,P就是小车,G就是小车里装的砖。

一图胜千言啊,弄清楚了它们三者的关系,下面我们就开始重点聊地鼠是如何在搬运砖块的。


启动过程

在关心绝大多数程序的内部原理的时候,我们都试图去弄明白其启动初始化过程,弄明白这个过程对后续的深入分析至关重要。

在asm_amd64.s文件中的汇编代码_rt0_amd64就是整个启动过程核心过程如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
CALL	runtime.args(SB)
CALL runtime.osinit(SB)
CALL runtime.hashinit(SB)
CALL runtime.schedinit(SB)

// create a new goroutine to start program
PUSHQ $runtime.main.f(SB) // entry
PUSHQ $0 // arg size
CALL runtime.newproc(SB)
POPQ AX
POPQ AX

// start this M
CALL runtime.mstart(SB)

启动过程做了调度器初始化runtime.schedinit

1
2
3
4
5
启动过程中的调度器初始化runtime.schedinit函数主要根据用户设置的GOMAXPROCS值来创建一批小车(P)

不管GOMAXPROCS设置为多大,最多也只能创建256个小车(P)。

这些小车(p)初始创建好后都是闲置状态,也就是还没开始使用,所以它们都放置在调度器结构(Sched)的 pidle字段维护的链表中存储起来了,以备后续之需。

再调用runtime.newproc创建出第一个goroutine

1
2
3
这个goroutine将执行的函数是runtime.main,这第一个goroutine也就是所谓的主goroutine。

我们写的最简单的Go程序`”hello,world”`就是完全跑在这个goroutine里,当然任何一个Go程序的入口都是从这个goroutine开始的。

最后调用的runtime.mstart就是内核线程M真正的执行上一步创建的主goroutine。


查看runtime.main函数可以了解到主goroutine开始执行后,做的第一件事情是创建了一个新的内核级线程(地鼠M)

不过这个线程是一个特殊线程,它在整个运行期专门负责做特定的事情——系统监控(sysmon)。

接下来就是进入Go程序的main函数开始Go程序的执行。

至此,Go程序就被启动起来开始运行了。

一个真正干活的Go程序,一定创建有不少的goroutine,所以在Go程序开始运行后,就会向调度器添加goroutine,调度器就要负责维护好这些goroutine的正常执行。


创建goroutine(G)

在Go程序中,时常会有类似代码:

1
go do_something()

go关键字就是用来创建一个goroutine的,后面的函数就是这个goroutine需要执行的代码逻辑。

go关键字对应到调度器的接口就是runtime.newproc

runtime.newproc干的事情很简单,就负责制造一块砖(G),然后将这块砖(G)放入当前这个地鼠(M)的小车(P)中。


每个新的goroutine都需要有一个自己的栈,G结构的 sched字段维护了栈地址以及程序计数器等信息,这是最基本的调度信息

也就是说这个goroutine放弃cpu的时候需要保存这些信息,待下次重新获得cpu的时候需要将这些信息装载到对应的cpu寄存器中。

假设这个时候已经创建了大量的goroutne,就轮到调度器去维护这些goroutine了。


创建内核线程(M)

Go程序中没有语言级的关键字让你去创建一个内核线程,你只能创建goroutine,内核线程只能由runtime根据实际情况去创建。

runtime什么时候创建线程?

以地鼠运砖图来讲,砖(G)太多了,地鼠(M)又太少了,实在忙不过来,刚好还有空闲的小车(P)没有使用,那就从别处再借些地鼠(M)过来直到把小车(p)用完为止。这

里有一个地鼠(M)不够用,从别处借地鼠(M)的过程,这个过程就是创建一个内核线程(M)。创建M的接口函数是:

1
void newm(void (*fn)(void), P *p)

newm函数的核心行为就是调用clone系统调用创建一个内核线程,每个内核线程的开始执行位置都是runtime.mstart函数。

参数p就是一辆空闲的小车(p)。

每个创建好的内核线程都从runtime.mstart函数开始执行了,它们将用分配给自己小车去搬砖了。


调度核心

newm接口只是给新创建的M分配了一个空闲的P,也就是相当于告诉借来的地鼠(M)——“接下来的日子,你将使用1号小车搬砖,记住是1号小车;待会自己到停车场拿车。”

地鼠(M)去拿小车(P)这个过程就是acquirep

runtime.mstart在进入schedul之前会给当前M装配上P,runtime.mstart函数中的代码

1
2
3
4
5
} else if(m != &runtime.m0) {
acquirep(m->nextp);
m->nextp = nil;
}
schedule();

if分支的内容就是为当前M装配上P,nextp就是newm分配的空闲小车(P),只是到这个时候才真正拿到手罢了。

没有P,M是无法执行goroutine的,就像地鼠没有小车无法运砖一样的道理。

对应acquirep的动作是releasep,把M装配的P给载掉;活干完了,地鼠需要休息了,就把小车还到停车场,然后睡觉去。

地鼠(M)拿到属于自己的小车(P)后,就进入工场开始干活了,也就是上面的 schedule调用。简化schedule的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
static void
schedule(void)
{
G *gp;

gp = runqget(m->p);
if(gp == nil)
gp = findrunnable();

if (m->p->runqhead != m->p->runqtail &&
runtime.atomicload(&runtime.sched.nmspinning) == 0 &&
runtime.atomicload(&runtime.sched.npidle) > 0) // TODO: fast atomic
wakep();

execute(gp);
}

schedule函数被我简化了太多,主要是我不喜欢贴大段大段的代码,因此只保留主干代码了。

这里涉及到4大步逻辑:

  • runqget
    地鼠(M)试图从自己的小车(P)取出一块砖(G),当然结果可能失败,也就是这个地鼠的小车已经空了没有砖了。

  • findrunnable
    如果地鼠自己的小车中没有砖,那也不能闲着不干活是吧,所以地鼠就会试图跑去工场仓库取一块砖来处理;工场仓库也可能没砖啊,出现这种情况的时候,这个地鼠也没有偷懒停下干活,而是悄悄跑出去,随机盯上一个小伙伴(地鼠),然后从它的车里试图偷一半砖到自己车里。如果多次尝试偷砖都失败了,那说明实在没有砖可搬了,这个时候地鼠就会把小车还回停车场,然后 睡觉休息了。如果地鼠睡觉了,下面的过程当然都停止了,地鼠睡觉也就是线程sleep了。

  • wakep
    到这个过程的时候,可怜的地鼠发现自己小车里有好多砖啊,自己根本处理不过来;再回头一看停车场居然有闲置的小车,立马跑到宿舍一看,你妹,居然还有小伙伴在睡觉,直接给屁股一脚,你妹,居然还在睡觉,老子都快累死了,赶紧起来干活,分担点工作。,小伙伴醒了,拿上自己的小车,乖乖干活去了。有时候可怜的地鼠跑到宿舍却发现没有在睡觉的小伙伴,于是会很失望,最后只好向工场老板说:停车场还有闲置的车啊,我快干不动了,赶紧从别的工场借个地鼠来帮忙吧。,最后工场老板就搞来一个新的地鼠干活了。

  • execute
    地鼠拿着砖放入火种欢快的烧练起来。

到这里貌似整个工场都正常的运转起来了,无懈可击的样子。

不对还有一个疑点没解决啊,假设地鼠的车里有很多砖,它把一块砖放入火炉中后,何时把它取出来,放入第二块砖呢?

难道要一直把第一块砖烧练好才取出来吗?那估计后面的砖真的是等得花儿都要谢了。

这里就是要真正解决goroutine的调度上下文切换问题。


调度点

runtime.park

当我们翻看channel的实现代码可以发现,对channel读写操作的时候会触发调用runtime.park函数。

goroutine调用park后,这个goroutine就会被设置位waiting状态放弃CPU。被park的goroutine处于waiting状态,并且这个goroutine不在小车(P)中,如果不对其调用runtime.ready,它是永远不会再被执行的。除了channel操作外,定时器、网络poll等都有可能park goroutine

runtime.gosched

除了park可以放弃cpu外,调用runtime.gosched函数也可以让当前goroutine放弃cpu,但和park完全不同;gosched是将goroutine设置为runnable状态,然后放入到调度器全局等待队列(也就是上面提到的工场仓库,这下就明白为何工场仓库会有砖块(G)了吧)。

除此之外就轮到系统调用了,有些系统调用也会触发重新调度。

Go语言完全是自己封装的系统调用,所以在封装系统调用的时候,可以做不少手脚,也就是进入系统调用的时候执行entersyscall,退出后又执行exitsyscall函数。

也只有封装了entersyscall的系统调用才有可能触发重新调度,它将改变小车(P)的状态为syscall。

还记一开始提到的sysmon线程吗?

这个系统监控线程会扫描所有的小车(P),发现一个小车(P)处于了syscall的状态,就知道这个小车(P)遇到了goroutine在做系统调用,于是系统监控线程就会创建一个新的地鼠(M)去把这个处于syscall的小车给抢过来,开始干活,这样这个小车中的所有砖块(G)就可以绕过之前系统调用的等待了。

从goroutine的调度点可以看出,调度器还是挺粗暴的,调度粒度有点过大,公平性也没有想想的那么好。总之这个调度器还是比较简单的。

综上所述,goroutine上下文切换的调度时机可分为以下几个条件:

  • goroutine阻塞(waiting)
  • 显式调用runtime.gosched()
  • 系统调用system call
1
2
3
协程一般都是这样工作的,但是从1.2开始为了避免饿死其它goroutine,就是在发生任意函数调用的时候,都有机会触发scheduler。

所以从1.2开始如果你的goroutine中是纯计算,没有任何系统调用,scheduler仍然有机会介入,不会永远独占CPU。


现场处理

goroutine在cpu上换入换出,不断上下文切换的时候,必须要保证的事情就是保存现场恢复现场;

  • 保存现场就是在goroutine放弃cpu的时候,将相关寄存器的值给保存到内存中;

  • 恢复现场就是在goroutine重新获得cpu的时候,需要从内存把之前的寄存器信息全部放回到相应寄存器中去。

goroutine在主动放弃cpu的时候(park/gosched),都会涉及到调用runtime.mcall函数,此函数也是汇编实现,主要将goroutine的栈地址和程序计数器保存到G结构的sched字段中

mcall就完成了现场保存。恢复现场的函数是runtime.gogocall,这个函数主要在 execute中调用,就是在执行goroutine前,需要重新装载相应的寄存器。




垃圾回收

  • v1.1 STW
  • v1.3 Mark STW, Sweep 并行
  • v1.5 三色标记法
  • v1.8 hybrid write barrier

标记-清扫

标记-清扫算法是第一种自动内存管理,基于追踪的垃圾收集算法。

算法思想在 70 年代就提出了,是一种非常古老的算法。

内存单元并不会在变成垃圾立刻回收,而是保持不可达状态,直到到达某个阈值或者固定时间长度。

这个时候系统会挂起用户程序也就是 STW,转而执行垃圾回收程序。

垃圾回收程序对所有的存活单元进行一次全局遍历确定哪些单元可以回收。

算法分两个部分:标记(mark)清扫(sweep)

标记阶段表明所有的存活单元,清扫阶段将垃圾单元回收。可视化可以参考下图。

三色标记法

三色标记算法是对标记阶段的改进,原理如下:

  • 1:起初所有对象都是白色。
  • 2:从根出发扫描所有可达对象,标记为灰色,放入待处理队列。
  • 3:从队列取出灰色对象,将其引用对象标记为灰色放入队列,自身标记为黑色。
  • 4:重复 3,直到灰色对象队列为空。此时白色对象即为垃圾,进行回收。

何时触发 GC

在堆上分配大于32K byte对象的时候进行检测此时是否满足垃圾回收条件,如果满足则进行垃圾回收。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func mallocgc(size uintptr, typ *_type, needzero bool) unsafe.Pointer {
...
shouldhelpgc := false
// 分配的对象小于 32K byte
if size <= maxSmallSize {
...
} else {
shouldhelpgc = true
...
}
...
// gcShouldStart() 函数进行触发条件检测
if shouldhelpgc && gcShouldStart(false) {
// gcStart() 函数进行垃圾回收
gcStart(gcBackgroundMode, false)
}
}

上面是自动垃圾回收,还有一种是主动垃圾回收,通过调用runtime.GC(),这是阻塞式的。

1
2
3
4
5
6
// GC runs a garbage collection and blocks the caller until the
// garbage collection is complete. It may also block the entire
// program.
func GC() {
gcStart(gcForceBlockMode, false)
}

GC 触发条件

触发条件主要关注下面代码中的中间部分:forceTrigger || memstats.heap_live >= memstats.gc_triggerforceTriggerforceGC的标志;

后面半句的意思是当前堆上的活跃对象大于我们初始化时候设置的 GC 触发阈值。在malloc以及free的时候heap_live会一直进行更新,这里就不再展开了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// gcShouldStart returns true if the exit condition for the _GCoff
// phase has been met. The exit condition should be tested when
// allocating.
//
// If forceTrigger is true, it ignores the current heap size, but
// checks all other conditions. In general this should be false.
func gcShouldStart(forceTrigger bool) bool {
return gcphase == _GCoff && (forceTrigger || memstats.heap_live >= memstats.gc_trigger) && memstats.enablegc && panicking == 0 && gcpercent >= 0
}

//初始化的时候设置 GC 的触发阈值
func gcinit() {
_ = setGCPercent(readgogc())
memstats.gc_trigger = heapminimum
...
}
// 启动的时候通过 GOGC 传递百分比 x
// 触发阈值等于 x * defaultHeapMinimum (defaultHeapMinimum 默认是 4M)
func readgogc() int32 {
p := gogetenv("GOGC")
if p == "off" {
return -1
}
if n, ok := atoi32(p); ok {
return n
}
return 100
}

垃圾回收流程

  • 首先从 root 开始遍历,root 包括全局指针和 goroutine 栈上的指针。
  • mark 有两个过程。
    • 从 root 开始遍历,标记为灰色。遍历灰色队列。
    • re-scan 全局指针和栈。因为 mark 和用户程序是并行的,所以在过程 1 的时候可能会有新的对象分配,这个时候就需要通过写屏障(write barrier)记录下来。re-scan 再完成检查一下。
  • Stop The World 有两个过程。
    • 第一个是 GC 将要开始的时候,这个时候主要是一些准备工作,比如 enable write barrier。
    • 第二个过程就是上面提到的 re-scan 过程。如果这个时候没有 stw,那么 mark 将无休止。

源码步骤

标记 STW phase 1

在 GC 开始之前的准备工作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// gcStart 是 GC 的入口函数,根据 gcMode 做处理。
// 1. gcMode == gcBackgroundMode(后台运行,也就是并行), _GCoff -> _GCmark
// 2. 否则 GCoff -> _GCmarktermination,这个时候就是主动 GC
func gcStart(mode gcMode, forceTrigger bool) {
...
}


....


func gcStart(mode gcMode, forceTrigger bool) {
...
//在后台启动 mark worker
if mode == gcBackgroundMode {
gcBgMarkStartWorkers()
}
...
// Stop The World
systemstack(stopTheWorldWithSema)
...
if mode == gcBackgroundMode {
// GC 开始前的准备工作

//处理设置 GCPhase,setGCPhase 还会 enable write barrier
setGCPhase(_GCmark)

gcBgMarkPrepare() // Must happen before assist enable.
gcMarkRootPrepare()

// Mark all active tinyalloc blocks. Since we're
// allocating from these, they need to be black like
// other allocations. The alternative is to blacken
// the tiny block on every allocation from it, which
// would slow down the tiny allocator.
gcMarkTinyAllocs()

// Start The World
systemstack(startTheWorldWithSema)
} else {
...
}
}


Mark

Mark 阶段是并行的运行,通过在后台一直运行 mark worker 来实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
func gcStart(mode gcMode, forceTrigger bool) {
...
//在后台启动 mark worker
if mode == gcBackgroundMode {
gcBgMarkStartWorkers()
}
}

func gcBgMarkStartWorkers() {
// Background marking is performed by per-P G's. Ensure that
// each P has a background GC G.
for _, p := range &allp {
if p == nil || p.status == _Pdead {
break
}
if p.gcBgMarkWorker == 0 {
go gcBgMarkWorker(p)
notetsleepg(&work.bgMarkReady, -1)
noteclear(&work.bgMarkReady)
}
}
}
// gcBgMarkWorker 是一直在后台运行的,大部分时候是休眠状态,通过 gcController 来调度
func gcBgMarkWorker(_p_ *p) {
for {
// 将当前 goroutine 休眠,直到满足某些条件
gopark(...)
...
// mark 过程
systemstack(func() {
// Mark our goroutine preemptible so its stack
// can be scanned. This lets two mark workers
// scan each other (otherwise, they would
// deadlock). We must not modify anything on
// the G stack. However, stack shrinking is
// disabled for mark workers, so it is safe to
// read from the G stack.
casgstatus(gp, _Grunning, _Gwaiting)
switch _p_.gcMarkWorkerMode {
default:
throw("gcBgMarkWorker: unexpected gcMarkWorkerMode")
case gcMarkWorkerDedicatedMode:
gcDrain(&_p_.gcw, gcDrainNoBlock|gcDrainFlushBgCredit)
case gcMarkWorkerFractionalMode:
gcDrain(&_p_.gcw, gcDrainUntilPreempt|gcDrainFlushBgCredit)
case gcMarkWorkerIdleMode:
gcDrain(&_p_.gcw, gcDrainIdle|gcDrainUntilPreempt|gcDrainFlushBgCredit)
}
casgstatus(gp, _Gwaiting, _Grunning)
})
...
}
}

Mark 阶段的标记代码主要在函数 gcDrain() 中实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// gcDrain scans roots and objects in work buffers, blackening grey
// objects until all roots and work buffers have been drained.
func gcDrain(gcw *gcWork, flags gcDrainFlags) {
...
// Drain root marking jobs.
if work.markrootNext < work.markrootJobs {
for !(preemptible && gp.preempt) {
job := atomic.Xadd(&work.markrootNext, +1) - 1
if job >= work.markrootJobs {
break
}
markroot(gcw, job)
if idle && pollWork() {
goto done
}
}
}

// 处理 heap 标记
// Drain heap marking jobs.
for !(preemptible && gp.preempt) {
...
//从灰色列队中取出对象
var b uintptr
if blocking {
b = gcw.get()
} else {
b = gcw.tryGetFast()
if b == 0 {
b = gcw.tryGet()
}
}
if b == 0 {
// work barrier reached or tryGet failed.
break
}
//扫描灰色对象的引用对象,标记为灰色,入灰色队列
scanobject(b, gcw)
}
}


Mark termination (STW phase 2)

mark termination 阶段会 stop the world。

函数实现在 gcMarkTermination()。1.8 版本已经不会再对 goroutine stack 进行 re-scan 了。细节有点多,这里不细说了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func gcMarkTermination() {
// World is stopped.
// Run gc on the g0 stack. We do this so that the g stack
// we're currently running on will no longer change. Cuts
// the root set down a bit (g0 stacks are not scanned, and
// we don't need to scan gc's internal state). We also
// need to switch to g0 so we can shrink the stack.
systemstack(func() {
gcMark(startTime)
// Must return immediately.
// The outer function's stack may have moved
// during gcMark (it shrinks stacks, including the
// outer function's stack), so we must not refer
// to any of its variables. Return back to the
// non-system stack to pick up the new addresses
// before continuing.
})
...
}


清扫

清扫相对来说就简单很多了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func gcSweep(mode gcMode) {
...
//阻塞式
if !_ConcurrentSweep || mode == gcForceBlockMode {
// Special case synchronous sweep.
...
// Sweep all spans eagerly.
for sweepone() != ^uintptr(0) {
sweep.npausesweep++
}
// Do an additional mProf_GC, because all 'free' events are now real as well.
mProf_GC()
mProf_GC()
return
}

// 并行式
// Background sweep.
lock(&sweep.lock)
if sweep.parked {
sweep.parked = false
ready(sweep.g, 0, true)
}
unlock(&sweep.lock)
}

对于并行式清扫,在 GC 初始化的时候就会启动bgsweep(),然后在后台一直循环。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
func bgsweep(c chan int) {
sweep.g = getg()

lock(&sweep.lock)
sweep.parked = true
c <- 1
goparkunlock(&sweep.lock, "GC sweep wait", traceEvGoBlock, 1)

for {
for gosweepone() != ^uintptr(0) {
sweep.nbgsweep++
Gosched()
}
lock(&sweep.lock)
if !gosweepdone() {
// This can happen if a GC runs between
// gosweepone returning ^0 above
// and the lock being acquired.
unlock(&sweep.lock)
continue
}
sweep.parked = true
goparkunlock(&sweep.lock, "GC sweep wait", traceEvGoBlock, 1)
}
}

func gosweepone() uintptr {
var ret uintptr
systemstack(func() {
ret = sweepone()
})
return ret
}

不管是阻塞式还是并行式,都是通过 sweepone()函数来做清扫工作的。

内存管理都是基于 span 的, mheap_ 是一个全局的变量,所有分配的对象都会记录在 mheap_ 中。

在标记的时候,我们只要找到对对象对应的 span 进行标记,清扫的时候扫描 span,没有标记的 span 就可以回收了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// sweeps one span
// returns number of pages returned to heap, or ^uintptr(0) if there is nothing to sweep
func sweepone() uintptr {
...
for {
s := mheap_.sweepSpans[1-sg/2%2].pop()
...
if !s.sweep(false) {
// Span is still in-use, so this returned no
// pages to the heap and the span needs to
// move to the swept in-use list.
npages = 0
}
}
}

// Sweep frees or collects finalizers for blocks not marked in the mark phase.
// It clears the mark bits in preparation for the next GC round.
// Returns true if the span was returned to heap.
// If preserve=true, don't return it to heap nor relink in MCentral lists;
// caller takes care of it.
func (s *mspan) sweep(preserve bool) bool {
...
}




Select&Channel分析

Select

根据 select 中语句的不同选择了不同的优化路径:

  • 空的 select 语句会被直接转换成 block 函数的调用,直接挂起当前 Goroutine;
  • 如果 select 语句中只包含一个 case,就会被转换成 if ch == nil { block }; n; 表达式;
    • 首先判断操作的 Channel 是不是空的;
    • 然后执行 case 结构中的内容;
  • 如果 select 语句中只包含两个 case 并且其中一个是 default,那么 Channel 和接收和发送操作都会使用 selectnbrecv 和 selectnbsend 非阻塞地执行接收和发送操作;
  • 在默认情况下会通过 selectgo 函数选择需要执行的 case 并通过多个 if 语句执行 case 中的表达式;

在编译器已经对 select 语句进行优化之后,Go 语言会在运行时执行编译期间展开的 selectgo 函数,这个函数会按照以下的过程执行:

  • 随机生成一个遍历的轮询顺序 pollOrder 并根据 Channel 地址生成一个用于遍历的锁定顺序 lockOrder;
  • 根据 pollOrder 遍历所有的 case 查看是否有可以立刻处理的 Channel 消息;
    • 如果有消息就直接获取 case 对应的索引并返回;
  • 如果没有消息就会创建 sudog 结构体,将当前 Goroutine 加入到所有相关 Channel 的 sendq 和 recvq 队列中并调用 gopark 触发调度器的调度;
  • 当调度器唤醒当前 Goroutine 时就会再次按照 lockOrder 遍历所有的 case,从中查找需要被处理的 sudog 结构并返回对应的索引;

然而并不是所有的 select 控制结构都会走到 selectgo 上,很多情况都会被直接优化掉,没有机会调用 selectgo 函数。


Channel

在 Go 语言中,一个最常见的也是经常被人提及的设计模式就是不要通过共享内存的方式进行通信,而是应该通过通信的方式共享内存

Go 语言对于并发编程的设计与上述这种共享内存的方式完全不同,虽然我们在 Golang 中也能使用共享内存加互斥锁来实现并发编程,但是与此同时,Go 语言也提供了一种不同的并发模型,也就是 CSP,即通信顺序进程(Communicating sequential processes),

Goroutine 其实就是 CSP 中的实体,Channel 就是用于传递信息的通道,使用 CSP 并发模型的 Goroutine 就会通过 Channel 来传递消息。

上图中的两个 Goroutine,一个会负责向 Channel 中发送消息,另一个会负责从 Channel 中接收消息,它们两者并没有任何直接的关联,能够独立地工作和运行,但是间接地通过 Channel 完成了通信。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
type hchan struct {
qcount uint
dataqsiz uint
buf unsafe.Pointer
elemsize uint16
closed uint32
elemtype *_type
sendx uint
recvx uint
recvq waitq
sendq waitq

lock mutex
}

qcount、dataqsize、buf、sendx、recv 的主要作用就是构建底层的循环队列

  • qcount 保存了当前 Channel 中的元素个数
  • dataqsize 表示 Channel 中的循环队列的长度
  • buf 指向了一个长度为 dataqsiz 的数组
  • sendx 和 recvx 负责标识当前 Channel 的发送和接收已经处理到了数组中的哪个位置
  • elemsize 和 elemtype 分别表示了当前 Channel 能够收发的元素类型和大小
  • sendq 和 recvq 的主要作用就是存储当前 Channel 由于缓冲区空间不足而阻塞的 Goroutine 列表
1
2
3
4
type waitq struct {
first *sudog
last *sudog
}

Channel能够执行的操作其实也就只有创建、发送、接收和关闭几种


发送

channel发送数据类似ch <- i表达式

这个表达式会被编译器解析成OSEND节点,同样地在SSA中间代码的生成期间,这些OSEND节点也会被转换成chansend1的函数调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func walkexpr(n *Node, init *Nodes) *Node {
switch n.Op {
case OSEND:
n1 := n.Right
n1 = assignconv(n1, n.Left.Type.Elem(), "chan send")
n1 = walkexpr(n1, init)
n1 = nod(OADDR, n1, nil)
n = mkcall1(chanfn("chansend1", 2, n.Left.Type), nil, init, n.Left, n1)
}
}

....


func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
lock(&c.lock)

if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}

chansend就是向Channel中发送数据时最终会调用的函数,这个函数负责了发送数据的全部逻辑

在发送数据的逻辑执行之前会先为当前Channel加锁,防止出现竞争条件的问题

如果当前Channel结构已经通过closed字段被标记成了关闭

那么在向该Channel发送数据时就会直接 panic 报出一个非常常见的错误"send on closed channel"并返回。


直接发送

如果目标Channel被关闭并且已经有处于读等待的 Goroutine,那么chansend函数会通过dequeuerecvq中取出最先先入等待的Goroutine并直接向它发送数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
if sg := c.recvq.dequeue(); sg != nil {
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}

func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if sg.elem != nil {
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
goready(gp, skip+1)
}

随后的goready函数会将等待接收数据的Goroutine标记成Grunnable并把该协程放到发送方所在的处理器P上等待执行

该处理器P在下一次调度时就会立刻唤醒消息接收方所在的协程。


缓冲区

Channel中发送数据时遇到的第二种情况就是创建的Channel包含缓冲区并且Channel中的数据没有装满,在这时就会执行下面的这段代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// ...
if c.qcount < c.dataqsiz {
qp := chanbuf(c, c.sendx)
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}

在这里我们首先会使用chanbuf计算出下一个可以放置待处理变量的位置,然后通过typedmemmove将发送的消息拷贝到缓冲区中并增加sendx索引和qcount计数器,在函数的最后会释放持有的锁。


阻塞发送

最后要介绍的就是向Channel发送但是遇到下游无法处理的『阻塞发送』了,当然如果传入的参数block=false,那么就会直接释放持有的锁并返回false表示这一次的发送不成功。

在常见的场景中,向Channel发送消息的操作基本上都是阻塞的,在这时就会执行下面的代码,我们可以简单梳理一下这段代码的逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// ...
if !block {
unlock(&c.lock)
return false
}

gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg)
goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3)

gp.waiting = nil
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true
}
  • 调用getg获取发送操作时使用的Goroutine协程;
  • 执行acquireSudog函数获取一个sudog结构体并设置这一次阻塞发送的相关信息,例如发送的Channel、是否在Select控制结构中、发送数据所在的地址等;
  • 将刚刚创建并初始化的sudog结构体加入sendq等待队列,并设置到当前Goroutinewaiting上,表示Goroutine正在等待该sudog准备就绪;
  • 调用goparkunlock函数将当前的Goroutine更新成Gwaiting状态并解锁,该Goroutine可以被调用goready再次唤醒;
  • 当前的Goroutine其实就会在这里陷入阻塞状态等待被调度器唤醒了;
  • 如果被调度器唤醒就会执行一些收尾的工作,将一些属性置零并且释放sudog结构体;

在最后,函数会返回 true 表示这一次发送的结束并继续运行当前 Goroutine 应该执行的逻辑。




接收

分析了Channel发送数据的过程之后,我们就可以继续介绍数据处理的另一端,也就是数据的接收了,我们在 Go 语言中其实有两种不同的方式去接收管道中的数据:

1
2
i <- ch
i, ok <- ch

这两种不同的方法经过编译器的处理都会变成ORECV类型的节点,但是后者会在类型检查阶段被转换成OAS2RECV节点,我们可以简单看一下这里转换的路线图:

虽然这两种不同的接收方式会被转换成 chanrecv1 和 chanrecv2 两种不同函数的调用,但是这两个函数最终调用的还是 chanrecv。

chanrecv 处理数据接收时总共可以分成五种不同的情况,当我们从一个空 Channel 中接收数据时会直接调用 gopark 直接让出当前 Goroutine 处理器的使用权。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
if c == nil {
if !block {
return
}
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}

lock(&c.lock)

if c.closed != 0 && c.qcount == 0 {
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}

如果当前的 Channel 已经被关闭并且缓冲区中不存在任何的数据,那么就会直接解锁当前的 Channel 并清除 ep 指针的数据。

直接接收

当 Channel 的 sendq 队列中包含处于等待状态的 Goroutine 时,我们其实就会直接取出队列头的 Goroutine,这里处理的逻辑和发送时所差无几,只是发送数据时调用的是 send 函数,而这里是 recv 函数:

1
2
3
4
if sg := c.sendq.dequeue(); sg != nil {
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}

recv 函数的实现其实也与 send 非常相似,我们可以简单看一下这里执行的逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if c.dataqsiz == 0 {
if ep != nil {
recvDirect(c.elemtype, sg, ep)
}
} else {
qp := chanbuf(c, c.recvx)
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemmove(c.elemtype, qp, sg.elem)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
sg.elem = nil
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
goready(gp, skip+1)
}
  • 如果当前 Channel 中不存在未被处理的数据,就会调用recvDirect,这个函数会将sendq中 Goroutine 存储的 elem 数据拷贝到目标内存地址中;
  • 如果当前 Channel 已经满了,就会通过typedmemmove将队列中的数据拷贝到接收方的内存地址中并将发送方的数据拷贝到队列中,这样我们可以释放一个阻塞的发送方 Goroutine;
  • 在最后会解锁 Channel 并调用 goready 函数将当前处理器的 runnext 设置成发送数据的 Goroutine,随后 <-ch 会返回并执行下面的逻辑;

上图展示了 Channel 在缓冲区已经没有空间并且 sendq 中存在等待的 Goroutine 时,使用 <-ch 发生的变化,sendq 队列中的第一个 sudog 结构中的元素会替换 sendx/recvx 索引所在位置的元素,原有的元素会被拷贝到接收 <-ch 结果的内存空间上。


缓冲区

另一种接收数据时遇到的情况就是,Channel 的缓冲区中已经包含了一些元素,在这时如果使用 <-ch 从 Channel 中接收元素,我们就会直接从缓冲区中 recvx 的索引位置中取出数据进行处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// ...
if c.qcount > 0 {
qp := chanbuf(c, c.recvx)
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}

如果接收数据的内存地不为空,那么就会直接使用typedmemmove将缓冲区中的数据拷贝到内存中,在这之后会清除队列中的数据并完成收尾工作。

收尾工作就包括递增recvx索引的数据,当发现索引超过了当前队列的容量时,由于这是一个循环队列,所以就会将它归零;除此之外,这个函数还会减少qcount计数器并释放持有Channel的锁。

阻塞接收

当 Channel 的sendq队列中不存在等待的Goroutine并且缓冲区中也不存在任何数据时,从管道中接收数据的操作在大多数时候就会变成一个阻塞的操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// ...
if !block {
unlock(&c.lock)
return false, false
}

gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg)
goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)

gp.waiting = nil
closed := gp.param == nil
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, !closed
}

这种阻塞的情况下其实有一个例外,也就是与select语句结合使用时就可能会使用到非阻塞block=false的接收操作,这段代码在这时就会获取一个 sudog 结构体设置到当前Goroutinewaiting上并将其入队到recvq中。

除此之外,当前代码片段还会调用goparkunlock函数立刻触发Goroutine的调度,将当前Goroutine的状态改成Gwaiting并让出处理器的使用权,在这时Goroutine就会处于休眠状态等待调度器的调度,重新执行时就会从gp.waiting = nil处继续运行下面的代码对数据进行清理。

小结

我们简单梳理一下从 Channel 中接收数据时的几种情况:

  • 如果 Channel 是空的,那么就会直接调用gopark挂起当前的Goroutine
  • 如果 Channel 已经关闭并且缓冲区没有任何数据,chanrecv函数就会直接返回;
  • 如果 Channel 上的sendq队列中存在挂起的Goroutine,就会将recvx索引所在的数据拷贝到接收变量所在的内存空间上并将 sendq 队列中 Goroutine 的数据拷贝到缓冲区中;
  • 如果 Channel 的缓冲区中包含数据就会直接从recvx所在的索引上进行读取;
  • 在默认情况下会直接挂起当前的Goroutine,将sudog结构加入recvq队列并更新Goroutinewaiting属性,最后陷入休眠等待调度器的唤醒;

在从管道中接收数据的过程中,其实会在两个时间点触发 Goroutine 的调度,首先空的 Channel 意味着永远接收不到消息,那么就会直接挂起当前 Goroutine,第二个时间点是缓冲区中不存在数据,在这时也会直接挂起当前的 Goroutine 等待发送方发送数据。




声明

文献资料摘录于