本文来介绍原生的API来操作ZK节点,从而引出下节要说明的Curator客户端。
环境准备
新建一个普通的java项目即可,然后引入一些jar包的依赖:
代码在code-for-chapter9
客户端与ZK建立连接
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper;
import java.io.IOException;
public class ZKConnect implements Watcher {
public static void main(String[] args) throws IOException { String serverPath = "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183";
ZooKeeper zk = new ZooKeeper(serverPath,5*1000,new ZKConnect());
System.out.println("客户端开始连接zk...,连接状态为:"+zk.getState());
try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); }
System.out.println("连接状态为:"+zk.getState());
}
@Override public void process(WatchedEvent watchedEvent) { System.out.println("接受到的watch通知:"+watchedEvent); } }
|
会话重连机制
主要的思路就是上一个程序注释中所描述:用上一次连接的sessionId和sessionPasswd这两个参数代入到下次连接中,就可以重新获取上一次的连接了。
下面就是不断地看seesionId来判断是否为同一个连接。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
| public class ZKConnectSessionWatcher implements Watcher {
public static void main(String[] args) throws IOException { String serverPath = "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183"; long sessionId = 999L; byte[] sessionPasswd = null;
ZooKeeper zk = new ZooKeeper(serverPath,5*1000,new ZKConnectSessionWatcher());
try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } if (zk.getState().equals(ZooKeeper.States.CONNECTED)){ sessionId = zk.getSessionId(); System.out.println(sessionId);
String ssid = "0x" + Long.toHexString(sessionId); System.out.println(ssid);
sessionPasswd = zk.getSessionPasswd(); }
System.out.println("会话重连...");
ZooKeeper zkSession = new ZooKeeper(serverPath,5*1000,new ZKConnectSessionWatcher(),sessionId,sessionPasswd);
try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); }
System.out.println("重连之后的sessionId为:"+zkSession.getSessionId()); }
@Override public void process(WatchedEvent watchedEvent) { System.out.println("接受到的watch通知:"+watchedEvent); } }
|
节点的增删改查
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121
| public class ZKNodeOperator implements Watcher {
final static String serverPath = "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183";
private ZooKeeper zooKeeper = null;
public ZooKeeper getZooKeeper(){ return zooKeeper; }
public void setZooKeeper(ZooKeeper zooKeeper){ this.zooKeeper = zooKeeper; }
public ZKNodeOperator(){}
public ZKNodeOperator(String connectionString,int sessionTimeout){ try { zooKeeper = new ZooKeeper(connectionString,sessionTimeout,new ZKNodeOperator()); } catch (IOException e) { e.printStackTrace(); if(zooKeeper != null){ try { zooKeeper.close(); } catch (InterruptedException e1) { e1.printStackTrace(); } } } }
public static void main(String[] args) { ZKNodeOperator zkServer = new ZKNodeOperator(serverPath,5*1000);
String ctx = "{'delete':'success'}"; zkServer.getZooKeeper().delete("/testNode1",0,new DeleteCallBack(),ctx); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } }
@Override public void process(WatchedEvent watchedEvent) { System.out.println("watch被触发..."+watchedEvent); }
}
|
countDownLatch
上面学习了对于节点的增删改,还差一个查,这里先学习一下countDownLatch
:
demo的场景是:有一个监控中心,监控很多地方的调度中心的情况,每检查一个,就返回一个状态,直到所有的调度中心都检查完。
代码在文件夹countdownlatchdemo
.
这个玩意就是一个计数器。引入这个玩意,是为了配合下面的案例,使得线程能挂起,我们可以测试数据变化一下,然后触发watcher,拿到变化后的值,然后主线程执行结束。
获取父节点数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71
| public class ZKGetNodeData implements Watcher {
private ZooKeeper zooKeeper;
final static String serverPath = "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183";
private static Stat stat = new Stat();
private static CountDownLatch countDownLatch = new CountDownLatch(1);
public ZKGetNodeData(){}
public ZKGetNodeData(String connectionString,int sessionTimeout){ try { zooKeeper = new ZooKeeper(connectionString,sessionTimeout,new ZKGetNodeData()); } catch (IOException e) { e.printStackTrace(); if(zooKeeper != null){ try { zooKeeper.close(); } catch (InterruptedException e1) { e1.printStackTrace(); } } } }
public static void main(String[] args) throws InterruptedException, KeeperException { ZKGetNodeData zkServer = new ZKGetNodeData(serverPath,5*1000);
byte[] resByte = zkServer.getZooKeeper().getData("/hello",true,stat); String result = new String(resByte); System.out.println("当前值:"+result); countDownLatch.await(); }
@Override public void process(WatchedEvent event) { try { if(event.getType() == Event.EventType.NodeDataChanged){ ZKGetNodeData zkServer = new ZKGetNodeData(serverPath,5*1000);
byte[] resByte = zkServer.getZooKeeper().getData("/hello",true,stat); String result = new String(resByte);
System.out.println("更改后的值:"+result); System.out.println("版本:"+stat.getVersion());
countDownLatch.countDown(); }else if(event.getType() == Event.EventType.NodeCreated){
}else if(event.getType() == Event.EventType.NodeChildrenChanged){
}else if(event.getType() == Event.EventType.NodeDeleted){
} }catch (Exception e){ e.printStackTrace(); } }
public ZooKeeper getZooKeeper() { return zooKeeper; }
public void setZooKeeper(ZooKeeper zooKeeper) { this.zooKeeper = zooKeeper; } }
|
获取子节点数据
基本同上,见代码ZKGetChildrenList
判断节点是否存在
基本同上,见代码ZKNodeExist
。