hive 读取超时

介绍 JDBC 连接 Hive Server 的大致过程,以及如何设置读取超时。

业务反馈 hive 读取作业执行失败
image-20220707143134989.png
查看了这些失败作业,发现有些许作业是读超时了,异常栈如下

Caused by: java.net.SocketTimeoutException: Read timed out
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
at java.net.SocketInputStream.read(SocketInputStream.java:171)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
at java.io.BufferedInputStream.read1(BufferedInputStream.java:286)
at java.io.BufferedInputStream.read(BufferedInputStream.java:345)
at org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:127)
... 30 more

方法java.net.SocketInputStream#socketRead0的定义是

1
2
3
4
private native int socketRead0(FileDescriptor fd,
byte b[], int off, int len,
int timeout)
throws IOException;

其中,参数timeout的定义是the read timeout in mstimeout在 141 行调用,由下面的方法传入,即impl.getTimeout()

1
2
3
public int read(byte b[], int off, int length) throws IOException {
return read(b, off, length, impl.getTimeout());
}

这里impl是类AbstractPlainSocketImpl的一个实例,方法getTimeout()返回成员变量timeout,该成员在方法java.net.AbstractPlainSocketImpl#setOption中设置

1
2
3
4
5
6
7
8
9
10
11
12
public void setOption(int opt, Object val) throws SocketException {
//...
case SO_TIMEOUT:
if (val == null || (!(val instanceof Integer)))
throw new SocketException("Bad parameter for SO_TIMEOUT");
int tmp = ((Integer) val).intValue();
if (tmp < 0)
throw new IllegalArgumentException("timeout < 0");
timeout = tmp;
break;
//...
}

这个方法有 3 处调用
image-20220708105432620.png
如何知道哪个调用是与 hive 有关系呢?不太好找,因为有太多的调用方了。换个思路,从 hive 入手,知道肯定会执行到这里,所以打个断点,当执行到断点时,查看调用栈,就知道执行过程了。
image-20220708144551916.png
这里大致流程是
com.dtstack.flinkx.hive.format.HiveInputFormat#openInternal拿到查询 SQLquerySql并执行查询
查询是通过方法com.dtstack.flinkx.hive.format.HiveInputFormat#executeQuery实现,首先获取数据库连接dbConn
数据库连接通过方法com.dtstack.flinkx.rdb.inputformat.JdbcInputFormat#getConnection 获取,内部有个重试,调用DriverManager.getConnection()
DriverManager内部遍历注册的 drivers ,并连接,见方法java.sql.DriverManager#getConnection(java.lang.String, java.util.Properties, java.lang.Class<?>)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
for(DriverInfo aDriver : registeredDrivers) {
// If the caller does not have permission to load the driver then
// skip it.
if(isDriverAllowed(aDriver.driver, callerCL)) {
try {
println(" trying " + aDriver.driver.getClass().getName());
Connection con = aDriver.driver.connect(url, info); //连接
if (con != null) {
// Success!
println("getConnection returning " + aDriver.driver.getClass().getName());
return (con);
}
} catch (SQLException ex) {
// ...
}

} else {
println(" skipping: " + aDriver.getClass().getName());
}
}

Hive 场景下,注册的 driver 是 HiveDriver,connect 方法中就是创建一个HiveConnection 对象

1
2
3
public Connection connect(String url, Properties info) throws SQLException {
return acceptsURL(url) ? new HiveConnection(url, info) : null;
}

对象HiveConnection的构造方法如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
public HiveConnection(String uri, Properties info) throws SQLException {
setupLoginTimeout();
try {
connParams = Utils.parseURL(uri);
} catch (ZooKeeperHiveClientException e) {
throw new SQLException(e);
}
jdbcUriString = connParams.getJdbcUriString();
// extract parsed connection parameters:
// JDBC URL: jdbc:hive2://<host>:<port>/dbName;sess_var_list?hive_conf_list#hive_var_list
// each list: <key1>=<val1>;<key2>=<val2> and so on
// sess_var_list -> sessConfMap
// hive_conf_list -> hiveConfMap
// hive_var_list -> hiveVarMap
host = connParams.getHost();
port = connParams.getPort();
sessConfMap = connParams.getSessionVars();
hiveConfMap = connParams.getHiveConfs();

hiveVarMap = connParams.getHiveVars();
for (Map.Entry<Object, Object> kv : info.entrySet()) {
if ((kv.getKey() instanceof String)) {
String key = (String) kv.getKey();
if (key.startsWith(HIVE_VAR_PREFIX)) {
hiveVarMap.put(key.substring(HIVE_VAR_PREFIX.length()), info.getProperty(key));
} else if (key.startsWith(HIVE_CONF_PREFIX)) {
hiveConfMap.put(key.substring(HIVE_CONF_PREFIX.length()), info.getProperty(key));
}
}
}

isEmbeddedMode = connParams.isEmbeddedMode();

if (isEmbeddedMode) {
EmbeddedThriftBinaryCLIService embeddedClient = new EmbeddedThriftBinaryCLIService();
embeddedClient.init(new HiveConf());
client = embeddedClient;
} else {
// extract user/password from JDBC connection properties if its not supplied in the
// connection URL
if (info.containsKey(JdbcConnectionParams.AUTH_USER)) {
sessConfMap.put(JdbcConnectionParams.AUTH_USER, info.getProperty(JdbcConnectionParams.AUTH_USER));
if (info.containsKey(JdbcConnectionParams.AUTH_PASSWD)) {
sessConfMap.put(JdbcConnectionParams.AUTH_PASSWD, info.getProperty(JdbcConnectionParams.AUTH_PASSWD));
}
}
if (info.containsKey(JdbcConnectionParams.AUTH_TYPE)) {
sessConfMap.put(JdbcConnectionParams.AUTH_TYPE, info.getProperty(JdbcConnectionParams.AUTH_TYPE));
}
// open the client transport
openTransport(); // 从这里开始
// set up the client
client = new TCLIService.Client(new TBinaryProtocol(transport));
}
// add supported protocols
supportedProtocols.add(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V1);
supportedProtocols.add(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V2);
supportedProtocols.add(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V3);
supportedProtocols.add(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V4);
supportedProtocols.add(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V5);
supportedProtocols.add(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V6);
supportedProtocols.add(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V7);

// open client session
openSession();

// Wrap the client with a thread-safe proxy to serialize the RPC calls
client = newSynchronizedClient(client);
}

