ZooKeeper源码分析(三)—服务端启动之单机模式

  |   0 评论   |   0 浏览

1. 预启动

1.1. QuorumPeerMain作为启动入口

跟集群模式一样,启动入口也是在 QuorumPeerMainmain方法中。

1.2. 解析配置文件 zoo.cfg

配置文件包含了zk运行时需要的基本参数,比如tickTime、dataDir和clientPort。解析的主要逻辑如下:

try {
    File configFile = (new VerifyingFileFactory.Builder(LOG)
        .warnForRelativePath()
        .failForNonExistingPath()
        .build()).create(path);
      
    Properties cfg = new Properties();
    FileInputStream in = new FileInputStream(configFile);
    try {
    	// 文件加载
        cfg.load(in);
        configFileStr = path;
    } finally {
        in.close();
    }
    // 解析配置文件
    parseProperties(cfg);
} catch (IOException e) {
    throw new ConfigException("Error processing " + path, e);
} catch (IllegalArgumentException e) {
    throw new ConfigException("Error processing " + path, e);
}   

1.3. 创建并启动历史文件清理器DatadirCleanupManager

ZooKeeper 的自动清理文件机制,对快照文件和事务日志定期进行清理。实例创建的代码如下:

DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config.getDataDir(), config.getDataLogDir(), config.getSnapRetainCount(), config.getPurgeInterval());

可以看到,创建实例时需要知道dataDir、dataLogDir、快照保存个数、清理间隔这些参数。内部基于 TimerTask起了一个定时任务,会根据 purgeInterval定期去清理文件。

timer = new Timer("PurgeTask", true);
TimerTask task = new PurgeTask(dataLogDir, snapDir, snapRetainCount);
timer.scheduleAtFixedRate(task, 0, TimeUnit.HOURS.toMillis(purgeInterval));

1.4. 判断启动模式

根据解析出的服务器地址列表判断走单机模式还是集群模式。

 if (args.length == 1 && config.isDistributed()) {
    runFromConfig(config);
} else {
    LOG.warn("Either no config or no quorum defined in config, running "
            + " in standalone mode");
    // there is only server in the quorum -- run as standalone
    ZooKeeperServerMain.main(args);
}

这里的 args 是启动命令行输入的配置文件名称,当文件名称只有一个,并且配置文件里配置了 distributed=true时走集群模式,否则走单机模式。单机启动的代码都在 ZooKeeperServerMain.main(args)

1.5. 再次进行配置解析

单机启动时,命令输入的参数会有两种情况

  • 输入了配置文件名称
  • 直接输入了配置参数,比如按顺序输入了 clientPortAddressdataDirdataLogDirtickTimemaxClientCnxns

所以要根据这两种情况进行不同的解析

if (args.length == 1) {
    config.parse(args[0]);
} else {
    config.parse(args);
}

当命令行输入的文件名称时,根据路径构造File对象,进行解析,然后把配置信息读入到ServerConfig中

public void parse(String path) throws ConfigException {
    QuorumPeerConfig config = new QuorumPeerConfig();
    // 根据文件名称解析配置文件
    config.parse(path);

    // let qpconfig parse the file and then pull the stuff we are
    // interested in
    // 从配置中读取配置信息,给ServerConfig赋值
    readFrom(config);
}

直接输入配置参数时,需要保证输入参数的顺序,代码中是按顺序取的,顺序见上面描述。

public void parse(String[] args) {
    if (args.length < 2 || args.length > 4) {
        throw new IllegalArgumentException("Invalid number of arguments:" + Arrays.toString(args));
    }
    // clientPortAddress是第一顺序
    clientPortAddress = new InetSocketAddress(Integer.parseInt(args[0]));
    // 第二顺序是dataDir
    dataDir = new File(args[1]);
    dataLogDir = dataDir;
    if (args.length >= 3) {
    	// 第三顺序是tickTime
        tickTime = Integer.parseInt(args[2]);
    }
    if (args.length == 4) {
    	// 第四顺序是maxClientCnxns
        maxClientCnxns = Integer.parseInt(args[3]);
    }
}

2. 初始化

接下来会执行 runFromConfig(ServerConfig config)方法,进行服务器实例化过程,分步骤来看。

2.1. 创建 FileTxnSnapLog

FileTxnSnapLog是ZooKeeper上层服务于底层数据存储之间的对接层,提供了一系列操作数据文件的接口。包括事务日志文件和快照数据文件。根据dataDir和snapDir来创建FileTxnSnapLog。

this.dataDir = new File(dataDir, version + VERSION);
this.snapDir = new File(snapDir, version + VERSION);

