P的状态转换
摘要:= t { // 如果未触发sysretake 尝试调整p.sysmontick.syscalltick次数 pd.syscalltick = uint32(t) pd.syscallwhen = now continue } // On the one hand we don't want to retake Ps if there is no other work to do, // but on the other hand we want to retake them eventually // because they can prevent the sysmon thread from deep sleep. // 如果P的队列为空 且已有空闲的P或者有P绑定的M处于spinning(寻找可运行的G)且系统调用时间小于10ms if runqempty(_p_) && atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) > 0 && pd.syscallwhen+10*1000*1000 > now { continue } // Drop allpLock so we can take sched.lock. unlock(&allpLock) // Need to decrement number of idle locked M's // (pretending that one more is running) before the CAS. // Otherwise the M from which we retake can exit the syscall, // increment nmidle and report deadlock. incidlelocked(-1) //。func entersyscall() { reentersyscall(getcallerpc(), getcallersp()) } func reentersyscall(pc, sp uintptr) { _g_ := getg() // Disable preemption because during this function g is in Gsyscall status, // but can have inconsistent g->sched, do not let GC observe it. _g_.m.locks++ // Entersyscall must not call any function that might split/grow the stack. // (See details in comment above.) // Catch calls that might, by replacing the stack guard with something that // will trip any stack check and leaving a flag to tell newstack to die. _g_.stackguard0 = stackPreempt _g_.throwsplit = true // Leave SP around for GC and traceback. save(pc, sp) _g_.syscallsp = sp _g_.syscallpc = pc casgstatus(_g_, _Grunning, _Gsyscall) //切换G的状态 if _g_.syscallsp < _g_.stack.lo || _g_.stack.hi < _g_.syscallsp { systemstack(func() { print("entersyscall inconsistent ", hex(_g_.syscallsp), " [", hex(_g_.stack.lo), ",", hex(_g_.stack.hi), "]\n") throw("entersyscall") }) } // 按需记录调用信息 if trace.enabled { systemstack(traceGoSysCall) // systemstack itself clobbers g.sched.{pc,sp} and we might // need them later when the G is genuinely blocked in a // syscall save(pc, sp) } if atomic.Load(&sched.sysmonwait)。
声明
下面的分析均基于Golang1.14版本。
状态图
P状态图
主要流程
1.procresize创建,Go程序初始化时初始CPU数量个P。
2.acquirep进入运行状态。当有新的G创建或waiting状态的G变得可运行,并且有P空闲时,通过acquirep获取p来运行最新的G。
3.entersyscall,进入系统调用状态。当P绑定的M正在运行的G进入系统调用状态,P也随之进入系统调用状态。
4.exitsyscallfast,退出系统调用状态。如果G快速退出系统调用,此时P理应等待G,当G退出系统调用时,继续运行G的代码。
5.retake,如果G长时间处于系统调用,此时P应与对应的G,M解绑。考虑到如果所有G都阻塞在系统调用,则整个Go程序都阻塞在系统调用,无法执行用户代码,因此需要一个将长时间陷入系统调用的P解绑M,重新绑定空闲的M执行用户代码。
6.releasep,让P进入空闲状态。如果P上可运行的G的队列空闲,并且无法从其它P中偷取G,此时应让P进入空闲状态。
procresize函数
1.谁在什么时候调用?程序初始化时或者修改CPU数量时(这种情况暂不考虑)调用。
2.P的状态变化?从无到有的过程,初始化CPU数量个P,所有P的状态都是_PIdle。
// Change number of processors. The world is stopped, sched is locked. // gcworkbufs are not being modified by either the GC or // the write barrier code. // Returns list of Ps with local work, they need to be scheduled by the caller. func procresize(nprocs int32) *p { old := gomaxprocs // 在这之前P的数量 if old < 0 || nprocs <= 0 { throw("procresize: invalid arg") } if trace.enabled { traceGomaxprocs(nprocs) } // update statistics now := nanotime() if sched.procresizetime != 0 { sched.totaltime += int64(old) * (now - sched.procresizetime) // 此处统计的应该是总的P的存在的时间 } sched.procresizetime = now // Grow allp if necessary. if nprocs > int32(len(allp)) { // 根据需要扩展allp来存储P // Synchronize with retake, which could be running // concurrently since it doesn't run on a P. lock(&allpLock) if nprocs <= int32(cap(allp)) { allp = allp[:nprocs] } else { nallp := make([]*p, nprocs) // Copy everything up to allp's cap so we // never lose old allocated Ps. copy(nallp, allp[:cap(allp)]) allp = nallp } unlock(&allpLock) } // initialize new P's for i := old; i < nprocs; i++ { // 将新创建的初始化 然后投入allp中 pp := allp[i] if pp == nil { pp = new(p) } pp.init(i) // 初始化P 主要是分配mcache atomicstorep(unsafe.Pointer(&allp[i]), unsafe.Pointer(pp)) // 将创建的P放入allp中 } _g_ := getg() if _g_.m.p != 0 && _g_.m.p.ptr().id < nprocs { // 如果当前M绑定的P仍有效 // continue to use the current P _g_.m.p.ptr().status = _Prunning _g_.m.p.ptr().mcache.prepareForSweep() } else { // release the current P and acquire allp[0]. // // We must do this before destroying our current P // because p.destroy itself has write barriers, so we // need to do that from a valid P. if _g_.m.p != 0 { if trace.enabled { // Pretend that we were descheduled // and then scheduled again to keep // the trace sane. traceGoSched() traceProcStop(_g_.m.p.ptr()) } _g_.m.p.ptr().m = 0 // M绑定的P已无效 解绑 } // M绑定新的P(allp[0]) _g_.m.p = 0 p := allp[0] p.m = 0 p.status = _Pidle acquirep(p) if trace.enabled { traceGoStart() } } // g.m.p is now set, so we no longer need mcache0 for bootstrapping. mcache0 = nil // mcache0是为没有P的情况下给第一个P分配mcache 此时已经有P 则不需要mcache0 // release resources from unused P's for i := nprocs; i < old; i++ { // 如果allp缩小 则释放多余的P p := allp[i] p.destroy() // can't free P itself because it can be referenced by an M in syscall } // Trim allp. if int32(len(allp)) != nprocs { // 缩小切片allp lock(&allpLock) allp = allp[:nprocs] unlock(&allpLock) } var runnablePs *p for i := nprocs - 1; i >= 0; i-- { // 将除了当前的P以外的P 进行初始化 p := allp[i] if _g_.m.p.ptr() == p { continue } p.status = _Pidle if runqempty(p) { // 如果当前P没有要运行的G 将P放入Idle队列 pidleput(p) } else { p.m.set(mget()) // 从空闲的M队列中取出M 将其和P进行绑定 p.link.set(runnablePs) runnablePs = p } } stealOrder.reset(uint32(nprocs)) var int32p *int32 = &gomaxprocs // make compiler check that gomaxprocs is an int32 atomic.Store((*uint32)(unsafe.Pointer(int32p)), uint32(nprocs)) // 设置gomaxprocs为当前P的数量 return runnablePs }
acquirep
1.谁在什么时候调用?当有新的G创建(newproc)或有G从waiting进入Runnable状态(ready)时,如果有空闲的P且没有正在找可运行的G的P,则唤醒一个P来运行这个刚创建的G。
2.P的状态转换?P从原先的Idle状态进入Running状态。
entersyscall
1.谁在什么时候调用?当P绑定的M要进入系统调用或者CGo调用时,触发entersyscall。
2.P的状态转换?P从Running状态进入Syscall状态,并且和当前绑定的M解绑。
func entersyscall() { reentersyscall(getcallerpc(), getcallersp()) } func reentersyscall(pc, sp uintptr) { _g_ := getg() // Disable preemption because during this function g is in Gsyscall status, // but can have inconsistent g->sched, do not let GC observe it. _g_.m.locks++ // Entersyscall must not call any function that might split/grow the stack. // (See details in comment above.) // Catch calls that might, by replacing the stack guard with something that // will trip any stack check and leaving a flag to tell newstack to die. _g_.stackguard0 = stackPreempt _g_.throwsplit = true // Leave SP around for GC and traceback. save(pc, sp) _g_.syscallsp = sp _g_.syscallpc = pc casgstatus(_g_, _Grunning, _Gsyscall) //切换G的状态 if _g_.syscallsp < _g_.stack.lo || _g_.stack.hi < _g_.syscallsp { systemstack(func() { print("entersyscall inconsistent ", hex(_g_.syscallsp), " [", hex(_g_.stack.lo), ",", hex(_g_.stack.hi), "]\n") throw("entersyscall") }) } // 按需记录调用信息 if trace.enabled { systemstack(traceGoSysCall) // systemstack itself clobbers g.sched.{pc,sp} and we might // need them later when the G is genuinely blocked in a // syscall save(pc, sp) } if atomic.Load(&sched.sysmonwait) != 0 { // 如果系统监控线程在等待 则执行下面的函数 具体不深究 systemstack(entersyscall_sysmon) save(pc, sp) } if _g_.m.p.ptr().runSafePointFn != 0 { // 如果runSafePointFn不为0 则执行 具体不深究 // runSafePointFn may stack split if run on this stack systemstack(runSafePointFn) save(pc, sp) } _g_.m.syscalltick = _g_.m.p.ptr().syscalltick _g_.sysblocktraced = true // m p 解绑 pp := _g_.m.p.ptr() pp.m = 0 _g_.m.oldp.set(pp) _g_.m.p = 0 atomic.Store(&pp.status, _Psyscall) //切换P的状态 if sched.gcwaiting != 0 { // 正在gc时 则调用以下代码 详细的不深究 systemstack(entersyscall_gcwait) save(pc, sp) } _g_.m.locks-- }
exitsyscallfast
1.谁在什么时候调用?进入系统调用的G,迅速退出系统调用后,此时P尚未和别的M绑定,于是M和P重新绑定,继续运行该G。
2.P的状态转换?P从Syscall状态进入Running状态,M和P重新绑定。
// 删除了部分无关代码 func exitsyscallfast(oldp *p) bool { _g_ := getg() // Freezetheworld sets stopwait but does not retake P's. if sched.stopwait == freezeStopWait { return false } // Try to re-acquire the last P. sysmon线程未剥夺P if oldp != nil && oldp.status == _Psyscall && atomic.Cas(&oldp.status, _Psyscall, _Pidle) { // There's a cpu for us, so we can run. // 切换p的状态 并且重新绑定m p wirep(oldp) exitsyscallfast_reacquired() return true } return false }
retake
1.谁在什么时候调用?sysmon(系统监控线程)在P进入Syscall一段时间后,将P从Syscall状态切为Idle状态。
2.P的状态转换?P从Syscall进入Idle状态。
func retake(now int64) uint32 { n := 0 // Prevent allp slice changes. This lock will be completely // uncontended unless we're already stopping the world. lock(&allpLock) // 给allp加锁 // We can't use a range loop over allp because we may // temporarily drop the allpLock. Hence, we need to re-fetch // allp each time around the loop. for i := 0; i < len(allp); i++ { _p_ := allp[i] if _p_ == nil { // This can happen if procresize has grown // allp but not yet created new Ps. continue } pd := &_p_.sysmontick s := _p_.status sysretake := false if s == _Prunning || s == _Psyscall { // Preempt G if it's running for too long. t := int64(_p_.schedtick) // schedtick表示p执行execute的次数 if int64(pd.schedtick) != t { // 调整p.sysmontick里的schedtick次数 pd.schedtick = uint32(t) pd.schedwhen = now } else if pd.schedwhen+forcePreemptNS <= now { // 如果P运行时间过长 则抢占P preemptone(_p_) // In case of syscall, preemptone() doesn't // work, because there is no M wired to P. sysretake = true // Syscal状态P未绑定M 无法抢占 } } if s == _Psyscall { // Retake P from syscall if it's there for more than 1 sysmon tick (at least 20us). t := int64(_p_.syscalltick) // 当P从Syscall状态退出时++ if !sysretake && int64(pd.syscalltick) != t { // 如果未触发sysretake 尝试调整p.sysmontick.syscalltick次数 pd.syscalltick = uint32(t) pd.syscallwhen = now continue } // On the one hand we don't want to retake Ps if there is no other work to do, // but on the other hand we want to retake them eventually // because they can prevent the sysmon thread from deep sleep. // 如果P的队列为空 且已有空闲的P或者有P绑定的M处于spinning(寻找可运行的G)且系统调用时间小于10ms if runqempty(_p_) && atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) > 0 && pd.syscallwhen+10*1000*1000 > now { continue } // Drop allpLock so we can take sched.lock. unlock(&allpLock) // Need to decrement number of idle locked M's // (pretending that one more is running) before the CAS. // Otherwise the M from which we retake can exit the syscall, // increment nmidle and report deadlock. incidlelocked(-1) // ??? if atomic.Cas(&_p_.status, s, _Pidle) { // P转为Idle状态 if trace.enabled { traceGoSysBlock(_p_) traceProcStop(_p_) } n++ _p_.syscalltick++ handoffp(_p_) // 尝试把P放入空闲队列中 } incidlelocked(1) lock(&allpLock) } } unlock(&allpLock) return uint32(n) } func handoffp(_p_ *p) { // handoffp must start an M in any situation where // findrunnable would return a G to run on _p_. // if it has local work, start it straight away if !runqempty(_p_) || sched.runqsize != 0 { // 如果本地队列不为空或者全局运行队列不为空 startm(_p_, false) return } // if it has GC work, start it straight away if gcBlackenEnabled != 0 && gcMarkWorkAvailable(_p_) { startm(_p_, false) return } // no local work, check that there are no spinning/idle M's, // otherwise our help is not required // 如果没有正在spinning状态的M 且 没有P处于Idle状态 则不退出 if atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) == 0 && atomic.Cas(&sched.nmspinning, 0, 1) { // TODO: fast atomic startm(_p_, true) return } lock(&sched.lock) if sched.gcwaiting != 0 { _p_.status = _Pgcstop sched.stopwait-- if sched.stopwait == 0 { notewakeup(&sched.stopnote) } unlock(&sched.lock) return } if _p_.runSafePointFn != 0 && atomic.Cas(&_p_.runSafePointFn, 1, 0) { sched.safePointFn(_p_) sched.safePointWait-- if sched.safePointWait == 0 { notewakeup(&sched.safePointNote) } } if sched.runqsize != 0 { // 如果全局G运行队列不为空 unlock(&sched.lock) startm(_p_, false) return } // If this is the last running P and nobody is polling network, // need to wakeup another M to poll network. // 如果是最后一个P 并且... if sched.npidle == uint32(gomaxprocs-1) && atomic.Load64(&sched.lastpoll) != 0 { unlock(&sched.lock) startm(_p_, false) return } if when := nobarrierWakeTime(_p_); when != 0 { wakeNetPoller(when) } pidleput(_p_) // 将P放入空闲队列中 unlock(&sched.lock) }
releasep
1.谁在什么时候调用?当P本地的可运行G的队列为空且无法从其它地方窃取可运行的G时,P和M解绑,进入空闲状态。
2.P的状态转换?P和M解绑,从Running进入Idle。
func releasep() *p { _g_ := getg() if _g_.m.p == 0 { throw("releasep: invalid arg") } _p_ := _g_.m.p.ptr() if _p_.m.ptr() != _g_.m || _p_.status != _Prunning { print("releasep: m=", _g_.m, " m->p=", _g_.m.p.ptr(), " p->m=", hex(_p_.m), " p->status=", _p_.status, "\n") throw("releasep: invalid p state") } if trace.enabled { traceProcStop(_g_.m.p.ptr()) } _g_.m.p = 0 _p_.m = 0 _p_.status = _Pidle return _p_ }
总结
1.P和G的状态转换息息相关,理解G的状态转换后,理解P的状态转顺理成章。
欢迎关注我们的微信公众号,每天学习Go知识