ZK的Session源码分析


ZK的Session源码分析

本篇文章主要讲解服务端 Session 会话处理流程。

服务端Session属性分析

Zookeeper服务端会话操作如下图:
在这里插入图片描述
在服务端通过 SessionTrackerImplExpiryQueue来保存Session会话信息。

SessionTrackerImpl 有以下属性:

  1. sessionsById: 用来存储ConcurrentHashMap<Long, SessionImpl>
    {sessionId:SessionImpl}
  2. sessionExpiryQueue :ExpiryQueue失效队列
  3. sessionsWithTimeout :ConcurrentMap<Long, Integer>存储的是{sessionId:sessionTimeout}
  4. nextSessionId :下一个sessionId

ExpiryQueue 失效队列有以下属性:

  1. elemMap: ConcurrentHashMap<E, Long> 存储的是{SessionImpl:newExpiryTime} Session实例对象,失效时间。

  2. expiryMap :ConcurrentHashMap<Long, Set>存储的是{time: set} 失效时间,当前失效时间的Session对象集合。

  3. nextExpirationTime: 下一次失效时间 {(System.nanoTime() / 1000000)/expirationInterval+1}*expirationInterval当前系统时间毫秒值ms=System.nanoTime() / 1000000。 nextExpirationTime=当前系统时间毫秒值+expirationInterval(失效间隔)。

  4. expirationInterval :失效间隔,默认是10s,可以通过sessionlessCnxnTimeout修改。即是通过配置文件的tickTime修改。

Session创建

概览图:
在这里插入图片描述

假如客户端发起请求后,后端如何识别是第一次创建请求?在之前的
案例源码 NIOServerCnxn.readPayload()中有所体现,NIOServerCnxn.readPayload() 部分关键源码如下:
在这里插入图片描述
此时如果 initialized=false ,表示第一次连接 需要创建Session,此处调用readConnectRequest()后,在 readConnectRequest() 方法中会将 initialized 设置为 true ,只有在处理完连接请求之后才会把 initialized 设置为 true ,才可以处理客户端其他命令。
在这里插入图片描述
上面方法还调用了processConnectRequest ()处理连接请求, processConnectRequest 第一次从请求中获取到的 sessionId=0 ,此时会把创建 Session 作为一个业务,会调用createSession()方法,
processConnectRequest 方法部分关键代码如下:
在这里插入图片描述
创建会话调用 createSession(),该方法会首先创建一个sessionId,并把该sessionId 作为会话ID创建一个创建session会话的请求,并将该请求交给业务链作为一个业务处理, createSession() 源码如下:
在这里插入图片描述
上面方法用到的sessionTracker.createSession(timeout)做了2个操作分别是创建sessionId和配置sessionId的跟踪信息,方法源码如下:
在这里插入图片描述
会话信息的跟踪其实就是将会话信息添加到队列中,任何地方可以根据会话ID找到会话信息,trackSession 方法实现了Session创建、Session队列存储、 Session 过期队列存储, trackSession方法源码如下:
在这里插入图片描述
PrepRequestProcessor 的 run 方法中调用pRequest()->pRequestHelper() 最终调用pRequest2Txn,关键代码如下:
在这里插入图片描述
SyncRequestProcessortxn(创建session的操作)进行持久化,在 FinalRequestProcessor 会对Session进行提交,其实就是把 Session 的ID和 Timeout 存到 sessionsWithTimeout 中去。

由于FinalRequestProcessor 中调用链路太复杂,我们把调用链路写出来,可以按照这个顺序跟踪:
在这里插入图片描述
上面调用链路中 processTxnForSessionEvents(request, hdr, request.getTxn()); 方法代码如下:
在这里插入图片描述
其中的SessionTrackerImpl.commitSession()代码如下:
在这里插入图片描述

Session刷新

概览图:
在这里插入图片描述

服务端无论接受什么请求命令都会更新Session的过期时间 。我们做增删或者ping命令的时候,都会经过 RequestThrottler , RequestThrottler 的run方法中调用zks.submitRequestNow(),而 zks.submitRequestNow(request) 中调用了touch(si.cnxn);,该方法源码如下:

在这里插入图片描述
touchSession() 方法更新sessionExpiryQueue失效队列中的失效时间,源码如下:

在这里插入图片描述
其实内部是调用了sessionExpiryQueue.update(s, timeout);源码如下:
在这里插入图片描述
update() 方法会在当前时间的基础上增加timeout,并更新失效时间为newExpiryTime,**关键源码(并非全部源码)**如下:
在这里插入图片描述

Session失效

概览图:
在这里插入图片描述
SessionTrackerImpl 是一个线程类,继承了 ·ZooKeeperCriticalThread· ,我们可以看它的run方法,它首先获取了下一个会话过期时间,并休眠等待会话过期时间到期,然后获取过期的客户端会话集合并循环关闭,源码如下:
在这里插入图片描述
上面方法中调用了sessionExpiryQueue.poll(),该方法代码主要是获取过期时间对应的客户端会话集合,也就是key为过期时间,value为会话集合,源码如下:
在这里插入图片描述
最上面的 setSessionClosing()方法其实是把Session会话的 isClosing 状态设置为了true,方法源码如下:
在这里插入图片描述
而让客户端失效的方法expirer.expire(s); 其实也是一个业务操作,主要调用了ZooKeeperServer.expire() 方法,而该方法获取SessionId后,又创建了一个
OpCode.closeSession的请求,并交给业务链处理,我们查看ZooKeeperServer.expire() 方法源码如下:
在这里插入图片描述

在 业务处理链的PrepRequestProcessor.pRequest2Txn()方法中 OpCode.closeSession操作里最后部分代理明确将会话Session的isClosing设置为了true,源码如下:
在这里插入图片描述
业务链处理对象 FinalRequestProcessor.processRequest()方法调用了
ZooKeeperServer.processTxn(),并且在 processTxn() 方法中执行了
processTxnForSessionEvents() ,而 processTxnForSessionEvents() 方法正好移除了会话信息,方法源码如下:
在这里插入图片描述
移除会话的方法 SessionTrackerImpl.removeSession()会移除会话ID以及过期会话对象,源码如下:
在这里插入图片描述


文章作者: fFee-ops
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 fFee-ops !
评论
  目录