连不上 HS2

介绍 JDBC 连接 Hive Server 过程中,通过 ZK 实现高可用的逻辑。

作业执行失败报错如下

1
Tried all existing HiveServer2 uris from ZooKeeper.

目前 HS2 通过 ZK 做了 HA,从报错日志看,将 ZK 里面配置的 HS2 依次连接了一遍,都失败了。过往的经验是有两个可能的问题

jdbc 连接串( jdbc:hive2://10.1.183.246,10.1.178.113,10.1.166.18:2181/usercenter;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=sgp -com-hive;hive.server2.proxy.user=sg_ib_dw?mapred.job.queue.name=root.ib.sgoffline )有问题
HS2 地址中使用的是主机名,从主机名解析获取 IP 地址失败

第一个问题通过 beeline 连接给定的 jdbc 连接串验证,可以连上,说明不是这个问题;

第二个问题可以从 ZK 中获取 HS2 地址,通过 ping 或者 telnet 命令验证,验证通过,说明也不是这个问题

1
2
3
4
5
6
7
8
9
10
11
12
# 获取 HS2 地址
[zk: 10.1.166.18:2181(CONNECTED) 0] ls /sgp-com-hive
[serverUri=10-94-12-217.sgp-com-hiveserver2.sgp:10000;version=2.3.7-amzn-1;sequence=0000001812, serverUri=10-94-104-44.sgp-com-hiveserver2.sgp:10000;version=2.3.7-amzn-1;sequence=0000001795, serverUri=10-94-106-212.sgp-com-hiveserver2.sgp:10000;version=2.3.7-amzn-1;sequence=0000001771]

# telnet 验证
[service@10-94-48-163.datahub-api.sgp lib]$ telnet 10-94-12-217.sgp-com-hiveserver2.sgp 10000
Trying 10.94.12.217...
Connected to 10-94-12-217.sgp-com-hiveserver2.sgp.
Escape character is '^]'.
^]
telnet> quit
Connection closed.

接下来,看下 hive jdbc 是如何创建连接的,从异常栈中是看到相关的类是 org.apache.hive.jdbc.HiveConnection#HiveConnection ,之前在hive-读取超时中有描述 jdbc 打开 hive 连接的大致流程,接下来看下其中遍历 ZK 中配置的 HS2,依次尝试连接,直到成功或者失败的过程。

从方法 org.apache.hive.jdbc.Utils#parseURL 开始解析 jdbc url 连接, jdbc:hive2://<host1>:<port1>,<host2>:<port2>/dbName;sess_var_list?hive_conf_list#hive_var_list ,其中几个 list 是以 ; 分隔的键值对,分别对应 URI 的 path、query和fragment,内部也是用 URI 类来提取的各个部分的。最后会调用 org.apache.hive.jdbc.Utils#configureConnParams 处理 connParams ,从 ZK 中获取真实的主机和端口,并替换 dummyAuthorityString

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
static JdbcConnectionParams parseURL(String uri) throws JdbcUriParseException,
SQLException, ZooKeeperHiveClientException {
JdbcConnectionParams connParams = new JdbcConnectionParams();
// ...
// Extract host, port
if (connParams.isEmbeddedMode()) {
// In case of embedded mode we were supplied with an empty authority.
// So we never substituted the authority with a dummy one.
connParams.setHost(jdbcURI.getHost());
connParams.setPort(jdbcURI.getPort());
} else {
// Configure host, port and params from ZooKeeper if used,
// and substitute the dummy authority with a resolved one
configureConnParams(connParams); //就是这里
// We check for invalid host, port while configuring connParams with configureConnParams()
String authorityStr = connParams.getHost() + ":" + connParams.getPort();
LOG.info("Resolved authority: " + authorityStr);
uri = uri.replace(dummyAuthorityString, authorityStr);
connParams.setJdbcUriString(uri);
}
return connParams;
}

