ZooKeeper源码分析(一)—会话创建之客户端

  |   0 评论   |   0 浏览

开发人员主要使用zk的客户端,所以我们先来了解zk客户端的创建过程原理。

1. 概述

zk客户端的核心组件如下:

  • ZooKeeper实例 :客户端入口
  • ClientWatcherManager :客户端Watcher管理器
  • HostProvider:客户端地址列表管理器
  • ClientCnxn:客户端核心线程。包含两个线程,即SendThread和EventThread。前者是一个I/O线程,主要负责ZooKeeper客户端和服务端之间的网络I/O通信,后者是一个事件线程,主要负责对服务端事件进行处理。

类图说明:
te

ZooKeeper客户端的初始化与启动环节,实际上就是ZooKeeper对象的实例化过程,分析一个zk客户端的构造方法:

 public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long sessionId, byte[] sessionPasswd, boolean canBeReadOnly,
  HostProvider aHostProvider, ZKClientConfig clientConfig) throws IOException {
}

connectString:逗号隔开的 host:port对,表示节点路径,比如:127.0.0.1:3002/app/a 表示客户端的root节点为/app/a
sessionTimeout:session过期时间,单位:毫秒
watcher:默认的Watcher,会收到状态更新、节点事件的消息推送
sessionId:重连时特定的session Id
canBeReadOnly:是否是只读模式,这种情况只允许读流量,写入将被拒绝
aHostProvider:
clientConfig:客户端配置信息

客户端初始化和启动过程大体分为三个步骤:

  • 设置默认Watcher
  • 设置ZooKeeper服务器地址列表
  • 创建ClientCnxn

2.会话的创建过程

2.1.初始化阶段

  1. 初始化ZooKeeper对象。
    通过调用Zookeeper的构造方法来实例化一个ZooKeeper对象,初始化过程中,会创建一个客户端的Watcher管理器
  2. 设置会话默认Watcher
    如果在构造方法中传入一个Wat对象,那么客户端会将这个对象作为默认Watcher保存在ClientWatchManager中。
  3. 构造ZooKeeper服务器地址列表管理器:HostProvider
    对于构造方法中传入的服务器地址,客户端会将其存放在服务器地址累不管理器中
  4. 创建并初始化客户端网络连接器:ClientCnxn
    Zook客户端首先会创建一个网络连接器ClientCnxn,用来管理客户端与服务器的网络交互。另外,客户端在创建ClientCnxn的同事,还会初始化客户端的两个核心队列outgoingQueue和pendingQueue,分别作为客户端的请求发送队列和服务端响应的等待队列。
  5. 初始化SendThread和EventThread。
    客户端会创建两个核心网络线程SendThread和EventThread,前者用于管理客户端和服务端之间的所有网络I/O,后者用于客户端事件处理。客户端还会将ClientCnxnSocket分配给SendThread作为底层网络I/O处理器,并初始化EventThread的待处理事件队列waitingEvents,用于存放所有等待被客户端处理的事件。

源码分析:

if (clientConfig == null) {
    clientConfig = new ZKClientConfig();
}

this.clientConfig = clientConfig;

// 设置默认Watcher管理器
watchManager = defaultWatchManager();
watchManager.defaultWatcher = watcher;

// 客户端连接地址解析,支持IPV6,这里用到了ChrootPath,表示根节点
ConnectStringParser connectStringParser = new ConnectStringParser(
        connectString);
// 设置服务端地址管理器
hostProvider = aHostProvider;

// 初始化网络连接器
cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
  hostProvider, sessionTimeout, this, watchManager,
  getClientCnxnSocket(), sessionId, sessionPasswd, canBeReadOnly);
  
cnxn.seenRwServerBefore = true; // since user has provided sessionId
cnxn.start();


客户端还会将ClientCnxnSocket分配给SendThread作为底层网络I/O处理器

前面提到了这样一句话,看下具体如何实现:
new ClientCnxn时调用 getClientCnxnSocket()创建了一个 ClientCnxnSocket

ClientCnxnSocket创建过程如下:

private ClientCnxnSocket getClientCnxnSocket() throws IOException {

    // 从配置信息中获取线程名称
    String clientCnxnSocketName = getClientConfig().getProperty(
            ZKClientConfig.ZOOKEEPER_CLIENT_CNXN_SOCKET);
   // 默认NIO方式
   if (clientCnxnSocketName == null) {
	clientCnxnSocketName = ClientCnxnSocketNIO.class.getName();
   }
   try {
        // 1.通过反射获取clientCxn构造器,ZKClientConfig.class是构造器需要的参数
	Constructor clientCxnConstructor = Class.forName(clientCnxnSocketName).getDeclaredConstructor(ZKClientConfig.class);

	// 2.实例化ClientCnxnSocket
	ClientCnxnSocket clientCxnSocket = (ClientCnxnSocket) clientCxnConstructor.newInstance(getClientConfig());
	return clientCxnSocket;
    } catch (Exception e) {
	IOException ioe = new IOException("Couldn't instantiate "
	+ clientCnxnSocketName);
	ioe.initCause(e);
	throw ioe;
    }
}

