01 ZooKeeper 数据模型:节点的特性与应用

ZooKeeper 基础知识基本分为三大模块:

  • 数据模型
  • ACL 权限控制
  • Watch 监控

其中,数据模型是最重要的,很多 ZooKeeper 中典型的应用场景都是利用这些基础模块实现的。比如我们可以利用数据模型中的临时节点和 Watch 监控机制来实现一个发布订阅的功能。

数据模型

计算机最根本的作用其实就是处理和存储数据,作为一款分布式一致性框架,ZooKeeper 也是如此。数据模型就是 ZooKeeper 用来存储和处理数据的一种逻辑结构。就像我们用 MySQL 数据库一样,要想处理复杂业务。前提是先学会如何往里边新增数据。ZooKeeper 数据模型最根本的功能就像一个数据库。

现在,数据模型对我们来说还是一个比较抽象的概念,接下来我们开始部署一个开发测试环境,并在上面做一些简单的操作。来看看 ZooKeeper 的数据模型究竟是什么样的:

  1. 配置文件
1
2
3
4
5
tickTime=2000

dataDir=/var/lib/zookeeper

clientPort=2181
  1. 服务启动
1
bin/zkServer.sh start
  1. 使用客户端连接服务器
1
bin/zkCli.sh -server 127.0.0.1:2181
  1. 这样单机版的开发环境就已经构建完成了,接下来我们通过 ZooKeeper 提供的 create 命令来创建几个节点,分别是:“/locks”“/servers”“/works”:
1
2
3
4
5
create /locks

create /servers

create /works

最终在 ZooKeeper 服务器上会得到一个具有层级关系的数据结构,如下图所示,这个数据结构就是 ZooKeeper 中的数据模型。

image-20240601214357114

ZooKeeper 中的数据模型是一种树形结构,非常像电脑中的文件系统,有一个根文件夹,下面还有很多子文件夹。ZooKeeper 的数据模型也具有一个固定的根节点(/),我们可以在根节点下创建子节点,并在子节点下继续创建下一级节点。ZooKeeper 树中的每一层级用斜杠(/)分隔开,且只能用绝对路径(如“get /work/task1”)的方式查询 ZooKeeper 节点,而不能使用相对路径。具体的结构你可以看看下面这张图:

image-20240601214410026

znode 节点类型与特性

知道了 ZooKeeper 的数据模型是一种树形结构,就像在 MySQL 中数据是存在于数据表中,ZooKeeper 中的数据是由多个数据节点最终构成的一个层级的树状结构,和我们在创建 MySOL 数据表时会定义不同类型的数据列字段,ZooKeeper 中的数据节点也分为持久节点、临时节点和有序节点三种类型:

1、持久节点

我们第一个介绍的是持久节点,这种节点也是在 ZooKeeper 最为常用的,几乎所有业务场景中都会包含持久节点的创建。之所以叫作持久节点是因为一旦将节点创建为持久节点,该数据节点会一直存储在 ZooKeeper 服务器上,即使创建该节点的客户端与服务端的会话关闭了,该节点依然不会被删除。如果我们想删除持久节点,就要显式调用 delete 函数进行删除操作。

2、临时节点

接下来我们来介绍临时节点。从名称上我们可以看出该节点的一个最重要的特性就是临时性。所谓临时性是指,如果将节点创建为临时节点,那么该节点数据不会一直存储在 ZooKeeper 服务器上。当创建该临时节点的客户端会话因超时或发生异常而关闭时,该节点也相应在 ZooKeeper 服务器上被删除。同样,我们可以像删除持久节点一样主动删除临时节点。

在平时的开发中,我们可以利用临时节点的这一特性来做服务器集群内机器运行情况的统计,将集群设置为“/servers”节点,并为集群下的每台服务器创建一个临时节点“/servers/host”,当服务器下线时该节点自动被删除,最后统计临时节点个数就可以知道集群中的运行情况。如下图所示:

image-20240601214425685

3、有序节点

最后我们再说一下有序节点,其实有序节点并不算是一种单独种类的节点,而是在之前提到的持久节点和临时节点特性的基础上,增加了一个节点有序的性质。所谓节点有序是说在我们创建有序节点的时候,ZooKeeper 服务器会自动使用一个单调递增的数字作为后缀,追加到我们创建节点的后边。例如一个客户端创建了一个路径为 works/task- 的有序节点,那么 ZooKeeper 将会生成一个序号并追加到该节点的路径后,最后该节点的路径为 works/task-1。通过这种方式我们可以直观的查看到节点的创建顺序。

到目前为止我们知道在 ZooKeeper 服务器上存储数据的基本信息,知道了 ZooKeeper 中的数据节点种类有持久节点和临时节点等。上述这几种数据节点虽然类型不同,但 ZooKeeper 中的每个节点都维护有这些内容:一个二进制数组(byte data[]),用来存储节点的数据、ACL 访问控制信息、子节点数据(因为临时节点不允许有子节点,所以其子节点字段为 null),除此之外每个数据节点还有一个记录自身状态信息的字段 stat。

下面我们详细说明节点的状态信息。

节点的状态结构

每个节点都有属于自己的状态信息,这就很像我们每个人的身份信息一样,我们打开之前的客户端,执行 stat /zk_test,可以看到控制台输出了一些信息,这些就是节点状态信息。

image-20240601214440092

每一个节点都有一个自己的状态属性,记录了节点本身的一些信息,这些属性包括的内容我列在了下面这个表格里:

image-20240601214456572

数据节点的版本

这里我们重点讲解一下版本相关的属性,在 ZooKeeper 中为数据节点引入了版本的概念,每个数据节点有 3 种类型的版本信息,对数据节点的任何更新操作都会引起版本号的变化。ZooKeeper 的版本信息表示的是对节点数据内容、子节点信息或者是 ACL 信息的修改次数。

使用 ZooKeeper 实现锁

学习了 ZooKeeper 的数据模型和数据节点的相关知识,下面我们通过实际的应用进一步加深理解。

设想这样一个情景:一个购物网站,某个商品库存只剩一件,客户 A 搜索到这件商品并准备下单,但在这期间客户 B 也查询到了该件商品并提交了购买,于此同时,客户 A 也下单购买了此商品,这样就出现了只有一件库存的商品实际上卖出了两件的情况。为了解决这个问题,我们可以在客户 A 对商品进行操作的时候对这件商品进行锁定从而避免这种超卖的情况发生。

实现锁的方式有很多中,这里我们主要介绍两种:悲观锁、乐观锁。

悲观锁 悲观锁认为进程对临界区的竞争总是会出现,为了保证进程在操作数据时,该条数据不被其他进程修改。数据会一直处于被锁定的状态。

我们假设一个具有 n 个进程的应用,同时访问临界区资源,我们通过进程创建 ZooKeeper 节点 /locks 的方式获取锁。

线程 a 通过成功创建 ZooKeeper 节点“/locks”的方式获取锁后继续执行,如下图所示:

image-20240601214512591

这时进程 b 也要访问临界区资源,于是进程 b 也尝试创建“/locks”节点来获取锁,因为之前进程 a 已经创建该节点,所以进程 b 创建节点失败无法获得锁。

image-20240601214529182

这样就实现了一个简单的悲观锁,不过这也有一个隐含的问题,就是当进程 a 因为异常中断导致 /locks 节点始终存在,其他线程因为无法再次创建节点而无法获取锁,这就产生了一个死锁问题。针对这种情况我们可以通过将节点设置为临时节点的方式避免。并通过在服务器端添加监听事件来通知其他进程重新获取锁。

乐观锁 乐观锁认为,进程对临界区资源的竞争不会总是出现,所以相对悲观锁而言。加锁方式没有那么激烈,不会全程的锁定资源,而是在数据进行提交更新的时候,对数据的冲突与否进行检测,如果发现冲突了,则拒绝操作。

**乐观锁基本可以分为读取、校验、写入三个步骤。**CAS(Compare-And-Swap),即比较并替换,就是一个乐观锁的实现。CAS 有 3 个操作数,内存值 V,旧的预期值 A,要修改的新值 B。当且仅当预期值 A 和内存值 V 相同时,将内存值 V 修改为 B,否则什么都不做。

在 ZooKeeper 中的 version 属性就是用来实现乐观锁机制中的“校验”的,ZooKeeper 每个节点都有数据版本的概念,在调用更新操作的时候,假如有一个客户端试图进行更新操作,它会携带上次获取到的 version 值进行更新。而如果在这段时间内,ZooKeeper 服务器上该节点的数值恰好已经被其他客户端更新了,那么其数据版本一定也会发生变化,因此肯定与客户端携带的 version 无法匹配,便无法成功更新,因此可以有效地避免一些分布式更新的并发问题。

在 ZooKeeper 的底层实现中,当服务端处理 setDataRequest 请求时,首先会调用 checkAndIncVersion 方法进行数据版本校验。ZooKeeper 会从 setDataRequest 请求中获取当前请求的版本 version,同时通过 getRecordForPath 方法获取服务器数据记录 nodeRecord, 从中得到当前服务器上的版本信息 currentversion。如果 version 为 -1,表示该请求操作不使用乐观锁,可以忽略版本对比;如果 version 不是 -1,那么就对比 version 和 currentversion,如果相等,则进行更新操作,否则就会抛出 BadVersionException 异常中断操作。

总结

主要介绍了ZooKeeper的基础知识点——数据模型。并深入介绍了节点类型、stat 状态属性等知识,并利用目前学到的知识解决了集群中服务器运行情况统计、悲观锁、乐观锁等问题

为什么 ZooKeeper 不能采用相对路径查找节点呢?