方法 org.apache.hive.jdbc.Utils#configureConnParams 内部会判断服务发现模式是不是 zooKeeper (在 jdbc url 的 session var 中有给出,即: serviceDiscoveryMode=zooKeeper; ),执行 org.apache.hive.jdbc.ZooKeeperHiveClientHelper#configureConnParams 从 ZK 获取配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private static void configureConnParams(JdbcConnectionParams connParams)
throws JdbcUriParseException, ZooKeeperHiveClientException {
String serviceDiscoveryMode =
connParams.getSessionVars().get(JdbcConnectionParams.SERVICE_DISCOVERY_MODE);
if ((serviceDiscoveryMode != null)
&& (JdbcConnectionParams.SERVICE_DISCOVERY_MODE_ZOOKEEPER
.equalsIgnoreCase(serviceDiscoveryMode))) {
// Set ZooKeeper ensemble in connParams for later use
connParams.setZooKeeperEnsemble(joinStringArray(connParams.getAuthorityList(), ","));
// Configure using ZooKeeper
ZooKeeperHiveClientHelper.configureConnParams(connParams);
} else {
// ...
}
}
}

看方法 org.apache.hive.jdbc.ZooKeeperHiveClientHelper#configureConnParams 内部实现
从指定空间(在 jdbc url 的 session var 中指定:即 zooKeeperNamespace=sgp-com-hive )中获取 HS2 列表 serverHosts
serverHosts 中已经尝试连接过的 HS2 剔除掉(连接失败的 HS2 会记录到^^拒绝连接列表^^中,下文中有描述)
如果没有可用的 HS2,就抛出前面任务失败的异常 Tried all existing HiveServer2 uris from ZooKeeper.
从可用的 HS2 随机选择一个,获取对应 znode 内容 serverConfStr ,即对应 HS2 的配置:

1
2
3
4
[zk: 10.1.166.18:2181(CONNECTED) 0] ls /sgp-com-hive
[serverUri=10-94-12-217.sgp-com-hiveserver2.sgp:10000;version=2.3.7-amzn-1;sequence=0000001812, serverUri=10-94-104-44.sgp-com-hiveserver2.sgp:10000;version=2.3.7-amzn-1;sequence=0000001795, serverUri=10-94-106-212.sgp-com-hiveserver2.sgp:10000;version=2.3.7-amzn-1;sequence=0000001771]
[zk: 10.1.166.18:2181(CONNECTED) 1] get /sgp-com-hive/serverUri=10-94-12-217.sgp-com-hiveserver2.sgp:10000;version=2.3.7-amzn-1;sequence=0000001812
hive.server2.authentication=CUSTOM;hive.server2.transport.mode=binary;hive.server2.thrift.sasl.qop=auth;hive.server2.thrift.bind.host=10-94-12-217.sgp-com-hiveserver2.sgp;hive.server2.thrift.port=10000;hive.server2.use.SSL=false

重新整理后如下

1
2
3
4
5
6
hive.server2.authentication=CUSTOM;
hive.server2.transport.mode=binary;
hive.server2.thrift.sasl.qop=auth;
hive.server2.thrift.bind.host=10-94-12-217.sgp-com-hiveserver2.sgp;
hive.server2.thrift.port=10000;
hive.server2.use.SSL=false

调用方法 applyConfs 应用配置 serverConfStr ,即将上面的配置依次设置到 connParams

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
static void configureConnParams(JdbcConnectionParams connParams)
throws ZooKeeperHiveClientException {
//省略...
serverHosts = zooKeeperClient.getChildren().forPath("/" + zooKeeperNamespace);
// Remove the znodes we've already tried from this list
serverHosts.removeAll(connParams.getRejectedHostZnodePaths());
if (serverHosts.isEmpty()) {
throw new ZooKeeperHiveClientException(
"Tried all existing HiveServer2 uris from ZooKeeper.");
}
// Now pick a server node randomly
serverNode = serverHosts.get(randomizer.nextInt(serverHosts.size()));
connParams.setCurrentHostZnodePath(serverNode);
// Read config string from the znode for this server node
String serverConfStr =
new String(
zooKeeperClient.getData().forPath("/" + zooKeeperNamespace + "/" + serverNode),
Charset.forName("UTF-8"));
applyConfs(serverConfStr, connParams);
//省略 ...
}

