一句话告诉我Curator是什么?
(Zookeeper的背景就不在这里介绍了.)
Curator是对ZK的高阶封装. 与操作原生的Zookeeper相比, 它提供了对ZK的完美封装, 简化了对集群的连接, 错误的处理; 实现了一系列经典"模式", 比如分布式锁, Leader选举等.
既然是为了简单
首先, 获取一个连接实例(CuratorFramework). 对于每个集群, 只需要创建一个实例.
CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy)
这里我们需要指定一个retryPolicy, 这里用一个指数补偿策略:
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3)
CuratorFramework client = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy);
client.start();
然后我们就可以调用Zookeeper的一系列操作了, 比如创建:
client.create().forPath("/my/path", myData)
这里如果出现了连接问题,curator也会帮你自动重连. 所以, 很简单咯?
我们继续, C/R/U/D:
// this will create the given ZNode with the given data
client.create().forPath(path, payload);
// set data for the given node
client.setData().forPath(path, payload);
// set data for the given node asynchronously. The completion notification
// is done via the CuratorListener.
client.setData().inBackground().forPath(path, payload);
// delete the given node
client.delete().forPath(path);
public static List<String> watchedGetChildren(CuratorFramework client, String path) throws Exception
{
/**
* Get children and set a watcher on the node. The watcher notification will come through the
* CuratorListener (see setDataAsync() above).
*/
return client.getChildren().watched().forPath(path);
}
模式
分布式锁
InterProcessMutex lock = new InterProcessMutex(client, lockPath);
if ( lock.acquire(maxWait, waitUnit) )
{
try
{
// do some work inside of the critical section here
}
finally
{
lock.release();
}
}
Leader选举
LeaderSelectorListener listener = new LeaderSelectorListenerAdapter()
{
public void takeLeadership(CuratorFramework client) throws Exception
{
// this callback will get called when you are the leader
// do whatever leader work you need to and only exit
// this method when you want to relinquish leadership
}
}
LeaderSelector selector = new LeaderSelector(client, path, listener);
selector.autoRequeue(); // not required, but this is behavior that you will probably expect
selector.start();
事务
Collection<CuratorTransactionResult> results = client.inTransaction()
.create().forPath("/a/path", "some data".getBytes())
.and()
.setData().forPath("/another/path", "other data".getBytes())
.and()
.delete().forPath("/yet/another/path")
.and()
.commit(); // IMPORTANT! The transaction is not submitted until commit() is called
附录
模块 | 简介 |
---|---|
Framework | 简化Zookeeper的使用,提供了一系列高阶API, 在Zookeeper集群连接的管理, 以及重连操作上化繁为简. |
Receipts | 实现了一些ZooKeeper的基本"模式", 该实现基于Curator Framework. |
Utilities | 一系列工具 |
Client | ZooKeeper类的替代 |
Errors | 定义了curator如何处理错误,连接问题,可恢复的异常等. |
Extensions | 实现了一些Zookeeper文档中提及的通用模式. |
版本
- Curator 2.x.x - ZooKeeper 3.4.x, ZooKeeper 3.5.x
- Curator 3.x.x - 只兼容ZooKeeper 3.5.x; 支持dynamic reconfiguration等新feature.