现在网上很难找到一篇有关HiveServer2 HA源码分析的文章,晚上有时间又耐不住从源码层面分析了下HiveServer2 HA负载均衡实现的原理。
HA有几种模式,一种是NameNode/ResourceManager/Flink等的FailOver Active/Standy切换的HA模型;一种则是HiveServer2 HA这种负载均衡的模型(应该还有其他吧?)。
HiveServer2 HA模型原理想起来应该很简单:
- Server端:在不同的机器上启动多个HiveServer2进程,每个进程都在Zookeeper指定namespace中加载该节点的ThriftServer的host/post等信息;
- Client端:从Zookeeper指定的namespace空间中随机选择出一个ThriftServer的host/port,连接该进程,就可以达到均衡的效果;
- 对于HA,则由Zookeeper的Watch机制完成,当发现某一台机器的HiveServer2进程挂掉的时候,会对Zookeeper相应的Znode做标记(删除?),这样下次Client端连接的时候就会把该节点自动过滤掉;
HiveServer2 Server端Zookeeper原理
HiveServer2的start()方法,在启动完毕HiveServer2各种Client之后,如果再配置中发现HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY
dynamic service discovery模式,则会调用addServerInstanceToZooKeeper()
将该ThriftServer的host/port信息加载至zookeeper中;
@Override
public synchronized void start() {
super.start();
// If we're supporting dynamic service discovery, we'll add the service uri for this
// HiveServer2 instance to Zookeeper as a znode.
HiveConf hiveConf = this.getHiveConf();
if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY)) {
try {
addServerInstanceToZooKeeper(hiveConf);
} catch (Exception e) {
LOG.error("Error adding this HiveServer2 instance to ZooKeeper: ", e);
throw new ServiceException(e);
}
}
if (webServer != null) {
try {
webServer.start();
LOG.info("Web UI has started on port " + webServer.getPort());
} catch (Exception e) {
LOG.error("Error starting Web UI: ", e);
throw new ServiceException(e);
}
}
}
addServerInstanceToZooKeeper()
方法根据传入的HiveConf信息,将配置中的zookeeper HIVE_ZOOKEEPER_QUORUM
加载至指定的HIVE_SERVER2_ZOOKEEPER_NAMESPACE
空间中。
private void addServerInstanceToZooKeeper(HiveConf hiveConf) throws Exception {
String zooKeeperEnsemble = ZooKeeperHiveHelper.getQuorumServers(hiveConf);
String rootNamespace = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_ZOOKEEPER_NAMESPACE);
String instanceURI = getServerInstanceURI();
setUpZooKeeperAuth(hiveConf);
int sessionTimeout =
(int) hiveConf.getTimeVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT,
TimeUnit.MILLISECONDS);
int baseSleepTime =
(int) hiveConf.getTimeVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME,
TimeUnit.MILLISECONDS);
int maxRetries = hiveConf.getIntVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES);
// Create a CuratorFramework instance to be used as the ZooKeeper client
// Use the zooKeeperAclProvider to create appropriate ACLs
zooKeeperClient =
CuratorFrameworkFactory.builder().connectString(zooKeeperEnsemble)
.sessionTimeoutMs(sessionTimeout).aclProvider(zooKeeperAclProvider)
.retryPolicy(new ExponentialBackoffRetry(baseSleepTime, maxRetries)).build();
zooKeeperClient.start();
// Create the parent znodes recursively; ignore if the parent already exists.
try {
zooKeeperClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
.forPath(ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + rootNamespace);
LOG.info("Created the root name space: " + rootNamespace + " on ZooKeeper for HiveServer2");
} catch (KeeperException e) {
if (e.code() != KeeperException.Code.NODEEXISTS) {
LOG.error("Unable to create HiveServer2 namespace: " + rootNamespace + " on ZooKeeper", e);
throw e;
}
}
// Create a znode under the rootNamespace parent for this instance of the server
// Znode name: serverUri=host:port;version=versionInfo;sequence=sequenceNumber
try {
String pathPrefix =
ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + rootNamespace
+ ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + "serverUri=" + instanceURI + ";"
+ "version=" + HiveVersionInfo.getVersion() + ";" + "sequence=";
String znodeData = "";
if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ZOOKEEPER_PUBLISH_CONFIGS)) {
// HiveServer2 configs that this instance will publish to ZooKeeper,
// so that the clients can read these and configure themselves properly.
Map<String, String> confsToPublish = new HashMap<String, String>();
addConfsToPublish(hiveConf, confsToPublish);
// Publish configs for this instance as the data on the node
znodeData = Joiner.on(';').withKeyValueSeparator("=").join(confsToPublish);
} else {
znodeData = instanceURI;
}
byte[] znodeDataUTF8 = znodeData.getBytes(Charset.forName("UTF-8"));
znode =
new PersistentEphemeralNode(zooKeeperClient,
PersistentEphemeralNode.Mode.EPHEMERAL_SEQUENTIAL, pathPrefix, znodeDataUTF8);
znode.start();
// We'll wait for 120s for node creation
long znodeCreationTimeout = 120;
if (!znode.waitForInitialCreate(znodeCreationTimeout, TimeUnit.SECONDS)) {
throw new Exception("Max znode creation wait time: " + znodeCreationTimeout + "s exhausted");
}
setDeregisteredWithZooKeeper(false);
znodePath = znode.getActualPath();
// Set a watch on the znode
if (zooKeeperClient.checkExists().usingWatcher(new DeRegisterWatcher()).forPath(znodePath) == null) {
// No node exists, throw exception
throw new Exception("Unable to create znode for this HiveServer2 instance on ZooKeeper.");
}
LOG.info("Created a znode on ZooKeeper for HiveServer2 uri: " + instanceURI);
} catch (Exception e) {
LOG.error("Unable to create a znode for this server instance", e);
if (znode != null) {
znode.close();
}
throw (e);
}
}
Client连接原理
Hive Client端调用beeline连接HiveServer2 HA集群,如下:
beeline> !connect jdbc:hive2://hadoop710.lt.163.org:2181,hadoop711.lt.163.org:2181,hadoop712.lt.163.org:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2-lsm hadoop ""
Beeline调用org.apache.hive.jdbc.parseURL()
解析jdbc连接字符串,判断serviceDiscoveryMode
类型进行解析:
- 如果为zookeeper,则调用
ZooKeeperHiveClientHelper
解析; - 否则解析出jdbc的host/port;
private static void configureConnParams(JdbcConnectionParams connParams)
throws JdbcUriParseException, ZooKeeperHiveClientException {
String serviceDiscoveryMode =
connParams.getSessionVars().get(JdbcConnectionParams.SERVICE_DISCOVERY_MODE);
if ((serviceDiscoveryMode != null)
&& (JdbcConnectionParams.SERVICE_DISCOVERY_MODE_ZOOKEEPER
.equalsIgnoreCase(serviceDiscoveryMode))) {
// Set ZooKeeper ensemble in connParams for later use
connParams.setZooKeeperEnsemble(joinStringArray(connParams.getAuthorityList(), ","));
// Configure using ZooKeeper
ZooKeeperHiveClientHelper.configureConnParams(connParams);
} else {
String authority = connParams.getAuthorityList()[0];
URI jdbcURI = URI.create(URI_HIVE_PREFIX + "//" + authority);
// Check to prevent unintentional use of embedded mode. A missing "/"
// to separate the 'path' portion of URI can result in this.
// The missing "/" common typo while using secure mode, eg of such url -
// jdbc:hive2://localhost:10000;principal=hive/HiveServer2Host@YOUR-REALM.COM
if (jdbcURI.getAuthority() != null) {
String host = jdbcURI.getHost();
int port = jdbcURI.getPort();
if (host == null) {
throw new JdbcUriParseException("Bad URL format. Hostname not found "
+ " in authority part of the url: " + jdbcURI.getAuthority()
+ ". Are you missing a '/' after the hostname ?");
}
// Set the port to default value; we do support jdbc url like:
// jdbc:hive2://localhost/db
if (port <= 0) {
port = Integer.parseInt(Utils.DEFAULT_PORT);
}
connParams.setHost(jdbcURI.getHost());
connParams.setPort(jdbcURI.getPort());
}
}
}
org.apache.hive.jdbc.ZooKeeperHiveClientHelper
中的configureConnParams()
方法做如下事情:
- 从conf中获取所有的znodes节点;
- 去除已经尝试过的Znode节点(getCurrentHostZnodePath);
- 然后在剩下的znode节点中随机的选择一个znode节点;
- 解析该znode节点的value信息,包含了jdbc所需要的host/port;
static void configureConnParams(JdbcConnectionParams connParams)
throws ZooKeeperHiveClientException {
String zooKeeperEnsemble = connParams.getZooKeeperEnsemble();
String zooKeeperNamespace =
connParams.getSessionVars().get(JdbcConnectionParams.ZOOKEEPER_NAMESPACE);
if ((zooKeeperNamespace == null) || (zooKeeperNamespace.isEmpty())) {
zooKeeperNamespace = JdbcConnectionParams.ZOOKEEPER_DEFAULT_NAMESPACE;
}
List<String> serverHosts;
Random randomizer = new Random();
String serverNode;
CuratorFramework zooKeeperClient =
CuratorFrameworkFactory.builder().connectString(zooKeeperEnsemble)
.retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
try {
zooKeeperClient.start();
serverHosts = zooKeeperClient.getChildren().forPath("/" + zooKeeperNamespace);
// Remove the znodes we've already tried from this list
serverHosts.removeAll(connParams.getRejectedHostZnodePaths());
if (serverHosts.isEmpty()) {
throw new ZooKeeperHiveClientException(
"Tried all existing HiveServer2 uris from ZooKeeper.");
}
// Now pick a server node randomly
serverNode = serverHosts.get(randomizer.nextInt(serverHosts.size()));
connParams.setCurrentHostZnodePath(serverNode);
// Read data from the znode for this server node
// This data could be either config string (new releases) or server end
// point (old releases)
String dataStr =
new String(
zooKeeperClient.getData().forPath("/" + zooKeeperNamespace + "/" + serverNode),
Charset.forName("UTF-8"));
Matcher matcher = kvPattern.matcher(dataStr);
// If dataStr is not null and dataStr is not a KV pattern,
// it must be the server uri added by an older version HS2
if ((dataStr != null) && (!matcher.find())) {
String[] split = dataStr.split(":");
if (split.length != 2) {
throw new ZooKeeperHiveClientException("Unable to read HiveServer2 uri from ZooKeeper: "
+ dataStr);
}
connParams.setHost(split[0]);
connParams.setPort(Integer.parseInt(split[1]));
} else {
applyConfs(dataStr, connParams);
}
} catch (Exception e) {
throw new ZooKeeperHiveClientException("Unable to read HiveServer2 configs from ZooKeeper", e);
} finally {
// Close the client connection with ZooKeeper
if (zooKeeperClient != null) {
zooKeeperClient.close();
}
}
}
HA处理
当某一台host的HiveServerr2进程挂掉的时候,会调用HiveServer2.stop()方法,在该方法中会检测是否使用HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY
模式,如果是该模式,则会将zookeeper相应相应实例删除掉;
// Remove this server instance from ZooKeeper if dynamic service discovery is set
if (hiveConf != null && hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY)) {
try {
removeServerInstanceFromZooKeeper();
} catch (Exception e) {
LOG.error("Error removing znode for this HiveServer2 instance from ZooKeeper.", e);
}
}
/**
* The watcher class which sets the de-register flag when the znode corresponding to this server
* instance is deleted. Additionally, it shuts down the server if there are no more active client
* sessions at the time of receiving a 'NodeDeleted' notification from ZooKeeper.
*/
private class DeRegisterWatcher implements Watcher {
@Override
public void process(WatchedEvent event) {
if (event.getType().equals(Watcher.Event.EventType.NodeDeleted)) {
if (znode != null) {
try {
znode.close();
LOG.warn("This HiveServer2 instance is now de-registered from ZooKeeper. "
+ "The server will be shut down after the last client sesssion completes.");
} catch (IOException e) {
LOG.error("Failed to close the persistent ephemeral znode", e);
} finally {
HiveServer2.this.setDeregisteredWithZooKeeper(true);
// If there are no more active client sessions, stop the server
if (cliService.getSessionManager().getOpenSessionCount() == 0) {
LOG.warn("This instance of HiveServer2 has been removed from the list of server "
+ "instances available for dynamic service discovery. "
+ "The last client session has ended - will shutdown now.");
HiveServer2.this.stop();
}
}
}
}
}
}