摘要:= t { // 如果未触发sysretake 尝试调整p.sysmontick.syscalltick次数 pd.syscalltick = uint32(t) pd.syscallwhen = now continue } // On the one hand we don't want to retake Ps if there is no other work to do, // but on the other hand we want to retake them eventually // because they can prevent the sysmon thread from deep sleep. // 如果P的队列为空 且已有空闲的P或者有P绑定的M处于spinning(寻找可运行的G)且系统调用时间小于10ms if runqempty(_p_) && atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) > 0 && pd.syscallwhen+10*1000*1000 > now { continue } // Drop allpLock so we can take sched.lock. unlock(&allpLock) // Need to decrement number of idle locked M's // (pretending that one more is running) before the CAS. // Otherwise the M from which we retake can exit the syscall, // increment nmidle and report deadlock. incidlelocked(-1) //。func entersyscall() { reentersyscall(getcallerpc(), getcallersp()) } func reentersyscall(pc, sp uintptr) { _g_ := getg() // Disable preemption because during this function g is in Gsyscall status, // but can have inconsistent g->sched, do not let GC observe it. _g_.m.locks++ // Entersyscall must not call any function that might split/grow the stack. // (See details in comment above.) // Catch calls that might, by replacing the stack guard with something that // will trip any stack check and leaving a flag to tell newstack to die. _g_.stackguard0 = stackPreempt _g_.throwsplit = true // Leave SP around for GC and traceback. save(pc, sp) _g_.syscallsp = sp _g_.syscallpc = pc casgstatus(_g_, _Grunning, _Gsyscall) //切换G的状态 if _g_.syscallsp < _g_.stack.lo || _g_.stack.hi < _g_.syscallsp { systemstack(func() { print("entersyscall inconsistent ", hex(_g_.syscallsp), " [", hex(_g_.stack.lo), ",", hex(_g_.stack.hi), "]\n") throw("entersyscall") }) } // 按需记录调用信息 if trace.enabled { systemstack(traceGoSysCall) // systemstack itself clobbers g.sched.{pc,sp} and we might // need them later when the G is genuinely blocked in a // syscall save(pc, sp) } if atomic.Load(&sched.sysmonwait)。

声明

下面的分析均基于Golang1.14版本。

状态图

P状态图

主要流程

1.procresize创建,Go程序初始化时初始CPU数量个P。

2.acquirep进入运行状态。当有新的G创建或waiting状态的G变得可运行,并且有P空闲时,通过acquirep获取p来运行最新的G。

3.entersyscall,进入系统调用状态。当P绑定的M正在运行的G进入系统调用状态,P也随之进入系统调用状态。

4.exitsyscallfast,退出系统调用状态。如果G快速退出系统调用,此时P理应等待G,当G退出系统调用时,继续运行G的代码。

5.retake,如果G长时间处于系统调用,此时P应与对应的G,M解绑。考虑到如果所有G都阻塞在系统调用,则整个Go程序都阻塞在系统调用,无法执行用户代码,因此需要一个将长时间陷入系统调用的P解绑M,重新绑定空闲的M执行用户代码。

6.releasep,让P进入空闲状态。如果P上可运行的G的队列空闲,并且无法从其它P中偷取G,此时应让P进入空闲状态。

procresize函数

1.谁在什么时候调用?程序初始化时或者修改CPU数量时(这种情况暂不考虑)调用。

2.P的状态变化?从无到有的过程,初始化CPU数量个P,所有P的状态都是_PIdle。

// Change number of processors. The world is stopped, sched is locked.
// gcworkbufs are not being modified by either the GC or
// the write barrier code.
// Returns list of Ps with local work, they need to be scheduled by the caller.
func procresize(nprocs int32) *p {
        old := gomaxprocs // 在这之前P的数量
        if old < 0 || nprocs <= 0 {
        throw("procresize: invalid arg")
    }
        if trace.enabled {
        traceGomaxprocs(nprocs)
    }

    // update statistics
    now := nanotime()
    if sched.procresizetime != 0 {
        sched.totaltime += int64(old) * (now - sched.procresizetime) // 此处统计的应该是总的P的存在的时间
    }
    sched.procresizetime = now

    // Grow allp if necessary.
    if nprocs > int32(len(allp)) { // 根据需要扩展allp来存储P
        // Synchronize with retake, which could be running
        // concurrently since it doesn't run on a P.
        lock(&allpLock)
        if nprocs <= int32(cap(allp)) {
            allp = allp[:nprocs]
        } else {
            nallp := make([]*p, nprocs)
            // Copy everything up to allp's cap so we
            // never lose old allocated Ps.
            copy(nallp, allp[:cap(allp)])
            allp = nallp
        }
        unlock(&allpLock)
    }

    // initialize new P's
    for i := old; i < nprocs; i++ { // 将新创建的初始化 然后投入allp中
        pp := allp[i]
        if pp == nil {
            pp = new(p)
        }
        pp.init(i) // 初始化P 主要是分配mcache
        atomicstorep(unsafe.Pointer(&allp[i]), unsafe.Pointer(pp)) // 将创建的P放入allp中
    }

    _g_ := getg()
    if _g_.m.p != 0 && _g_.m.p.ptr().id < nprocs { // 如果当前M绑定的P仍有效
        // continue to use the current P
        _g_.m.p.ptr().status = _Prunning
        _g_.m.p.ptr().mcache.prepareForSweep()
    } else {
        // release the current P and acquire allp[0].
        //
        // We must do this before destroying our current P
        // because p.destroy itself has write barriers, so we
        // need to do that from a valid P.
        if _g_.m.p != 0 {
            if trace.enabled {
                // Pretend that we were descheduled
                // and then scheduled again to keep
                // the trace sane.
                traceGoSched()
                traceProcStop(_g_.m.p.ptr())
            }
            _g_.m.p.ptr().m = 0 // M绑定的P已无效 解绑
        }
        // M绑定新的P(allp[0])
        _g_.m.p = 0
        p := allp[0]
        p.m = 0
        p.status = _Pidle
        acquirep(p)
        if trace.enabled {
            traceGoStart()
        }
    }

    // g.m.p is now set, so we no longer need mcache0 for bootstrapping.
    mcache0 = nil // mcache0是为没有P的情况下给第一个P分配mcache 此时已经有P 则不需要mcache0

    // release resources from unused P's
    for i := nprocs; i < old; i++ { // 如果allp缩小 则释放多余的P
        p := allp[i]
        p.destroy()
        // can't free P itself because it can be referenced by an M in syscall
    }

    // Trim allp.
    if int32(len(allp)) != nprocs { // 缩小切片allp
        lock(&allpLock)
        allp = allp[:nprocs]
        unlock(&allpLock)
    }

    var runnablePs *p
    for i := nprocs - 1; i >= 0; i-- { // 将除了当前的P以外的P 进行初始化
        p := allp[i]
        if _g_.m.p.ptr() == p {
            continue
        }
        p.status = _Pidle
        if runqempty(p) { // 如果当前P没有要运行的G 将P放入Idle队列
            pidleput(p)
        } else {
            p.m.set(mget()) // 从空闲的M队列中取出M 将其和P进行绑定
            p.link.set(runnablePs)
            runnablePs = p
        }
    }
    stealOrder.reset(uint32(nprocs))
    var int32p *int32 = &gomaxprocs // make compiler check that gomaxprocs is an int32
    atomic.Store((*uint32)(unsafe.Pointer(int32p)), uint32(nprocs)) // 设置gomaxprocs为当前P的数量
    return runnablePs
}

acquirep

1.谁在什么时候调用?当有新的G创建(newproc)或有G从waiting进入Runnable状态(ready)时,如果有空闲的P且没有正在找可运行的G的P,则唤醒一个P来运行这个刚创建的G。

2.P的状态转换?P从原先的Idle状态进入Running状态。

entersyscall

1.谁在什么时候调用?当P绑定的M要进入系统调用或者CGo调用时,触发entersyscall。

2.P的状态转换?P从Running状态进入Syscall状态,并且和当前绑定的M解绑。

func entersyscall() {
    reentersyscall(getcallerpc(), getcallersp())
}

func reentersyscall(pc, sp uintptr) {
    _g_ := getg()

    // Disable preemption because during this function g is in Gsyscall status,
    // but can have inconsistent g->sched, do not let GC observe it.
    _g_.m.locks++

    // Entersyscall must not call any function that might split/grow the stack.
    // (See details in comment above.)
    // Catch calls that might, by replacing the stack guard with something that
    // will trip any stack check and leaving a flag to tell newstack to die.
    _g_.stackguard0 = stackPreempt
    _g_.throwsplit = true

    // Leave SP around for GC and traceback.
    save(pc, sp)
    _g_.syscallsp = sp
    _g_.syscallpc = pc
    casgstatus(_g_, _Grunning, _Gsyscall) //切换G的状态
    if _g_.syscallsp < _g_.stack.lo || _g_.stack.hi < _g_.syscallsp {
        systemstack(func() {
            print("entersyscall inconsistent ", hex(_g_.syscallsp), " [", hex(_g_.stack.lo), ",", hex(_g_.stack.hi), "]\n")
            throw("entersyscall")
        })
    }

    // 按需记录调用信息
    if trace.enabled {
        systemstack(traceGoSysCall)
        // systemstack itself clobbers g.sched.{pc,sp} and we might
        // need them later when the G is genuinely blocked in a
        // syscall
        save(pc, sp)
    }

    if atomic.Load(&sched.sysmonwait) != 0 { // 如果系统监控线程在等待 则执行下面的函数 具体不深究
        systemstack(entersyscall_sysmon)
        save(pc, sp)
    }

    if _g_.m.p.ptr().runSafePointFn != 0 { // 如果runSafePointFn不为0 则执行 具体不深究
        // runSafePointFn may stack split if run on this stack
        systemstack(runSafePointFn)
        save(pc, sp)
    }

    _g_.m.syscalltick = _g_.m.p.ptr().syscalltick
    _g_.sysblocktraced = true
    // m p 解绑
    pp := _g_.m.p.ptr()
    pp.m = 0
    _g_.m.oldp.set(pp)
    _g_.m.p = 0
    atomic.Store(&pp.status, _Psyscall) //切换P的状态
    if sched.gcwaiting != 0 { // 正在gc时 则调用以下代码 详细的不深究
        systemstack(entersyscall_gcwait)
        save(pc, sp)
    }

    _g_.m.locks--
}

exitsyscallfast

1.谁在什么时候调用?进入系统调用的G,迅速退出系统调用后,此时P尚未和别的M绑定,于是M和P重新绑定,继续运行该G。

2.P的状态转换?P从Syscall状态进入Running状态,M和P重新绑定。

// 删除了部分无关代码
func exitsyscallfast(oldp *p) bool {
    _g_ := getg()

    // Freezetheworld sets stopwait but does not retake P's.
    if sched.stopwait == freezeStopWait {
        return false
    }

    // Try to re-acquire the last P. sysmon线程未剥夺P
    if oldp != nil && oldp.status == _Psyscall && atomic.Cas(&oldp.status, _Psyscall, _Pidle) {
        // There's a cpu for us, so we can run.
        // 切换p的状态 并且重新绑定m p
        wirep(oldp)
        exitsyscallfast_reacquired()
        return true
    }
    return false
}

retake

1.谁在什么时候调用?sysmon(系统监控线程)在P进入Syscall一段时间后,将P从Syscall状态切为Idle状态。

2.P的状态转换?P从Syscall进入Idle状态。

func retake(now int64) uint32 {
    n := 0
    // Prevent allp slice changes. This lock will be completely
    // uncontended unless we're already stopping the world.
    lock(&allpLock) // 给allp加锁
    // We can't use a range loop over allp because we may
    // temporarily drop the allpLock. Hence, we need to re-fetch
    // allp each time around the loop.
    for i := 0; i < len(allp); i++ {
        _p_ := allp[i]
        if _p_ == nil {
            // This can happen if procresize has grown
            // allp but not yet created new Ps.
            continue
        }
        pd := &_p_.sysmontick
        s := _p_.status
        sysretake := false
        if s == _Prunning || s == _Psyscall {
            // Preempt G if it's running for too long.
            t := int64(_p_.schedtick) // schedtick表示p执行execute的次数
            if int64(pd.schedtick) != t { // 调整p.sysmontick里的schedtick次数
                pd.schedtick = uint32(t)
                pd.schedwhen = now
            } else if pd.schedwhen+forcePreemptNS <= now { // 如果P运行时间过长 则抢占P
                preemptone(_p_)
                // In case of syscall, preemptone() doesn't
                // work, because there is no M wired to P.
                sysretake = true // Syscal状态P未绑定M 无法抢占
            }
        }
        if s == _Psyscall {
            // Retake P from syscall if it's there for more than 1 sysmon tick (at least 20us).
            t := int64(_p_.syscalltick) // 当P从Syscall状态退出时++
            if !sysretake && int64(pd.syscalltick) != t { // 如果未触发sysretake 尝试调整p.sysmontick.syscalltick次数
                pd.syscalltick = uint32(t)
                pd.syscallwhen = now
                continue
            }
            // On the one hand we don't want to retake Ps if there is no other work to do,
            // but on the other hand we want to retake them eventually
            // because they can prevent the sysmon thread from deep sleep.
            // 如果P的队列为空 且已有空闲的P或者有P绑定的M处于spinning(寻找可运行的G)且系统调用时间小于10ms
            if runqempty(_p_) && atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) > 0 && pd.syscallwhen+10*1000*1000 > now {
                continue
            }
            // Drop allpLock so we can take sched.lock.
            unlock(&allpLock)
            // Need to decrement number of idle locked M's
            // (pretending that one more is running) before the CAS.
            // Otherwise the M from which we retake can exit the syscall,
            // increment nmidle and report deadlock.
            incidlelocked(-1) // ???
            if atomic.Cas(&_p_.status, s, _Pidle) { // P转为Idle状态
                if trace.enabled {
                    traceGoSysBlock(_p_)
                    traceProcStop(_p_)
                }
                n++
                _p_.syscalltick++
                handoffp(_p_) // 尝试把P放入空闲队列中
            }
            incidlelocked(1)
            lock(&allpLock)
        }
    }
    unlock(&allpLock)
    return uint32(n)
}

