Apache Curator是用於Apache ZooKeeper的一個Java客戶端庫;它包括一個高級API框架和實用程序,使使用Apache ZooKeeper更加容易和可靠。Curator之於ZooKeeper就像Cuava之於Java。

本文件主要介紹使用Curator操作Zookeeper,文中所使用到的軟件版本:Java 1.8.0_191、Zookeeper 3.6.0、Junit 4.13。

1、引入依賴

<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.6.0</version>
</dependency>
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>4.3.0</version>
</dependency>
<dependency>
    <groupId>junit</groupId>
    <artifactId>junit</artifactId>
    <version>4.13</version>
</dependency>

2、基本操作

package com.inspur.demo.general.zookeeper;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;

/**
 * 使用Curator操作Zookeeper
 */
public class CuratorCase {
    //Zookeeper地址,集羣多個地址間用逗號分隔,如:10.49.196.10:2181,10.49.196.11:2181,10.49.196.12:2181
    private static String connectString = "10.49.196.10:2181";
    private static int sessionTimeout = 20 * 1000;
    private static int connectionTimeout = 10 * 1000;

    private CuratorFramework cf;
    @Before
    public void before() {
        RetryPolicy retryPolicy  = new ExponentialBackoffRetry(1000,3);
        cf = CuratorFrameworkFactory.builder()
                .connectString(connectString)
                .sessionTimeoutMs(sessionTimeout)
                .connectionTimeoutMs(connectionTimeout)
                .retryPolicy(retryPolicy)
                .build();
        cf.start();
    }

    @After
    public void after() throws Exception {
        cf.close();
    }

    /**
     * 創建節點
     */
    @Test
    public void create() throws Exception {
        /*
         * 同步創建節點
         * 1.除非指明創建節點的類型,默認是持久節點
         * 2.臨時節點沒有子節點;所以遞歸創建出來的節點,只有最後的數據節點纔是指定類型的節點,其父節點是持久節點
         */
        //創建一個內容爲空的節點
        cf.create().forPath("/curator/node1");
        //創建一個內容爲aaa的節點
        cf.create().forPath("/curator/node2", "aaa".getBytes());
        //創建一個臨時節點
        cf.create().withMode(CreateMode.EPHEMERAL).forPath("/curator/node3");
        //遞歸創建,最後的節點類型爲臨時節點
        cf.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath("/curator/node4/a/b");
        //創建一個節點,ACL爲digest:jack:tgi9UCnyPo5FJjVylKr05nAlWeg=:cdrwa
        cf.create().withACL(Collections.singletonList(new ACL(ZooDefs.Perms.ALL, new Id("digest", "jack:tgi9UCnyPo5FJjVylKr05nAlWeg=")))).forPath("/curator/node5");

        /*
         * 異步創建節點
         *  可以指定線程池,不指定則使用Zookeeper的EventThread線程對事件進行串行處理
         */
        CountDownLatch counter = new CountDownLatch(2);
        cf.create().inBackground(new BackgroundCallback(){
            @Override
            public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
                System.out.println(event);
                counter.countDown();
            }
        }, Executors.newFixedThreadPool(1)).forPath("/curator/node6");
        cf.create().inBackground(new BackgroundCallback(){
            @Override
            public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
                System.out.println(event);
                counter.countDown();
            }
        }).forPath("/curator/node7");
        counter.await();
    }

    /**
     * 獲取節點內容
     * @throws Exception
     */
    @Test
    public void getData() throws Exception {
        Stat stat = new Stat();
        byte[] bytes = cf.getData()
                .storingStatIn(stat)//狀態,可選
                .forPath("/curator/node2");
        System.out.println("狀態信息:" + stat);
        System.out.println("內容:" + new String(bytes));

        //異步獲取數據
        CountDownLatch counter = new CountDownLatch(1);
        cf.getData().inBackground(new BackgroundCallback(){
            @Override
            public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
                System.out.println("event:" + event);
                System.out.println("內容:"+ new String(event.getData()));
                counter.countDown();
            }
        }).forPath("/curator/node2");
        counter.await();
    }

    /**
     * 設置節點的值
     * @throws Exception
     */
    @Test
    public void setData() throws Exception {
        cf.setData()
                .withVersion(0) //指定版本,可選
                .forPath("/curator/node2", "測試修改".getBytes());
    }

    /**
     * 刪除節點
     * @throws Exception
     */
    @Test
    public void delete() throws Exception {
        cf.delete()
                .guaranteed() //如果刪除失敗,只要會話有效就會不斷的重試,直到刪除成功爲止
                .deletingChildrenIfNeeded()//刪除子節點,可選
                .withVersion(0) //指定版本,可選
                .forPath("/curator/node4");
    }

    /**
     * 獲取子節點
     * @throws Exception
     */
    @Test
    public void getChildren() throws Exception {
        List<String> list = cf.getChildren().forPath("/curator");
        System.out.println("子節點:" + list);
    }
}

