PG 读取大量数据因临时空间不足导致任务失败

记录 flinkx 1.11 同步 PG 数据量过亿时,由于生成的同步 SQL 中包含的排序子句,临时空间不够导致任务失败。

背景

业务方有一个同步 PG 的作业,数据量级大致是 1.5+ 亿条。任务执行失败,报错

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
2022-10-28 14:31:37.178 [flink-akka.actor.default-dispatcher-21] INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source: postgresqlreader (1/1) (77bd8cb9299189456dceee9609b5d03a) switched from RUNNING to FAILED on org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@55bedd88.
java.lang.IllegalArgumentException: open() failed.ERROR: temporary file size exceeds temp_file_limit (15728640kB)
at com.dtstack.flinkx.rdb.inputformat.JdbcInputFormat.openInternal(JdbcInputFormat.java:154)
at com.dtstack.flinkx.inputformat.BaseRichInputFormat.open(BaseRichInputFormat.java:183)
at com.dtstack.flinkx.streaming.api.functions.source.DtInputFormatSourceFunction.run(DtInputFormatSourceFunction.java:124)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
Caused by: org.postgresql.util.PSQLException: ERROR: temporary file size exceeds temp_file_limit (15728640kB)
at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2433)
at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2178)
at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:306)
at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:441)
at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:365)
at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:307)
at org.postgresql.jdbc.PgStatement.executeCachedSql(PgStatement.java:293)
at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:270)
at org.postgresql.jdbc.PgStatement.executeQuery(PgStatement.java:224)
at com.dtstack.flinkx.rdb.inputformat.JdbcInputFormat.executeQuery(JdbcInputFormat.java:868)
at com.dtstack.flinkx.rdb.inputformat.JdbcInputFormat.openInternal(JdbcInputFormat.java:149)
... 5 common frames omitted

分析

这看起来就是 PG 数据库的问题,需要增加配置temp_file_limit。不过业务说 1.0 上可以执行,不放心用 2.0 的配置在 1.0 上手动提交了一个作业,居然可以。
从 2.0 的执行日志中获取了执行的 SQL

1
SELECT "ts","asm_name","sequence","process_name","thread_name","pid","tid","energy_raw","uniform_time_ms","curr_time" FROM "ctp_metrics27" WHERE 1=1   and "curr_time" >= '2022-10-27 00:00:00.000000' and "curr_time" < '2022-10-28 00:00:00.000000' order by curr_time

注意到,这个 sql 的最后多了 order by,这是个多余的操作,而且为了做排序,必定会使用到临时空间,考虑到数据量级,空间不足就是理应会发生的了。
看看这个 order by 是在哪里添加的——
flinkx 中同步关系型数据库的逻辑,简单的概括就是InputFormatopen方法中创建一个数据库连接,执行 SQL,得到结果集,其中执行的 SQL 的生成分成两步

  1. 在调用方法com.dtstack.flinkx.rdb.datareader.JdbcDataReader#readData处理配置时,调用com.dtstack.flinkx.rdb.datareader.QuerySqlBuilder#buildSql生成一个 SQL 模板,其中包含了增量/恢复、切分条件的占位符
  2. 在调用方法com.dtstack.flinkx.rdb.inputformat.JdbcInputFormat#openInternal打开数据库时,调用com.dtstack.flinkx.rdb.inputformat.JdbcInputFormat#buildQuerySql将模板中的占位符替换掉,得到最终的 SQL

PG 特殊的地方在于它将父类的方法com.dtstack.flinkx.postgresql.reader.PostgresqlQuerySqlBuilder#buildQuerySql给覆盖掉了,覆盖后的方法与父类的差异多了一行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
protected String buildQuerySql(){
//...
sb.append(buildOrderSql()); // 多了这一行
//...
}

//com.dtstack.flinkx.rdb.datareader.QuerySqlBuilder#buildOrderSql
protected String buildOrderSql(){
String column;
if(isIncrement){
column = incrementColumn;
} else if(isRestore){
column = restoreColumn;
} else {
column = orderByColumn;
}

return StringUtils.isEmpty(column) ? "" : String.format(" order by %s", column);
}

可以看到,当任务是增量任务时,会添加order by 增量字段

处理

这段逻辑是没有必要的,去掉即可。