摘要:= t { // 如果未觸發sysretake 嘗試調整p.sysmontick.syscalltick次數 pd.syscalltick = uint32(t) pd.syscallwhen = now continue } // On the one hand we don't want to retake Ps if there is no other work to do, // but on the other hand we want to retake them eventually // because they can prevent the sysmon thread from deep sleep. // 如果P的隊列爲空 且已有空閒的P或者有P綁定的M處於spinning(尋找可運行的G)且系統調用時間小於10ms if runqempty(_p_) && atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) > 0 && pd.syscallwhen+10*1000*1000 > now { continue } // Drop allpLock so we can take sched.lock. unlock(&allpLock) // Need to decrement number of idle locked M's // (pretending that one more is running) before the CAS. // Otherwise the M from which we retake can exit the syscall, // increment nmidle and report deadlock. incidlelocked(-1) //。func entersyscall() { reentersyscall(getcallerpc(), getcallersp()) } func reentersyscall(pc, sp uintptr) { _g_ := getg() // Disable preemption because during this function g is in Gsyscall status, // but can have inconsistent g->sched, do not let GC observe it. _g_.m.locks++ // Entersyscall must not call any function that might split/grow the stack. // (See details in comment above.) // Catch calls that might, by replacing the stack guard with something that // will trip any stack check and leaving a flag to tell newstack to die. _g_.stackguard0 = stackPreempt _g_.throwsplit = true // Leave SP around for GC and traceback. save(pc, sp) _g_.syscallsp = sp _g_.syscallpc = pc casgstatus(_g_, _Grunning, _Gsyscall) //切換G的狀態 if _g_.syscallsp < _g_.stack.lo || _g_.stack.hi < _g_.syscallsp { systemstack(func() { print("entersyscall inconsistent ", hex(_g_.syscallsp), " [", hex(_g_.stack.lo), ",", hex(_g_.stack.hi), "]\n") throw("entersyscall") }) } // 按需記錄調用信息 if trace.enabled { systemstack(traceGoSysCall) // systemstack itself clobbers g.sched.{pc,sp} and we might // need them later when the G is genuinely blocked in a // syscall save(pc, sp) } if atomic.Load(&sched.sysmonwait)。

聲明

下面的分析均基於Golang1.14版本。

狀態圖

P狀態圖

主要流程

1.procresize創建,Go程序初始化時初始CPU數量個P。

2.acquirep進入運行狀態。當有新的G創建或waiting狀態的G變得可運行,並且有P空閒時,通過acquirep獲取p來運行最新的G。

3.entersyscall,進入系統調用狀態。當P綁定的M正在運行的G進入系統調用狀態,P也隨之進入系統調用狀態。

4.exitsyscallfast,退出系統調用狀態。如果G快速退出系統調用,此時P理應等待G,當G退出系統調用時,繼續運行G的代碼。

5.retake,如果G長時間處於系統調用,此時P應與對應的G,M解綁。考慮到如果所有G都阻塞在系統調用,則整個Go程序都阻塞在系統調用,無法執行用戶代碼,因此需要一個將長時間陷入系統調用的P解綁M,重新綁定空閒的M執行用戶代碼。

6.releasep,讓P進入空閒狀態。如果P上可運行的G的隊列空閒,並且無法從其它P中偷取G,此時應讓P進入空閒狀態。

procresize函數

1.誰在什麼時候調用?程序初始化時或者修改CPU數量時(這種情況暫不考慮)調用。

2.P的狀態變化?從無到有的過程,初始化CPU數量個P,所有P的狀態都是_PIdle。

// Change number of processors. The world is stopped, sched is locked.
// gcworkbufs are not being modified by either the GC or
// the write barrier code.
// Returns list of Ps with local work, they need to be scheduled by the caller.
func procresize(nprocs int32) *p {
        old := gomaxprocs // 在這之前P的數量
        if old < 0 || nprocs <= 0 {
        throw("procresize: invalid arg")
    }
        if trace.enabled {
        traceGomaxprocs(nprocs)
    }

    // update statistics
    now := nanotime()
    if sched.procresizetime != 0 {
        sched.totaltime += int64(old) * (now - sched.procresizetime) // 此處統計的應該是總的P的存在的時間
    }
    sched.procresizetime = now

    // Grow allp if necessary.
    if nprocs > int32(len(allp)) { // 根據需要擴展allp來存儲P
        // Synchronize with retake, which could be running
        // concurrently since it doesn't run on a P.
        lock(&allpLock)
        if nprocs <= int32(cap(allp)) {
            allp = allp[:nprocs]
        } else {
            nallp := make([]*p, nprocs)
            // Copy everything up to allp's cap so we
            // never lose old allocated Ps.
            copy(nallp, allp[:cap(allp)])
            allp = nallp
        }
        unlock(&allpLock)
    }

    // initialize new P's
    for i := old; i < nprocs; i++ { // 將新創建的初始化 然後投入allp中
        pp := allp[i]
        if pp == nil {
            pp = new(p)
        }
        pp.init(i) // 初始化P 主要是分配mcache
        atomicstorep(unsafe.Pointer(&allp[i]), unsafe.Pointer(pp)) // 將創建的P放入allp中
    }

    _g_ := getg()
    if _g_.m.p != 0 && _g_.m.p.ptr().id < nprocs { // 如果當前M綁定的P仍有效
        // continue to use the current P
        _g_.m.p.ptr().status = _Prunning
        _g_.m.p.ptr().mcache.prepareForSweep()
    } else {
        // release the current P and acquire allp[0].
        //
        // We must do this before destroying our current P
        // because p.destroy itself has write barriers, so we
        // need to do that from a valid P.
        if _g_.m.p != 0 {
            if trace.enabled {
                // Pretend that we were descheduled
                // and then scheduled again to keep
                // the trace sane.
                traceGoSched()
                traceProcStop(_g_.m.p.ptr())
            }
            _g_.m.p.ptr().m = 0 // M綁定的P已無效 解綁
        }
        // M綁定新的P(allp[0])
        _g_.m.p = 0
        p := allp[0]
        p.m = 0
        p.status = _Pidle
        acquirep(p)
        if trace.enabled {
            traceGoStart()
        }
    }

    // g.m.p is now set, so we no longer need mcache0 for bootstrapping.
    mcache0 = nil // mcache0是爲沒有P的情況下給第一個P分配mcache 此時已經有P 則不需要mcache0

    // release resources from unused P's
    for i := nprocs; i < old; i++ { // 如果allp縮小 則釋放多餘的P
        p := allp[i]
        p.destroy()
        // can't free P itself because it can be referenced by an M in syscall
    }

    // Trim allp.
    if int32(len(allp)) != nprocs { // 縮小切片allp
        lock(&allpLock)
        allp = allp[:nprocs]
        unlock(&allpLock)
    }

    var runnablePs *p
    for i := nprocs - 1; i >= 0; i-- { // 將除了當前的P以外的P 進行初始化
        p := allp[i]
        if _g_.m.p.ptr() == p {
            continue
        }
        p.status = _Pidle
        if runqempty(p) { // 如果當前P沒有要運行的G 將P放入Idle隊列
            pidleput(p)
        } else {
            p.m.set(mget()) // 從空閒的M隊列中取出M 將其和P進行綁定
            p.link.set(runnablePs)
            runnablePs = p
        }
    }
    stealOrder.reset(uint32(nprocs))
    var int32p *int32 = &gomaxprocs // make compiler check that gomaxprocs is an int32
    atomic.Store((*uint32)(unsafe.Pointer(int32p)), uint32(nprocs)) // 設置gomaxprocs爲當前P的數量
    return runnablePs
}

acquirep

1.誰在什麼時候調用?當有新的G創建(newproc)或有G從waiting進入Runnable狀態(ready)時,如果有空閒的P且沒有正在找可運行的G的P,則喚醒一個P來運行這個剛創建的G。

2.P的狀態轉換?P從原先的Idle狀態進入Running狀態。

entersyscall

1.誰在什麼時候調用?當P綁定的M要進入系統調用或者CGo調用時,觸發entersyscall。

2.P的狀態轉換?P從Running狀態進入Syscall狀態,並且和當前綁定的M解綁。

func entersyscall() {
    reentersyscall(getcallerpc(), getcallersp())
}

func reentersyscall(pc, sp uintptr) {
    _g_ := getg()

    // Disable preemption because during this function g is in Gsyscall status,
    // but can have inconsistent g->sched, do not let GC observe it.
    _g_.m.locks++

    // Entersyscall must not call any function that might split/grow the stack.
    // (See details in comment above.)
    // Catch calls that might, by replacing the stack guard with something that
    // will trip any stack check and leaving a flag to tell newstack to die.
    _g_.stackguard0 = stackPreempt
    _g_.throwsplit = true

    // Leave SP around for GC and traceback.
    save(pc, sp)
    _g_.syscallsp = sp
    _g_.syscallpc = pc
    casgstatus(_g_, _Grunning, _Gsyscall) //切換G的狀態
    if _g_.syscallsp < _g_.stack.lo || _g_.stack.hi < _g_.syscallsp {
        systemstack(func() {
            print("entersyscall inconsistent ", hex(_g_.syscallsp), " [", hex(_g_.stack.lo), ",", hex(_g_.stack.hi), "]\n")
            throw("entersyscall")
        })
    }

    // 按需記錄調用信息
    if trace.enabled {
        systemstack(traceGoSysCall)
        // systemstack itself clobbers g.sched.{pc,sp} and we might
        // need them later when the G is genuinely blocked in a
        // syscall
        save(pc, sp)
    }

    if atomic.Load(&sched.sysmonwait) != 0 { // 如果系統監控線程在等待 則執行下面的函數 具體不深究
        systemstack(entersyscall_sysmon)
        save(pc, sp)
    }

    if _g_.m.p.ptr().runSafePointFn != 0 { // 如果runSafePointFn不爲0 則執行 具體不深究
        // runSafePointFn may stack split if run on this stack
        systemstack(runSafePointFn)
        save(pc, sp)
    }

    _g_.m.syscalltick = _g_.m.p.ptr().syscalltick
    _g_.sysblocktraced = true
    // m p 解綁
    pp := _g_.m.p.ptr()
    pp.m = 0
    _g_.m.oldp.set(pp)
    _g_.m.p = 0
    atomic.Store(&pp.status, _Psyscall) //切換P的狀態
    if sched.gcwaiting != 0 { // 正在gc時 則調用以下代碼 詳細的不深究
        systemstack(entersyscall_gcwait)
        save(pc, sp)
    }

    _g_.m.locks--
}

exitsyscallfast

1.誰在什麼時候調用?進入系統調用的G,迅速退出系統調用後,此時P尚未和別的M綁定,於是M和P重新綁定,繼續運行該G。

2.P的狀態轉換?P從Syscall狀態進入Running狀態,M和P重新綁定。

// 刪除了部分無關代碼
func exitsyscallfast(oldp *p) bool {
    _g_ := getg()

    // Freezetheworld sets stopwait but does not retake P's.
    if sched.stopwait == freezeStopWait {
        return false
    }

    // Try to re-acquire the last P. sysmon線程未剝奪P
    if oldp != nil && oldp.status == _Psyscall && atomic.Cas(&oldp.status, _Psyscall, _Pidle) {
        // There's a cpu for us, so we can run.
        // 切換p的狀態 並且重新綁定m p
        wirep(oldp)
        exitsyscallfast_reacquired()
        return true
    }
    return false
}

retake

1.誰在什麼時候調用?sysmon(系統監控線程)在P進入Syscall一段時間後,將P從Syscall狀態切爲Idle狀態。

2.P的狀態轉換?P從Syscall進入Idle狀態。

func retake(now int64) uint32 {
    n := 0
    // Prevent allp slice changes. This lock will be completely
    // uncontended unless we're already stopping the world.
    lock(&allpLock) // 給allp加鎖
    // We can't use a range loop over allp because we may
    // temporarily drop the allpLock. Hence, we need to re-fetch
    // allp each time around the loop.
    for i := 0; i < len(allp); i++ {
        _p_ := allp[i]
        if _p_ == nil {
            // This can happen if procresize has grown
            // allp but not yet created new Ps.
            continue
        }
        pd := &_p_.sysmontick
        s := _p_.status
        sysretake := false
        if s == _Prunning || s == _Psyscall {
            // Preempt G if it's running for too long.
            t := int64(_p_.schedtick) // schedtick表示p執行execute的次數
            if int64(pd.schedtick) != t { // 調整p.sysmontick裏的schedtick次數
                pd.schedtick = uint32(t)
                pd.schedwhen = now
            } else if pd.schedwhen+forcePreemptNS <= now { // 如果P運行時間過長 則搶佔P
                preemptone(_p_)
                // In case of syscall, preemptone() doesn't
                // work, because there is no M wired to P.
                sysretake = true // Syscal狀態P未綁定M 無法搶佔
            }
        }
        if s == _Psyscall {
            // Retake P from syscall if it's there for more than 1 sysmon tick (at least 20us).
            t := int64(_p_.syscalltick) // 當P從Syscall狀態退出時++
            if !sysretake && int64(pd.syscalltick) != t { // 如果未觸發sysretake 嘗試調整p.sysmontick.syscalltick次數
                pd.syscalltick = uint32(t)
                pd.syscallwhen = now
                continue
            }
            // On the one hand we don't want to retake Ps if there is no other work to do,
            // but on the other hand we want to retake them eventually
            // because they can prevent the sysmon thread from deep sleep.
            // 如果P的隊列爲空 且已有空閒的P或者有P綁定的M處於spinning(尋找可運行的G)且系統調用時間小於10ms
            if runqempty(_p_) && atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) > 0 && pd.syscallwhen+10*1000*1000 > now {
                continue
            }
            // Drop allpLock so we can take sched.lock.
            unlock(&allpLock)
            // Need to decrement number of idle locked M's
            // (pretending that one more is running) before the CAS.
            // Otherwise the M from which we retake can exit the syscall,
            // increment nmidle and report deadlock.
            incidlelocked(-1) // ???
            if atomic.Cas(&_p_.status, s, _Pidle) { // P轉爲Idle狀態
                if trace.enabled {
                    traceGoSysBlock(_p_)
                    traceProcStop(_p_)
                }
                n++
                _p_.syscalltick++
                handoffp(_p_) // 嘗試把P放入空閒隊列中
            }
            incidlelocked(1)
            lock(&allpLock)
        }
    }
    unlock(&allpLock)
    return uint32(n)
}