func handoffp(_p_ *p) {
    // handoffp must start an M in any situation where
    // findrunnable would return a G to run on _p_.

    // if it has local work, start it straight away
    if !runqempty(_p_) || sched.runqsize != 0 { // 如果本地队列不为空或者全局运行队列不为空
        startm(_p_, false)
        return
    }
    // if it has GC work, start it straight away
    if gcBlackenEnabled != 0 && gcMarkWorkAvailable(_p_) {
        startm(_p_, false)
        return
    }
    // no local work, check that there are no spinning/idle M's,
    // otherwise our help is not required
    // 如果没有正在spinning状态的M 且 没有P处于Idle状态 则不退出
    if atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) == 0 && atomic.Cas(&sched.nmspinning, 0, 1) { // TODO: fast atomic
        startm(_p_, true)
        return
    }
    lock(&sched.lock)
    if sched.gcwaiting != 0 {
        _p_.status = _Pgcstop
        sched.stopwait--
        if sched.stopwait == 0 {
            notewakeup(&sched.stopnote)
        }
        unlock(&sched.lock)
        return
    }
    if _p_.runSafePointFn != 0 && atomic.Cas(&_p_.runSafePointFn, 1, 0) {
        sched.safePointFn(_p_)
        sched.safePointWait--
        if sched.safePointWait == 0 {
            notewakeup(&sched.safePointNote)
        }
    }
    if sched.runqsize != 0 { // 如果全局G运行队列不为空
        unlock(&sched.lock)
        startm(_p_, false)
        return
    }
    // If this is the last running P and nobody is polling network,
    // need to wakeup another M to poll network.
    // 如果是最后一个P 并且...
    if sched.npidle == uint32(gomaxprocs-1) && atomic.Load64(&sched.lastpoll) != 0 {
        unlock(&sched.lock)
        startm(_p_, false)
        return
    }
    if when := nobarrierWakeTime(_p_); when != 0 {
        wakeNetPoller(when)
    }
    pidleput(_p_) // 将P放入空闲队列中
    unlock(&sched.lock)
}

releasep

1.谁在什么时候调用?当P本地的可运行G的队列为空且无法从其它地方窃取可运行的G时,P和M解绑,进入空闲状态。

2.P的状态转换?P和M解绑,从Running进入Idle。

func releasep() *p {
    _g_ := getg()

    if _g_.m.p == 0 {
        throw("releasep: invalid arg")
    }
    _p_ := _g_.m.p.ptr()
    if _p_.m.ptr() != _g_.m || _p_.status != _Prunning {
        print("releasep: m=", _g_.m, " m->p=", _g_.m.p.ptr(), " p->m=", hex(_p_.m), " p->status=", _p_.status, "\n")
        throw("releasep: invalid p state")
    }
    if trace.enabled {
        traceProcStop(_g_.m.p.ptr())
    }
    _g_.m.p = 0
    _p_.m = 0
    _p_.status = _Pidle
    return _p_
}

总结

1.P和G的状态转换息息相关,理解G的状态转换后,理解P的状态转顺理成章。

欢迎关注我们的微信公众号,每天学习Go知识

相关文章