整个Zookeeper集群就是一个多节点分布式一致性算法的实现,底层采用的实现协议为ZAB。
ZAB = Zookeeper Atomic Broadcast(Zookeeper原子广播协议)
ZAB协议是Zookeeper专门设计的,支持 “原子广播”和“崩溃恢复”的协议;
Zookeeper是一个保证CP的主从架构,所有的写操作必须通过Leader节点进行,读操作可以通过Leader、Follower、Observer任意节点进行。
所有的事务操作被设计成 两阶段提交(2PC),大致过程简化为:

一、原子广播
ZAB协议的消息广播过程是一个原子广播协议,类似一个“两阶段提交过程(2PC)”,对于客户端发送的所有写请求(事务操作),全部会由Leader节点处理;
-
Leader节点在收到请求后,会将请求封装成一个事务Proposal,将起发送给所有的Follower节点,Follower节点在收到Proposal后,会给Leader节点返回ack;(第一阶段)
-
如果Leader节点收到了超过半数以上的ack(包含Leader节点自己的一次ack),就会认为本次事务操作可以被执行,然后就像所有的Follower节点发送commit;同时通知Observer节点存储消息(第二阶段)

一些细节点:
-
如果客户端的写请求被Follower节点接收到,那么集群会自动将请求转发到Leader节点进行处理;
-
Leader节点在接收到写请求时,会为本次请求生成一个zxid,并将请求封装为一个Proposal发送给所有的Follower节点,同时自己还要将本次事务操作写入到自己本地的事务日志中(磁盘),并直接给自己返回一次ack(统计ack时候千万别忘了自己的ack);而Follower节点收到Proposal后也执行相同的操作,写入本地事务日志文件(磁盘),再给Leader节点返回ack;
-
ZAB协议要求保证事务的顺序,所有所有节点会将所有的事务,按照zxid进行排序后,再进行处理(主要时通过队列完成);
-
zxid是一个64位的数字,前32位用来表示本次事务所属的epoch(集群周期),后32位是一个自增的计数;有了epoch的设计,就保证了,即使旧的集群leader生成了新的zxid,但是当集群恢复时,由于它的epoch值小于新的集群中的对应的zxid,旧leader的zxid对应的事务就会被丢弃,而以新集群的为准。(旧Leader虽然会有一次多余的zxid,但是绝对不会被commit执行,因为收不到半数的ack,见于“Zookeeper是如何避免脑裂问题的?”)
-
Leader节点和Follower节点完成第一阶段的ack统计后,进行commit操作,同时不要忘记给Observer节点进行inform通知,Observer节点之前可一直没有参与!
-
Leader和Follower在进行数据发送时,中间还做了一层队列,用来实现解耦,防止同步阻塞!
public class LearnerHandler extends ZooKeeperThread {
final LinkedBlockingQueue<QuorumPacket> queuedPackets = new LinkedBlockingQueue<QuorumPacket>();
}
// 循环忘所有的follower节点发送对应的数据,包括Proposal、commit、ping等
void sendPacket(QuorumPacket qp) {
synchronized (forwardingFollowers) {
for (LearnerHandler f : forwardingFollowers) {
f.queuePacket(qp);
}
}
}
void queuePacket(QuorumPacket p) {
queuedPackets.add(p);
}
二、崩溃恢复
当主节点挂掉,或者失去了与过半节点的心跳检测,整个集群就会进入崩溃恢复模式(此时集群不可用,典型的CP牺牲高可用性,保证强一致性!)
整个集群新的选举过程,与集群启动时的选举过程一致,只不过现在的epoch值不再是从0开始!
崩溃恢复模式需要保证解决2个问题:
1、已经被commit的事务不能丢;
如果Leader已经发出了commit后,挂了,那么所有的Follower中只要有一个成哥commit,那么这条数据就能够被成功commit,不会被丢弃;
这也就保证了,如果一条事务被集群的某个节点成功commit执行,他也必然会被集群中的其它节点执行成功!
2、没被commit的事务需要被丢弃;
如果Leader还没有发出commit,就挂了,那么这条事务就会被新的集群丢弃;
因为新的Leader产生后,他会对比自己的“磁盘事务日志文件” 和“内存数据”,对于那些还未被commited的事务日志将直接被丢弃。
三、其它问题
1、Zookeeper如何保证事务消息执行的顺序性
首先就是依赖zxid(前32位为epoch值,后32位为递增的计数),当所有的follower接收到proposal提案时,除了将事务操作写入本地事务日志外,还会将本次request记录到一个阻塞队列pendingTxns中:
LinkedBlockingQueue<Request> pendingTxns = new LinkedBlockingQueue<Request>();
public void logRequest(TxnHeader hdr, Record txn) {
Request request = new Request(hdr.getClientId(), hdr.getCxid(), hdr.getType(), hdr, txn, hdr.getZxid());
if ((request.zxid & 0xffffffffL) != 0) {
pendingTxns.add(request);
}
syncProcessor.processRequest(request);
}
当之后,收到来之Leader节点的commit时,会拿commit中的zxid与pendingTxns队列的头部txid进行对比,必须一样,如果不一样,那么本Follower会退出,然后重新加入集群,再次跟Leader进行同步数据!
/**
* 当收到 COMMIT 消息时,最终会调用此方法,它将来自 COMMIT 的 zxid 与(希望)pendingTxns 队列的头部进行匹配,并将其交给 commitProcessor 进行提交。
* @param zxid - 如果存在,则必须对应于 pendingTxns 的头部
*/
public void commit(long zxid) {
if (pendingTxns.size() == 0) {
LOG.warn("Committing " + Long.toHexString(zxid)
+ " without seeing txn");
return;
}
long firstElementZxid = pendingTxns.element().zxid;
if (firstElementZxid != zxid) {
LOG.error("Committing zxid 0x" + Long.toHexString(zxid)
+ " but next pending txn 0x"
+ Long.toHexString(firstElementZxid));
System.exit(12);
}
Request request = pendingTxns.remove();
commitProcessor.commit(request);
}
(一言不合就去找Leader同步,而Leader节点中的事务执行顺序肯定是有保障的)
2、Zookeeper的所有节点为什么是先写磁盘,后写内存?
因为整个Zookeeper集群的数据读取,是从内存中的dataTree读取。
case OpCode.getData: {
lastOp = "GETD";
GetDataRequest getDataRequest = new GetDataRequest();
ByteBufferInputStream.byteBuffer2Record(request.request,
getDataRequest);
DataNode n = zks.getZKDatabase().getNode(getDataRequest.getPath()); // 从内存dataTree中读取数据
if (n == null) {
throw new KeeperException.NoNodeException();
}
PrepRequestProcessor.checkACL(zks, zks.getZKDatabase().aclForNode(n),
ZooDefs.Perms.READ,
request.authInfo);
Stat stat = new Stat();
byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat,
getDataRequest.getWatch() ? cnxn : null);
rsp = new GetDataResponse(b, stat);
break;
}
// ZKDatabase中维护的核心就是dataTree
public class ZKDatabase {
protected DataTree dataTree; // 核心数据
protected ConcurrentHashMap<Long, Integer> sessionsWithTimeouts;
protected FileTxnSnapLog snapLog;
protected long minCommittedLog, maxCommittedLog;
}
// dataTree的数据结构为:
public class DataTree {
// 核心数据是一个ConcurrentHashMap = (path, DataNode)
private final ConcurrentHashMap<String, DataNode> nodes = new ConcurrentHashMap<String, DataNode>();
private final WatchManager dataWatches = new WatchManager(); // get -w 监听本身数据
private final WatchManager childWatches = new WatchManager(); // ls -w 监听子节点
}
// 每一个DataNode的结构为:
public class DataNode implements Record {
byte data[]; // 字节数组存储实际数据
Long acl;
public StatPersisted stat; // -s 状态信息
private Set<String> children = null; // 子节点列表
}
如果先写内存的话,那么即使没有完成2PC操作,有客户端来读取时,也会直接从内存中把数据读取走,ZAB一致性协议也就失效了!
4、旧Leader断线后,重新加入集群,会乖乖地做Follower
因为旧的Leader的epoch值小于新集群Leader的epoch值,它只能乖乖做小弟;
对于数据中的zxid不一致的数据,只能以新Leader的为准,因为前32位表示的epoch值比新集群的小。
四、为什么Zookeeper不存在脑裂问题?
所谓的脑裂问题:可能由于网络等问题,导致集群中的机器被分为两个阵营,如果不考虑“过半机制”,那么两个阵营都会选出新的Leader,这样出现了两个Leader的现象就被称为“脑裂”现象!