然后执行 new ClientCnxn

public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,
  ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,
  long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) {
  // 设置参数
  this.zooKeeper = zooKeeper;
  this.watcher = watcher;
  this.sessionId = sessionId;
  this.sessionPasswd = sessionPasswd;
  this.sessionTimeout = sessionTimeout;
  this.hostProvider = hostProvider;
  this.chrootPath = chrootPath;

  connectTimeout = sessionTimeout / hostProvider.size();
  readTimeout = sessionTimeout * 2 / 3;
  readOnly = canBeReadOnly;
  // 初始化SendThread,管理客户端和服务端之间的网络I/O,依赖于clientCnxnSocket,守护线程
  sendThread = new SendThread(clientCnxnSocket);
  // 初始化EventThread,用于事件处理,会被设置为守护线程
  eventThread = new EventThread();
  this.clientConfig=zooKeeper.getClientConfig();
  // 初始化超时机制
  initRequestTimeout();
}

至此,初始化阶段完成。

2.2. 会话创建阶段

  1. 启动SendThread和EventThread
    SendThread首先会判断当前客户端的状态,进行一系列清理工作,为客户端发送会话创建请求做准备
  2. 获取一个服务器地址
    开始创建Tcp连接之前,SendThread首先需要获取一个ZooKeeper服务器的目标地址,通常是从HostProvider中随机选出一个,然后委托给ClientCnxnSocket去创建与ZooKeeper服务器之间的TCP连接
    选取规则:serverAddress = hostProvider.next(1000);
  3. 创建TCP连接
    获取到一个服务器地址后,ClientCnxnSocket负责和服务器创建一个TCP长链接
  4. 构造ConnectRequest请求
    SendThread会负责根据当前客户端的实际设置,构造出一个ConnectRequest请求,该请求代表了客户端试图与服务端创建一个会话。同时,ZooKeeper客户端还会进一步将该请求包装成网络I/O层的Packet对象,放入请求发送队列outgoingQueue中去
  5. 发送请求
    ClientCnxnSocket负责从outgoingQueue中取出一个待发送的Packet对象,将其序列化成ByteBuffer向服务端进行发送

源码分析:
初始化SendThread时,会触发run()方法。这里面有个 startConnect(InetSocketAddress addr)方法,负责建立连接。有两种实现方式,NIONetty

NIO connent:

@Override
void connect(InetSocketAddress addr) throws IOException {
	// 创建socket
    SocketChannel sock = createSock();
   try {
	 // 注册
	registerAndConnect(sock, addr);
  } catch (IOException e) {
	LOG.error("Unable to open socket to " + addr);
  sock.close();
	throw e;
  }
   initialized = false;

  /*
   * Reset incomingBuffer 
   */  
   lenBuffer.clear();
   incomingBuffer = lenBuffer;
}

选择器注册:

void registerAndConnect(SocketChannel sock, InetSocketAddress addr) 
throws IOException {

  // selector注册
 sockKey = sock.register(selector, SelectionKey.OP_CONNECT);
 boolean immediateConnect = sock.connect(addr);
 if (immediateConnect) {
	   // 连接的主要逻辑:设置会话、权限处理、watches处理
        sendThread.primeConnection();
  }
}

Netty方式的代码在 ClientCnxnSocketNetty中,代码比较多,就不详细说了。达成的效果是一样的。

接下来是构造构造ConnectRequest的过程,都是在 primeConnection()方法中完成。

// 创建request对象
ConnectRequest conReq = new ConnectRequest(0, lastZxid, sessionTimeout, sessId, sessionPasswd);

// 构造器
public ConnectRequest(
   int protocolVersion,
   long lastZxidSeen,
   int timeOut,
   long sessionId,
   byte[] passwd) {
	this.protocolVersion=protocolVersion;
   this.lastZxidSeen=lastZxidSeen;
   this.timeOut=timeOut;
   this.sessionId=sessionId;
   this.passwd=passwd;
}

数据格式如上:

数据包封装代码如下:

// 设置packet数据包
RequestHeader header = new RequestHeader(-8, OpCode.setWatches);
Packet packet = new Packet(header, new ReplyHeader(), sw, null, null);
// 放入outgoingQueue中去
outgoingQueue.addFirst(packet);