主要的方法有如下一些,后面会作为专题来讲

// 读取快照文件和事务日志之后恢复服务端数据
restore(DataTree, Map, Integer>, PlayBackListener):long

// 把最新的事务日志快速更新到server database,与restore不同的是只处理事务日志
fastForwardFromEdits(DataTree, Map, Integer>,PlayBackListener):long

// 把dataTree和sessions放入快照文件中
save(DataTree,ConcurrentHashMap, Integer>, boolean):void

// 在dataTree上处理事务
processTransaction(TxnHeader,DataTree ,Map, Integer> , Record ):void

2.2. 创建 MetricsProvider

MetricsProvider是一个可以收集指标、将当前值发送到外部设备的一个组件。数据在server端和client端可以共享。在《ZooKeeper源码分析二-服务端启动之集群模式》讲过,这里不再细说。

2.3. 创建服务器实例 ZooKeeperServer

依赖txnLog、tickTime、minSessionTimeout、maxSessionTimeout、listenBacklog等参数实例化ZooKeeperServer:

ZooKeeperServer zkServer = new ZooKeeperServer(txnLog,  config.tickTime, config.minSessionTimeout, config.maxSessionTimeout,  config.listenBacklog, null);

2.4. 创建ServerStats

ServerStats是一个服务端统计器,包含了最基本的运行时信息:

  • packetsSent 从zk启动开始,或是最近一次重置服务端统计信息之后,服务端向客户端发送的响应包次数
  • packetsReceived 从zk启动开始,或是最近一次重置服务端统计信息之后,服务端接收到来自客户端的请求包次数
  • requestLatency 从zk启动开始,或是最近一次重置服务端统计信息之后服务端请求处理的最大延时、最小延时、平均延时以及总延时
  • clientResponseStats 提供jute 序列化缓冲区使用情况的实时统计信息
  • fsyncThresholdExceedCount 同步内存数据到存储设备超过阈值的次数
  • startTime zk启动后开始统计的时间

2.5. Registers shutdown handler

ZooKeeperServerShutdownHandler是zk用于处理异常的组件。当系统发生错误时,会使用CountDownLatch通知其他线程停止工作

class ZooKeeperServerShutdownHandler {

    private final CountDownLatch shutdownLatch;

    ZooKeeperServerShutdownHandler(CountDownLatch shutdownLatch) {
        this.shutdownLatch = shutdownLatch;
    }

    /**
     * This will be invoked when the server transition to a new server state.
     *
     * @param state new server state
     */
    void handle(State state) {
        if (state == State.ERROR || state == State.SHUTDOWN) {
            shutdownLatch.countDown();
        }
    }
}

2.6. 启动AdminServer

AdminServer用来管理ZooKeeperServer。有两种实现方式JettyAdminServer和DummyAdminServer。

zookeeper.admin.enableServer为true时才启动AdminServer,通过反射的方式创建实例

public static AdminServer createAdminServer() {
    if (!"false".equals(System.getProperty("zookeeper.admin.enableServer"))) {
        try {
            Class<?> jettyAdminServerC = Class.forName("org.apache.zookeeper.server.admin.JettyAdminServer");
            Object adminServer = jettyAdminServerC.getConstructor().newInstance();
            return (AdminServer) adminServer;

        } catch (ClassNotFoundException e) {
            LOG.warn("Unable to start JettyAdminServer", e);
        }
    }
    return new DummyAdminServer();
}

创建完AdminServer后设置ZooKeeperServer,启动jetty容器

adminServer = AdminServerFactory.createAdminServer();
adminServer.setZooKeeperServer(zkServer);
adminServer.start();

Jetty容器启动代码如下:

@Override
public void start() throws AdminServerException {
    try {
        server.start();
    } catch (Exception e) {
        // Server.start() only throws Exception, so let's at least wrap it
        // in an identifiable subclass
        throw new AdminServerException(String.format(
                "Problem starting AdminServer on address %s,"
                        + " port %d and command URL %s", address, port,
                commandUrl), e);
    }
    LOG.info(String.format("Started AdminServer on address %s, port %d"
            + " and command URL %s", address, port, commandUrl));
}

2.7. 创建ServerCnxnFactory

早期版本,都是自己实现NIO框架,从3.4.0版本引入了Netty,可以通过 zookeeper.serverCnxnFactory来指定使用NIO还是Netty作为Zookeeper服务端网络连接工厂。

2.8. 初始化ServerCnxnFactory

Zookeeper首先会初始化一个Thread,作为整个ServerCnxnFactory的主线程,然后再初始化NIO服务器。

