利用zk的watcher功能实时监控zk节点的变化,可以利用这个功能做报警、监控,例如监控kafka的broker,otter的node等,如果有节点挂掉,立时通知
class MonitorZk(var zkHost: String, var zkPath: String) extends Watcher {
var zoo: ZooKeeper = new ZooKeeper(zkHost, 10000, this)
var brokerListStr = ""
var brokers: util.List[String] = _
private val logger: Logger = Logger.getLogger(MonitorZk.getClass)
override def process(event: WatchedEvent): Unit = {
if (event == null) return
//取得连接状态
val state = event.getState
//取得事件类型
val eventType = event.getType
//哪一个节点路径发生变更
val nodePath = event.getPath
brokers = zoo.getChildren(zkPath, true)
import scala.collection.JavaConversions._
brokers.foreach(x => {
brokerListStr += s"$x "
})
if (KeeperState.SyncConnected eq state) {
//连接成功
// 没有任何节点,表示创建连接成功(客户端与服务器端创建连接成功后没有任何节点信息)
if (EventType.None eq eventType) {
logger.warn(s"成功链接zookeeper服务器,节点数量:${
brokers.size()
}, 节点列表:$brokerListStr")
} else if (EventType.NodeCreated eq eventType) {
//当服务器端创建节点的时候触发
logger.warn(s"zookeeper服务端创建新的节点, 节点数量:${
brokers.size()
}, 节点列表:$brokerListStr")
} else if (EventType.NodeDataChanged eq eventType) {
//被监控该节点的数据发生变更的时候触发
logger.warn(s"节点的数据更新,节点数量:${
brokers.size()
}, 节点列表:$brokerListStr")
} else if (EventType.NodeChildrenChanged eq eventType) {
// 对应本代码而言只能监控根节点的一级节点变更。如:在根节点直接创建一级节点,
//或者删除一级节点时触发。如修改一级节点的数据,不会触发,创建二级节点时也不会触发。
logger.warn(s"子节点发生变更,节点数量:${
brokers.size()
}, 节点列表:$brokerListStr")
} else if (EventType.NodeDeleted eq eventType) {
logger.warn(s"节点:$nodePath 被删除,节点数量:${
brokers.size()
}, 节点列表:$brokerListStr")
}
} else if (KeeperState.Disconnected eq state) logger.warn("客户端连接zookeeper服务器端失败")
else if (KeeperState.Expired eq state) logger.warn("客户端与zookeeper服务器端会话失败")
else if (KeeperState.AuthFailed eq state) logger.warn("权限认证失败")
if (brokers.length < nodeCount.toInt) {
logger.warn(s"报警 $title, 节点列表:$brokerListStr")
SendMail.sendMail(title, s"节点列表:$brokerListStr", "html")
}
brokerListStr = ""
}
}
object MonitorZk {
def main(args: Array[String]): Unit = {
val parseParam = new ZkParam
val jCommander = new JCommander(parseParam, args: _*)
if (parseParam.help) {
jCommander.usage()
System.exit(0)
}
val zt = new MonitorZk(parseParam.zkHost, parseParam.nodePath)
zt.logger.warn("程序启动....")
//这句必须先执行,才能实现监控
zt.brokers = zt.zoo.getChildren(parseParam.nodePath, true)
scala.sys.addShutdownHook({
zt.logger.warn(s"${parseParam.title} 程序关闭")
})
//让主线程一直在
Thread.sleep(Integer.MAX_VALUE)
}
}