
前几天 PDD 面试被问到这个问题,吟唱 GMP 调度八股,但是很明显面试官很不满意,指出不要背八股,他想听的 runtime 的调度过程。(尽管可能还是因为我八股看的不够多)特查看源码理解一下(,顺带总结一下 GMP 模型。
调度模型
首先总结一下但凡涉及到 golang 八股就肯定绕不开的 GMP 调度。
GM 模型
Golang 1.1 前, 使用的是 GM 调度模型。 G
即 goroutine
,代表 go 协程;M
即Machine
,是处理线程操作的结构体,直接与操作系统进行交互,可以直接视作 OS 线程。新建的 G
会被放到一个全局队列中等待M
的处理。
来看一下早期 1.0.1 版本的调度部分代码:
// One round of scheduler: find a goroutine and run it.// The argument is the goroutine that was running before// schedule was called, or nil if this is the first call.// Never returns.static voidschedule(G *gp){ int32 hz; uint32 v;
schedlock(); if(gp != nil) { // Just finished running gp. // 解绑当前的 G 和 M,全局队列的 G 运行数-- gp->m = nil; runtime·sched.grunning--;
// atomic { mcpu-- } v = runtime·xadd(&runtime·sched.atomic, -1<<mcpuShift); if(atomic_mcpu(v) > maxgomaxprocs) runtime·throw("negative mcpu in scheduler");
switch(gp->status){ case Grunnable: // 为什么就绪状态的会被调度? case Gdead: // Shouldn't have been running! // 唉,你怎么死了 runtime·throw("bad gp->status in sched"); case Grunning: gp->status = Grunnable; // 回调度队列 gput(gp); break; case Gmoribund: // 将死的 go 协程,销毁资源 gp->status = Gdead; // 埋了 if(gp->lockedm) { gp->lockedm = nil; m->lockedg = nil; } gp->idlem = nil; unwindstack(gp, nil); // 释放栈空间 gfput(gp); // 回收 G 到空闲列表 if(--runtime·sched.gcount == 0) runtime·exit(0); break; } // 如果 gp 设置了 readyonstop(表示它执行完需要唤醒某个 G) if(gp->readyonstop){ gp->readyonstop = 0; readylocked(gp); } } else if(m->helpgc) { // 清理 GC 辅助协程 // Bootstrap m or new m started by starttheworld. // atomic { mcpu-- } v = runtime·xadd(&runtime·sched.atomic, -1<<mcpuShift); if(atomic_mcpu(v) > maxgomaxprocs) runtime·throw("negative mcpu in scheduler"); // Compensate for increment in starttheworld(). runtime·sched.grunning--; m->helpgc = 0; } else if(m->nextg != nil) { // New m started by matchmg. // m 被分配任务 } else { runtime·throw("invalid m state in scheduler"); }
// Find (or wait for) g to run. Unlocks runtime·sched. // 从队列中找下一个 g 来运行 gp = nextgandunlock(); gp->readyonstop = 0; gp->status = Grunning; m->curg = gp; gp->m = m;
// Check whether the profiler needs to be turned on or off. hz = runtime·sched.profilehz; if(m->profilehz != hz) runtime·resetcpuprofiler(hz);
if(gp->sched.pc == (byte*)runtime·goexit) { // kickoff runtime·gogocall(&gp->sched, (void(*)(void))gp->entry); } runtime·gogo(&gp->sched, 0); // 真正切换到 gp 的执行上下文}
首先schedlock()
对调度器加锁保护全局队列。然后判断gp
的状态,如果已经完成,则彻底销毁;反之则正常回收当前 goroutine。然后从队列中取下一个 g 来运行。
static G*nextgandunlock(void){ G *gp; uint32 v;
top: if(atomic_mcpu(runtime·sched.atomic) >= maxgomaxprocs) runtime·throw("negative mcpu");
// If there is a g waiting as m->nextg, the mcpu++ // happened before it was passed to mnextg. if(m->nextg != nil) { gp = m->nextg; m->nextg = nil; schedunlock();
return gp; }......
GM 模型的问题?
-
从上面的代码可以看出,每次调度都需要获取一遍全局锁,导致频繁的锁竞争。
-
M 之间频繁通过
G.nextg
传递 g,导致额外开销。 -
同时每个 M 都有自己的独立内存缓存
M.mcache
,造成资源浪费+数据局部性差。 -
系统调用时 M 经常被阻塞或解除阻塞,造成很多额外开销
GMP 调度模型
GM 模型的问题就是为什么要引入 P 的原因。P
即为Processor
,表示 M
的执行上下文。每一个 P 都和一个 M 相互绑定。
每个 P 都会维护自己的本地队列,减轻了对全局队列的依赖,从而减少锁的竞争。同时带来的工作窃取机制也减少了 M 的空转,提高了资源利用率。
G, M, P
GMP 调度流程
-
go func() 创建协程后会加入到一个 P 的本地队列中; 如果本地队列已满则加入全局队列。
-
每个 P 和一个 M 绑定, M 从 P 的本地队列中获取 G 来执行。若 M 绑定的 P 本地队列为空怎么会从其他 P 的本地队列中偷取 G。
-
若 G 因系统调用阻塞, P 会和当前 M 解绑并寻找新的空闲 M,若没有空闲的 M 则新建一个 M;若 G 因通道或网络 I/O 阻塞,则 M 会寻找其他处于 Grunnable 状态的 G;原来被阻塞的 G 恢复后会重新进入 P 的本地队列等待执行。
-
G 执行完成后释放资源。
schedule()
func schedule() {mp := getg().m // 获取当前 G 所在的 M......top:pp := mp.p.ptr() // 获取当前 M 所属的 Ppp.preempt = false......gp, inheritTime, tryWakeP := findRunnable() // blocks until work is available......execute(gp, inheritTime)}#经过众多版本迭代,1.23 版的 schedule() 实现复杂了很多,其中选择具体 G 的代码封装在
findRunnable()
中,这个函数实在太长,这里只简单讲一下其逻辑。
findRunnable()
做的是:
找出一个可执行的 goroutine(gp),并返回是否继承时间片(inheritTime),以及是否唤醒新的 P(tryWakeP)。
首先尝试调度 traceReader 或 GC worker,这些是 runtime 系统 goroutine,优先级高。然后查看本地队列。如果本地队列是空的,再尝试从全局队列获取 G。为了防止某个 P 的本地队列饱和而其他 P 饿死,每隔一段频率会优先尝试取全局队列的 G。
如果上面都没有找到任务,并且 spinning M 数量没超过限制,尝试从其他 P 的队列中偷任务。成功偷到就执行;偷不到但发现有新 timer 或 GC 任务则重试。
最后如果没有任务了,则准备释放 P,阻塞 M。
那么 runtime 到底做了什么?
编译器会将 go func() { … } 翻译成 runtime.newproc() 语句:
// Create a new g running fn.// Put it on the queue of g's waiting to run.// The compiler turns a go statement into a call to this.func newproc(fn *funcval) { gp := getg() // 获取当前的程序计数器(返回地址),用于调试和分析栈踪迹时追踪调用来源 pc := sys.GetCallerPC() // 切换到当前 M 的系统栈 systemstack(func() { newg := newproc1(fn, gp, pc, false, waitReasonZero)
// 获取当前线程的 P 并将新创建的 newg 加入到本地队列 pp := getg().m.p.ptr() runqput(pp, newg, true)
if mainStarted { wakep() } })}
再来看一下 newproc1 的内部逻辑
// Create a new g in state _Grunnable (or _Gwaiting if parked is true), starting at fn.// callerpc is the address of the go statement that created this. The caller is responsible// for adding the new g to the scheduler. If parked is true, waitreason must be non-zero.func newproc1(fn *funcval, callergp *g, callerpc uintptr, parked bool, waitreason waitReason) *g { if fn == nil { fatal("go of nil func value") }
mp := acquirem() // disable preemption because we hold M and P in local vars. pp := mp.p.ptr() newg := gfget(pp) if newg == nil { newg = malg(stackMin) casgstatus(newg, _Gidle, _Gdead) allgadd(newg) // publishes with a g->status of Gdead so GC scanner doesn't look at uninitialized stack. }
......
totalSize := uintptr(4*goarch.PtrSize + sys.MinFrameSize) // extra space in case of reads slightly beyond frame totalSize = alignUp(totalSize, sys.StackAlign) sp := newg.stack.hi - totalSize
......
memclrNoHeapPointers(unsafe.Pointer(&newg.sched), unsafe.Sizeof(newg.sched)) newg.sched.sp = sp newg.stktopsp = sp newg.sched.pc = abi.FuncPCABI0(goexit) + sys.PCQuantum // +PCQuantum so that previous instruction is in same function newg.sched.g = guintptr(unsafe.Pointer(newg)) gostartcallfn(&newg.sched, fn) newg.parentGoid = callergp.goid newg.gopc = callerpc newg.ancestors = saveAncestors(callergp) newg.startpc = fn.fn
...... // 判断是否系统 G,进行标记
// Track initial transition? newg.trackingSeq = uint8(cheaprand()) if newg.trackingSeq%gTrackingPeriod == 0 { newg.tracking = true } gcController.addScannableStack(pp, int64(newg.stack.hi-newg.stack.lo))
.......// trace 跟踪
// Set up race context. if raceenabled { newg.racectx = racegostart(callerpc) newg.raceignore = 0 if newg.labels != nil { // See note in proflabel.go on labelSync's role in synchronizing // with the reads in the signal handler. racereleasemergeg(newg, unsafe.Pointer(&labelSync)) } } releasem(mp) // 释放锁
return newg}
在获取到当前 M 后,首先尝试从当前 P 的空闲 G 池中复用,若没有空闲 G,则分配新栈。之后初始化新 G 的调度上下文和元数据。最后返回这个 G。
总结
最后总结一下标题的答案:
当使用 go 关键字新建一个 goroutine 时, runtime 会调用 newproc 生成新的 G。
-
在 newproc 中,用 systemstack() 切换到 (g0 的) 系统栈,调用 newproc1。
-
在 newproc1 中,首先尝试复用 P 中空闲的 G,若没有,则新建一个 G 实例并为其分配栈空间。
-
初始化调度上下文 (sched)。
-
分配唯一的 goid,记录父 G 的信息、调用栈、标签等元数据。
-
设置 G 的初始状态为 _Grunnable,即可运行状态。
-
将这个 G 加入当前 P 的本地队列,让调度器 schedule() 后续执行它。(若已满则进入全局队列)
此外, runtime还会做一些tracing、GC 标记、新栈注册、race 检测等附加工作。
附录
type p struct { id int32 status uint32 // one of pidle/prunning/... link puintptr schedtick uint32 // incremented on every scheduler call syscalltick uint32 // incremented on every system call sysmontick sysmontick // last tick observed by sysmon m muintptr // back-link to associated m (nil if idle) mcache *mcache pcache pageCache raceprocctx uintptr
deferpool []*_defer // pool of available defer structs (see panic.go) deferpoolbuf [32]*_defer
// Cache of goroutine ids, amortizes accesses to runtime·sched.goidgen. goidcache uint64 goidcacheend uint64
// Queue of runnable goroutines. Accessed without lock. runqhead uint32 runqtail uint32 runq [256]guintptr // runnext, if non-nil, is a runnable G that was ready'd by // the current G and should be run next instead of what's in // runq if there's time remaining in the running G's time // slice. It will inherit the time left in the current time // slice. If a set of goroutines is locked in a // communicate-and-wait pattern, this schedules that set as a // unit and eliminates the (potentially large) scheduling // latency that otherwise arises from adding the ready'd // goroutines to the end of the run queue. // // Note that while other P's may atomically CAS this to zero, // only the owner P can CAS it to a valid G. runnext guintptr
// Available G's (status == Gdead) gFree struct { gList n int32 }
sudogcache []*sudog sudogbuf [128]*sudog
// Cache of mspan objects from the heap. mspancache struct { // We need an explicit length here because this field is used // in allocation codepaths where write barriers are not allowed, // and eliminating the write barrier/keeping it eliminated from // slice updates is tricky, more so than just managing the length // ourselves. len int buf [128]*mspan }
// Cache of a single pinner object to reduce allocations from repeated // pinner creation. pinnerCache *pinner
trace pTraceState
palloc persistentAlloc // per-P to avoid mutex
// Per-P GC state gcAssistTime int64 // Nanoseconds in assistAlloc gcFractionalMarkTime int64 // Nanoseconds in fractional mark worker (atomic)
// limiterEvent tracks events for the GC CPU limiter. limiterEvent limiterEvent
// gcMarkWorkerMode is the mode for the next mark worker to run in. // That is, this is used to communicate with the worker goroutine // selected for immediate execution by // gcController.findRunnableGCWorker. When scheduling other goroutines, // this field must be set to gcMarkWorkerNotWorker. gcMarkWorkerMode gcMarkWorkerMode // gcMarkWorkerStartTime is the nanotime() at which the most recent // mark worker started. gcMarkWorkerStartTime int64
// gcw is this P's GC work buffer cache. The work buffer is // filled by write barriers, drained by mutator assists, and // disposed on certain GC state transitions. gcw gcWork
// wbBuf is this P's GC write barrier buffer. // // TODO: Consider caching this in the running G. wbBuf wbBuf
runSafePointFn uint32 // if 1, run sched.safePointFn at next safe point
// statsSeq is a counter indicating whether this P is currently // writing any stats. Its value is even when not, odd when it is. statsSeq atomic.Uint32
// Timer heap. timers timers
// maxStackScanDelta accumulates the amount of stack space held by // live goroutines (i.e. those eligible for stack scanning). // Flushed to gcController.maxStackScan once maxStackScanSlack // or -maxStackScanSlack is reached. maxStackScanDelta int64
// gc-time statistics about current goroutines // Note that this differs from maxStackScan in that this // accumulates the actual stack observed to be used at GC time (hi - sp), // not an instantaneous measure of the total stack size that might need // to be scanned (hi - lo). scannedStackSize uint64 // stack size of goroutines scanned by this P scannedStacks uint64 // number of goroutines scanned by this P
// preempt is set to indicate that this P should be enter the // scheduler ASAP (regardless of what G is running on it). preempt bool
// gcStopTime is the nanotime timestamp that this P last entered _Pgcstop. gcStopTime int64
// Padding is no longer needed. False sharing is now not a worry because p is large enough // that its size class is an integer multiple of the cache line size (for any of our architectures).}
type g struct { // Stack parameters. // stack describes the actual stack memory: [stack.lo, stack.hi). // stackguard0 is the stack pointer compared in the Go stack growth prologue. // It is stack.lo+StackGuard normally, but can be StackPreempt to trigger a preemption. // stackguard1 is the stack pointer compared in the //go:systemstack stack growth prologue. // It is stack.lo+StackGuard on g0 and gsignal stacks. // It is ~0 on other goroutine stacks, to trigger a call to morestackc (and crash). stack stack // offset known to runtime/cgo stackguard0 uintptr // offset known to liblink stackguard1 uintptr // offset known to liblink
_panic *_panic // innermost panic - offset known to liblink _defer *_defer // innermost defer m *m // current m; offset known to arm liblink sched gobuf syscallsp uintptr // if status==Gsyscall, syscallsp = sched.sp to use during gc syscallpc uintptr // if status==Gsyscall, syscallpc = sched.pc to use during gc syscallbp uintptr // if status==Gsyscall, syscallbp = sched.bp to use in fpTraceback stktopsp uintptr // expected sp at top of stack, to check in traceback // param is a generic pointer parameter field used to pass // values in particular contexts where other storage for the // parameter would be difficult to find. It is currently used // in four ways: // 1. When a channel operation wakes up a blocked goroutine, it sets param to // point to the sudog of the completed blocking operation. // 2. By gcAssistAlloc1 to signal back to its caller that the goroutine completed // the GC cycle. It is unsafe to do so in any other way, because the goroutine's // stack may have moved in the meantime. // 3. By debugCallWrap to pass parameters to a new goroutine because allocating a // closure in the runtime is forbidden. // 4. When a panic is recovered and control returns to the respective frame, // param may point to a savedOpenDeferState. param unsafe.Pointer atomicstatus atomic.Uint32 stackLock uint32 // sigprof/scang lock; TODO: fold in to atomicstatus goid uint64 schedlink guintptr waitsince int64 // approx time when the g become blocked waitreason waitReason // if status==Gwaiting
preempt bool // preemption signal, duplicates stackguard0 = stackpreempt preemptStop bool // transition to _Gpreempted on preemption; otherwise, just deschedule preemptShrink bool // shrink stack at synchronous safe point
// asyncSafePoint is set if g is stopped at an asynchronous // safe point. This means there are frames on the stack // without precise pointer information. asyncSafePoint bool
paniconfault bool // panic (instead of crash) on unexpected fault address gcscandone bool // g has scanned stack; protected by _Gscan bit in status throwsplit bool // must not split stack // activeStackChans indicates that there are unlocked channels // pointing into this goroutine's stack. If true, stack // copying needs to acquire channel locks to protect these // areas of the stack. activeStackChans bool // parkingOnChan indicates that the goroutine is about to // park on a chansend or chanrecv. Used to signal an unsafe point // for stack shrinking. parkingOnChan atomic.Bool // inMarkAssist indicates whether the goroutine is in mark assist. // Used by the execution tracer. inMarkAssist bool coroexit bool // argument to coroswitch_m
raceignore int8 // ignore race detection events nocgocallback bool // whether disable callback from C tracking bool // whether we're tracking this G for sched latency statistics trackingSeq uint8 // used to decide whether to track this G trackingStamp int64 // timestamp of when the G last started being tracked runnableTime int64 // the amount of time spent runnable, cleared when running, only used when tracking lockedm muintptr fipsIndicator uint8 sig uint32 writebuf []byte sigcode0 uintptr sigcode1 uintptr sigpc uintptr parentGoid uint64 // goid of goroutine that created this goroutine gopc uintptr // pc of go statement that created this goroutine ancestors *[]ancestorInfo // ancestor information goroutine(s) that created this goroutine (only used if debug.tracebackancestors) startpc uintptr // pc of goroutine function racectx uintptr waiting *sudog // sudog structures this g is waiting on (that have a valid elem ptr); in lock order cgoCtxt []uintptr // cgo traceback context labels unsafe.Pointer // profiler labels timer *timer // cached timer for time.Sleep sleepWhen int64 // when to sleep until selectDone atomic.Uint32 // are we participating in a select and did someone win the race?
// goroutineProfiled indicates the status of this goroutine's stack for the // current in-progress goroutine profile goroutineProfiled goroutineProfileStateHolder
coroarg *coro // argument during coroutine transfers syncGroup *synctestGroup
// Per-G tracer state. trace gTraceState
// Per-G GC state
// gcAssistBytes is this G's GC assist credit in terms of // bytes allocated. If this is positive, then the G has credit // to allocate gcAssistBytes bytes without assisting. If this // is negative, then the G must correct this by performing // scan work. We track this in bytes to make it fast to update // and check for debt in the malloc hot path. The assist ratio // determines how this corresponds to scan work debt. gcAssistBytes int64}