调用方法setupLoginTimeout()设置loginTimeout,内部获取的是java.sql.DriverManager#loginTimeout,这个变量是类的 static 成员,就是参考资料中提到的多个 JDBC driver 驱动存在时,会被覆盖导致问题
调用org.apache.hive.jdbc.Utils#parseURL解析 jdbc url 连接, jdbc:hive2://<host1>:<port1>,<host2>:<port2>/dbName;sess_var_list?hive_conf_list#hive_var_list,其中几个 list 是以;分隔的键值对,分别对应 URI 的 path、query和fragment,内部也是用 URI 类来提取的各个部分的
调用方法org.apache.hive.jdbc.HiveConnection#openTransport打开客户端连接,判断是个二进制协议还是 http 协议,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private void openTransport() throws SQLException {
int numRetries = 0;
int maxRetries = 1;
try {
maxRetries = Integer.parseInt(sessConfMap.get(JdbcConnectionParams.RETRIES));
} catch(NumberFormatException e) {
}

while (true) {
try {
assumeSubject =
JdbcConnectionParams.AUTH_KERBEROS_AUTH_TYPE_FROM_SUBJECT.equals(sessConfMap
.get(JdbcConnectionParams.AUTH_KERBEROS_AUTH_TYPE));
transport = isHttpTransportMode() ? createHttpTransport() : createBinaryTransport(); //判断是个二进制协议还是 http 协议,
if (!transport.isOpen()) {
transport.open();
}
logZkDiscoveryMessage("Connected to " + connParams.getHost() + ":" + connParams.getPort());
break;
} catch (TTransportException e) {
// We'll retry till we exhaust all HiveServer2 nodes from ZooKeeper
}
}
}

这里是二进制,调用方法org.apache.hive.jdbc.HiveConnection#createBinaryTransport,内部调用org.apache.hive.jdbc.HiveConnection#createUnderlyingTransport创建底层的 transport

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private TTransport createUnderlyingTransport() throws TTransportException {
TTransport transport = null;
// Note: Thrift returns an SSL socket that is already bound to the specified host:port
// Therefore an open called on this would be a no-op later
// Hence, any TTransportException related to connecting with the peer are thrown here.
// Bubbling them up the call hierarchy so that a retry can happen in openTransport,
// if dynamic service discovery is configured.
if (isSslConnection()) {
// get SSL socket
} else {
// get non-SSL socket transport
transport = HiveAuthUtils.getSocketTransport(host, port, loginTimeout);
}
return transport;
}

这里获取 transport 的时候,传了loginTimeout
方法org.apache.hadoop.hive.common.auth.HiveAuthUtils#getSocketTransport

1
2
3
public static TTransport getSocketTransport(String host, int port, int loginTimeout) {
return new TSocket(host, port, loginTimeout);
}

继续看 TSocket 的初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public TSocket(String host, int port, int timeout) {
this.socket_ = null;
this.host_ = null;
this.port_ = 0;
this.timeout_ = 0;
this.host_ = host;
this.port_ = port;
this.timeout_ = timeout;
this.initSocket(); //
}

private void initSocket() {
this.socket_ = new Socket();

try {
this.socket_.setSoLinger(false, 0);
this.socket_.setTcpNoDelay(true);
this.socket_.setKeepAlive(true);
this.socket_.setSoTimeout(this.timeout_); //
} catch (SocketException var2) {
LOGGER.error("Could not configure socket.", var2);
}

}

java.net.Socket#setSoTimeout

1
2
3
4
5
6
7
8
public synchronized void setSoTimeout(int timeout) throws SocketException {
if (isClosed())
throw new SocketException("Socket is closed");
if (timeout < 0)
throw new IllegalArgumentException("timeout can't be negative");

getImpl().setOption(SocketOptions.SO_TIMEOUT, new Integer(timeout)); //
}

这里设置的是 socket 的读取超时,实际上 timeout_,也用于 socket 连接超时
最后看下,java.sql.DriverManager#loginTimeout是如何设置的,类似的,添加断点,发现是在方法com.dtstack.flinkx.util.ClassUtil#forName(java.lang.String, java.lang.ClassLoader)中设置,

1
2
3
4
5
6
7
8
9
10
public static void forName(String clazz, ClassLoader classLoader)  {
synchronized (LOCK_STR){
try {
Class.forName(clazz, true, classLoader);
DriverManager.setLoginTimeout(10); //10s
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}

image-20220708110105135.png