这是因为 ZooKeeper 大多是应用场景是定位数据模型上的节点,并在相关节点上进行操作。像这种查找与给定值相等的记录问题最适合用散列来解决。因此 ZooKeeper 在底层实现的时候,使用了一个 hashtable,即 hashtableConcurrentHashMap nodes ,用节点的完整路径来作为 key 存储节点数据。这样就大大提高了 ZooKeeper 的性能。

02 发布订阅模式:如何使用 Watch 机制实现分布式通知

我们来学习 ZooKeeper 又一关键技术——Watch 监控机制,并用它实现一个发布订阅功能。

在日常生活中也有很多订阅发布的场景。比如我们喜欢观看某一个剧集,视频网站会有一个订阅按钮,用户可以订阅自己喜欢的电视剧,当有新的剧集发布时,网站会通知该用户第一时间观看。或者我们在网站上看到一件心仪的商品,但是当前没有库存,网站会提供到货通知的功能,我们开启这个商品的到货通知功能后,商品补货的时候会通知我们,之后就可以进行购买了。ZooKeeper 中的 Watch 机制很像这些日常的应用场景,其中的客户端就是用户,而服务端的数据节点就好像是我们订阅的商品或剧集。

现在我们可以从技术实现的角度分析一下上边提到的这些场景,无论是订阅一集电视剧还是订购一件商品。都有几个核心节点,即用户端注册服务、服务端处理请求、客户端收到回调后执行相应的操作。接下来我们也带着这个观点来看一下 ZooKeeper 中的 Watch 机制是如何实现的。

Watch 机制是如何实现的

正如我们可以通过点击视频网站上的”收藏“按钮来订阅我们喜欢的内容,ZooKeeper 的客户端也可以通过 Watch 机制来订阅当服务器上某一节点的数据或状态发生变化时收到相应的通知,我们可以通过向 ZooKeeper 客户端的构造方法中传递 Watcher 参数的方式实现:

1
new ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)

上面代码的意思是定义了一个了 ZooKeeper 客户端对象实例,并传入三个参数:

1
2
3
4
5
connectString 服务端地址

sessionTimeout:超时时间

Watcher:监控事件

这个 Watcher 将作为整个 ZooKeeper 会话期间的上下文 ,一直被保存在客户端 ZKWatchManager 的 defaultWatcher 中。

除此之外,ZooKeeper 客户端也可以通过 getData、exists 和 getChildren 三个接口来向 ZooKeeper 服务器注册 Watcher,从而方便地在不同的情况下添加 Watch 事件:

1
getData(String path, Watcher watcher, Stat stat)

知道了 ZooKeeper 添加服务器监控事件的方式,下面我们来讲解一下触发通知的条件。

image-20240601214646207

上图中列出了客户端在不同会话状态下,相应的在服务器节点所能支持的事件类型。例如在客户端连接服务端的时候,可以对数据节点的创建、删除、数据变更、子节点的更新等操作进行监控。

现在我们已经从应用层的角度了解了 ZooKeeper 中的 Watch 机制,而学习 ZooKeeper 过程中一个大问题就是入门容易精通难,像上边我们通过几个简单的 API 调用就可以对服务器的节点状态变更进行监控,但是在实际生产环境中我们会遇到很多意想不到的问题,要想解决好这些问题就要深入理解 Watch 的底层实现机制。

Watch 机制的底层原理

现在我们就深入底层了解其背后的实现原理。与上个课时直接通过底层代码的调用过程来分析不同,在 Watch 底层实现的分析阶段,由于 Watch 机制涉及了客户端和服务端的多个函数和操作节点,单单按照程序执行流程分析跳跃性对整体实现机制的理解难度大,这也是我在学习 Watch 这部分底层实现遇到的问题。为了更好地阐述 Watch 机制,我们另辟蹊径,从设计模式角度出发来分析其底层实现:

image-20240601214659082

【简单说,思考整个过程,就是网络发送,回调处理】

最初我在开始学习 Watch 机制的时候,它给我的第一印象是,其结构很像设计模式中的”观察者模式“,一个对象或者数据节点可能会被多个客户端监控,当对应事件被触发时,会通知这些对象或客户端。我们可以将 Watch 机制理解为是分布式环境下的观察者模式。所以接下来我们就以观察者模式的角度点来看看 ZooKeeper 底层 Watch 是如何实现的。

image-20240601214711961

通常我们在实现观察者模式时,最核心或者说关键的代码就是创建一个列表来存放观察者。 而在 ZooKeeper 中则是在客户端和服务器端分别实现两个存放观察者列表,即:ZKWatchManager 和 WatchManager。其核心操作就是围绕着这两个展开的。

客户端 Watch 注册实现过程

我们先看一下客户端的实现过程,在发送一个 Watch 监控事件的会话请求时,ZooKeeper 客户端主要做了两个工作:

  • 标记该会话是一个带有 Watch 事件的请求
  • 将 Watch 事件存储到 ZKWatchManager

我们以 getData 接口为例。当发送一个带有 Watch 事件的请求时,客户端首先会把该会话标记为带有 Watch 监控的事件请求,之后通过 DataWatchRegistration 类来保存 watcher 事件和节点的对应关系:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public byte[] getData(final String path, Watcher watcher, Stat stat){

...

WatchRegistration wcb = null;

if (watcher != null) {

wcb = new DataWatchRegistration(watcher, clientPath);

}

RequestHeader h = new RequestHeader();

request.setWatch(watcher != null);

...

GetDataResponse response = new GetDataResponse();

ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);

}

之后客户端向服务器发送请求时,是将请求封装成一个 Packet 对象,并添加到一个等待发送队列 outgoingQueue 中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public Packet queuePacket(RequestHeader h, ReplyHeader r,...) {

Packet packet = null;

...

packet = new Packet(h, r, request, response, watchRegistration);

...

outgoingQueue.add(packet);

...

return packet;

}

最后,ZooKeeper 客户端就会向服务器端发送这个请求,完成请求发送后。调用负责处理服务器响应的 SendThread 线程类中的 readResponse 方法接收服务端的回调,并在最后执行 finishPacket()方法将 Watch 注册到 ZKWatchManager 中:

1
2
3
4
5
6
7
8
9
10
11
12
13
private void finishPacket(Packet p) {

int err = p.replyHeader.getErr();

if (p.watchRegistration != null) {

p.watchRegistration.register(err);

}

...

}

服务端 Watch 注册实现过程

介绍完客户端对 Watch 请求的发送过程,下面我们来看一下服务端是如何处理一个 Watch 事件。

Zookeeper 服务端处理 Watch 事件基本有 2 个过程:

  • 解析收到的请求是否带有 Watch 注册事件
  • 将对应的 Watch 事件存储到 WatchManager

下面我们分别对这 2 个步骤进行分析:

当 ZooKeeper 服务器接收到一个客户端请求后,首先会对请求进行解析,判断该请求是否包含 Watch 事件。这在 ZooKeeper 底层是通过 FinalRequestProcessor 类中的 processRequest 函数实现的。当 getDataRequest.getWatch() 值为 True 时,表明该请求需要进行 Watch 监控注册。并通过 zks.getZKDatabase().getData 函数将 Watch 事件注册到服务端的 WatchManager 中。

1
2
3
4
5
6
7
8
9
10
11
12
13
public void processRequest(Request request) {

...

byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat,

getDataRequest.getWatch() ? cnxn : null);

rsp = new GetDataResponse(b, stat);

..

}

服务端 Watch 事件的触发过程

在客户端和服务端都对 watch 注册完成后,我们接下来看一下在 ZooKeeper 中触发一个 Watch 事件的底层实现过程:

我们以 setData 接口即“节点数据内容发生变更”事件为例。在 setData 方法内部执行完对节点数据的变更后,会调用 WatchManager.triggerWatch 方法触发数据变更事件。

1
2
3
4
5
6
7
8
9
10
11
12
13
public Stat setData(String path, byte data[], ...){

Stat s = new Stat();

DataNode n = nodes.get(path);

...

dataWatches.triggerWatch(path, EventType.NodeDataChanged);

return s;

}

下面我们进入 triggerWatch 函数内部来看看他究竟做了哪些工作。首先,封装了一个具有会话状态、事件类型、数据节点 3 种属性的 WatchedEvent 对象。之后查询该节点注册的 Watch 事件,如果为空说明该节点没有注册过 Watch 事件。如果存在 Watch 事件则添加到定义的 Wathcers 集合中,并在 WatchManager 管理中删除。最后,通过调用 process 方法向客户端发送通知。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
Set<Watcher> triggerWatch(String path, EventType type...) {

WatchedEvent e = new WatchedEvent(type,

KeeperState.SyncConnected, path);

Set<Watcher> watchers;

synchronized (this) {

watchers = watchTable.remove(path);

...

for (Watcher w : watchers) {

Set<String> paths = watch2Paths.get(w);

if (paths != null) {

paths.remove(path);

}

}

}

for (Watcher w : watchers) {

if (supress != null && supress.contains(w)) {

continue;

}

w.process(e);

}

return watchers;

}

客户端回调的处理过程

知道了服务器端 Watch 事件的触发过程后,我们来看一下客户端接收到通知后如何进行操作的。