func handoffp(_p_ *p) {
    // handoffp must start an M in any situation where
    // findrunnable would return a G to run on _p_.

    // if it has local work, start it straight away
    if !runqempty(_p_) || sched.runqsize != 0 { // 如果本地隊列不爲空或者全局運行隊列不爲空
        startm(_p_, false)
        return
    }
    // if it has GC work, start it straight away
    if gcBlackenEnabled != 0 && gcMarkWorkAvailable(_p_) {
        startm(_p_, false)
        return
    }
    // no local work, check that there are no spinning/idle M's,
    // otherwise our help is not required
    // 如果沒有正在spinning狀態的M 且 沒有P處於Idle狀態 則不退出
    if atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) == 0 && atomic.Cas(&sched.nmspinning, 0, 1) { // TODO: fast atomic
        startm(_p_, true)
        return
    }
    lock(&sched.lock)
    if sched.gcwaiting != 0 {
        _p_.status = _Pgcstop
        sched.stopwait--
        if sched.stopwait == 0 {
            notewakeup(&sched.stopnote)
        }
        unlock(&sched.lock)
        return
    }
    if _p_.runSafePointFn != 0 && atomic.Cas(&_p_.runSafePointFn, 1, 0) {
        sched.safePointFn(_p_)
        sched.safePointWait--
        if sched.safePointWait == 0 {
            notewakeup(&sched.safePointNote)
        }
    }
    if sched.runqsize != 0 { // 如果全局G運行隊列不爲空
        unlock(&sched.lock)
        startm(_p_, false)
        return
    }
    // If this is the last running P and nobody is polling network,
    // need to wakeup another M to poll network.
    // 如果是最後一個P 並且...
    if sched.npidle == uint32(gomaxprocs-1) && atomic.Load64(&sched.lastpoll) != 0 {
        unlock(&sched.lock)
        startm(_p_, false)
        return
    }
    if when := nobarrierWakeTime(_p_); when != 0 {
        wakeNetPoller(when)
    }
    pidleput(_p_) // 將P放入空閒隊列中
    unlock(&sched.lock)
}

releasep

1.誰在什麼時候調用?當P本地的可運行G的隊列爲空且無法從其它地方竊取可運行的G時,P和M解綁,進入空閒狀態。

2.P的狀態轉換?P和M解綁,從Running進入Idle。

func releasep() *p {
    _g_ := getg()

    if _g_.m.p == 0 {
        throw("releasep: invalid arg")
    }
    _p_ := _g_.m.p.ptr()
    if _p_.m.ptr() != _g_.m || _p_.status != _Prunning {
        print("releasep: m=", _g_.m, " m->p=", _g_.m.p.ptr(), " p->m=", hex(_p_.m), " p->status=", _p_.status, "\n")
        throw("releasep: invalid p state")
    }
    if trace.enabled {
        traceProcStop(_g_.m.p.ptr())
    }
    _g_.m.p = 0
    _p_.m = 0
    _p_.status = _Pidle
    return _p_
}

總結

1.P和G的狀態轉換息息相關,理解G的狀態轉換後,理解P的狀態轉順理成章。

歡迎關注我們的微信公衆號,每天學習Go知識

相關文章