zookeeper的一篇概述
摘要:PS: :到了這個階段,ZK集羣才正式對外提供服務,並且Leader可以進行消息廣播,如果有新節點加入,還需要進行同步。System.out.println("----------zk節點數據:" + new String(bytes) + "------------")。
之前在公司由於業務需要,對zookeeper進行了一些知識點的梳理進行分享,對一些剛剛接觸zookeeper的小夥伴來說,或許可以借鑑一下
一、ZOOKEEPER介紹
簡介
Zookeeper致力於提供一個 高性能 、 高可用 ,且具備 嚴格的順序訪問 控制能力的分佈式協調服務。
設計目標
- 簡單的數據結構: 共享的樹形結構,類似文件系統,存儲於內存;
- 可以構建集羣: 避免單點故障,3-5臺機器就可以組成集羣,超過半數,正常工作就能對外提供服務;
- 順序訪問: 對於每個寫請求,zk會分配一個全局唯一的遞增編號,利用 這個特性可以實現高級協調服務;
- 高性能: 基於內存操作,服務於非事務請求,適用於讀操作爲主的業務 場景。3臺zk集羣能達到13w QPS;
應用場景
- 數據發佈訂閱
- 負載均衡
- 命名服務
- Master選舉
- 集羣管理
- 配置管理
- 分佈式隊列
- 分佈式鎖
二、zookeeper特性
會話(session):客戶端與服務端的一次會話連接,本質是TCP長連接,通過會話可以進行心跳檢測和數據傳輸;
數據節點(znode)
- 持久節點(PERSISTENT)
- 持久順序節點(PERSISTENT_SEQUENTIAL)
- 臨時節點(EPHEMERAL)
-
臨時順序節點(EPHEMERAL_SEQUENTIAL)
對於持久節點和臨時節點,同一個znode下,節點的名稱是唯一的:[center red 20px]
Watcher 事件監聽器:客戶端可以在節點上註冊監聽器,當特定的事件發生後,zk會通知到感興趣的客戶端。
EventType: NodeCreated、NodeDeleted、NodeDataChanged、NodeChildrenChange
ACL:Zk採用ACL(access control lists)策略來控制權限
權限類型:create,read,write,delete,admin
三、zookeeper常用命令
- 啓動ZK服務: bin/zkServer.sh start
- 查看ZK服務狀態:bin/zkServer.sh status
- 停止ZK服務: bin/zkServer.sh stop
- 重啓ZK服務: bin/zkServer.sh restart
- 客戶端連接:zkCli.sh -server 127.0.0.1:2181
- 顯示目錄:ls /
- 創建:create /zk “test”
- 獲得值:get /zk
- 修改值:set /zk “test”
- 刪除:delete /zk
-
ACL:
- getAcl / setAcl
- addauth
四、zookeeper的java客戶端
<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>2.12.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>2.12.0</version> </dependency>
public class App { public static void main(String[] args) throws Exception { String connectString = "211.159.174.226:2181"; RetryPolicy retryPolicy = getRetryPolicy(); CuratorFramework client = CuratorFrameworkFactory.newClient(connectString, 5000, 5000, retryPolicy); client.start(); //增刪改查 client.create().withMode(CreateMode.PERSISTENT).forPath("/test-Curator-PERSISTENT-nodata"); client.create().withMode(CreateMode.PERSISTENT).forPath("/test-Curator-PERSISTENT-data", "test-Curator-PERSISTENT-data".getBytes()); client.create().withMode(CreateMode.EPHEMERAL).forPath("/test-Curator-EPHEMERAL-nodata"); client.create().withMode(CreateMode.EPHEMERAL).forPath("/test-Curator-EPHEMERAL-data", "/test-Curator-EPHEMERAL-data".getBytes()); for (int i = 0; i < 5; i++) { client.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath("/test-Curator-PERSISTENT_SEQUENTIAL-nodata"); } byte[] bytes = client.getData().forPath("/test-Curator-PERSISTENT-data"); System.out.println("----------zk節點數據:" + new String(bytes) + "------------"); client.create().withMode(CreateMode.PERSISTENT).forPath("/test-listener", "test-listener".getBytes()); final NodeCache nodeCache = new NodeCache(client, "/test-listener"); nodeCache.start(); NodeCacheListener listener = new NodeCacheListener() { @Override public void nodeChanged() throws Exception { System.out.println("node changed : " + nodeCache.getCurrentData()); } }; nodeCache.getListenable().addListener(listener); client.setData().forPath("/test-listener", "/test-listener-change".getBytes()); } /** * RetryOneTime: 只重連一次. * RetryNTime: 指定重連的次數N. * RetryUtilElapsed: 指定最大重連超時時間和重連時間間隔,間歇性重連直到超時或者鏈接成功. * ExponentialBackoffRetry: 基於"backoff"方式重連,和RetryUtilElapsed的區別是重連的時間間隔是動態的 * BoundedExponentialBackoffRetry: 同ExponentialBackoffRetry,增加了最大重試次數的控制. */ public static RetryPolicy getRetryPolicy() { return new ExponentialBackoffRetry(1000, 3); } }
五、分佈式鎖
public class ZookeeperLock { private final String lockPath = "/distributed-lock"; private String connectString; private RetryPolicy retry; private CuratorFramework client; private InterProcessLock interProcessMutex; public void init() throws Exception { connectString = "211.159.174.226:2181"; retry = new ExponentialBackoffRetry(1000, 3); client = CuratorFrameworkFactory.newClient(connectString, 60000, 15000, retry); client.start(); //共享可重入鎖 interProcessMutex = new InterProcessMutex(client,lockPath); } public void lock(){ try { interProcessMutex.acquire(); } catch (Exception e) { System.out.println("鎖失敗了,真慘"); } } public void unlock(){ try { interProcessMutex.release(); } catch (Exception e) { System.out.println("釋放失敗了,更慘"); } } public static void main(String[] args) throws Exception { final ZookeeperLock zookeeperLock = new ZookeeperLock(); zookeeperLock.init(); Executor executor = Executors.newFixedThreadPool(5); for (int i = 0;i<50;i++) { executor.execute(new Runnable() { @Override public void run() { zookeeperLock.lock(); Long time = System.nanoTime(); System.out.println(time); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(time); zookeeperLock.unlock(); } }); } while (true){ } } }
六、zab協議
-
ZAB協議所定義的三種節點狀態
- Looking :選舉狀態。
- Following :Follower節點(從節點)所處的狀態。
- Leading :Leader節點(主節點)所處狀態。
-
Zxid(64位的數據結構)
前32位:Leader 週期編號 myid
低32位:事務的自增序列(單調遞增的序列)只要客戶端有請求,就+1
當產生新Leader的時候,就從這個Leader服務器上取出本地log中最大事務zxid,從裏面讀出epoch+1,作爲一個新epoch,並將低32位置0(保證id絕對自增)。 -
崩潰恢復
- 每個server都有一張選票<myid,zxid>,選票投自己。
- 蒐集各個服務器的投票。
- 比較投票,比較邏輯:優先比較zxid,然後才比較myid。
-
改變服務器狀態(崩潰恢復=》數據同步,或者崩潰恢復=》消息廣播)
-
消息廣播(類似2P提交):
- Leader接受請求後,講這個請求賦予全局的唯一64位自增Id(zxid)。
- 將zxid作爲議案發給所有follower。
- 所有的follower接受到議案後,想將議案寫入硬盤後,馬上回復Leader一個ACK(OK)。
- 當Leader接受到合法數量Acks,Leader給所有follower發送commit命令。
- follower執行commit命令。
- PS: :到了這個階段,ZK集羣才正式對外提供服務,並且Leader可以進行消息廣播,如果有新節點加入,還需要進行同步。