2022-10-2814:31:37.178[flink-akka.actor.default-dispatcher-21] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: postgresqlreader (1/1) (77bd8cb9299189456dceee9609b5d03a) switched from RUNNING to FAILED on org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@55bedd88. java.lang.IllegalArgumentException: open() failed.ERROR: temporary file size exceeds temp_file_limit (15728640kB) at com.dtstack.flinkx.rdb.inputformat.JdbcInputFormat.openInternal(JdbcInputFormat.java:154) at com.dtstack.flinkx.inputformat.BaseRichInputFormat.open(BaseRichInputFormat.java:183) at com.dtstack.flinkx.streaming.api.functions.source.DtInputFormatSourceFunction.run(DtInputFormatSourceFunction.java:124) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) Caused by: org.postgresql.util.PSQLException: ERROR: temporary file size exceeds temp_file_limit (15728640kB) at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2433) at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2178) at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:306) at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:441) at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:365) at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:307) at org.postgresql.jdbc.PgStatement.executeCachedSql(PgStatement.java:293) at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:270) at org.postgresql.jdbc.PgStatement.executeQuery(PgStatement.java:224) at com.dtstack.flinkx.rdb.inputformat.JdbcInputFormat.executeQuery(JdbcInputFormat.java:868) at com.dtstack.flinkx.rdb.inputformat.JdbcInputFormat.openInternal(JdbcInputFormat.java:149) ... 5 common frames omitted
注意到,这个 sql 的最后多了 order by,这是个多余的操作,而且为了做排序,必定会使用到临时空间,考虑到数据量级,空间不足就是理应会发生的了。 看看这个 order by 是在哪里添加的—— flinkx 中同步关系型数据库的逻辑,简单的概括就是InputFormat的open方法中创建一个数据库连接,执行 SQL,得到结果集,其中执行的 SQL 的生成分成两步