前几天 PDD 面试被问到这个问题,吟唱 GMP 调度八股,但是很明显面试官很不满意,指出不要背八股,他想听的 runtime 的调度过程。(尽管可能还是因为我八股看的不够多)特查看源码理解一下(,顺带总结一下 GMP 模型。
调度模型
首先总结一下但凡涉及到 golang 八股就肯定绕不开的 GMP 调度。
GM 模型
Golang 1.1 前, 使用的是 GM 调度模型。 G
即 goroutine
,代表 go 协程;M
即Machine
,是处理线程操作的结构体,直接与操作系统进行交互,可以直接视作 OS 线程。新建的 G
会被放到一个全局队列中等待M
的处理。
来看一下早期 1.0.1 版本的调度部分代码:
// One round of scheduler: find a goroutine and run it.
// The argument is the goroutine that was running before
// schedule was called, or nil if this is the first call.
// Never returns.
static void
schedule(G *gp)
{
int32 hz;
uint32 v;
schedlock();
if(gp != nil) {
// Just finished running gp.
// 解绑当前的 G 和 M,全局队列的 G 运行数--
gp->m = nil;
runtime·sched.grunning--;
// atomic { mcpu-- }
v = runtime·xadd(&runtime·sched.atomic, -1<<mcpuShift);
if(atomic_mcpu(v) > maxgomaxprocs)
runtime·throw("negative mcpu in scheduler");
switch(gp->status){
case Grunnable:
// 为什么就绪状态的会被调度?
case Gdead:
// Shouldn't have been running!
// 唉,你怎么死了
runtime·throw("bad gp->status in sched");
case Grunning:
gp->status = Grunnable;
// 回调度队列
gput(gp);
break;
case Gmoribund:
// 将死的 go 协程,销毁资源
gp->status = Gdead; // 埋了
if(gp->lockedm) {
gp->lockedm = nil;
m->lockedg = nil;
}
gp->idlem = nil;
unwindstack(gp, nil); // 释放栈空间
gfput(gp); // 回收 G 到空闲列表
if(--runtime·sched.gcount == 0)
runtime·exit(0);
break;
}
// 如果 gp 设置了 readyonstop(表示它执行完需要唤醒某个 G)
if(gp->readyonstop){
gp->readyonstop = 0;
readylocked(gp);
}
} else if(m->helpgc) { // 清理 GC 辅助协程
// Bootstrap m or new m started by starttheworld.
// atomic { mcpu-- }
v = runtime·xadd(&runtime·sched.atomic, -1<<mcpuShift);
if(atomic_mcpu(v) > maxgomaxprocs)
runtime·throw("negative mcpu in scheduler");
// Compensate for increment in starttheworld().
runtime·sched.grunning--;
m->helpgc = 0;
} else if(m->nextg != nil) {
// New m started by matchmg.
// m 被分配任务
} else {
runtime·throw("invalid m state in scheduler");
}
// Find (or wait for) g to run. Unlocks runtime·sched.
// 从队列中找下一个 g 来运行
gp = nextgandunlock();
gp->readyonstop = 0;
gp->status = Grunning;
m->curg = gp;
gp->m = m;
// Check whether the profiler needs to be turned on or off.
hz = runtime·sched.profilehz;
if(m->profilehz != hz)
runtime·resetcpuprofiler(hz);
if(gp->sched.pc == (byte*)runtime·goexit) { // kickoff
runtime·gogocall(&gp->sched, (void(*)(void))gp->entry);
}
runtime·gogo(&gp->sched, 0); // 真正切换到 gp 的执行上下文
}
首先schedlock()
对调度器加锁保护全局队列。然后判断gp
的状态,如果已经完成,则彻底销毁;反之则正常回收当前 goroutine。然后从队列中取下一个 g 来运行。
static G*
nextgandunlock(void)
{
G *gp;
uint32 v;
top:
if(atomic_mcpu(runtime·sched.atomic) >= maxgomaxprocs)
runtime·throw("negative mcpu");
// If there is a g waiting as m->nextg, the mcpu++
// happened before it was passed to mnextg.
if(m->nextg != nil) {
gp = m->nextg;
m->nextg = nil;
schedunlock();
return gp;
}
......
GM 模型的问题?
从上面的代码可以看出,每次调度都需要获取一遍全局锁,导致频繁的锁竞争。
M 之间频繁通过
G.nextg
传递 g,导致额外开销。同时每个 M 都有自己的独立内存缓存
M.mcache
,造成资源浪费+数据局部性差。系统调用时 M 经常被阻塞或解除阻塞,造成很多额外开销
GMP 调度模型
GM 模型的问题就是为什么要引入 P 的原因。P
即为Processor
,表示 M
的执行上下文。每一个 P 都和一个 M 相互绑定。
每个 P 都会维护自己的本地队列,减轻了对全局队列的依赖,从而减少锁的竞争。同时带来的工作窃取机制也减少了 M 的空转,提高了资源利用率。
G, M, P
GMP 调度流程
go func() 创建协程后会加入到一个 P 的本地队列中; 如果本地队列已满则加入全局队列。
每个 P 和一个 M 绑定, M 从 P 的本地队列中获取 G 来执行。若 M 绑定的 P 本地队列为空怎么会从其他 P 的本地队列中偷取 G。
若 G 因系统调用阻塞, P 会和当前 M 解绑并寻找新的空闲 M,若没有空闲的 M 则新建一个 M;若 G 因通道或网络 I/O 阻塞,则 M 会寻找其他处于 Grunnable 状态的 G;原来被阻塞的 G 恢复后会重新进入 P 的本地队列等待执行。
G 执行完成后释放资源。
schedule()
func schedule() { mp := getg().m // 获取当前 G 所在的 M ...... top: pp := mp.p.ptr() // 获取当前 M 所属的 P pp.preempt = false ...... gp, inheritTime, tryWakeP := findRunnable() // blocks until work is available ...... execute(gp, inheritTime) }#
经过众多版本迭代,1.23 版的 schedule() 实现复杂了很多,其中选择具体 G 的代码封装在
findRunnable()
中,这个函数实在太长,这里只简单讲一下其逻辑。
findRunnable()
做的是:
找出一个可执行的 goroutine(gp),并返回是否继承时间片(inheritTime),以及是否唤醒新的 P(tryWakeP)。
首先尝试调度 traceReader 或 GC worker,这些是 runtime 系统 goroutine,优先级高。然后查看本地队列。如果本地队列是空的,再尝试从全局队列获取 G。为了防止某个 P 的本地队列饱和而其他 P 饿死,每隔一段频率会优先尝试取全局队列的 G。
如果上面都没有找到任务,并且 spinning M 数量没超过限制,尝试从其他 P 的队列中偷任务。成功偷到就执行;偷不到但发现有新 timer 或 GC 任务则重试。
最后如果没有任务了,则准备释放 P,阻塞 M。
那么 runtime 到底做了什么?
编译器会将 go func() { … } 翻译成 runtime.newproc() 语句:
// Create a new g running fn.
// Put it on the queue of g's waiting to run.
// The compiler turns a go statement into a call to this.
func newproc(fn *funcval) {
gp := getg()
\\ 获取当前的程序计数器(返回地址),用于调试和分析栈踪迹时追踪调用来源
pc := sys.GetCallerPC()
\\ 切换到当前 M 的系统栈
systemstack(func() {
newg := newproc1(fn, gp, pc, false, waitReasonZero)
\\ 获取当前线程的 P 并将新创建的 newg 加入到本地队列
pp := getg().m.p.ptr()
runqput(pp, newg, true)
if mainStarted {
wakep()
}
})
}
再来看一下 newproc1 的内部逻辑
// Create a new g in state _Grunnable (or _Gwaiting if parked is true), starting at fn.
// callerpc is the address of the go statement that created this. The caller is responsible
// for adding the new g to the scheduler. If parked is true, waitreason must be non-zero.
func newproc1(fn *funcval, callergp *g, callerpc uintptr, parked bool, waitreason waitReason) *g {
if fn == nil {
fatal("go of nil func value")
}
mp := acquirem() // disable preemption because we hold M and P in local vars.
pp := mp.p.ptr()
newg := gfget(pp)
if newg == nil {
newg = malg(stackMin)
casgstatus(newg, _Gidle, _Gdead)
allgadd(newg) // publishes with a g->status of Gdead so GC scanner doesn't look at uninitialized stack.
}
......
totalSize := uintptr(4*goarch.PtrSize + sys.MinFrameSize) // extra space in case of reads slightly beyond frame
totalSize = alignUp(totalSize, sys.StackAlign)
sp := newg.stack.hi - totalSize
......
memclrNoHeapPointers(unsafe.Pointer(&newg.sched), unsafe.Sizeof(newg.sched))
newg.sched.sp = sp
newg.stktopsp = sp
newg.sched.pc = abi.FuncPCABI0(goexit) + sys.PCQuantum // +PCQuantum so that previous instruction is in same function
newg.sched.g = guintptr(unsafe.Pointer(newg))
gostartcallfn(&newg.sched, fn)
newg.parentGoid = callergp.goid
newg.gopc = callerpc
newg.ancestors = saveAncestors(callergp)
newg.startpc = fn.fn
...... // 判断是否系统 G,进行标记
// Track initial transition?
newg.trackingSeq = uint8(cheaprand())
if newg.trackingSeq%gTrackingPeriod == 0 {
newg.tracking = true
}
gcController.addScannableStack(pp, int64(newg.stack.hi-newg.stack.lo))
....... \\ trace 跟踪
// Set up race context.
if raceenabled {
newg.racectx = racegostart(callerpc)
newg.raceignore = 0
if newg.labels != nil {
// See note in proflabel.go on labelSync's role in synchronizing
// with the reads in the signal handler.
racereleasemergeg(newg, unsafe.Pointer(&labelSync))
}
}
releasem(mp) \\ 释放锁
return newg
}
在获取到当前 M 后,首先尝试从当前 P 的空闲 G 池中复用,若没有空闲 G,则分配新栈。之后初始化新 G 的调度上下文和元数据。最后返回这个 G。
总结
最后总结一下标题的答案:
当使用 go 关键字新建一个 goroutine 时, runtime 会调用 newproc 生成新的 G。
在 newproc 中,用 systemstack() 切换到 (g0 的) 系统栈,调用 newproc1。
在 newproc1 中,首先尝试复用 P 中空闲的 G,若没有,则新建一个 G 实例并为其分配栈空间。
初始化调度上下文 (sched)。
分配唯一的 goid,记录父 G 的信息、调用栈、标签等元数据。
设置 G 的初始状态为 _Grunnable,即可运行状态。
将这个 G 加入当前 P 的本地队列,让调度器 schedule() 后续执行它。(若已满则进入全局队列)
此外, runtime还会做一些tracing、GC 标记、新栈注册、race 检测等附加工作。
附录
type p struct {
id int32
status uint32 // one of pidle/prunning/...
link puintptr
schedtick uint32 // incremented on every scheduler call
syscalltick uint32 // incremented on every system call
sysmontick sysmontick // last tick observed by sysmon
m muintptr // back-link to associated m (nil if idle)
mcache *mcache
pcache pageCache
raceprocctx uintptr
deferpool []*_defer // pool of available defer structs (see panic.go)
deferpoolbuf [32]*_defer
// Cache of goroutine ids, amortizes accesses to runtime·sched.goidgen.
goidcache uint64
goidcacheend uint64
// Queue of runnable goroutines. Accessed without lock.
runqhead uint32
runqtail uint32
runq [256]guintptr
// runnext, if non-nil, is a runnable G that was ready'd by
// the current G and should be run next instead of what's in
// runq if there's time remaining in the running G's time
// slice. It will inherit the time left in the current time
// slice. If a set of goroutines is locked in a
// communicate-and-wait pattern, this schedules that set as a
// unit and eliminates the (potentially large) scheduling
// latency that otherwise arises from adding the ready'd
// goroutines to the end of the run queue.
//
// Note that while other P's may atomically CAS this to zero,
// only the owner P can CAS it to a valid G.
runnext guintptr
// Available G's (status == Gdead)
gFree struct {
gList
n int32
}
sudogcache []*sudog
sudogbuf [128]*sudog
// Cache of mspan objects from the heap.
mspancache struct {
// We need an explicit length here because this field is used
// in allocation codepaths where write barriers are not allowed,
// and eliminating the write barrier/keeping it eliminated from
// slice updates is tricky, more so than just managing the length
// ourselves.
len int
buf [128]*mspan
}
// Cache of a single pinner object to reduce allocations from repeated
// pinner creation.
pinnerCache *pinner
trace pTraceState
palloc persistentAlloc // per-P to avoid mutex
// Per-P GC state
gcAssistTime int64 // Nanoseconds in assistAlloc
gcFractionalMarkTime int64 // Nanoseconds in fractional mark worker (atomic)
// limiterEvent tracks events for the GC CPU limiter.
limiterEvent limiterEvent
// gcMarkWorkerMode is the mode for the next mark worker to run in.
// That is, this is used to communicate with the worker goroutine
// selected for immediate execution by
// gcController.findRunnableGCWorker. When scheduling other goroutines,
// this field must be set to gcMarkWorkerNotWorker.
gcMarkWorkerMode gcMarkWorkerMode
// gcMarkWorkerStartTime is the nanotime() at which the most recent
// mark worker started.
gcMarkWorkerStartTime int64
// gcw is this P's GC work buffer cache. The work buffer is
// filled by write barriers, drained by mutator assists, and
// disposed on certain GC state transitions.
gcw gcWork
// wbBuf is this P's GC write barrier buffer.
//
// TODO: Consider caching this in the running G.
wbBuf wbBuf
runSafePointFn uint32 // if 1, run sched.safePointFn at next safe point
// statsSeq is a counter indicating whether this P is currently
// writing any stats. Its value is even when not, odd when it is.
statsSeq atomic.Uint32
// Timer heap.
timers timers
// maxStackScanDelta accumulates the amount of stack space held by
// live goroutines (i.e. those eligible for stack scanning).
// Flushed to gcController.maxStackScan once maxStackScanSlack
// or -maxStackScanSlack is reached.
maxStackScanDelta int64
// gc-time statistics about current goroutines
// Note that this differs from maxStackScan in that this
// accumulates the actual stack observed to be used at GC time (hi - sp),
// not an instantaneous measure of the total stack size that might need
// to be scanned (hi - lo).
scannedStackSize uint64 // stack size of goroutines scanned by this P
scannedStacks uint64 // number of goroutines scanned by this P
// preempt is set to indicate that this P should be enter the
// scheduler ASAP (regardless of what G is running on it).
preempt bool
// gcStopTime is the nanotime timestamp that this P last entered _Pgcstop.
gcStopTime int64
// Padding is no longer needed. False sharing is now not a worry because p is large enough
// that its size class is an integer multiple of the cache line size (for any of our architectures).
}
type g struct {
// Stack parameters.
// stack describes the actual stack memory: [stack.lo, stack.hi).
// stackguard0 is the stack pointer compared in the Go stack growth prologue.
// It is stack.lo+StackGuard normally, but can be StackPreempt to trigger a preemption.
// stackguard1 is the stack pointer compared in the //go:systemstack stack growth prologue.
// It is stack.lo+StackGuard on g0 and gsignal stacks.
// It is ~0 on other goroutine stacks, to trigger a call to morestackc (and crash).
stack stack // offset known to runtime/cgo
stackguard0 uintptr // offset known to liblink
stackguard1 uintptr // offset known to liblink
_panic *_panic // innermost panic - offset known to liblink
_defer *_defer // innermost defer
m *m // current m; offset known to arm liblink
sched gobuf
syscallsp uintptr // if status==Gsyscall, syscallsp = sched.sp to use during gc
syscallpc uintptr // if status==Gsyscall, syscallpc = sched.pc to use during gc
syscallbp uintptr // if status==Gsyscall, syscallbp = sched.bp to use in fpTraceback
stktopsp uintptr // expected sp at top of stack, to check in traceback
// param is a generic pointer parameter field used to pass
// values in particular contexts where other storage for the
// parameter would be difficult to find. It is currently used
// in four ways:
// 1. When a channel operation wakes up a blocked goroutine, it sets param to
// point to the sudog of the completed blocking operation.
// 2. By gcAssistAlloc1 to signal back to its caller that the goroutine completed
// the GC cycle. It is unsafe to do so in any other way, because the goroutine's
// stack may have moved in the meantime.
// 3. By debugCallWrap to pass parameters to a new goroutine because allocating a
// closure in the runtime is forbidden.
// 4. When a panic is recovered and control returns to the respective frame,
// param may point to a savedOpenDeferState.
param unsafe.Pointer
atomicstatus atomic.Uint32
stackLock uint32 // sigprof/scang lock; TODO: fold in to atomicstatus
goid uint64
schedlink guintptr
waitsince int64 // approx time when the g become blocked
waitreason waitReason // if status==Gwaiting
preempt bool // preemption signal, duplicates stackguard0 = stackpreempt
preemptStop bool // transition to _Gpreempted on preemption; otherwise, just deschedule
preemptShrink bool // shrink stack at synchronous safe point
// asyncSafePoint is set if g is stopped at an asynchronous
// safe point. This means there are frames on the stack
// without precise pointer information.
asyncSafePoint bool
paniconfault bool // panic (instead of crash) on unexpected fault address
gcscandone bool // g has scanned stack; protected by _Gscan bit in status
throwsplit bool // must not split stack
// activeStackChans indicates that there are unlocked channels
// pointing into this goroutine's stack. If true, stack
// copying needs to acquire channel locks to protect these
// areas of the stack.
activeStackChans bool
// parkingOnChan indicates that the goroutine is about to
// park on a chansend or chanrecv. Used to signal an unsafe point
// for stack shrinking.
parkingOnChan atomic.Bool
// inMarkAssist indicates whether the goroutine is in mark assist.
// Used by the execution tracer.
inMarkAssist bool
coroexit bool // argument to coroswitch_m
raceignore int8 // ignore race detection events
nocgocallback bool // whether disable callback from C
tracking bool // whether we're tracking this G for sched latency statistics
trackingSeq uint8 // used to decide whether to track this G
trackingStamp int64 // timestamp of when the G last started being tracked
runnableTime int64 // the amount of time spent runnable, cleared when running, only used when tracking
lockedm muintptr
fipsIndicator uint8
sig uint32
writebuf []byte
sigcode0 uintptr
sigcode1 uintptr
sigpc uintptr
parentGoid uint64 // goid of goroutine that created this goroutine
gopc uintptr // pc of go statement that created this goroutine
ancestors *[]ancestorInfo // ancestor information goroutine(s) that created this goroutine (only used if debug.tracebackancestors)
startpc uintptr // pc of goroutine function
racectx uintptr
waiting *sudog // sudog structures this g is waiting on (that have a valid elem ptr); in lock order
cgoCtxt []uintptr // cgo traceback context
labels unsafe.Pointer // profiler labels
timer *timer // cached timer for time.Sleep
sleepWhen int64 // when to sleep until
selectDone atomic.Uint32 // are we participating in a select and did someone win the race?
// goroutineProfiled indicates the status of this goroutine's stack for the
// current in-progress goroutine profile
goroutineProfiled goroutineProfileStateHolder
coroarg *coro // argument during coroutine transfers
syncGroup *synctestGroup
// Per-G tracer state.
trace gTraceState
// Per-G GC state
// gcAssistBytes is this G's GC assist credit in terms of
// bytes allocated. If this is positive, then the G has credit
// to allocate gcAssistBytes bytes without assisting. If this
// is negative, then the G must correct this by performing
// scan work. We track this in bytes to make it fast to update
// and check for debt in the malloc hot path. The assist ratio
// determines how this corresponds to scan work debt.
gcAssistBytes int64
}