支持连接参数 connParams 初始化完成,接下来调用方法 org.apache.hive.jdbc.HiveConnection#openTransport 打开与 ThriftServer 的连接,在一个 while true 的循环中尝试连接,处理异常。看下其中处理打开异常的部分:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private void openTransport() throws SQLException {
while (true) {
try {
transport = isHttpTransportMode() ? createHttpTransport() : createBinaryTransport();
if (!transport.isOpen()) {
transport.open();
}
logZkDiscoveryMessage("Connected to " + connParams.getHost() + ":" + connParams.getPort());
break;
} catch (TTransportException e) {
// We'll retry till we exhaust all HiveServer2 nodes from ZooKeeper
if (isZkDynamicDiscoveryMode()) {
try {
Utils.updateConnParamsFromZooKeeper(connParams); //这里
//...
} else {
}
}
}
}

在通过 ZK 做动态发现模式下(方法 isZkDynamicDiscoveryMode() ),调用方法 org.apache.hive.jdbc.Utils#updateConnParamsFromZooKeeper 重新获取一个 HS2
首先,将当前尝试连接的节点节点添加到^^拒绝节点列表^^中
暂存当前使用 HS2 的 host 和 port
调用前面介绍的方法 org.apache.hive.jdbc.ZooKeeperHiveClientHelper#configureConnParams 重新获取一个 HS2
使用新获取的 HS2 的 host 和 port 替换旧的

1
2
3
4
5
6
7
8
9
10
11
12
static void updateConnParamsFromZooKeeper(JdbcConnectionParams connParams)
throws ZooKeeperHiveClientException {
// Add current host to the rejected list
connParams.getRejectedHostZnodePaths().add(connParams.getCurrentHostZnodePath());
String oldServerHost = connParams.getHost();
int oldServerPort = connParams.getPort();
// Update connection params (including host, port) from ZooKeeper
ZooKeeperHiveClientHelper.configureConnParams(connParams);
connParams.setJdbcUriString(connParams.getJdbcUriString().replace(
oldServerHost + ":" + oldServerPort, connParams.getHost() + ":" + connParams.getPort()));
LOG.info("Selected HiveServer2 instance with uri: " + connParams.getJdbcUriString());
}

至此,通过 ZK 来实现 HS2 高可用的流程就结束了。

看来问题还是在连接 HS2 出现了异常,即 transport.open(); 这里,内部的代码没有细看,大致用了 SASL 做认证,其中会从 sessConfMap 中获取用户和密码,会不会是认证出现了问题了?

用户名和密码是在创建对象 HiveConnection 是,从传入变量 info 中拿到后设置到 sessConfMap 中的

1
2
3
4
5
6
if (info.containsKey(JdbcConnectionParams.AUTH_USER)) {
sessConfMap.put(JdbcConnectionParams.AUTH_USER, info.getProperty(JdbcConnectionParams.AUTH_USER));
if (info.containsKey(JdbcConnectionParams.AUTH_PASSWD)) {
sessConfMap.put(JdbcConnectionParams.AUTH_PASSWD, info.getProperty(JdbcConnectionParams.AUTH_PASSWD));
}
}

结合前面的异常栈,这个 info 就是 com.dtstack.flinkx.hive.format.HiveInputFormat#executeQuery 这里的 properties ,这个 properties 是从配置文件中获取的,检查后,发现配置是 null

1
2
3
4
"increColumn": "",
"properties": null,
"splitPk": "",
"startLocation": "",

这些问题大致明白了——没有获取到用户名和密码导致 HS2 连接失败。