3、監控數據變化

package com.inspur.demo.general.zookeeper;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.*;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.util.concurrent.*;

public class CuratorWatchCase {
    //Zookeeper地址,集羣多個地址間用逗號分隔,如:10.49.196.10:2181,10.49.196.11:2181,10.49.196.12:2181
    private static String connectString = "10.49.196.10:2181";
    private static int sessionTimeout = 20 * 1000;
    private static int connectionTimeout = 10 * 1000;

    private CuratorFramework cf;
    @Before
    public void before() {
        RetryPolicy retryPolicy  = new ExponentialBackoffRetry(1000,3);
        cf = CuratorFrameworkFactory.builder()
                .connectString(connectString)
                .sessionTimeoutMs(sessionTimeout)
                .connectionTimeoutMs(connectionTimeout)
                .retryPolicy(retryPolicy)
                .build();
        cf.start();
    }

    @After
    public void after() throws Exception {
        cf.close();
    }

    /**
     * 監控節點變化
     * @throws Exception
     */
    @Test
    public void watchNode() throws Exception {
        CountDownLatch counter = new CountDownLatch(1);

        NodeCache cache = new NodeCache(cf, "/curator/node2", false);
        cache.start();
        cache.getListenable().addListener(new NodeCacheListener() {
            @Override
            public void nodeChanged() throws Exception {
                System.out.println("路徑爲:" + cache.getCurrentData().getPath());
                System.out.println("數據爲:" + new String(cache.getCurrentData().getData()));
                System.out.println("狀態爲:" + cache.getCurrentData().getStat());

                //某種情況下退出監控
                //if (...) {
                //    counter.countDown();
                //}
            }
        });

        counter.await();
    }

    /**
     * 監控子節點變化
     * @throws Exception
     */
    @Test
    public void watchChildren() throws Exception {
        //使用自定義的線程池
        ExecutorService threadPool = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>(32), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
        CountDownLatch counter = new CountDownLatch(1);

        PathChildrenCache cache = new PathChildrenCache(cf, "/curator/node2", true);
        cache.start();
        cache.getListenable().addListener(new PathChildrenCacheListener() {
            @Override
            public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
                switch (event.getType()) {
                    case CHILD_ADDED:
                        System.out.println("CHILD_ADDED");
                        break;
                    case CHILD_UPDATED:
                        System.out.println("CHILD_UPDATED");
                        break;
                    case CHILD_REMOVED:
                        System.out.println("CHILD_REMOVED");
                        break;
                    default:
                        System.out.println(event.getType());
                }
                System.out.println("子節點信息:" + event.getData());

                //某種情況下退出監控
                //if (...) {
                //    counter.countDown();
                //}
            }
        }, threadPool);

        counter.await();
        threadPool.shutdownNow();
    }
}

可以看到不管是基本的增刪改查還是監控數據變化,Curator都比原生的API好用很多。

相關文章