gRPC-go服務發現&負載均衡
前言
以下示例基於 https://github.com/grpc/grpc-go v1.30.0,關於proto文件定義,服務生成參考 gRPC 官方文檔中文版
client
grpc使用的是客戶端負載均衡模式,每次新建連接的時候會根據負載均衡算法選出服務端的IP然後建立連接。現在grpc默認支持兩種算法pick_first(第一次地址) 和 round_robin(輪詢)
pick_first:
pick_first每次都是嘗試連接第一個地址,如果連接失敗就會嘗試下一個,直到連接成功爲止,之後的RPC請求都會使用這個連接
round_robin:
round_robin會對每個地址建立連接,之後的RPC請求會依次通過這些連接發送到後端
客戶端新建一個連接
conn, err := grpc.Dial( fmt.Sprintf("%s:///%s", "game", baseService), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, roundrobin.Name)), grpc.WithInsecure(), //grpc.WithUnaryInterceptor(unaryClientInterceptor), //grpc.WithBlock(), //grpc.WithCompressor Deprecated )
客戶端每次發起請求都需要通過grpc.dail創建一個ClientConn,然後通過ClientConn.XXXX發送請求。
建立連接的各項參數:
grpc.WithInsecure
:禁用傳輸認證,沒有這個選項必須設置一種認證方式
grpc.WithCompressor:
在grpc.Dial參數中設置壓縮的方式將要被廢棄,推薦使用UseCompressor
grpc.UseCompressor(gzip.Name) conn, err := grpc.Dial( //... )
PS:壓縮方式客戶端應該和服務端對應
grpc.WithBlock():
grpc.Dial默認建立連接是異步的,加了這個參數後會等待所有連接建立成功後再返回
grpc.WithUnaryInterceptor:
一元攔截器,適用於普通rpc連接,相應的還有流攔截器。攔截器只有第一個生效,所以一般設置一個。攔截器是對請求的一次封裝,客戶端和服務端都可以設置攔截器,請求的發送/執行都是在攔截器內操作的,所以在請求的前後都可以嵌入用戶自定義的代碼,類似hook
//客戶端攔截器 func unaryInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { var credsConfigured bool for _, o := range opts { _, ok := o.(grpc.PerRPCCredsCallOption) if ok { credsConfigured = true break } } if !credsConfigured { opts = append(opts, grpc.PerRPCCredentials(oauth.NewOauthAccess(&oauth2.Token{ AccessToken: fallbackToken, }))) } start := time.Now() err := invoker(ctx, method, req, reply, cc, opts...) end := time.Now() logger("RPC: %s, start time: %s, end time: %s, err: %v", method, start.Format("Basic"), end.Format(time.RFC3339), err) return err } //服務端攔截器 func unaryInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { // authentication (token verification) md, ok := metadata.FromIncomingContext(ctx) if !ok { return nil, errMissingMetadata } if !valid(md["authorization"]) { return nil, errInvalidToken } m, err := handler(ctx, req) if err != nil { logger("RPC failed with error %v", err) } return m, err }
grpc.WithDefaultServiceConfig:
舊的版本可以通過grpc.RoundRobin(),和grpc.WithBalancer()來設置負載均衡,這個版本grpc.RoundRobin()已經取消了,grpc.WithBalancer()和grpc. 也WithBalancerName()標記爲廢棄。
//service config example { "loadBalancingConfig": [ { "round_robin": {} } ], "methodConfig": [ { "name": [ { "service": "foo", "method": "bar" }, { "service": "baz" } ], "timeout": "1.0000000001s" } ] }
grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, roundrobin.Name))
可以這樣設置BalancingPolicy
target: grpc.Dial:
的第一個參數,這個參數的主要作用的通過它來找到對應的服務端地址,target傳入是一個字符串,統一格式爲 scheme://authority/endpoint ,然後通過以下方式解析爲Target struct
type Target struct { Scheme string Authority string Endpoint string } func parseTarget(target string) (ret resolver.Target) { var ok bool ret.Scheme, ret.Endpoint, ok = split2(target, "://") if !ok { return resolver.Target{Endpoint: target} } ret.Authority, ret.Endpoint, ok = split2(ret.Endpoint, "/") if !ok { return resolver.Target{Endpoint: target} } return ret }
解析target的時候有以下幾種情況:
- 當前參數有沒有直接設置resolverBuilder,如果設置了,直接設置Endpoint=target
- 如果未直接設置resolverBuilder,則通過Scheme來找到resolverBuilder
- 如果通過Scheme沒有找到resolverBuilder,resolverBuilder爲默認的dns builder,設置
Endpoint=target
所以,真正獲取IP地址是通過resolverBuilder這個接口
type Builder interface { Build(target Target, cc ClientConn, opts BuildOptions) (Resolver, error) Scheme() string }
Build():
爲給定目標創建一個新的resolver,當調用grpc.Dial()時執行。
Scheme():
返回此resolver方案的名稱
type Resolver interface { ResolveNow(ResolveNowOptions) Close() }
ResolveNow():
被 gRPC 調用,以嘗試再次解析目標名稱。只用於提示,可忽略該方法。
Close方法:
關閉resolver
下面我們看一個示例
func init() { resolver.Register(&exampleResolverBuilder{}) /* //註冊的時候將Scheme => builder保存到m func Register(b Builder) { m[b.Scheme()] = b } */ } const ( exampleScheme = "example" exampleServiceName = "lb.example.grpc.io" ) var addrs = []string{"localhost:50051", "localhost:50052"} type exampleResolverBuilder struct{} func (*exampleResolverBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) { r := &exampleResolver{ target: target, cc: cc, addrsStore: map[string][]string{ exampleServiceName: addrs, }, } r.start() return r, nil } func (*exampleResolverBuilder) Scheme() string { return exampleScheme } type exampleResolver struct { target resolver.Target cc resolver.ClientConn addrsStore map[string][]string } func (r *exampleResolver) start() { addrStrs := r.addrsStore[r.target.Endpoint] addrs := make([]resolver.Address, len(addrStrs)) for i, s := range addrStrs { addrs[i] = resolver.Address{Addr: s} } r.cc.UpdateState(resolver.State{Addresses: addrs}) } func (*exampleResolver) ResolveNow(o resolver.ResolveNowOptions) {} func (*exampleResolver) Close() {} func main() { //... roundrobinConn, err := grpc.Dial( // Target{Scheme:exampleScheme,Endpoint:exampleServiceName} fmt.Sprintf("%s:///%s", exampleScheme, exampleServiceName), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, roundrobin.Name)), grpc.WithInsecure(), grpc.WithBlock(), ) //... }
grpc.Dial() 會調用Scheme=>builder 的Build() 方法,之後調用r.start()
r.cc.UpdateState(resolver.State{Addresses: addrs})
UpdateState()將addr更新到cc,也就是外部的連接中,供其他接口使用。
server
server相對來說啓動比較簡單,一般都會加攔截器來獲取matedata或者去recover() panic,又或者打印一些日誌
grpc.UseCompressor(gzip.Name) s := grpc.NewServer(grpc.UnaryInterceptor(unaryServerInterceptor)) //...
matedata:
matedata是一個map[string][]string的結構,用來在客戶端和服務器之間傳輸數據。其中的一個作用是可以傳遞分佈式調用環境中的鏈路id,方便跟蹤調試。另外也可以傳一些業務相關的數據
客戶端攔截器中設置metedata
md := metadata.Pairs("XXX_id",xxxID, "YYY_id", yyyID) mdOld, _ := metadata.FromIncomingContext(ctx) md = metadata.Join(mdOld, md) ctx = metadata.NewOutgoingContext(ctx, md) //... invoker(ctx, method, req, reply, cc, opts...)
服務端攔截器獲取metadata
var xxxID,yyyID md, _ := metadata.FromIncomingContext(ctx) if arr := md["XXX_id"]; len(arr) > 0 { xxxID = arr[0] } if arr := md["YYY_id"]; len(arr) > 0 { yyyID = arr[0] } m, err := handler(ctx, req) if err != nil { logger("RPC failed with error %v", err) }
在server啓動之後,需要將這個服務註冊到etcd 。
用etcd3在編譯的時候出現了和groc-go版本不兼容的問題
首先當前用的etcd 版本是 3.4.9,支持的grpc-go最高版本是v1.26.0,於是需要將grpc-go降級
replace google.golang.org/grpc => google.golang.org/grpc v1.26.0
降級之後之前生成的proto.pb.go 又出現了錯誤,於是將protobuf降級
replace github.com/golang/protobuf => github.com/golang/protobuf v1.2.0
以上的問題網上其他人也遇到過,下面的這個不清楚是我本地環境有問題還是其他原因
報錯原因是 google.golang.org/genproto這個包下面生成的proto.pb.go裏面指定了protobuf1.4的版本變量,解決辦法還是降級,版本號是在$GOPATH/pkg/mod/... 下面找到的
replace google.golang.org/genproto => google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8
關於etcd的內容之後再整理吧。
小結
結合etcd 的watch功能,很容易檢測某一個路徑節點的變化,如果,server端註冊兩個服務到etcd
key = /project/service/user/1 val = 127.0.0.1:9999
key = /project/service/user/2 val = 127.0.0.1:9998
在客戶端,如果我們自定義了一個名叫example的resolverBuilder,
同時開啓一個watch協程 ,監測/project/service下面的節點,動態維護Build()中addrsStore,這個時候我們設置addrsStore[user] = {127.0.0.1:9999,127.0.0.1:9998}。
然後在客戶端grpc.Dai中令target = example:///user
那麼在r.start()中就可以獲取到 {127.0.0.1:9999,127.0.0.1:9998}(具體可以看上面示例中r.start()方法)
server註冊的key,Build()中addrsStore中的key,以及target 後面的endPoint 的不同選擇可以實現不通粒度的服務劃分。
歡迎關注我們的微信公衆號,每天學習Go知識