客户端使用 SendThread.readResponse() 方法来统一处理服务端的相应。首先反序列化服务器发送请求头信息 replyHdr.deserialize(bbia, “header”),并判断相属性字段 xid 的值为 -1,表示该请求响应为通知类型。在处理通知类型时,首先将己收到的字节流反序列化转换成 WatcherEvent 对象。接着判断客户端是否配置了 chrootPath 属性,如果为 True 说明客户端配置了 chrootPath 属性。需要对接收到的节点路径进行 chrootPath 处理。最后调用 eventThread.queueEvent( )方法将接收到的事件交给 EventThread 线程进行处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
if (replyHdr.getXid() == -1) {

...

WatcherEvent event = new WatcherEvent();

event.deserialize(bbia, "response");

...

if (chrootPath != null) {

String serverPath = event.getPath();

if(serverPath.compareTo(chrootPath)==0)

event.setPath("/");

...

event.setPath(serverPath.substring(chrootPath.length()));

...

}

WatchedEvent we = new WatchedEvent(event);

...

eventThread.queueEvent( we );

}

接下来我们来看一下 EventThread.queueEvent() 方法内部的执行逻辑。其主要工作分为 2 点: 第 1 步按照通知的事件类型,从 ZKWatchManager 中查询注册过的客户端 Watch 信息。客户端在查询到对应的 Watch 信息后,会将其从 ZKWatchManager 的管理中删除。因此这里也请你多注意,客户端的 Watcher 机制是一次性的,触发后就会被删除。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public Set<Watcher> materialize(...)

{

Set<Watcher> result = new HashSet<Watcher>();

...

switch (type) {

...

case NodeDataChanged:

case NodeCreated:

synchronized (dataWatches) {

addTo(dataWatches.remove(clientPath), result);

}

synchronized (existWatches) {

addTo(existWatches.remove(clientPath), result);

}

break;

....

}

return result;

}

