这篇文章原本是跟着 B 站教学视频学习 Zookeeper 时记录下的笔记,但后来再次查看时,发现原本笔记中很多内容并不是太清晰,因此也整理参考了一些其他的文章,进行综合整理,希望能够尽可能详尽地触及到 ZK 相关的所有基础知识点。
# 概述
Zookeeper 是一个分布式的,开放源码的分布式应用程序协调服务,是 Google 的 Chubby 一个开源的实现,是 Hadoop 和 Hbase 的重要组件。它是一个为分布式应用提供一致性服务的软件,提供的功能包括:配置维护、域名服务、分布式同步、组服务等。
quote from 百度百科_zookeeper
Zookeeper
从设计模式角度来理解:是一个基于观察者模式设计的分布式服务管理框架,它负责存储数据,并接受观察者的注册,一旦这些数据的状态发生变化,ZK 就会负责通知已经在 ZK 上注册的那些观察者做出相应的反应。
# Zookeeper 的特点
- 它是由一个
Leader
,多个Follower
组成的继承。 - 全局数据一致:每个 Server 保存一份相同的数据副本,Client 无论连接到哪个 Server,数据都是一致的。
- 可靠性:如果消息被其中一台服务器接收,那么将被所有服务器接收。
- 顺序性:更新请求顺序执行,来自同一个客户端的更新请求按其发送顺序依次执行。
- 原子性:一次数据更新要么成功,要么失败。
- 实时性:在一定时间范围内,客户端能读到最新的数据。
- 半数机制:集群中只要有半数以上的节点存活,zookeeper 集群就能正常提供服务,所以 zookeeper 适合安装奇数台服务器。
# Zookeeper 架构图
Zookeeper 数据模型的结构与 Unix 文件系统类似,整体上可以看做是一棵树,每个节点被称为一个 ZNode
。每个 ZNode 默认能够存储 1MB 的数据,每个 ZNode 都可以通过其路径唯一标识。
Zookeeper 角色:
- Leader:它是 Zookeeper 集群工作的核心,事务请求(写操作)的唯一调度和处理者,保证集群事务处理的顺序性;集群内部各个服务的调度者。对于所有服务节点中具有 create,setData,delete 等有写性质的请求,需要统一转发给 leader 处理,Leader 需要决定编号、执行操作,这个过程称为一个事务。
- Follower:处理客户端非事务(读操作)请求,转发事务请求给 Leader 参与集群 leader 选举投票,参与选举的所有服务器节点数应为
2 * n - 1
台。此外,针对访问量较大的 zookeeper 集群,还可以新增观察者角色。 - Observer:观察者角色,观察 zooKeeper 集群的最新状态变化并将这些状态进行同步,其对于非事务请求可以进行独立处理,对于事务请求,则会转发给 Leader 服务器处理。它只提供服务,不参与任何形式的投票,通常用于在不影响集群事务处理能力的前提下提升集群的非事务处理能力。
# Zookeeper 的应用场景
Zookeeper 是一个典型的 发布/订阅模式
的分布式数据管理与协调框架,它提供基于类似于文件系统的目录节点树方式的数据进行存储(共享的内存中的树型结构)。利用 zooKeeper 可以非常方便构建一系列分布式应用中都会涉及到的核心功能。
Zookeeper 提供的服务主要包括:
数据的发布和订阅
数据发布 / 订阅的一个常见的场景是配置中心,发布者将数据发布到 zookeeper 上供订阅者订阅,从而达到动态获取数据的目的。
这种应用场景通常有以下几个特点:
- 数据内容通常是数据量较小的键值对。
- 数据在客户端运行期间,可能会发生动态变化。
- 集群环境下,在不同客户端之间需要进行配置共享和保持一致。
Zookeeper 发布与订阅采用推拉结合的方式:
- 推:服务端将事件推送给注册了监控节点的客户端,客户端通过 Watcher 获取到事件通知。
- 拉:客户端获取到通知后,主动到服务端拉取最新数据。
软负载均衡
Zookeeper 中记录了每台服务器的访问数,它可以根据一定的算法将请求分发到压力最小的服务器进行处理。
统一命名服务
提供类 JNDI 功能,可以把系统中各种服务的名称、地址以及目录信息存放在 Zookeeper,需要的时候去 Zookeeper 中读取。
制作分布式的序列号生成器。
利用 zooKeeper 顺序节点的特性,制作分布式的序列号生成器,或者叫 id 生成器。在创建节点时设置节点是有序的,zookeeper 就会自动在创建的节点名称后追加序号。
分布式协调 / 通知
在 zookeeper 中,可以使用其临时节点的特性,不同机器在 zooKeeper 的一个指定节点下创建临时子节点,不同机器之间可以根据这个临时节点来判断客户端机器是否存活。
统一集群管理
集群管理主要指集群监控和集群控制两个方面。前者侧重于集群运行时的状态的收集,后者则是对集群进行操作与控制。
利用 zookeeper 可以很方便地实现集群管理和监控组件,其思路主要如下:
当某一主机上线时,将该主机节点添加到指定的 zookeeper 目录下,如
/servers/[hostname]
,此时关注/servers
节点的监控中心将会收到子节点变更事件(即上线通知),并可以进行相应的业务逻辑处理,从而实现了服务器动态上下线。分布式锁
通过对 zookeeper 进行逻辑处理,也可以实现分布式锁控制。(后文将做详细讲解)
分布式队列
# 安装
# 本地安装
下载地址:https://zookeeper.apache.org/releases.html
安装前提
环境准备
参考文章:
- VMware15 上安装 CentOS8 图形化界面
- Linux 下安装 JDK
- 如何使用 Xshell 连接 VMware 上的 Linux 虚拟机
- VMware 如何通过现有虚拟机克隆新的虚拟机
将下载的 zookeeper 安装包拷贝到 Linux 目录下。
假定上传后的文件位置为
/opt/software/apache-zookeeper-3.5.7-bin.tar.gz
。tar -zxvf apache-zookeeper-3.5.7-bin.tar.gz -C /opt/module/
cd ../module/
mv apache-zookeeper-3.5.7-bin/ zookeeper-3.5.7
cd zookeeper-3.5.7/conf/
mv zoo_sample.cfg zoo.cfg
mkdir ../zkData
vim zoo.cfg
修改配置文件
# 此处为简洁显示,删除了原文件的注释
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/opt/module/zookeeper-3.5.7/zkData
clientPort=2181
启动
../bin/zkServer.sh start
jps -l
../bin/zkCli.sh
ls /
quit
其他操作
../bin/zkServer.sh status
../bin/zkServer.sh stop
配置参数说明
从官网下载 zookeeper 后,其提供的配置示例文件
zoo_sample.cfg
内容如下:# 此处为简洁显示,删除了原文件的注释
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/tmp/zookeeper
clientPort=2181
- tickTime 表示通信心跳时间,即 zk 客户端与服务端通信频率,单位:毫秒。
- initLimit 表示 Leader 与 Follower 初始通信时限,超出该时间意味着通信连接失败,单位:tickTime。
- syncLimit 表示 Leader 与 Follower 同步通信时限,如果超出该时间,Leader 则 Follower 判定为死亡,单位:tickTime。
- dataDir:保存 zk 的数据目录。
- clientPort:客户端连接端口。
# 集群安装
准备奇数台服务器(或虚拟机)。
将相同环境部署到其他主机。
按照本地安装步骤,分别在其他服务器上安装 zk。(也可以直接克隆当前服务器,然后修改主机、网络等信息)
这里假定三台主机分别为:hadoop01,hadoop02,hadoop03。
配置 myid
对于 zk 集群环境,必须在每一台主机上配置唯一标识。
cd /opt/module/zookeeper-3.5.7/zkData
vim myid
myid
文件中输入当前 zk 的数字编号,可以随意指定,但需要与同集群下的其他主机的 zk 数字编号不同。这里假定主机与 myid 的对应关系为:hadoop01 -> 1,hadoop02 -> 2,hadoop03 -> 3。
编辑所有主机上的文件 /opt/module/zookeeper-3.5.7/conf/zoo.cfg 并在其末尾追加内容:
server.1=hadoop01:2888:3888;2181
server.2=hadoop02:2888:3888;2181
server.3=hadoop03:2888:3888;2181
注意:此处的
server.[myid]:[hostname]:2888:3888;2181
必须与myid
及主机名保持一致。依次启动
由于同集群下的主机各自未配置防火墙出入站规则,可以选择关闭防火墙。
防火墙相关命令:
systemctl status firewalld.service # 查看防火墙状态
systemctl start firewalld.service # 打开防火墙
systemctl stop firewalld.service # 关闭防火墙
systemctl enable firewalld.service # 开启防火墙
systemctl disable firewalld.service # 禁用防火墙
依次启动 zk 服务:
/opt/module/zookeeper-3.5.7/bin/zkServer.sh start
/opt/module/zookeeper-3.5.7/bin/zkServer.sh status # 查看 zk 状态
当所有服务节点启动成功后,分别查看各自状态,可以发现其中有且仅有一个服务节点为
Mode: leader
,其他节点均为Mode: follower
,这其中涉及到 zookeeper 的选举机制
,这将在后文进行说明。
# 集群启动脚本
创建脚本文件 zk.sh
# !/bin/bash | |
case $1 in | |
"start") { | |
for i in hadoop01 hadoop02 hadoop03 | |
do | |
ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh start" | |
echo ------------ zookeeper $i is started ------------ | |
done | |
};; | |
"stop") { | |
for i in hadoop01 hadoop02 hadoop03 | |
do | |
ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh stop" | |
echo ------------ zookeeper $i is stopped ------------ | |
done | |
};; | |
"stop") { | |
for i in hadoop01 hadoop02 hadoop03 | |
do | |
echo ------------ zookeeper $i status ------------ | |
ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh status" | |
done | |
};; | |
esac |
修改文件权限:
chmod a+x zk.sh |
注意:
- 脚本中 hadoop01 等作为主机名,需要在对应的主机上配置了对应的 hosts 才会有效。
- 脚本执行过程中,如果主机权限拦截,需要输入密码。
# Docker 安装
使用 Docker 安装 Zookeeper(单节点)
拉取 zookeeper 镜像:
docker pull zookeeper # 拉取指定版本镜像 |
预先创建 zookeeper 挂载目录:
mkdir -p /home/docker/zookeeper/conf | |
chmod 777 /home/docker/zookeeper/conf | |
mkdir -p /home/docker/zookeeper/data | |
mkdir -p /home/docker/zookeeper/log |
创建自定义网络:
docker network create --driver bridge --subnet 127.0.0.1/16 zk_network | |
docker network ls # 查看是否创建成功 |
启动镜像:
docker run -d -p 2181:2181 -p 2888:2888 -p 3888:3888 --name zookeeper --privileged --restart always --network zk_network \ | |
-v /home/docker/zookeeper/conf:/conf \ | |
-v /home/docker/zookeeper/data:/data \ | |
-v /home/docker/zookeeper/datalog:/datalog \ | |
zookeeper |
检验是否启动成功:
docker exec -it zookeeper zkCli.sh | |
# 进入客户端后,如需退出,可以使用 quit 指令 |
该命令其实相当于:
docker exec -it zookeeper /bin/bash | |
cd bin/ | |
./zkCli.sh -server 127.0.0.1:2181 |
如果出现 [zk: localhost:2181(CONNECTED) 0]
则表示 zookeeper 启动成功。
查看当前 zookeeper 模式及配置:
./zkServer.sh status | |
Zookeeper JMX enabled by default | |
Using config: /conf/zoo.cfg | |
Client port found: 2181. Client address: localhost. Client SSL: false. | |
Mode: standalone |
注意:这里的 zookeeper 的配置文件位于容器中的 /conf/zoo.cfg 位置,并被映射到宿主机的 /home/docker/zookeeper/conf 目录。
这里仅列出了必要的命令,更多 Docker 相关知识,可参考文章《给,你要的 Docker 详细教程》。
提示:
退出容器使用
Ctrl + P + Q
。Docker 安装 zookeeper 默认未指定日志文件,如需要日志文件配置,可从官网下载完整 zookeeper,并将
/conf/log4j.properties
拷贝至配置目录,然后重启即可。
启动和停止 zookeeper 服务:
./zkServer.sh start # 启动 zookeeper 服务 | |
./zkServer.sh stop # 停止 zookeeper 服务 |
# Zookeeper 选举机制
依次启动集群节点,当节点超过半数时,将在已启动的集群节点中找出 ID 最大的节点,并被选举为 Leader。一旦 Leader 确定,新加入节点自动标记为 Follower。
Zookeeper 节点状态:
- LOOKING:寻找 Leader 状态,处于该状态需要进入选举流程。
- LEADING:领导者状态,处于该状态的节点说明是角色已经是 Leader。
- FOLLOWING:跟随者状态,表示 Leader 已经选举出来,当前节点角色是 Follower。
- OBSERVER:观察者状态,表明当前节点角色是 observer。
选举相关概念:
- SID:服务器 ID,用来唯一标识 ZK 集群中的每一台及其,它和 myid 保持一致。
- ZXID:事务 ID,用来标识一次服务器状态的变更。
- Epoch:每个 Leader 任期的代号。
自制图例(点击查看大图):
注:图是画着玩的,能理解就好,不相关的不要纠结哈。
# Zookeeper 工作流
一旦 Zookeeper 集合启动,它将等待客户端连接。客户端将连接到 Zookeeper 集合中的一个节点。它可以是 Leader 或 Follower 节点。一旦客户端被连接,节点将向特定客户端分配会话 ID 并向该客户端发送确认。如果客户端没有收到确认,它将尝试连接 Zookeeper 集合中的另一个节点。 一旦连接到节点,客户端将以有规律的间隔向节点发送心跳,以确保连接不会丢失。
- 如果客户端想要读取特定的 znode,它将会向具有 znode 路径的节点发送读取请求,并且节点通过从其自己的数据库获取来返回所请求的 znode。因此,在 Zookeeper 集合中读取速度快。
- 如果客户端想要将数据存储在 Zookeeper 集合中,则会将 znode 路径和数据发送到服务器。连接的服务器将该请求转发给 Leader,然后 Leader 将向所有的 Follower 重新发出写入请求。如果只有大部分节点成功响应,而写入请求成功,则成功返回代码将被发送到客户端。 否则,写入请求失败。绝大多数节点被称为
Quorum
。
Zookeeper 工作流示意图:
Zookeeper 工作流组件描述:
写入(write)
写入过程由 Leader 节点处理。Leader 将写入请求转发到所有 znode,并等待 znode 的回复。如果一半的 znode 回复,则写入过程完成。
读取(read)
读取由特定连接的 znode 在内部执行,因此不需要与集群进行交互。
复制数据库(replicated database)
它用于在 zookeeper 中存储数据。每个 znode 都有自己的数据库,每个 znode 在一致性的帮助下每次都有相同的数据。
领导者(Leader)
Leader 是负责处理写入请求的 znode。
跟随者(Follower)
Follower 从客户端接收写入请求,并将它们转发到 Leader znode。
请求处理器(request processor)
只存在于 Leader 节点。它管理来自 Follower 节点的写入请求。
原子广播(atomic broadcasts)
负责广播从 Leader 节点到 Follower 节点的变化。
# Zookeeper CLI
/opt/module/zookeeper-3.5.7/bin/zkCli.sh -server hadoop01:2181 |
# 节点类型
Zookeeper 创建节点的语法为 create -[params] [path] [data]
,它主要有以下四种节点类型:
PERSISTENT(持久节点)
节点创建后,即使服务器重启,节点都会一直存在,直到主动删除该节点。
create /node "node"
PERSISTENT_SEQUENTIAL(持久顺序节点)
具有顺序命名特点的持久节点,父节点会为其第一季子节点维护一份时序,记录子节点的先后顺序。该节点类型可以通过参数
-s
进行指定。create -s /node/seq_node "seq_node"
EPHEMERAL(临时节点)
在客户端会话失效后节点自动清除,临时节点下不能创建子节点。该节点类型可以通过参数
-e
进行指定。create -e /node/temp_node "temp_node"
EPHEMERAL_SEQUENTIAL(顺序临时节点)
具有顺序命名特点的临时节点,该节点类型可以通过参数
-e -s
进行指定。create -e -s /node/temp_seq_node "temp_seq_node"
# 节点操作
ls / # 查看根节点列表 | |
create /nodeA "info about nodeA" # 创建节点并指定内容(持久节点) | |
create /nodeA/node "info about node, parent is nodeA" # 创建子节点并指定内容 | |
ls /nodeA # 查看子节点列表 | |
get -s /zodeA # 获取节点内容 | |
create -s /node "info about node which sorted by no" # 创建顺序编号的节点并指定内容 | |
quit # 退出节点(退出后,再次进入持久节点仍然存在 ----------) | |
create -e /nodeB "info about nodeB, temp node" # 创建临时节点 | |
create -e -s /nodeB/node "info about nodeB, temp node, sorted by no" # 创建顺序编号的临时节点 | |
set /nodeA/node "node info is updated" # 修改节点内容 | |
delete /nodeA/node # 移除节点 | |
deleteall /nodeA # 递归移除节点及其子节点 |
# 监听器
Zookeeper 通过参数 -w
可以设置监听节点数据及节点的增删变化:
get -w /nodeA # 监听节点数据的变化 | |
ls -w /nodeA # 监听子节点增删变化 |
Zookeeper 监听器原理:
- 在创建 zookeeper 客户端时,会创建两个线程,一个负责网络连接通信(connet),一个负责事件监听(listener)。
- 通过 connet 线程将注册的监听事件发送给 zookeeper。
- zookeeper 将注册的监听事件添加到监听器列表。
- 当 zookeeper 监听到数据或路径发生变化,就会将消息发送给 listener 线程。
- listener 线程内部调用了
process()
方法进行处理。
# Zookeeper API
Zookeeper 有一个绑定 Java 和 C 的官方 API。Zookeeper 社区为大多数语言(.NET,python 等)提供非官方 API。使用 Zookeeper API,应用程序可以连接,交互,操作数据,协调,最后断开与 Zookeeper 集合的连接。
本文以 Java 语言作为示例。
# Java 简单示例
准备工作
- 创建普通 maven 工程。
- 启动可用的 zookeeper 集群服务。
引入依赖
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.5.5</version>
</dependency>
测试文件
public class ZkTest {
private String connectString = "hadoop01:2181,hadoop02:2181,hadoop03:2181";
private int sessionTimeout = 2000; // 会话超时,毫秒
private Zookeeper zkClient = null;
@Before
public void init() throws IOException {
zkClient = new Zookeeper(connectString, sessionTimeout, watchedEvent -> {
// 处理监视器任务:获取所有根节点
List<String> children = null;
try {
children = zkClient.getChildren("/", true);
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
for (String child : children) {
System.out.println(child);
}
});
}
@Test
public void create() throws KeeperException, InterruptedException {
String result = zkClient.create("/test", "node for test".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
@Test
public void exists() throws KeeperException, InterruptedException {
Stat stat = zkClient.exists("/test", false);
System.out.println(stat == null ? "not exist" : "exist");
}
}
代码优化
在实际使用过程中,我们通常会将相关的连接方法提取作为单独的连接器,例如:
public class ZookeeperConnector {
private Zookeeper client;
final CountDownLatch connectedSignal = new CountDownLatch(1);
public Zookeeper connect(String host) throws IOException, InterruptedException {
client = new Zookeeper(host, 5000, watcher -> {
if (watcher.getState() == Watcher.Event.KeeperState.SyncConnected) {
connectedSignal.countDown();
}
});
connectedSignal.await();
return client;
}
public void close() throws InterruptedException {
client.close();
}
}
在使用时直接调用连接器示例即可:
public class ZkCreate {
private static Zookeeper zk;
private static ZookeeperConnector conn;
public static void create(String path, byte[] data) throws KeeperException, InterruptedException {
zk.create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
public static void main(String[] args) {
String path = "/MyZnode";
byte[] data = "My Znode".getBytes();
try {
conn = new ZookeeperConnector();
zk = conn.connect("hadoop01:2181,hadoop02:2181,hadoop03:2181");
create(path, data);
conn.close();
} catch (Exception e) {
System.out.println(e.getMessage());
}
}
}
# Curator 框架
官方地址:http://curator.apache.org/curator-framework
用法示例:
public class ZkCurator { | |
private final String connectString = "hadoop01:2181,hadoop02:2181,hadoop03:2181"; | |
@Test | |
public void test() throws Exception { | |
// 重试策略 | |
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); | |
// 创建客户端,方式一 | |
// CuratorFramework client = CuratorFrameworkFactory.newClient(connectString, 5000, 5000, retryPolicy); | |
// 创建客户端,方式二 | |
CuratorFramework client = CuratorFrameworkFactory.builder() | |
.connectString(connectString) | |
.sessionTimeoutMs(5000) // 会话超时时间 | |
.connectionTimeoutMs(5000) // 连接超时时间 | |
.retryPolicy(retryPolicy) | |
.namespace("base") // 包含隔离名称 | |
.build(); | |
client.start(); | |
System.out.println("已创建并启动客户端"); | |
// 创建数据节点 | |
client.create().creatingParentContainersIfNeeded() // 递归创建所需父节点 | |
.withMode(CreateMode.PERSISTENT) // 创建类型为持久节点 | |
.forPath("/nodeA", "init".getBytes()); // 目录及内容 | |
System.out.println("成功创建持久节点"); | |
// 获取节点数据 | |
byte[] bytes = client.getData().forPath("/nodeA"); | |
System.out.println(new String(bytes));// init | |
// 修改节点数据 | |
client.setData() | |
.withVersion(0) // 指定版本修改 | |
.forPath("/nodeA", "data".getBytes()); | |
// 事务处理 | |
Collection<CuratorTransactionResult> commit = client.inTransaction().check().forPath("/nodeA") | |
.and() | |
.create().withMode(CreateMode.EPHEMERAL).forPath("/nodeB", "init".getBytes()) | |
.and() | |
.create().withMode(CreateMode.EPHEMERAL).forPath("/nodeC", "init".getBytes()) | |
.and() | |
.commit(); | |
commit.forEach(c -> { | |
System.out.println("Path: " + c.getForPath()); | |
}); | |
Stat stat = client.checkExists().forPath("/nodeA");// 检查是否存在 | |
if (stat != null) { | |
System.out.println("/base/nodeA 节点存在。"); | |
} | |
List<String> strings = client.getChildren().forPath("/nodeA");// 获取子节点的路径 | |
for (String string : strings) { | |
System.out.println("path: " + string); | |
} | |
// 异步回调 | |
Executor executor = Executors.newFixedThreadPool(2); | |
client.create() | |
.creatingParentsIfNeeded() | |
.withMode(CreateMode.PERSISTENT) | |
.inBackground((curatorFramework, curatorEvent) -> { | |
System.out.println(String.format("eventType:%s,resultCode:%s", curatorEvent.getType(), curatorEvent.getResultCode())); | |
}, executor) | |
.forPath("/syncNode"); | |
Thread.sleep(2000); | |
// 删除数据节点 | |
client.delete() | |
.guaranteed() // 强制保证删除 | |
.deletingChildrenIfNeeded() // 递归删除子节点 | |
.withVersion(0) // 指定删除的版本号 | |
.forPath("/"); | |
System.out.println("已成功删除节点[/base]"); | |
} | |
} |
# 参考
- https://www.bilibili.com/video/BV1to4y1C7gw (个人感觉讲得不是太好)
- https://zhuanlan.zhihu.com/p/59669985
- https://www.w3cschool.cn/zookeeper