寫這個東西也只是因爲想簡單掌握下 TiDB 的源碼,同事給了一些閱讀思路,很贊。

有些地方如果理解的有問題還請批評教育,對 Go 語言理解的比較有限。

如果不小心誤導了讀者,請見諒

TiDB 模塊是使用 Go 語言開發的,使用 GoLand 編譯器就可以了。

JetBrains出品

閱讀源碼,要尋找好的切入點,我們選擇 main.go [1] 作爲閱讀源碼的入口。

tidb-server/main.go

這裏的 main 函數可以 debug ,也是 TiDB 啓動的開頭。

稍微簡化一下

func main() {
   registerStores()
   registerMetrics()
   config.InitializeConfig(*configPath, *configCheck, *configStrict, reloadConfig, overrideConfig)
   if config.GetGlobalConfig().OOMUseTmpStorage {
      config.GetGlobalConfig().UpdateTempStoragePath()
      initializeTempDir()
   }
   setCPUAffinity()
   setupLog()
   setupTracing() 
   setupBinlogClient()
   setupMetrics()
   createStoreAndDomain()
   createServer()
   runServer()
}

可以看出,啓動流程做的每個步驟都按照函數封裝好了,大致瞭解一下都做什麼

// 註冊store
func registerStores() {
    err := kvstore.Register("tikv", tikv.Driver{}) //註冊TiKV
    tikv.NewGCHandlerFunc = gcworker.NewGCWorker //爲TiKV生成GCworker
    err = kvstore.Register("mocktikv", mockstore.MockDriver{}) //註冊默認存儲引擎MockTiKV
}
//共註冊100+ prometheus監控項,這裏只表一項
func RegisterMetrics() { 
    prometheus.MustRegister(AutoAnalyzeCounter)
}
// get全局config
func InitializeConfig(confPath string, configCheck, configStrict bool, reloadFunc ConfReloadFunc, enforceCmdArgs func(*Config)) {
    cfg := GetGlobalConfig() 
    StoreGlobalConfig(cfg)
}

這個判斷

if config.GetGlobalConfig().OOMUseTmpStorage {
        config.GetGlobalConfig().UpdateTempStoragePath()
        initializeTempDir()
}

設置是否在單條 SQL 語句的內存使用超出 mem-quota-query [2] 限制時爲某些算子啓用臨時磁盤。故若爲 true ,則初始化 TempStoragePath 。

// 設置CPU親和性
func setCPUAffinity() { /
    err := linux.SetAffinity(cpu)
    runtime.GOMAXPROCS(len(cpu))
}
//配置系統log
func setupLog() { 
    err = logutil.InitLogger(cfg.Log.ToLogConfig()) // 這裏配置格式、文件名、slowlog等
}
//註冊分佈式系統追蹤鏈 jaeger
func setupTracing() {
    tracingCfg := cfg.OpenTracing.ToTracingConfig()
    tracingCfg.ServiceName = "TiDB"
    tracer, _, err := tracingCfg.NewTracer()
    opentracing.SetGlobalTracer(tracer)
}
// 設置binlog信息
func setupBinlogClient() {
    if !cfg.Binlog.Enable { //若binlog.enable=false,則不開啓binlog
        return
    }
    if cfg.Binlog.IgnoreError { //若爲true,則忽略binlog報錯
        binloginfo.SetIgnoreError(true)
    }
    if len(cfg.Binlog.BinlogSocket) == 0 { //配置binlog輸出網絡地址
        ...
    }
    binloginfo.SetPumpsClient(client) //配置binlog信息到pump客戶端
}
// 配置監控
func setupMetrics() {
    runtime.SetMutexProfileFraction(10)// 對鎖調用的跟蹤
    systimeErrHandler := func() {
                // 表示TiDB的進程是否仍然存在。
                // 若10分鐘內tidb_monitor_keep_alive_total               
                // 次數<100,TiDB可能退出,此時會報警
        metrics.TimeJumpBackCounter.Inc() 
    }
    callBackCount := 0
    sucessCallBack := func() {
        callBackCount++
        if callBackCount >= 5 {
            callBackCount = 0
            metrics.KeepAliveCounter.Inc() // KeepAlive監控 
        }
    }
}
// 啓動了一些重要的後臺進程
func createStoreAndDomain() {
    fullPath := fmt.Sprintf("%s://%s", cfg.Store, cfg.Path)
    storage, err = kvstore.New(fullPath)
    dom, err = session.BootstrapSession(storage)}
}
// 創建TiDB server
func createServer() {
    driver := server.NewTiDBDriver(storage)
    svr, err = server.NewServer(cfg, driver)
    svr.SetDomain(dom)
}
//啓動服務
runServer()

可以看到, runServer() 是啓動TiDB流程中的的最後一步。

所以我們跳轉到 server.Run() 中,

這裏有很多接受請求時的異常處理邏輯,在此不表。簡化一下大概有四步,如下:

if s.cfg.Status.ReportStatus {
    s.startStatusHTTP() //配置路由信息
}
for{
    conn, err := s.listener.Accept()// 監聽客戶端請求
    clientConn := s.newConn(conn)// 創建connection
    go s.onConn(clientConn)// 使用connection處理請求
}

首先配置了關於 TiDB 組件的很多路由信息,有興趣的可以看看。

server 不斷監聽網絡請求,出現新的客戶端請求就創建一個新的 connection ,使用一個新 goroutine 來持續爲它提供服務。

後續就是通過 go s.onConn(clientConn) 處理客戶端請求,我們來一探究竟接下來的流程,簡化下代碼,大致如下