但是由于Zookeeper的“过半机制”,脑裂问题旧不存在了,体现在两个环节:
-
选举阶段:当网络被分为两个分区时,由于“选举过半机制”,那么只能会选出“1个或者0个”Leader节点,用于不会出现2个Leader!
-
写数据阶段:当旧Leader节点从大网络分区分离出去时,大网络分区会选举出一个新的Leader,而旧Leader并不知道,它还在执行“写数据”的操作,但是由于ZAB的“proposal-ack过半机制”,就导致这一条事务并不会被写成功,同时由于 1/2 以上的心跳检测失败,很快旧Leader将不在是Leader。所以集群很快就会只有1或0个Leader,所以永远不会出现脑裂问题;
五、Zookeeper的监听机制的实现?何时触发?
以“创建节点”为例,任意节点,在接收到commit后,会执行processTxn()方法:
private ProcessTxnResult processTxn(Request request, TxnHeader hdr,
Record txn) {
ProcessTxnResult rc;
int opCode = request != null ? request.type : hdr.getType();
long sessionId = request != null ? request.sessionId : hdr.getClientId();
if (hdr != null) {
rc = getZKDatabase().processTxn(hdr, txn); // 调用DataTree创建节点
} else {
rc = new ProcessTxnResult();
}
}
DataTree中的processTxn()方法(加深印象:Zookeeper的数据就是这个DataTree):
public ProcessTxnResult processTxn(TxnHeader header, Record txn, boolean isSubTxn)
{
ProcessTxnResult rc = new ProcessTxnResult();
try {
rc.clientId = header.getClientId();
rc.cxid = header.getCxid();
rc.zxid = header.getZxid();
rc.type = header.getType();
rc.err = 0;
rc.multiResult = null;
switch (header.getType()) {
case OpCode.create:
CreateTxn createTxn = (CreateTxn) txn;
rc.path = createTxn.getPath();
createNode( // 去创建DataNode节点
createTxn.getPath(),
createTxn.getData(),
createTxn.getAcl(),
createTxn.getEphemeral() ? header.getClientId() : 0,
createTxn.getParentCVersion(),
header.getZxid(), header.getTime(), null);
break;
}
}
}
createNode()方法的最后:
public void createNode(final String path, byte data[], List<ACL> acl,
long ephemeralOwner, int parentCVersion, long zxid, long time, Stat outputStat){
...创建节点的操作...
// 触发
dataWatches.triggerWatch(path, Event.EventType.NodeCreated);
childWatches.triggerWatch(parentName.equals("") ? "/" : parentName, Event.EventType.NodeChildrenChanged);
}
org.apache.zookeeper.server.WatchManager#triggerWatch(…):
Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
WatchedEvent e = new WatchedEvent(type,
KeeperState.SyncConnected, path);
HashSet<Watcher> watchers;
synchronized (this) {
watchers = watchTable.remove(path); // watch是一次性的,取完一次就remove
// ...周边逻辑,如果为空,就返回了...
}
for (Watcher w : watchers) {
if (supress != null && supress.contains(w)) {
continue;
}
w.process(e); //遍历watch,进行处理,其实就是通过Netty或者NIO给客户端发送event事件
}
return watchers;
}
我们很有必要看看中各重要的watchTable容器是什么?
private final HashMap<String, HashSet<Watcher>> watchTable = new HashMap<String, HashSet<Watcher>>(); //通过追溯,我们会发现,watch其实就是cnxn,而cnxn就是NettyServerCnxn或者NIOServerCnxn连接 watch = getDataRequest.getWatch() ? cnxn : null
所以我们最终的结论就是:
Zookeeper服务端在接收到请求时,会判断watch这个字段为true还是false,如果为true,则将本次的cnxn连接作为watch对象,保存在watchTable中,key为path,值为cnxn的HashSet;
watchs只需要保存在处理本次请求的节点即可,不需要同步给其他节点;
当有其他“写请求”发生时,这台服务节点最终肯定会接收到对应的事务处理,如:createNode,在createNode的最后,会尝试从本地的watchTables中获取watchs,如果有就遍历通过对应的cncx连接发送event事件。
所有的watch都是一次性的,因为使用的是ConcurrentHashMap.remove()方法。