完成了第 1 步工作获取到对应的 Watcher 信息后,将查询到的 Watcher 存储到 waitingEvents 队列中,调用 EventThread 类中的 run 方法会循环取出在 waitingEvents 队列中等待的 Watcher 事件进行处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public void run() {

try {

isRunning = true;

while (true) {

Object event = waitingEvents.take();

if (event == eventOfDeath) {

wasKilled = true;

} else {

processEvent(event);

}

if (wasKilled)

synchronized (waitingEvents) {

if (waitingEvents.isEmpty()) {

isRunning = false;

break;

}

}

}

...

}

最后调用 processEvent(event) 方法来最终执行实现了 Watcher 接口的 process()方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private void processEvent(Object event) {

...

if (event instanceof WatcherSetEventPair) {



WatcherSetEventPair pair = (WatcherSetEventPair) event;

for (Watcher watcher : pair.watchers) {

try {

watcher.process(pair.event);

} catch (Throwable t) {

LOG.error("Error while calling watcher ", t);

}

}

}

}

到目前为止我们将 ZooKeeper 中 Watch 机制的处理过程全部学习了一遍,大体上讲 ZooKeeper 实现的方式是通过客服端和服务端分别创建有观察者的信息列表。客户端调用 getData、exist 等接口时,首先将对应的 Watch 事件放到本地的 ZKWatchManager 中进行管理。服务端在接收到客户端的请求后根据请求类型判断是否含有 Watch 事件,并将对应事件放到 WatchManager 中进行管理。

在事件触发的时候服务端通过节点的路径信息查询相应的 Watch 事件通知给客户端,客户端在接收到通知后,首先查询本地的 ZKWatchManager 获得对应的 Watch 信息处理回调操作。这种设计不但实现了一个分布式环境下的观察者模式,而且通过将客户端和服务端各自处理 Watch 事件所需要的额外信息分别保存在两端,减少彼此通信的内容。大大提升了服务的处理性能。

订阅发布场景实现

现在我们已经知道 Watch 事件在 ZooKeeper 中的完整处理过程,接下来我们通过一个实际应用来加深我们对 ZooKeeper 中 Watch 机制的理解。

提到 ZooKeeper 的应用场景,你可能第一时间会想到最为典型的发布订阅功能。发布订阅功能可以看作是一个一对多的关系,即一个服务或数据的发布者可以被多个不同的消费者调用。一般一个发布订阅模式的数据交互可以分为消费者主动请求生产者信息的拉取模式,和生产者数据变更时主动推送给消费者的推送模式。ZooKeeper 采用了两种模式结合的方式实现订阅发布功能。下面我们来分析一个具体案例:

在系统开发的过程中会用到各种各样的配置信息,如数据库配置项、第三方接口、服务地址等,这些配置操作在我们开发过程中很容易完成,但是放到一个大规模的集群中配置起来就比较麻烦了。通常这种集群中,我们可以用配置管理功能自动完成服务器配置信息的维护,利用ZooKeeper 的发布订阅功能就能解决这个问题。

我们可以把诸如数据库配置项这样的信息存储在 ZooKeeper 数据节点中。如图中的 /confs/data_item1。服务器集群客户端对该节点添加 Watch 事件监控,当集群中的服务启动时,会读取该节点数据获取数据配置信息。而当该节点数据发生变化时,ZooKeeper 服务器会发送 Watch 事件给各个客户端,集群中的客户端在接收到该通知后,重新读取节点的数据库配置信息。

image-20240601214733308

我们使用 Watch 机制实现了一个分布式环境下的配置管理功能,通过对 ZooKeeper 服务器节点添加数据变更事件,实现当数据库配置项信息变更后,集群中的各个客户端能接收到该变更事件的通知,并获取最新的配置信息。要注意一点是,我们提到 Watch 具有一次性,所以当我们获得服务器通知后要再次添加 Watch 事件。

总结

学习了 ZooKeeper 中非常重要的基础知识——Watch 监控机制。详细分析了 ZooKeeper 在处理 Watch 事件的底层实现,并通过我们掌握的知识实现了一个集群环境下的配置管理功能。

现在我有一个思考题留给你:“当服务端某一节点发生数据变更操作时,所有曾经设置了该节点监控事件的客户端都会收到服务器的通知吗?答案是否定的,通过本课时对 ZooKeeper 内部实现机制的解析可以知道,Watch 事件的触发机制取决于会话的连接状态和客户端注册事件的类型,所以当客户端会话状态或数据节点发生改变时,都会触发对应的 Watch 事件。

03 ACL 权限控制:如何避免未经授权的访问?

之前,我们学习了数据模型节点、Watch 监控机制等知识。并利用这些知识实现了在分布式环境中经常用到的诸如分布式锁、配置管理等功能。这些功能的本质都在于操作数据节点,而如果作为分布式锁或配置项的数据节点被错误删除或修改,那么对整个分布式系统有很大的影响,甚至会造成严重的生产事故。而作为在分布式领域应用最为广泛的一致性解决框架,ZooKeeper 提供一个很好的解决方案那就是 ACL 权限控制。

说到 ACL 可能你会觉得陌生,但是提到权限控制相信你一定很熟悉。比如 Linux 系统将对文件的使用者分为三种身份,即 User、Group、Others。使用者对文件拥有读(read) 写(write)以及执行(execute)3 种方式的控制权。这种权限控制方式相对比较粗糙,在复杂的授权场景下往往并不适用。比如下边一个应用场景。

image-20240601214908300

上图给出了某个技术开发公司的一个工作项目 /object 。项目中的每个开发人员都可以读取和修改该项目中的文件,作为开发组长也对这个项目文件具有读取和修改的权限。其他技术开发组的员工则不能访问这个项目。如果我们用之前说到的 Linux 权限应该怎么设计呢?

首先作为技术组长使用 User 身份,具有读、写、执行权限。项目组其他成员使用 Group 身份,具有读写权限,其他项目组的人员则没有任何权限。这样就实现了满足要求的权限设定了。

但是,如果技术组新加入一个实习人员,为了能让他熟悉项目,必须具有该项目的读取的权限。但是目前他不具备修改项目的能力,所以并没给他赋予写入的权限。而如果使用现有的权限设置,显然将其分配给 User 用户或者 Group 用户都并不合适。而如果修改 Others 用户的权限,其他项目组的成员也能访问该项目文件。显然普通的三种身份的权限划分是无法满足要求的。而 ZooKeeper 中的 ACl 就能应对这种复杂的权限应用场景。

ACL 的使用

下面我们来讲解一下如何使用 ZooKeeper 的 ACL 机制来实现客户端对数据节点的访问控制。

一个 ACL 权限设置通常可以分为 3 部分,分别是:权限模式(Scheme)、授权对象(ID)、权限信息(Permission)。最终组成一条例如“scheme:id:permission”格式的 ACL 请求信息。下面我们具体看一下这 3 部分代表什么意思:

权限模式:Scheme

权限模式就是用来设置 ZooKeeper 服务器进行权限验证的方式。ZooKeeper 的权限验证方式大体分为两种类型,一种是范围验证,另外一种是口令验证。所谓的范围验证就是说 ZooKeeper 可以针对一个 IP 或者一段 IP 地址授予某种权限。比如我们可以让一个 IP 地址为“ip:192.168.0.11”的机器对服务器上的某个数据节点具有写入的权限。或者也可以通过“ip:192.168.0.11/22”给一段 IP 地址的机器赋权。

另一种权限模式就是口令验证,也可以理解为用户名密码的方式,这是我们最熟悉也是日常生活中经常使用的模式,比如我们打开自己的电脑或者去银行取钱都需要提供相应的密码。在 ZooKeeper 中这种验证方式是 Digest 认证,我们知道通过网络传输相对来说并不安全,所以“绝不通过明文在网络发送密码”也是程序设计中很重要的原则之一,而 Digest 这种认证方式首先在客户端传送“username:password”这种形式的权限表示符后,ZooKeeper 服务端会对密码 部分使用 SHA-1 和 BASE64 算法进行加密,以保证安全性。另一种权限模式 Super 可以认为是一种特殊的 Digest 认证。具有 Super 权限的客户端可以对 ZooKeeper 上的任意数据节点进行任意操作。下面这段代码给出了 Digest 模式下客户端的调用方式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//创建节点

create /digest_node1

//设置digest权限验证

setAcl /digest_node1 digest:用户名:base64格式密码:rwadc

//查询节点Acl权限

getAcl /digest_node1

//授权操作

addauth digest user:passwd

最后一种授权模式是 world 模式,其实这种授权模式对应于系统中的所有用户,本质上起不到任何作用。设置了 world 权限模式系统中的所有用户操作都可以不进行权限验证。

授权对象(ID)

接下来我们再看一下授权对象部分,其实这个很好理解,所谓的授权对象就是说我们要把权限赋予谁,而对应于 4 种不同的权限模式来说,如果我们选择采用 IP 方式,使用的授权对象可以是一个 IP 地址或 IP 地址段;而如果使用 Digest 或 Super 方式,则对应于一个用户名。如果是 World 模式,是授权系统中所有的用户。

权限信息(Permission)

介绍完授权方式以及授权对象,下面我们学习 ACL 请求信息中的最后一项:权限(Permission)。权限就是指我们可以在数据节点上执行的操作种类,如下图所示:在 ZooKeeper 中已经定义好的权限有 5 种:

  • 数据节点(create)创建权限,授予权限的对象可以在数据节点下创建子节点;
  • 数据节点(wirte)更新权限,授予权限的对象可以更新该数据节点;
  • 数据节点(read)读取权限,授予权限的对象可以读取该节点的内容以及子节点的信息;
  • 数据节点(delete)删除权限,授予权限的对象可以删除该数据节点的子节点;
  • 数据节点(admin)管理者权限,授予权限的对象可以对该数据节点体进行 ACL 权限设置。

image-20240601214925230

需要注意的一点是,每个节点都有维护自身的 ACL 权限数据,即使是该节点的子节点也是有自己的 ACL 权限而不是直接继承其父节点的权限。如下中“172.168.11.1”服务器有“/Config”节点的读取权限,但是没有其子节点的“/Config/dataBase_Config1”权限。

image-20240601214939192

实现自己的权限口控制

通过上边的介绍我们了解了 ZooKeeper 中的权限相关知识,虽然 ZooKeeper 自身的权限控制机制已经做得很细,但是它还是提供了一种权限扩展机制来让用户实现自己的权限控制方式。官方文档中对这种机制的定义是 “Pluggable ZooKeeper Authenication”,意思是可插拔的授权机制,从名称上我们可以看出它的灵活性。那么这种机制是如何实现的呢?

首先,要想实现自定义的权限控制机制,最核心的一点是实现 ZooKeeper 提供的权限控制器接口 AuthenticationProvider。下面这张图片展示了接口的内部结构,用户通过该接口实现自定义的权限控制。

image-20240601214950210

实现了自定义权限后,如何才能让 ZooKeeper 服务端使用自定义的权限验证方式呢?接下来就需要将自定义的权限控制注册到 ZooKeeper 服务器中,而注册的方式通常有两种。

第一种是通过设置系统属性来注册自定义的权限控制器:

1
-Dzookeeper.authProvider.x=CustomAuthenticationProvider

另一种是在配置文件 zoo.cfg 中进行配置:

1
authProvider.x=CustomAuthenticationProvider

ACL 内部实现原理

到目前为止我们学习了 ACL 权限控制机制应用层方面的相关知识,下面就深入到底层学习一下 ZooKeeper 是如何实现的。

客户端处理过程

我们先看一下客户端是如何操作的,我们以节点授权 addAuth 接口为例,首先客户端通过 ClientCnxn 类中的 addAuthInfo 方法向服务端发送 ACL 权限信息变更请求,该方法首先将 scheme 和 auth 封装成 AuthPacket 类,并通过 RequestHeader 方法表示该请求是权限操作请求,最后将这些数据统一封装到 packet 中,并添加到 outgoingQueue 队列中发送给服务端。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public void addAuthInfo(String scheme, byte auth[]) {

if (!state.isAlive()) {

return;

}

authInfo.add(new AuthData(scheme, auth));

queuePacket(new RequestHeader(-4, OpCode.auth), null,

new AuthPacket(0, scheme, auth), null, null, null, null,

null, null);

}

ACL 权限控制机制的客户端实现相对简单,只是封装请求类型为权限请求,方便服务器识别处理,而发送到服务器的信息包括我们之前提到的权限校验信息。

服务端实现过程

相比于客户端的处理过程,服务器端对 ACL 内部实现就比较复杂,当节点授权请求发送到服务端后,在服务器的处理中首先调用 readRequest()方法作为服务器处理的入口,其内部只是调用 processPacket 方法。

1
2
3
4
5
private void readRequest() throws IOException {

zkServer.processPacket(this, incomingBuffer);

}

而在 processPacket 方法的内部,首先反序列化客户端的请求信息并封装到 AuthPacket 对象中。之后通过 getServerProvider 方法根据不同的 scheme 判断具体的实现类,这里我们使用 Digest 模式为例,因此该实现类是 DigestAuthenticationProvider 。之后调用其 handleAuthentication() 方法进行权限验证。如果返 KeeperException.Code.OK 则表示该请求已经通过了权限验证,如果返回的状态是其他或者抛出异常则表示权限验证失败。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public void processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException {

...

ServerAuthenticationProvider ap=ProviderRegistry.getServerProvider(scheme);

if(ap != null) {

authReturn = ap.handleAuthentication(new ServerAuthenticationProvider.ServerObjs(this, cnxn), authPacket.getAuth());

}

...

}

现在我们知道了权限认证的最终实现函数是 handleAuthentication 函数,而这个函数内部实现的逻辑就很清晰简单了,主要的工作就是解析客户端传递的权限验证类型,并通过 addAuthInfo 函数将权限信息添加到 authInfo 集合属性中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public KeeperException.Code 

handleAuthentication(ServerCnxn cnxn, byte[] authData)

{

String id = new String(authData);

try {

String digest = generateDigest(id);

if (digest.equals(superDigest)) {

cnxn.addAuthInfo(new Id("super", ""));

}

cnxn.addAuthInfo(new Id(getScheme(), digest));

return KeeperException.Code.OK;

...

}

这里我们重点讲解一下 addAuthInfo 函数,其作用是将解析到的权限信息存储到 ZooKeeper 服务器的内存中,该信息在整个会话存活期间一直会保存在服务器上,如果会话关闭,该信息则会被删,这个特性很像我们之前学过的数据节点中的临时节点。

经过上面的步骤,服务器已经将客户端 ACL 请求解析并将对应的会话权限信息存储在服务器上,下面我们再看一下服务器是如何进行权限验证的。首先,在处理一次权限请求时,先通过 PrepRequestProcessor 中的 checkAcl 函数检查对应的请求权限,如果该节点没有任何权限设置则直接返回,如果该节点有权限设置则循环遍历节点信息进行检查,如果具有相应的权限则直接返回表明权限认证成功,否则最后抛出 NoAuthException 异常中断操作表明权限认证失败。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
static void checkACL(...){

...

for (ACL a : acl) {

if(authId.getScheme().equals(id.getScheme()..){

return;

}

}

throw new KeeperException.NoAuthException();

}

到目前为止我们对 ACL 权限在 ZooKeeper 服务器客户端和服务端的底层实现过程进行了深度的分析。总体来说,客户端在 ACL 权限请求发送过程的步骤比较简单:首先是封装该请求的类型,之后将权限信息封装到 request 中并发送给服务端。而服务器的实现比较复杂,首先分析请求类型是否是权限相关操作,之后根据不同的权限模式(scheme)调用不同的实现类验证权限最后存储权限信息。本课时的例子采用了授权接口 addAuth 而没有采用权限设置接口 setAcl,是因为权限设置接口相对简单,其核心功能点已经包括在授权接口实现中。而在授权接口中,值得注意的是会话的授权信息存储在 ZooKeeper 服务端的内存中,如果客户端会话关闭,授权信息会被删除。下次连接服务器后,需要重新调用授权接口进行授权。

总结

ZooKeeper 作为分布式系统协调框架,往往在一个分布式系统下起到关键的作用。尤其是在分布式锁、配置管理等应用场景中。如果因为错误操作对重要数据节点进行变更或删除,对整个分布式系统影响很大,甚至会导致整个分布式服务不可用。所以当你在设计使用 ZooKeeper 的时候一定要考虑对关键节点添加权限控制。

04 ZooKeeper 如何进行序列化?

我们大概清楚了使用 ZooKeeper 实现一些功能的主要方式,也就是通过客户端与服务端之间的相互通信。那么首先要解决的问题就是通过网络传输数据,而要想通过网络传输我们定义好的 Java 对象数据,必须要先对其进行序列化。例如,我们通过 ZooKeeper 客户端发送 ACL 权限控制请求时,需要把请求信息封装成 packet 类型,经过序列化后才能通过网络将 ACL 信息发送给 ZooKeeper 服务端进行处理。

什么是序列化,为什么要进行序列化操作

序列化是指将我们定义好的 Java 类型转化成数据流的形式。之所以这么做是因为在网络传输过程中,TCP 协议采用“流通信”的方式,提供了可以读写的字节流。而这种设计的好处在于避免了在网络传输过程中经常出现的问题:比如消息丢失、消息重复和排序等问题。那么什么时候需要序列化呢?如果我们需要通过网络传递对象或将对象信息进行持久化的时候,就需要将该对象进行序列化。

我们较为熟悉的序列化操作是在 Java中,当我们要序列化一个对象的时候,首先要实现一个 Serializable 接口。

1
2
3
4
5
6
7
8
9
10
11
public class User implements Serializable{

private static final long serialVersionUID = 1L;

private Long ids;

private String name;

...

}

实现了 Serializable 接口后其实没有做什么实际的工作,它是一个没有任何内容的空接口,起到的作用就是标识该类是需要进行序列化的,这个就与我们后边要重点讲解的 ZooKeeper 序列化实现方法有很大的不同,这里请你先记住当前的写法,后边我们会展开讲解。

1
2
3
public interface Serializable {

}

定义好序列化接口后,我们再看一下如何进行序列化和反序列化的操作。Java 中进行序列化和反序列化的过程中,主要用到了 ObjectInputStream 和 ObjectOutputStream 两个 IO 类。

ObjectOutputStream 负责将对象进行序列化并存储到本地。而 ObjectInputStream 从本地存储中读取对象信息反序列化对象。

1
2
3
4
5
6
7
8
9
10
11
//序列化

ObjectOutputStream oo = new ObjectOutputStream()

oo.writeObject(user);

//反序列化

ObjectInputStream ois = new ObjectInputStream();

User user = (User) ois.readObject();

到目前为止我们了解了什么是序列化,以及为什么要进行序列化,并通过我们熟悉的 Java 编程语言中的序列化实现,进一步对序列化操作有更加具体的了解。我们知道,当我们要把对象进行本地存储或网络传输时是需要进行序列化操作的,而在 ZooKeeper 中需要频繁的网络传输工作,那么在 ZooKeeper 中是如何进行序列化的呢,我们带着这个问题继续下面的学习。

ZooKeeper 中的序列化方案

在 ZooKeeper 中并没有采用和 Java 一样的序列化方式,而是采用了一个 Jute 的序列解决方案作为 ZooKeeper 框架自身的序列化方式,说到 Jute 框架,它最早作为 Hadoop 中的序列化组件。之后 Jute 从 Hadoop 中独立出来,成为一个独立的序列化解决方案。ZooKeeper 从最开始就采用 Jute 作为其序列化解决方案,直到其最新的版本依然没有更改。

虽然 ZooKeeper 一直将 Jute 框架作为序列化解决方案,但这并不意味着 Jute 相对其他框架性能更好,反倒是 Apache Avro、Thrift 等框架在性能上优于前者。之所以 ZooKeeper 一直采用 Jute 作为序列化解决方案,主要是新老版本的兼容等问题,这里也请你注意,也许在之后的版本中,ZooKeeper 会选择更加高效的序列化解决方案。

使用 Jute 实现序列化

简单介绍了 Jute 框架的发展过程,下面我们来看一下如何使用 Jute 在 ZooKeeper 中实现序列化。如果我们要想将某个定义的类进行序列化,首先需要该类实现 Record 接口的 serilize 和 deserialize 方法,这两个方法分别是序列化和反序列化方法。下边这段代码给出了我们一般在 ZooKeeper 中进行序列化的具体实现:首先,我们定义了一个 test_jute 类,为了能够对它进行序列化,需要该 test_jute 类实现 Record 接口,并在对应的 serialize 序列化方法和 deserialize 反序列化方法中编辑具体的实现逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class test_jute implements Record{

private long ids;

private String name;

...

public void serialize(OutpurArchive a_,String tag){

...

}

public void deserialize(INputArchive a_,String tag){

...

}

}

而在序列化方法 serialize 中,我们要实现的逻辑是,首先通过字符类型参数 tag 传递标记序列化标识符,之后使用 writeLong 和 writeString 等方法分别将对象属性字段进行序列化。

1
2
3
4
5
6
7
8
9
10
11
public void serialize(OutpurArchive a_,String tag) throws ...{

a_.startRecord(this.tag);

a_.writeLong(ids,"ids");

a_.writeString(type,"name");

a_.endRecord(this,tag);

}

而调用 derseralize 在实现反序列化的过程则与我们上边说的序列化过程正好相反。

1
2
3
4
5
6
7
8
9
10
11
public void deserialize(INputArchive a_,String tag) throws {

a_.startRecord(tag);

ids = a_.readLong("ids");

name = a_.readString("name");

a_.endRecord(tag);

}

到这里我们就介绍完了如何在 ZooKeeper 中使用 Jute 实现序列化,需要注意的是,在实现了Record 接口后,具体的序列化和反序列化逻辑要我们自己在 serialize 和 deserialize 函数中完成

序列化和反序列化的实现逻辑编码方式相对固定,首先通过 startRecord 开启一段序列化操作,之后通过 writeLong、writeString 或 readLong、 readString 等方法执行序列化或反序列化。本例中只是实现了长整型和字符型的序列化和反序列化操作,除此之外 ZooKeeper 中的 Jute 框架还支持 整数类型(Int)、布尔类型(Bool)、双精度类型(Double)以及 Byte/Buffer 类型。

Jute 在 ZooKeeper 中的底层实现

正因为 ZooKeeper 的设计目的是将复杂的底层操作封装成简单易用的接口,从而方便用户调用,也使得我们在使用 ZooKeeper 实现序列化的时候能够更加容易。

学会了利用 Jute 实现序列化和反序列化后,我们深入底层,看一下 ZooKeeper 框架具体是如何实现序列化操作的。正如上边我们提到的,通过简单的实现 Record 接口就可以实现序列化,那么我们接下来就以这个接口作为入口,详细分析其底层原理。

Record 接口可以理解为 ZooKeeper 中专门用来进行网络传输或本地存储时使用的数据类型。因此所有我们实现的类要想传输或者存储到本地都要实现该 Record 接口。

1
2
3
4
5
6
7
8
9
10
11
public interface Record{

public void serialize(OutputArchive archive, String tag)

throws IOException;

public void deserialize(InputArchive archive, String tag)

throws IOException;

}

Record 接口的内部实现逻辑非常简单,只是定义了一个 序列化方法 serialize 和一个反序列化方法 deserialize 。而在 Record 起到关键作用的则是两个重要的类:OutputArchive 和 InputArchive ,其实这两个类才是真正的序列化和反序列化工具类。

在 OutputArchive 中定义了可进行序列化的参数类型,根据不同的序列化方式调用不同的实现类进行序列化操作。如下图所示,Jute 可以通过 Binary 、 Csv 、Xml 等方式进行序列化操作。

image-20240601215044427

而对应于序列化操作,在反序列化时也会相应调用不同的实现类,来进行反序列化操作。 如下图所示:

image-20240601215058769

注意:无论是序列化还是反序列化,都可以对多个对象进行操作,所以当我们在定义序列化和反序列化方法时,需要字符类型参数 tag 表示要序列化或反序列化哪个对象。

总结

为什么需要序列化

在计算机网络中,数据通常会在不同的系统和设备之间进行传输。这些系统和设备可能有不同的架构和数据表示方式。序列化是将复杂的数据结构转换为字节流的过程,这使得数据能够在网络中进行传输,或者被存储到磁盘上。

以下是序列化的几个主要原因:

  1. 跨平台通信:不同的系统或语言有自己的内存管理和数据表示方式。序列化可以将数据转换为通用的格式,使得不同的系统和语言能够理解和使用。
  2. 网络传输:网络传输的基本单位是字节,复杂的数据结构(如对象、数组等)无法直接在网络中传输。序列化可以将这些数据结构转换为字节流,从而可以在网络中进行传输。
  3. 持久化存储:序列化也可以用于将数据持久化存储到磁盘中。序列化的数据可以在后续的程序运行中重新加载和使用,实现了数据的持久化。

因此,无论是为了实现跨平台通信,还是为了网络传输,或者是为了持久化存储,序列化都是必不可少的过程。

05 深入分析 Jute 的底层实现原理

上个课时我们讲解了 ZooKeeper 中采用 Jute 作为序列化解决的方案,并介绍了其应用层的使用技巧。本课时我们就深入 Jute 框架的内部核心,来看一看其内部的实现原理和算法。而通过研究 Jute 序列化框架的内部的实现原理,能够让我们在日常工作中更加高效安全地使用 Jute 序列化框架。

简述 Jute 序列化

通过前面的课时我们知道了序列化就是将 Java 对象转化成字节码的形式,从而方便进行网络传输和本地化存储,那么具体的序列化方法都有哪些呢?这里我们结合 ZooKeeper 中使用到的序列化解决方案 Jute 来进行介绍,Jute 框架给出了 3 种序列化方式,分别是 Binary 方式、Csv 方式、XML 方式。序列化方式可以通俗地理解成我们将 Java 对象通过转化成特定的格式,从而更加方便在网络中传输和本地化存储。之所以采用这 3 种方式的格式化文件,也是因为这 3 种方式具有跨平台和普遍的规约特性,后面我将会对这三种方法的特性进行具体讲解。接下来我将深入 Jute 的底层,看一下这 3 种实现方式的底层实现过程。

Jute 内部核心算法

上个课时中我们提到过,ZooKeeper 在实现序列化的时候要实现 Record 接口,而在 Record 接口的内部,真正起作用的是两个工具类,分别是 OutPutArchive 和 InputArchive。下边我们分别来看一下它们在 Jute 内部是如何实现的。

OutPutArchive 是一个接口,规定了一系列序列化相关的操作。而要实现具体的相关操作,Jute 是通过三个具体实现类分别实现了 Binary、Csv、XML 三种方式的序列化操作。而这三种方式有什么不同,我们在日常工作中应该如何选择呢?带着这些问题我们来深入到 Jute 的内部实现来找寻答案

Binary 方式的序列化

首先我们来看一下 Jute 中的第 1 种序列化方式:Binary 序列化方式,即二进制的序列化方式。正如我们前边所提到的,采用这种方式的序列化就是将 Java 对象信息转化成二进制的文件格式。

在 Jute 中实现 Binary 序列化方式的类是 BinaryOutputArchive。该 BinaryOutputArchive 类通过实现 OutPutArchive 接口,在 Jute 框架采用二进制的方式实现序列化的时候,采用其作为具体的实现类。

在这里我们通过调用 Record 接口中的 writeString 方法为例,该方法是将 Java 对象的 String 字符类型进行序列化。当调用 writeString 方法后,首先判断所要进行序列化的字符串是否为空。如果是空字符串则采用 writeInt 方法,将空字符串当作值为 -1 的数字类型进行序列化;如果不为空,则调用 stringtoByteBuffer 方法对字符串进行序列化操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void writeString (String s, Sring tag){

if(s==null){

writeInt(-1,"len");

return

}

ByteBuffer bb = stringtoByteBuffer(s);

...

}

而 stringToByteBuffer 方法也是 BinaryOutputArchive 类的内部核心方法,除了 writeString 序列化方法外,其他的比如 writeInt、wirteDoule 等序列化方法则是调用 DataOutPut 接口中的相关方法来实现具体的序列化操作。

在调用 BinaryOutputArchive 类的 stringToByteBuffer 方法时,在将字符串转化成二进制字节流的过程中,首选将字符串转化成字符数组 CharSequence 对象,并根据 ascii 编码判断字符类型,如果是字母等则使用1个 byte 进行存储。如果是诸如 “¥” 等符号则采用两个 byte 进程存储。如果是汉字则采用3个 byte 进行存储。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
...

private ByteBuffer bb = ByteBuffer.allocate(1024);

...

ByteBuffer stringToByteBuffer(CharSeuquece s){

if (c < 0x80) {

bb.put((byte) c);

} else if (c < 0x800) {

bb.put((byte) (0xc0 | (c >> 6)));

bb.put((byte) (0x80 | (c & 0x3f)));

} else {

bb.put((byte) (0xe0 | (c >> 12)));

bb.put((byte) (0x80 | ((c >> 6) & 0x3f)));

bb.put((byte) (0x80 | (c & 0x3f)));

}

}

Binary 二进制序列化方式的底层实现相对简单,只是采用将对应的 Java 对象转化成二进制字节流的方式。Binary 方式序列化的优点有很多:无论是 Windows 操作系统、还是 Linux 操作系统或者是苹果的 macOS 操作系统,其底层都是对二进制文件进行操作,而且所有的系统对二进制文件的编译与解析也是一样的,所有操作系统都能对二进制文件进行操作,跨平台的支持性更好。而缺点则是会存在不同操作系统下,产生大端小端的问题。

XML 方式的序列化

说完了 Binary 的序列化方式,我们再来看看 Jute 中的另一种序列化方式 XML 方式。XML 是一种可扩展的标记语言。当初设计的目的就是用来传输和存储数据,很像我们都很熟悉的 HTML 语言,而与 HTML 语言不同的是我们需要自己定义标签。在 XML 文件中每个标签都是我们自己定义的,而每个标签就对应一项内容。一个简单的 XML 的格式如下面这段代码所示:

1
2
3
4
5
6
7
8
9
10
11
<note>

<to>学生</to>

<from>老师</form>

<heading>上课提醒</heading>

<body>记得9:00来上课</body>

</note>

大概了解了 XML 文件,接下来我们看一下 Jute 框架中是如何采用 XML 方式进行序列化操作的。在 Jute 中使用 XmlOutPutArchive 类作用 XML 方式序列化的具体实现类。与上面讲解二进制的序列化实现一样 ,这里我们还是以 writeString 方法的 XML 序列化方式的实现为例。 首先,当采用XML 方式进行序列化时,调用 writeString 方法将 Java 中的 String 字符串对象进行序列化时,在 writeString 内部首先调用 printBeginEnvelope 方法并传入 tag 参数,标记我们要序列化的字段名称。之后采用“”和“”作用自定义标签,封装好传入的 Java 字符串。

1
2
3
4
5
6
7
8
9
10
11
12
13
void writeString(String s, String tag){

printBeginEnvelope(tag);

stream.print("<string>");

stream.print(Utils.toXMLString(s));

stream.print("</string>");

printEndEnvelope(tag);

}

而在 printBeginEnvelope 方法中,其主要作用就是添加该字段的名称、字段值等信息,用于之后反序列化的过程中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
void printBeginEnvelope (String tag){

...

if ("struct".equals(s)) {

putIndent();

stream.print("<member>\n");

addIndent();

putIndent();

stream.print("<name>"+tag+"</name>\n");

putIndent();

stream.print("<value>");

} else if ("vector".equals(s)) {

stream.print("<value>");

} else if ("map".equals(s)) {

stream.print("<value>");

}

} else {

stream.print("<value>");

}

}

通过上面在 Jute 框架中,对采用 XML 方式序列化的实现类:XmlOutPutArchive 中的底层实现过程分析,我们可以了解到其实现的基本原理,也就是根据 XML 格式的要求,解析传入的序列化参数,并将参数按照 Jute 定义好的格式,采用设定好的默认标签封装成对应的序列化文件。

而采用 XML 方式进行序列化的优点则是,通过可扩展标记协议,不同平台或操作系统对序列化和反序列化的方式都是一样的,不存在因为平台不同而产生的差异性,也不会出现如 Binary 二进制序列化方法中产生的大小端的问题。而缺点则是序列化和反序列化的性能不如二进制方式。在序列化后产生的文件相比与二进制方式,同样的信息所产生的文件更大。

Csv 方式的序列化

最后我们来学习一下 Jute 序列化框架的最后一种序列化方式:Csv,它和 XML 方式很像,只是所采用的转化格式不同,Csv 格式采用逗号将文本进行分割,我们日常使用中最常用的 Csv 格式文件就是 Excel 文件。

在 Jute 框架中实现 Csv 序列化的类是 CsvOutputArchive,我们还是以 String 字符对象序列化为例,在调用 CsvOutputArchive 的 writeString 方法时,writeString 方法首先调用 printCommaUnlessFirst 方法生成一个逗号分隔符,之后将要序列化的字符串值转换成 CSV 编码格式追加到字节数组中。

1
2
3
4
5
6
7
8
9
void writeString(String s, String tag){

printCommaUnlessFirst();

stream.print(Utils.toCSVString(s));

throwExceptionOnError(tag);

}

到这里我们已经对 Jute 框架的 3 种序列化方式的底层实现有了一个整体了解,这 3 种方式相比,二进制底层的实现方式最为简单,性能也最好。而 XML 作为可扩展的标记语言跨平台性更强。而 CSV 方式介于两者之间实现起来也相比 XML 格式更加简单。

总结

在 ZooKeeper 中默认的序列化实现方式是 Binary 二进制方式。这是因为二进制具有更好的性能,以及大多数平台对二进制的实现都不尽相同。

06 ZooKeeper 的网络通信协议详解

课我们将学习 ZooKeeper 的网络通信协议。同时,本节课也是基础篇中的最后一节课。在 ZooKeeper 中无论是客户端和服务器之间的通信,还是集群之间服务器的内部协同工作都是基于网络进行通信的。而网络通信协议则是影响 ZooKeeper 性能和稳定性的核心点。

ZooKeeper 协议简述

说到网络通信协议我们最为熟悉的应该就是 TCP/IP 协议。而 ZooKeeper 则是在 TCP/IP 协议的基础上实现了自己特有的通信协议格式。在 ZooKeeper 中一次客户端的请求协议由请求头、请求体组成。而在一次服务端的响应协议中由响应头和响应体组成。

ZooKeeper 协议的底层实现

我们大概了解了 ZooKeeper 中的网络通信协议的结构后。接下来我们看一下在 ZooKeeper 中的内部对于网络通信协议的底层是怎么样实现的。

请求协议

请求协议就是客户端向服务端发送的协议。比如我们经常用到的会话创建、数据节点查询等操作。都是客户端通过网络向 ZooKeeper 服务端发送请求协议完成的。

客户端请求头底层解析

首先,我们先看一下请求头的内部的实现原理。在 ZooKeeper 中请求头是通过 RequestHeader 类实现的。首先 RequestHeader 类实现了 Record 接口,用于之后在网络传输中进行序列化操作。

我们可以看到 RequestHeader 类中只有两个属性字段分别是 xid 和 type。这两个字段在我们第一节课 ZooKeeper 的数据模型中介绍过,分别代表客户端序号用于记录客户端请求的发起顺序以及请求操作的类型。

1
2
3
4
5
6
7
class RequestHeader implements Record{

private int xid;

private int type;

}

客户端请求体底层解析

我们接下来再看一下客户端请求协议的请求体,协议的请求体包括了协议处理逻辑的全部内容,一次会话请求的所有操作内容都涵盖在请求体中。在 ZooKeeper 的内部实现中,根据不同的请求操作类型,会采用不同的结构封装请求体。接下来我们就以最常用的创建一次会话和数据节点的查询和更新这三种操作来介绍,深入底层看看 ZooKeeper 在内部是如何实现的。

会话创建

前面的课程我们已经介绍了 ZooKeeper 中的会话创建以及会话管理等相关知识。通过之前的学习我们知道了在 ZooKeeper 客户端发起会话时,会向服务端发送一个会话创建请求,该请求的作用就是通知 ZooKeeper 服务端需要处理一个来自客户端的访问链接。

而服务端处理会话创建请求时所需要的所有信息都包括在请求体内。在 ZooKeeper 中该请求体是通过 ConnectRequest 类实现的,其内部一共包括了五种属性字段。分别是 protocolVersion 表示该请求协议的版本信息、lastZxidSeen 最后一次接收到的服务器的 zxid 序号、timeOut 会话的超时时间、会话标识符 sessionId 以及会话的密码 password。有了这些信息 ZooKeeper 服务端在接收一个请求时,就可以根据请求体的信息进行相关的操作了。

1
2
3
4
5
6
7
8
9
10
11
12
13
ConnectRequest implements Record {

private int protocolVersion;

private long lastZxidSeen;

private int timeOut;

private long sessionId;

private byte[] passwd;

}

节点查询

在我们通过客户端 API 查询 ZooKeeper 服务器上的数据节点时,客户端会向服务端发送 GetDataRequest 会话请求。与上面介绍的会话请求不同。ZooKeeper 在处理获取数据节点会话请求时,选择了另一种结构作为该协议的请求体。而具体的实现类则是 GetDataRequest 。在 GetDataRequest 类中首先实现了 Record 接口用于序列化操作。其具有两个属性分别是字符类型 path 表示要请求的数据节点路径以及布尔类型 watch 表示该节点是否注册了 Watch 监控。

节点路径如下:

1
2
3
4
5
public class GetDataRequest implements Record {

private String path;

private boolean watch;

节点更新

最后,我们来看一下最后一种会话操作类型即节点的更新操作,同样的在客户端向服务端发送一个数据节点更新操作时,其在网络上实际发送的是更新操作的请求协议。而在 ZooKeeper 中对于协议内部的请求体,ZooKeeper 通过 SetDataRequest 类进行了封装。在 SetDataRequest 内部也包含了三种属性,分别是 path 表示节点的路径、data 表示节点数据信息以及 version 表示节点期望的版本号用于锁的验证。

1
2
3
4
5
6
7
8
9
public class SetDataRequest implements Record {

private String path;

private byte[] data;

private int version;

}

到目前为止我们就对 ZooKeeper 客户端在一次网络会话请求中所发送的请求协议的内部结构和底层实现都做了介绍,然而这些都是客户端向服务器端的请求协议,接下来我们就继续分析 ZooKeeper 服务端向客户端发送的响应协议是如何实现的。

响应协议

响应协议可以理解为服务端在处理客户端的请求后,返回相关信息给客户端。而服务端所采用的响应协议类型则要根据客户端的请求协议类型来选择。

服务端请求头解析

在服务端接收到客户端的请求后,执行相关操作将结果通知给客户端。而在 ZooKeeper 服务端向客户单发送的响应协议中,也是包括了请求头和请求体。而与客户端的请求头不同的是在 ZooKeeper 服务端的请求头多了一个错误状态字段。具体的实现类是 ReplyHeader。

1
2
3
4
5
6
7
8
9
public class ReplyHeader implements Record {

private int xid;

private long zxid;

private int err;

}

服务端请求体解析

下面我们再看一下响应协议的请求体部分,服务端的请求体可以理解为对客户端所请求内容的封装,一个服务端的请求体包含了客户端所要查询的数据而对于不同的请求类型,在 ZooKeeper 的服务端也是采用了不同的结构进行处理的。与上面我们讲解客户端请求体的方法一样,我们还是通过会话的创建、数据节点的查询和修改这三种请求操作来介绍,看看 ZooKeeper 服务端是如何响应客户端请求的。

响应会话创建

对于客户端发起的一次会话连接操作,ZooKeeper 服务端在处理后,会返回给客户端一个 Response 响应。而在底层代码中 ZooKeeper 是通过 ConnectRespose 类来实现的。在该类中有四个属性,分别是 protocolVersion 请求协议的版本信息、timeOut 会话超时时间、sessionId 会话标识符以及 passwd 会话密码。

1
2
3
4
5
6
7
8
9
10
11
public class ConnectResponse implements Record {

private int protocolVersion;

private int timeOut;

private long sessionId;

private byte[] passwd;

}

响应节点查询

在客户端发起查询节点数据的请求时,服务端根据客户端发送的节点路径,并验证客户端具有相应的权限后,会将节点数据返回给客户端。而 ZooKeeper 服务端通过 GetDataResponse 类来封装查询到的节点相关信息到响应协议的请求体中。在 GetDataResponse 内部有两种属性字段分别是 data 属性表示节点数据的内容和 stat 属性表示节点的状态信息。

1
2
3
4
5
6
7
public class GetDataResponse implements Record {

private byte[] data;

private org.apache.zookeeper.data.Stat stat;

}

响应节点更新

在客户端发送一个节点变更操作后, ZooKeeper 服务端在处理完相关逻辑后,会发送一个响应给客户端。而在 ZooKeeper 中更新完节点后会将操作结果返回给客户端,节点更新操作的响应协议请求体通过 SetDataResponse 类来实现。而在该类的内部只有一个属性就是 stat 字段,表示该节点数据更新后的最新状态信息。

1
2
3
4
5
public class SetDataResponse implements Record {

private org.apache.zookeeper.data.Stat stat;

}

总结

Zookeeper为什么需要自定义协议?

ooKeeper选择使用自定义协议的主要原因是为了提高性能和灵活性。

  1. 性能:自定义协议允许ZooKeeper精确地定义和优化网络通信,以满足其特定的需求。例如,ZooKeeper的自定义协议可以更有效地处理大量的小数据包,这是ZooKeeper常见的使用场景。
  2. 灵活性:使用自定义协议,ZooKeeper可以更灵活地调整和改进其网络通信。例如,它可以添加新的消息类型或者调整消息格式来支持新的功能。
  3. 适应性:ZooKeeper的自定义协议可以更好地适应其特定的应用场景。例如,ZooKeeper需要在分布式环境中维护和同步状态,这需要一种能够有效处理这种需求的协议。

总的来说,虽然使用自定义协议可能需要更多的开发工作,但是它可以带来更好的性能和更高的灵活性,使得ZooKeeper能够更好地满足其特定的需求。

21 ZooKeeper 分布式锁:实现和原理解析

什么是分布式锁

在开始着手开发商业级的分布式锁之前,我们首先要弄清楚什么是分布式锁,以及分布式锁在日常工作的使用场景。明确了这些,我们才能设计出一个安全稳定的分布式锁。

在日常开发中,我们最熟悉也常用的分布式锁场景是在开发多线程的时候。为了协调本地应用上多个线程对某一资源的访问,就要对该资源或数值变量进行加锁,以保证在多线程环境下系统能够正确地运行。在一台服务器上的程序内部,线程可以通过系统进行线程之间的通信,实现加锁等操作。而在分布式环境下,执行事务的线程存在于不同的网络服务器中,要想实现在分布式网络下的线程协同操作,就要用到分布式锁

分布式死锁

在单机环境下,多线程之间会产生死锁问题。同样,在分布式系统环境下,也会产生分布式死锁的问题。

当死锁发生时,系统资源会一直被某一个线程占用,从而导致其他线程无法访问到该资源,最终使整个系统的业务处理或运行性能受到影响,严重的甚至可能导致服务器无法对外提供服务。

所以当我们在设计开发分布式系统的时候,要准备一些方案来面对可能会出现的死锁问题,当问题发生时,系统会根据我们预先设计的方案,避免死锁对整个系统的影响。常用的解决死锁问题的方法有超时方法和死锁检测

  • 超时方法:在解决死锁问题时,超时方法可能是最简单的处理方式了。超时方式是在创建分布式线程的时候,对每个线程都设置一个超时时间。当该线程的超时时间到期后,无论该线程是否执行完毕,都要关闭该线程并释放该线程所占用的系统资源。之后其他线程就可以访问该线程释放的资源,这样就不会造成分布式死锁问题。但是这种设置超时时间的方法也有很多缺点,最主要的就是很难设置一个合适的超时时间。如果时间设置过短,可能造成线程未执行完相关的处理逻辑,就因为超时时间到期就被迫关闭,最终导致程序执行出错。
  • 死锁检测:死锁检测是处理死锁问题的另一种方法,它解决了超时方法的缺陷。与超时方法相比,死锁检测方法主动检测发现线程死锁,在控制死锁问题上更加灵活准确。你可以把死锁检测理解为一个运行在各个服务器系统上的线程或方法,该方法专门用来探索发现应用服务上的线程是否发生了死锁。如果发生死锁,就会触发相应的预设处理方案。

锁的实现

在介绍完分布式锁的基本性质和潜在问题后,接下来我们就通过 ZooKeeper 来实现两种比较常用的分布式锁。

排他锁

排他锁也叫作独占锁,从名字上就可以看出它的实现原理。当我们给某一个数据对象设置了排他锁后,只有具有该锁的事务线程可以访问该条数据对象,直到该条事务主动释放锁。否则,在这期间其他事务不能对该数据对象进行任何操作。在第二课时我们已经学习了利用 ZooKeeper 实现排他锁,这里不再赘述。

共享锁

另一种分布式锁的类型是共享锁。它在性能上要优于排他锁,这是因为在共享锁的实现中,只对数据对象的写操作加锁,而不为对象的读操作进行加锁。这样既保证了数据对象的完整性,也兼顾了多事务情况下的读取操作。可以说,共享锁是写入排他,而读取操作则没有限制。

接下来我就通过 ZooKeeper 来实现一个排他锁。

创建锁

首先,我们通过在 ZooKeeper 服务器上创建数据节点的方式来创建一个共享锁。其实无论是共享锁还是排他锁,在锁的实现方式上都是一样的。唯一的区别在于,共享锁为一个数据事务创建两个数据节点,来区分是写入操作还是读取操作。如下图所示,在 ZooKeeper 数据模型上的 Locks_shared 节点下创建临时顺序节点,临时顺序节点的名称中带有请求的操作类型分别是 R 读取操作、W 写入操作。

image-20240602214101116

获取锁

当某一个事务在访问共享数据时,首先需要获取锁。ZooKeeper 中的所有客户端会在 Locks_shared 节点下创建一个临时顺序节点。根据对数据对象的操作类型创建不同的数据节点,如果是读操作,就创建名称中带有 R 标志的顺序节点,如果是写入操作就创建带有 W 标志的顺序节点。

image-20240602214114736

释放锁

事务逻辑执行完毕后,需要对事物线程占有的共享锁进行释放。我们可以利用 ZooKeeper 中数据节点的性质来实现主动释放锁和被动释放锁两种方式。

主动释放锁是当客户端的逻辑执行完毕,主动调用 delete 函数删除ZooKeeper 服务上的数据节点。而被动释放锁则利用临时节点的性质,在客户端因异常而退出时,ZooKeeper 服务端会直接删除该临时节点,即释放该共享锁。

这种实现方式正好和上面介绍的死锁的两种处理方式相对应。到目前为止,我们就利用 ZooKeeper 实现了一个比较完整的共享锁。如下图所示,在这个实现逻辑中,首先通过创建数据临时数据节点的方式实现获取锁的操作。创建数据节点分为两种,分别是读操作的数据节点和写操作的数据节点。当锁节点删除时,注册了该 Watch 监控的其他客户端也会收到通知,重新发起创建临时节点尝试获取锁。当事务逻辑执行完成,客户端会主动删除该临时节点释放锁。

image-20240602214127208

总结

1、客户端调用 create 方法创建类似定义锁方式的临时顺序节点。

2、客户端调用 getChildren 接口来获取所有已创建的子节点列表。

3、判断是否获得锁,对于读请求如果所有比自己小的子节点都是读请求或者没有比自己序号小的子节点,表明已经成功获取共享锁,同时开始执行度逻辑。对于写请求,如果自己不是序号最小的子节点,那么就进入等待。

4、如果没有获取到共享锁,读请求向比自己序号小的最后一个写请求节点注册 watcher 监听,写请求向比自己序号小的最后一个节点注册watcher 监听。

理解

ZooKeeper中的锁机制可以通过一种称为“子节点列表”的方式来实现。这个过程可以类比为在一个电影院购买电影票。

  1. 首先,客户端(也就是观众)通过调用getChildren接口(也就是询问售票员),来获取所有已创建的子节点列表(也就是看看有哪些座位已经被预订)。
  2. 然后,客户端会判断是否获得了锁。对于读请求(也就是想要查看电影信息),如果所有比自己小的子节点都是读请求(也就是其他的观众都只是在查看电影信息)或者没有比自己序号小的子节点(也就是没有其他的观众),那么就表明已经成功获取了共享锁,可以开始执行读取逻辑(也就是查看电影信息)。对于写请求(也就是想要预订座位),如果自己不是序号最小的子节点(也就是有其他的观众先预订了座位),那么就需要等待。
  3. 如果没有获取到共享锁,那么读请求会向比自己序号小的最后一个写请求节点(也就是最后一个预订座位的观众)注册watcher监听(也就是询问售票员何时有座位可用),写请求会向比自己序号小的最后一个节点(也就是最后一个操作的观众)注册watcher监听。

这就是ZooKeeper中获取共享锁的过程

实操

https://www.runoob.com/w3cnote/zookeeper-tutorial.html

1
2
3
4
5
6
7
8
9
10
11
tar -zxvf zookeeper-3.4.14.tar.gz
CHANGES.txt dist-maven ivy.xml NOTICE.txt src zookeeper-3.3.4.jar.md5
(base) sv@sv-NF5280M5:/home/sv/pengeHome/mprpc/package/zookeeper-3.3.4$ cd conf/
(base) sv@sv-NF5280M5:/home/sv/pengeHome/mprpc/package/zookeeper-3.3.4/conf$ cp zoo_sample.cfg zoo.cfg
(base) sv@sv-NF5280M5:/home/sv/pengeHome/mprpc/package/zookeeper-3.3.4/conf$ cd ..
(base) sv@sv-NF5280M5:/home/sv/pengeHome/mprpc/package/zookeeper-3.3.4$ cd bin/
(base) sv@sv-NF5280M5:/home/sv/pengeHome/mprpc/package/zookeeper-3.3.4/bin$ sudo sh zkServer.sh start
JMX enabled by default
Using config: /home/sv/pengeHome/mprpc/package/zookeeper-3.3.4/bin/../conf/zoo.cfg
Starting zookeeper ... zkServer.sh: 103: cannot create /tmp/zookeeper/zookeeper_server.pid: Directory nonexistent
FAILED TO WRITE PID

ZooKeeper JMX enabled by default Using config…完美解决

上述问题解决:通过修改里面的路径解决的!

1
(base) sv@sv-NF5280M5:/home/sv/pengeHome/mprpc/package/zookeeper-3.3.4/conf$ sudo vim zoo.cfg
1
sudo apt install openjdk-8-jdk

数据模型

查看信息

1
2
[zk: localhost:2181(CONNECTED) 2] ls /
[zookeeper]
1
2
3
4
5
6
7
8
9
10
11
12
13
[zk: localhost:2181(CONNECTED) 4] get /zookeeper

cZxid = 0x0
ctime = Thu Jan 01 08:00:00 CST 1970
mZxid = 0x0
mtime = Thu Jan 01 08:00:00 CST 1970
pZxid = 0x0
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 0
numChildren = 1
Zxid 创建节点时的事务ID
ctime 创建节点时的时间
mZxid 最后修改节点时的事务ID
mtime 最后修改节点时的时间
pZxid 表示该节点的子节点列表最后一次修改的事务ID,添加子节点或删除子节点就会影响子节点列表,但是修改子节点的数据内容则不影响该ID**(注意,只有子节点列表变更了才会变更pzxid,子节点内容变更不会影响pzxid)**
cversion 子节点版本号,子节点每次修改版本号加1
dataversion 数据版本号,数据每次修改该版本号加1
aclversion 权限版本号,权限每次修改该版本号加1
ephemeralOwner 创建该临时节点的会话的sessionID。**(**如果该节点是持久节点,那么这个属性值为0)
dataLength 该节点的数据长度
numChildren 该节点拥有子节点的数量**(只统计直接子节点的数量)**

修改节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
[zk: localhost:2181(CONNECTED) 4] get /zookeeper

cZxid = 0x0
ctime = Thu Jan 01 08:00:00 CST 1970
mZxid = 0x0
mtime = Thu Jan 01 08:00:00 CST 1970
pZxid = 0x0
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 0
numChildren = 1
[zk: localhost:2181(CONNECTED) 5] set /zookeeper 1
cZxid = 0x0
ctime = Thu Jan 01 08:00:00 CST 1970
mZxid = 0x2
mtime = Sun Jun 02 21:31:29 CST 2024
pZxid = 0x0
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 1
numChildren = 1

创建节点

create 命令

create 命令用于创建节点并赋值。

格式:

1
create [-s] [-e] path data acl
  • [-s] [-e]:-s 和 -e 都是可选的,-s 代表顺序节点, -e 代表临时节点,注意其中 -s 和 -e 可以同时使用的,并且临时节点不能再创建子节点。
  • path:指定要创建节点的路径,比如 /runoob
  • data:要在此节点存储的数据。
  • acl:访问权限相关,默认是 world,相当于全世界都能访问。

create 命令用于创建节点并赋值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
[zk: localhost:2181(CONNECTED) 6] create -e /zookeeper/child 0
Created /zookeeper/child
[zk: localhost:2181(CONNECTED) 7] get /zookeeper
1
cZxid = 0x0
ctime = Thu Jan 01 08:00:00 CST 1970
mZxid = 0x2
mtime = Sun Jun 02 21:31:29 CST 2024
pZxid = 0x3
cversion = 1
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 1
numChildren = 2
[zk: localhost:2181(CONNECTED) 8] ls /zookeeper
[quota, child]

详细查看信息

1
2
3
4
5
6
7
8
9
10
11
12
13
[zk: localhost:2181(CONNECTED) 9] ls2 /zookeeper
[quota, child]
cZxid = 0x0
ctime = Thu Jan 01 08:00:00 CST 1970
mZxid = 0x2
mtime = Sun Jun 02 21:31:29 CST 2024
pZxid = 0x3
cversion = 1
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 1
numChildren = 2

watch机制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
[zk: localhost:2181(CONNECTED) 10] get /zookeeper/child watch
0
cZxid = 0x3
ctime = Sun Jun 02 21:32:29 CST 2024
mZxid = 0x3
mtime = Sun Jun 02 21:32:29 CST 2024
pZxid = 0x3
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0xff8fd92315090000
dataLength = 1
numChildren = 0
[zk: localhost:2181(CONNECTED) 11] set /zookeeper/child 1

WATCHER::

WatchedEvent state:SyncConnected type:NodeDataChanged path:/zookeeper/child
cZxid = 0x3
ctime = Sun Jun 02 21:32:29 CST 2024
mZxid = 0x4
mtime = Sun Jun 02 21:35:17 CST 2024
pZxid = 0x3
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0xff8fd92315090000
dataLength = 1
numChildren = 0

参考:https://www.cnblogs.com/leesf456/p/6091208.html