// 初始化selectorThreads
for (int i = 0; i < numSelectorThreads; ++i) {
    selectorThreads.add(new SelectorThread(i));
}
// 初始化expirerThread
expirerThread = new ConnectionExpirerThread();

// 初始化acceptThread
acceptThread = new AcceptThread(ss, addr, selectorThreads);

2.9. 启动ServerCnxnFactory主线程

启动步骤2.7中已经初始化的主线程的主逻辑(run方法)。这时候ZooKeeper的NIO服务器已经对外开放端口,客户端能够访问2181端口,但是此时ZooKeeper服务器是无法正常处理客户端请求的

@Override
public void start() {
    stopped = false;
    if (workerPool == null) {
        workerPool = new WorkerService(
                "NIOWorker", numWorkerThreads, false);
    }
    for (SelectorThread thread : selectorThreads) {
        if (thread.getState() == Thread.State.NEW) {
            thread.start();
        }
    }
    // ensure thread is started once and only once
    if (acceptThread.getState() == Thread.State.NEW) {
        acceptThread.start();
    }
    if (expirerThread.getState() == Thread.State.NEW) {
        expirerThread.start();
    }
}

2.10. 恢复本地数据

从本地快照和事务日志文件中进行数据恢复。详细的数据恢复过程后面专题会讲

public void startdata() throws IOException, InterruptedException {
    //check to see if zkDb is not null
    if (zkDb == null) {
        zkDb = new ZKDatabase(this.txnLogFactory);
    }
    if (!zkDb.isInitialized()) {
        loadData();
    }
}

2.11. 创建并启动会话管理器

创建会话管理器SessionTracker,SessionTracker主要负责ZooKeeper服务端的会话管理,创建SessionTracker时,会设置expireInterval、NextExpirationTime和SessionWithTimeout,还会计算出一个初始化的SessionID

if (sessionTracker == null) {
    createSessionTracker();
}
startSessionTracker();

2.12. 初始化ZooKeeper的请求处理链

典型的责任链方式实现,在ZooKeeper服务器上,会有多个请求处理器一次来处理一个客户端请求,在服务器启动的时候,会将这些请求处理器串联起来形成一个请求处理链。单机版服务器的请求处理链主要包括 PrepRequestProcessorSyncRequestProcessorFinalRequestProcessor

PrepRequestProcessor -> SyncRequestProcessor -> FinalRequestProcessor
单机版ZooKeeper服务器的请求处理链

protected void setupRequestProcessors() {
    RequestProcessor finalProcessor = new FinalRequestProcessor(this);
    RequestProcessor syncProcessor = new SyncRequestProcessor(this,
            finalProcessor);
    ((SyncRequestProcessor)syncProcessor).start();
    firstProcessor = new PrepRequestProcessor(this, syncProcessor);
    ((PrepRequestProcessor)firstProcessor).start();
}

2.13. 注册JMX服务

ZooKeeper会将服务器运行时的一些信息以JMS的方式暴露给外部,具体实现如下:

protected void registerJMX() {
    // register with JMX
    try {
        jmxServerBean = new ZooKeeperServerBean(this);
        MBeanRegistry.getInstance().register(jmxServerBean, null);

        try {
            jmxDataTreeBean = new DataTreeBean(zkDb.getDataTree());
            MBeanRegistry.getInstance().register(jmxDataTreeBean, jmxServerBean);
        } catch (Exception e) {
            LOG.warn("Failed to register with JMX", e);
            jmxDataTreeBean = null;
        }
    } catch (Exception e) {
        LOG.warn("Failed to register with JMX", e);
        jmxServerBean = null;
    }
}

2.14. 注册ZooKeeper服务器实例

前面的步骤过后,ZooKeeper已经将ServerCnxnFactory主线程启动,但是同时ZooKeeper依旧无法处理客户端请求,原因是网络层不能够访问ZooKeeper实例,经过后续步骤的初始化,ZooKeeper服务器实例已经初始化完毕,只需要注册给ServerCnxnFactory即可,之后ZooKeeper就可以对外提供正常的服务了。

final public void setZooKeeperServer(ZooKeeperServer zks) {
    this.zkServer = zks;
    if (zks != null) {
        if (secure) {
            zks.setSecureServerCnxnFactory(this);
        } else {
            zks.setServerCnxnFactory(this);
        }
    }
}

至此,单机版的ZooKeeper服务器启动完毕。

3. 流程总结

1


标题:ZooKeeper源码分析(三)—服务端启动之单机模式
作者:guobing
地址:http://www.guobingwei.tech/articles/2019/03/23/1553295398628.html