gRPC負載均衡(客戶端負載均衡)
摘要:= nil { return nil, err } for _, ev := range resp.Kvs { s.SetServiceList(string(ev.Key), string(ev.Value)) } s.cc.NewAddress(s.getServices()) //監視前綴,修改變更的server go s.watcher(prefix) return s, nil } // ResolveNow 監視目標更新 func (s *ServiceDiscovery) ResolveNow(rn resolver.ResolveNowOption) { log.Println("ResolveNow") } //Scheme return schema func (s *ServiceDiscovery) Scheme() string { return schema } //Close 關閉 func (s *ServiceDiscovery) Close() { log.Println("Close") s.cli.Close() } //watcher 監聽前綴 func (s *ServiceDiscovery) watcher(prefix string) { rch := s.cli.Watch(context.Background(), prefix, clientv3.WithPrefix()) log.Printf("watching prefix:%s now...", prefix) for wresp := range rch { for _, ev := range wresp.Events { switch ev.Type { case mvccpb.PUT: //新增或修改 s.SetServiceList(string(ev.Kv.Key), string(ev.Kv.Value)) case mvccpb.DELETE: //刪除 s.DelServiceList(string(ev.Kv.Key)) } } } } //SetServiceList 新增服務地址 func (s *ServiceDiscovery) SetServiceList(key, val string) { s.lock.Lock() defer s.lock.Unlock() s.serverList[key] = resolver.Address{Addr: val} s.cc.NewAddress(s.getServices()) log.Println("put key :", key, "val:", val) } //DelServiceList 刪除服務地址 func (s *ServiceDiscovery) DelServiceList(key string) { s.lock.Lock() defer s.lock.Unlock() delete(s.serverList, key) s.cc.NewAddress(s.getServices()) log.Println("del key:", key) } //GetServices 獲取服務地址 func (s *ServiceDiscovery) getServices() []resolver.Address { addrs := make([]resolver.Address, 0, len(s.serverList)) for _, v := range s.serverList { addrs = append(addrs, v) } return addrs }。= nil { log.Fatal(err) } return &ServiceDiscovery{ cli: cli, } } //Build 爲給定目標創建一個新的`resolver`,當調用`grpc.Dial()`時執行 func (s *ServiceDiscovery) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOption) (resolver.Resolver, error) { log.Println("Build") s.cc = cc s.serverList = make(map[string]resolver.Address) prefix := "/" + target.Scheme + "/" + target.Endpoint + "/" //根據前綴獲取現有的key resp, err := s.cli.Get(context.Background(), prefix, clientv3.WithPrefix()) if err。
前言
上篇 介紹瞭如何使用 etcd
實現服務發現,本篇將基於etcd的服務發現前提下,介紹如何實現gRPC客戶端負載均衡。
gRPC負載均衡
gRPC官方文檔提供了關於gRPC負載均衡方案 Load Balancing in gRPC ,此方案是爲gRPC設計的,下面我們對此進行分析。
1、對每次調用進行負載均衡
gRPC中的負載平衡是以每次調用爲基礎,而不是以每個連接爲基礎。換句話說,即使所有的請求都來自一個客戶端,我們仍希望它們在所有的服務器上實現負載平衡。
2、負載均衡的方法
-
集中式
(Proxy Model)
在服務消費者和服務提供者之間有一個獨立的負載均衡(LB),通常是專門的硬件設備如 F5,或者基於軟件如 LVS,HAproxy等實現。LB上有所有服務的地址映射表,通常由運維配置註冊,當服務消費方調用某個目標服務時,它向LB發起請求,由LB以某種策略,比如輪詢(Round-Robin)做負載均衡後將請求轉發到目標服務。LB一般具備健康檢查能力,能自動摘除不健康的服務實例。
該方案主要問題:服務消費方、提供方之間增加了一級,有一定性能開銷,請求量大時,效率較低。
可能有讀者會認爲集中式負載均衡存在這樣的問題,一旦負載均衡服務掛掉,那整個系統將不能使用。 解決方案:可以對負載均衡服務進行DNS負載均衡,通過對一個域名設置多個IP地址,每次DNS解析時輪詢返回負載均衡服務地址,從而實現簡單的DNS負載均衡。
-
客戶端負載
(Balancing-aware Client)
針對第一個方案的不足,此方案將LB的功能集成到服務消費方進程裏,也被稱爲軟負載或者客戶端負載方案。服務提供方啓動時,首先將服務地址註冊到服務註冊表,同時定期報心跳到服務註冊表以表明服務的存活狀態,相當於健康檢查,服務消費方要訪問某個服務時,它通過內置的LB組件向服務註冊表查詢,同時緩存並定期刷新目標服務地址列表,然後以某種負載均衡策略選擇一個目標服務地址,最後向目標服務發起請求。LB和服務發現能力被分散到每一個服務消費者的進程內部,同時服務消費方和服務提供方之間是直接調用,沒有額外開銷,性能比較好。
該方案主要問題:要用多種語言、多個版本的客戶端編寫和維護負載均衡策略,使客戶端的代碼大大複雜化。
-
獨立LB服務
(External Load Balancing Service)
該方案是針對第二種方案的不足而提出的一種折中方案,原理和第二種方案基本類似。
不同之處是將LB和服務發現功能從進程內移出來,變成主機上的一個獨立進程。主機上的一個或者多個服務要訪問目標服務時,他們都通過同一主機上的獨立LB進程做服務發現和負載均衡。該方案也是一種分佈式方案沒有單點問題,服務調用方和LB之間是進程內調用性能好,同時該方案還簡化了服務調用方,不需要爲不同語言開發客戶庫。
本篇將介紹第二種負載均衡方法,客戶端負載均衡。
實現gRPC客戶端負載均衡
gRPC已提供了簡單的負載均衡策略(如:Round Robin),我們只需實現它提供的 Builder
和 Resolver
接口,就能完成gRPC客戶端負載均衡。
type Builder interface { Build(target Target, cc ClientConn, opts BuildOption) (Resolver, error) Scheme() string }
Builder
接口:創建一個 resolver
(本文稱之服務發現),用於監視名稱解析更新。 Build
方法:爲給定目標創建一個新的 resolver
,當調用 grpc.Dial()
時執行。 Scheme
方法:返回此 resolver
支持的方案, Scheme
定義可參考:https://github.com/grpc/grpc/blob/master/doc/naming.md
type Resolver interface { ResolveNow(ResolveNowOption) Close() }
Resolver
接口:監視指定目標的更新,包括地址更新和服務配置更新。 ResolveNow
方法:被 gRPC 調用,以嘗試再次解析目標名稱。只用於提示,可忽略該方法。 Close
方法:關閉 resolver
根據以上兩個接口,我們把服務發現的功能寫在 Build
方法中,把獲取到的負載均衡服務地址返回到客戶端,並監視服務更新情況,以修改客戶端連接。 修改服務發現代碼, discovery.go
package etcdv3 import ( "context" "log" "sync" "time" "github.com/coreos/etcd/mvcc/mvccpb" "go.etcd.io/etcd/clientv3" "google.golang.org/grpc/resolver" ) const schema = "grpclb" //ServiceDiscovery 服務發現 type ServiceDiscovery struct { cli *clientv3.Client //etcd client cc resolver.ClientConn serverList map[string]resolver.Address //服務列表 lock sync.Mutex } //NewServiceDiscovery 新建發現服務 func NewServiceDiscovery(endpoints []string) resolver.Builder { cli, err := clientv3.New(clientv3.Config{ Endpoints: endpoints, DialTimeout: 5 * time.Second, }) if err != nil { log.Fatal(err) } return &ServiceDiscovery{ cli: cli, } } //Build 爲給定目標創建一個新的`resolver`,當調用`grpc.Dial()`時執行 func (s *ServiceDiscovery) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOption) (resolver.Resolver, error) { log.Println("Build") s.cc = cc s.serverList = make(map[string]resolver.Address) prefix := "/" + target.Scheme + "/" + target.Endpoint + "/" //根據前綴獲取現有的key resp, err := s.cli.Get(context.Background(), prefix, clientv3.WithPrefix()) if err != nil { return nil, err } for _, ev := range resp.Kvs { s.SetServiceList(string(ev.Key), string(ev.Value)) } s.cc.NewAddress(s.getServices()) //監視前綴,修改變更的server go s.watcher(prefix) return s, nil } // ResolveNow 監視目標更新 func (s *ServiceDiscovery) ResolveNow(rn resolver.ResolveNowOption) { log.Println("ResolveNow") } //Scheme return schema func (s *ServiceDiscovery) Scheme() string { return schema } //Close 關閉 func (s *ServiceDiscovery) Close() { log.Println("Close") s.cli.Close() } //watcher 監聽前綴 func (s *ServiceDiscovery) watcher(prefix string) { rch := s.cli.Watch(context.Background(), prefix, clientv3.WithPrefix()) log.Printf("watching prefix:%s now...", prefix) for wresp := range rch { for _, ev := range wresp.Events { switch ev.Type { case mvccpb.PUT: //新增或修改 s.SetServiceList(string(ev.Kv.Key), string(ev.Kv.Value)) case mvccpb.DELETE: //刪除 s.DelServiceList(string(ev.Kv.Key)) } } } } //SetServiceList 新增服務地址 func (s *ServiceDiscovery) SetServiceList(key, val string) { s.lock.Lock() defer s.lock.Unlock() s.serverList[key] = resolver.Address{Addr: val} s.cc.NewAddress(s.getServices()) log.Println("put key :", key, "val:", val) } //DelServiceList 刪除服務地址 func (s *ServiceDiscovery) DelServiceList(key string) { s.lock.Lock() defer s.lock.Unlock() delete(s.serverList, key) s.cc.NewAddress(s.getServices()) log.Println("del key:", key) } //GetServices 獲取服務地址 func (s *ServiceDiscovery) getServices() []resolver.Address { addrs := make([]resolver.Address, 0, len(s.serverList)) for _, v := range s.serverList { addrs = append(addrs, v) } return addrs }
代碼主要修改以下地方:
-
把獲取的服務地址轉成
resolver.Address
,供gRPC客戶端連接。 -
根據
schema
的定義規則,修改key
格式。
服務註冊主要修改 key
存儲格式, register.go
package etcdv3 import ( "context" "log" "time" "go.etcd.io/etcd/clientv3" ) //ServiceRegister 創建租約註冊服務 type ServiceRegister struct { cli *clientv3.Client //etcd client leaseID clientv3.LeaseID //租約ID //租約keepalieve相應chan keepAliveChan <-chan *clientv3.LeaseKeepAliveResponse key string //key val string //value } //NewServiceRegister 新建註冊服務 func NewServiceRegister(endpoints []string, serName, addr string, lease int64) (*ServiceRegister, error) { cli, err := clientv3.New(clientv3.Config{ Endpoints: endpoints, DialTimeout: 5 * time.Second, }) if err != nil { log.Fatal(err) } ser := &ServiceRegister{ cli: cli, key: "/" + schema + "/" + serName + "/" + addr, val: addr, } //申請租約設置時間keepalive if err := ser.putKeyWithLease(lease); err != nil { return nil, err } return ser, nil } //設置租約 func (s *ServiceRegister) putKeyWithLease(lease int64) error { //設置租約時間 resp, err := s.cli.Grant(context.Background(), lease) if err != nil { return err } //註冊服務並綁定租約 _, err = s.cli.Put(context.Background(), s.key, s.val, clientv3.WithLease(resp.ID)) if err != nil { return err } //設置續租 定期發送需求請求 leaseRespChan, err := s.cli.KeepAlive(context.Background(), resp.ID) if err != nil { return err } s.leaseID = resp.ID s.keepAliveChan = leaseRespChan log.Printf("Put key:%s val:%s success!", s.key, s.val) return nil } //ListenLeaseRespChan 監聽 續租情況 func (s *ServiceRegister) ListenLeaseRespChan() { for leaseKeepResp := range s.keepAliveChan { log.Println("續約成功", leaseKeepResp) } log.Println("關閉續租") } // Close 註銷服務 func (s *ServiceRegister) Close() error { //撤銷租約 if _, err := s.cli.Revoke(context.Background(), s.leaseID); err != nil { return err } log.Println("撤銷租約") return s.cli.Close() }
客戶端修改gRPC連接服務的部分代碼即可:
func main() { r := etcdv3.NewServiceDiscovery(EtcdEndpoints) resolver.Register(r) // 連接服務器 conn, err := grpc.Dial(r.Scheme()+"://8.8.8.8/simple_grpc", grpc.WithBalancerName("round_robin"), grpc.WithInsecure()) if err != nil { log.Fatalf("net.Connect err: %v", err) } defer conn.Close() // 建立gRPC連接 grpcClient = pb.NewSimpleClient(conn)
gRPC內置了簡單的負載均衡策略 round_robin
,根據負載均衡地址,以輪詢的方式進行調用服務。
服務端啓動時,把服務地址註冊到 etcd
中即可:
func main() { // 監聽本地端口 listener, err := net.Listen(Network, Address) if err != nil { log.Fatalf("net.Listen err: %v", err) } log.Println(Address + " net.Listing...") // 新建gRPC服務器實例 grpcServer := grpc.NewServer() // 在gRPC服務器註冊我們的服務 pb.RegisterSimpleServer(grpcServer, &SimpleService{}) //把服務註冊到etcd ser, err := etcdv3.NewServiceRegister(EtcdEndpoints, SerName, Address, 5) if err != nil { log.Fatalf("register service err: %v", err) } defer ser.Close() //用服務器 Serve() 方法以及我們的端口信息區實現阻塞等待,直到進程被殺死或者 Stop() 被調用 err = grpcServer.Serve(listener) if err != nil { log.Fatalf("grpcServer.Serve err: %v", err) } }
運行效果
我們先啓動並註冊三個服務
然後客戶端進行調用
看服務端接收到的請求
關閉 localhost:8000
服務,剩餘 localhost:8001
和 localhost:8002
服務接收請求
重新打開 localhost:8000
服務
可以看到,gRPC客戶端負載均衡運行良好。
總結
本文介紹了gRPC客戶端負載均衡的實現,它簡單實現了gRPC負載均衡的功能。但在對接其他語言時候比較麻煩,需要每種語言都實現一套服務發現和負載策略,且如果要較爲複雜的負載策略,需要修改客戶端代碼才能完成。
下篇將介紹如何實現官方推薦的負載均衡策略( External Load Balancing Service
)。
源碼地址:https://github.com/Bingjian-Zhu/etcd-example
參考:
-
https://segmentfault.com/a/1190000008672912
-
https://github.com/wothing/wonaming