2.3.响应处理阶段

  1. 接收服务端响应
    ClientCnxnSocket 接收到服务端的响应后,会判断当前客户端状态是否是已初始化,如果尚未完成初始化,那么就认为该响应一定是会话创建请求的响应,直接交由readConnectResult方法来处理响应。
  2. 处理Response
    ClientCnxnSocket会对接收到的服务端响应进行反序列化,得到ConnectResponse对象,并充中获取到ZooKeeper服务端分配的会话sessionId
  3. 连接成功
    连接成功后,一方面需要通知SendThread线程,进一步对客户端进行会话参数的设置,并更新客户端状态;另一方面,需要通知地址管理器HostProvider当前连接成功的服务器地址。
  4. 生成事件:SyncConnected-None
    为了能够让上层应用感知到会话的成功创建,SendThread会生成一个事件 SyncConnected-None,代表客户端与服务器会话创建成功,并将该事件传递给EventThread线程。
  5. 查询Watcher
    EventThread线程收到事件后,会从ClientWatchManager中查询对应的Watcher,然后放入waitingEvents队列中去
  6. 处理事件
    EventThread不断从waitingEvents队列中取出待处理的Watcher对象,然后直接调用该对象的Process接口方法,以达到触发Watcher的目的。

源码分析:

if (!incomingBuffer.hasRemaining()) {
    incomingBuffer.flip();
   if (incomingBuffer == lenBuffer) {
	recvCount.getAndIncrement();
	readLength();
   } else if (!initialized) {
	 // 未初始化完成,交由readConnectResult方法处理
	readConnectResult();
	enableRead();
	 if (findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress()) != null) {
	  // Since SASL authentication has completed (if client is configured to do so),
	  // outgoing packets waiting in the outgoingQueue can now be sent.
	  enableWrite();
	  }
	lenBuffer.clear();
	incomingBuffer = lenBuffer;
	updateLastHeard();
	initialized = true;
	} else {
	// 处理Response
	  sendThread.readResponse(incomingBuffer);
	  lenBuffer.clear();
	  incomingBuffer = lenBuffer;
	  updateLastHeard();
	}
}

处理Response:

反序列化:

ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);
BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
ReplyHeader replyHdr = new ReplyHeader();

replyHdr.deserialize(bbia, "header");

分xid处理,如果xid = -2,处理ping信息的。打印日志返回。

if (replyHdr.getXid() == -2) {
    // -2 is the xid for pings
  if (LOG.isDebugEnabled()) {
        LOG.debug("Got ping response for sessionid: 0x"
  + Long.toHexString(sessionId)
                + " after "
  + ((System.nanoTime() - lastPingSentNs) / 1000000)
                + "ms");
  }
    return;
}

xid = -4,处理鉴权

if (replyHdr.getXid() == -4) {
    // -4 is the xid for AuthPacket             
 if(replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) {
        state = States.AUTH_FAILED;                  
 eventThread.queueEvent( new WatchedEvent(Watcher.Event.EventType.None, 
 Watcher.Event.KeeperState.AuthFailed, null) );
  eventThread.queueEventOfDeath();
  }
    if (LOG.isDebugEnabled()) {
        LOG.debug("Got auth sessionid:0x"
  + Long.toHexString(sessionId));
  }
    return;
}

xid = -1 处理通知信息:

WatcherEvent event = new WatcherEvent();
// 反序列化event信息
event.deserialize(bbia, "response");
WatchedEvent we = new WatchedEvent(event);

// 放入event队列中去处理
eventThread.queueEvent( we );

EventThread事件处理run():

public void run() {
   try {
      isRunning = true;
      while (true) {
         Object event = waitingEvents.take();
         if (event == eventOfDeath) {
            wasKilled = true;
         } else {
            // 事件处理
            processEvent(event);
         }
         if (wasKilled)
            synchronized (waitingEvents) {
               if (waitingEvents.isEmpty()) {
                  isRunning = false;
                  break;
               }
            }
      }
   } catch (InterruptedException e) {
      LOG.error("Event thread exiting due to interruption", e);
   }

    LOG.info("EventThread shut down for session: 0x{}",
             Long.toHexString(getSessionId()));
}

processEvent() 方法逻辑:调用每个Watch的process方法,达到触发Watcher的目的

if (event instanceof WatcherSetEventPair) {
  // each watcher will process the event
  WatcherSetEventPair pair = (WatcherSetEventPair) event;
  for (Watcher watcher : pair.watchers) {
      try {
	  // watcher触发
          watcher.process(pair.event);
      } catch (Throwable t) {
          LOG.error("Error while calling watcher ", t);
      }
  }
}

客户端会话创建流程:
s

至此,客户端一次完整的会话创建过程就已经完成了。


标题:ZooKeeper源码分析(一)—会话创建之客户端
作者:guobing
地址:http://www.guobingwei.tech/articles/2019/03/15/1552608908888.html