func (s *Server) onConn(conn *clientConn) {
    ctx := logutil.WithConnID(context.Background(), conn.connectionID)
    if err := conn.handshake(ctx); err != nil {
        if plugin.IsEnable(plugin.Audit) && conn.ctx != nil {
            conn.ctx.GetSessionVars().ConnectionInfo = conn.connectInfo()
            })
        }
    }
    connectedTime := time.Now()
    conn.Run(ctx)
}

將建鏈 info 寫入到 session 中,統計一下鏈路建立成功的時間,成功後,通過 conn.Run(ctx) 處理客戶端請求。

func (cc *clientConn) Run(ctx context.Context) {
    for {
        waitTimeout := cc.getSessionVarsWaitTimeout(ctx)
        cc.pkt.setReadTimeout(time.Duration(waitTimeout) * time.Second)
        data, err := cc.readPacket()
        if err = cc.dispatch(ctx, data); err != nil {
            ...
        }
    }
}

簡化後,處理邏輯大致是,

不停的通過 cc.readPacket() 讀取客戶端發來的網絡包。如果這個期間發生了等待網絡包超時的現象,則 close connection 。

讀取 data 成功後,將它傳入 cc.dispatch(ctx,data) ,這也是處理 SQL 請求的入口了。

當 SQL 來到 很合理 ,顧名思義,是對不同的 MySQL 協議進行調度,

server/conn.go

Then.

func (cc *clientConn) dispatch(ctx context.Context, data []byte) error { 
    cc.lastPacket = data
    cmd := data[0]
    data = data[1:]
    dataStr := string(hack.String(data))
}

客戶端請求MySQL協議報文格式

TiDB 也要根據這個格式進行解析, data[0] 就是 data 的第一個 byte ,其餘的是命令。

MySQL 請求報文的命令列表

0x00 COM_SLEEP 內部線程狀態
0x01 COM_QUIT 關閉連接
0x02 COM_INIT_DB 切換數據庫
0x03 COM_QUERY SQL查詢請求
0x04 COM_FIELD_LIST 獲取數據表字段信息
0x05 COM_CREATE_DB 創建數據庫
0x06 COM_DROP_DB 刪除數據庫
0x07 COM_REFRESH 清除緩存
0x08 COM_SHUTDOWN 停止服務器
0x09 COM_STATISTICS 獲取服務器統計信息
0x0A COM_PROCESS_INFO 獲取當前連接的列表
0x0B COM_CONNECT 內部線程狀態
0x0C COM_PROCESS_KILL 中斷某個連接
0x0D COM_DEBUG 保存服務器調試信息
0x0E COM_PING 測試連通性
0x0F COM_TIME 內部線程狀態
0x10 COM_DELAYED_INSERT 內部線程狀態
0x11 COM_CHANGE_USER 重新登陸
0x12 COM_BINLOG_DUMP 獲取二進制日誌信息
0x13 COM_TABLE_DUMP 獲取數據表結構信息
0x14 COM_CONNECT_OUT 內部線程狀態
0x15 COM_REGISTER_SLAVE 從服務器向主服務器進行註冊
0x16 COM_STMT_PREPARE 預處理SQL語句
0x17 COM_STMT_EXECUTE 執行預處理語句
0x18 COM_STMT_SEND_LONG_DATA 發送BLOB類型的數據
0x19 COM_STMT_CLOSE 銷燬預處理語句
0x1A COM_STMT_RESET 清除預處理語句參數緩存
0x1B COM_SET_OPTION 設置語句選項
0x1C COM_STMT_FETCH 獲取預處理語句的執行結果

我們看一下 dispatch 接下來的部分,也就是目前 TiDB 實現的部分 MySQL 協議了

switch cmd {
    case mysql.ComPing, 
             mysql.ComStmtClose, 
             mysql.ComStmtSendLongData, 
             mysql.ComStmtReset,
         mysql.ComSetOption, 
             mysql.ComChangeUser:
        cc.ctx.SetProcessInfo("", t, cmd, 0)
    case mysql.ComInitDB:
        cc.ctx.SetProcessInfo("use "+dataStr, t, cmd, 0)
    }
switch cmd {
    case mysql.ComSleep:
    case mysql.ComQuit:
    case mysql.ComQuery:
    case mysql.ComPing:
    case mysql.ComInitDB:
    case mysql.ComFieldList:
    case mysql.ComStmtPrepare:
    case mysql.ComStmtExecute:
    case mysql.ComStmtFetch:
    case mysql.ComStmtClose:
    case mysql.ComStmtSendLongData:
    case mysql.ComStmtReset:
    case mysql.ComSetOption:
    case mysql.ComChangeUser
    default: // other not support

目前 TiDB 的絕大部分 SQL 語句都是走的 ComQuery ,感興趣的同學可以自行 debug 一下。

case mysql.ComQuery: 
    if len(data) > 0 && data[len(data)-1] == 0 {
        data = data[:len(data)-1]
        dataStr = string(hack.String(data))
    }
    return cc.handleQuery(ctx, dataStr)

於是我們仔細看看這個 case , 要去看下 cc.handleQuery(ctx , dataStr) 。

func (cc *clientConn) handleQuery(ctx context.Context, sql string) (err error) {
     stmts, err := cc.ctx.Execute(ctx, sql)
}

之後就要進入解析、優化 SQL 的部分 ,我們還是要先簡單理解下 Lex & Yacc。

有疑問加站長微信聯繫

相關文章