Caused by: java.net.SocketTimeoutException: Read timed out
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
at java.net.SocketInputStream.read(SocketInputStream.java:171)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
at java.io.BufferedInputStream.read1(BufferedInputStream.java:286)
at java.io.BufferedInputStream.read(BufferedInputStream.java:345)
at org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:127)
... 30 more
方法java.net.SocketInputStream#socketRead0的定义是
1 2 3 4
privatenativeintsocketRead0(FileDescriptor fd, byte b[], int off, int len, int timeout) throws IOException;
其中,参数timeout的定义是the read timeout in ms。timeout在 141 行调用,由下面的方法传入,即impl.getTimeout()
1 2 3
publicintread(byte b[], int off, int length)throws IOException { return read(b, off, length, impl.getTimeout()); }
if (isEmbeddedMode) { EmbeddedThriftBinaryCLIServiceembeddedClient=newEmbeddedThriftBinaryCLIService(); embeddedClient.init(newHiveConf()); client = embeddedClient; } else { // extract user/password from JDBC connection properties if its not supplied in the // connection URL if (info.containsKey(JdbcConnectionParams.AUTH_USER)) { sessConfMap.put(JdbcConnectionParams.AUTH_USER, info.getProperty(JdbcConnectionParams.AUTH_USER)); if (info.containsKey(JdbcConnectionParams.AUTH_PASSWD)) { sessConfMap.put(JdbcConnectionParams.AUTH_PASSWD, info.getProperty(JdbcConnectionParams.AUTH_PASSWD)); } } if (info.containsKey(JdbcConnectionParams.AUTH_TYPE)) { sessConfMap.put(JdbcConnectionParams.AUTH_TYPE, info.getProperty(JdbcConnectionParams.AUTH_TYPE)); } // open the client transport openTransport(); // 从这里开始 // set up the client client = newTCLIService.Client(newTBinaryProtocol(transport)); } // add supported protocols supportedProtocols.add(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V1); supportedProtocols.add(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V2); supportedProtocols.add(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V3); supportedProtocols.add(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V4); supportedProtocols.add(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V5); supportedProtocols.add(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V6); supportedProtocols.add(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V7);
// open client session openSession();
// Wrap the client with a thread-safe proxy to serialize the RPC calls client = newSynchronizedClient(client); }
调用方法setupLoginTimeout()设置loginTimeout,内部获取的是java.sql.DriverManager#loginTimeout,这个变量是类的 static 成员,就是参考资料中提到的多个 JDBC driver 驱动存在时,会被覆盖导致问题 调用org.apache.hive.jdbc.Utils#parseURL解析 jdbc url 连接, jdbc:hive2://<host1>:<port1>,<host2>:<port2>/dbName;sess_var_list?hive_conf_list#hive_var_list,其中几个 list 是以;分隔的键值对,分别对应 URI 的 path、query和fragment,内部也是用 URI 类来提取的各个部分的 调用方法org.apache.hive.jdbc.HiveConnection#openTransport打开客户端连接,判断是个二进制协议还是 http 协议,
while (true) { try { assumeSubject = JdbcConnectionParams.AUTH_KERBEROS_AUTH_TYPE_FROM_SUBJECT.equals(sessConfMap .get(JdbcConnectionParams.AUTH_KERBEROS_AUTH_TYPE)); transport = isHttpTransportMode() ? createHttpTransport() : createBinaryTransport(); //判断是个二进制协议还是 http 协议, if (!transport.isOpen()) { transport.open(); } logZkDiscoveryMessage("Connected to " + connParams.getHost() + ":" + connParams.getPort()); break; } catch (TTransportException e) { // We'll retry till we exhaust all HiveServer2 nodes from ZooKeeper } } }
这里是二进制,调用方法org.apache.hive.jdbc.HiveConnection#createBinaryTransport,内部调用org.apache.hive.jdbc.HiveConnection#createUnderlyingTransport创建底层的 transport
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
private TTransport createUnderlyingTransport()throws TTransportException { TTransporttransport=null; // Note: Thrift returns an SSL socket that is already bound to the specified host:port // Therefore an open called on this would be a no-op later // Hence, any TTransportException related to connecting with the peer are thrown here. // Bubbling them up the call hierarchy so that a retry can happen in openTransport, // if dynamic service discovery is configured. if (isSslConnection()) { // get SSL socket } else { // get non-SSL socket transport transport = HiveAuthUtils.getSocketTransport(host, port, loginTimeout); } return transport; }
这里获取 transport 的时候,传了loginTimeout 方法org.apache.hadoop.hive.common.auth.HiveAuthUtils#getSocketTransport
1 2 3
publicstatic TTransport getSocketTransport(String host, int port, int loginTimeout) { returnnewTSocket(host, port, loginTimeout); }
publicsynchronizedvoidsetSoTimeout(int timeout)throws SocketException { if (isClosed()) thrownewSocketException("Socket is closed"); if (timeout < 0) thrownewIllegalArgumentException("timeout can't be negative");