mongo 任务执行成功但数据量不对
记录 flinkx 中 mongo 同步任务由于错误设置问题导致的任务应当失败而未失败的问题。
背景
业务反馈了一个同步 hive 数据到 mongo 的作业,同步结束后,mongo 里面的数据量与 hive 的对不上,任务运行日志中有 118 条记录写失败了。
分析
初看这个任务有一点奇怪:flinkx 中有一个数据量的检测机制,如果写失败了,错误记录数或者错误记录比例超过一定阈值,作业就失败了。这个作业有记录写失败了,为什么还是成功的呢?带着这个疑惑,重新看了下错误检测机制,分为两个部分:初始化和校验
初始化
在com.dtstack.flinkx.outputformat.BaseRichOutputFormat#open
内部会调用方法com.dtstack.flinkx.outputformat.BaseRichOutputFormat#initStatisticsAccumulator
初始化累加器(比如错误记录数 errCounter
),具体是通过 flink runtime 创建计数器,并将这些计数器添加到指标组中。
1 | protected void initStatisticsAccumulator(){ |
然后调用方法com.dtstack.flinkx.outputformat.BaseRichOutputFormat#initAccumulatorCollector
初始话计数器收集器accumulatorCollector
,内部会启动一个定时任务,执行方法com.dtstack.flinkx.metrics.AccumulatorCollector#collectAccumulatorWithApi
从 JM 获取计数器的值。
校验
校验过程有两处:每一行数据写之前;以及所有数据写完。
写之前是在方法com.dtstack.flinkx.outputformat.BaseRichOutputFormat#writeSingleRecord
中
1 | protected void writeSingleRecord(Row row) { |
这里先调用方法com.dtstack.flinkx.writer.ErrorLimiter#acquire
获取错误的情况,
1 | public void acquire() { |
从计数器收集器accumulatorCollector
中获取错误数errors
和错误比例(错误数/读取数)errorRatio = (**double**) errors / numRead
。如果超过最大错误数maxErrors
或者最大错误比例maxErrorRatio
,抛出异常。
由于上面的检测是在写之前,对于最后一次写,还是有可能失败的,所以写完之后,又有一次检测,也是调用方法com.dtstack.flinkx.writer.ErrorLimiter#acquire
。具体是在方法com.dtstack.flinkx.outputformat.BaseRichOutputFormat#checkErrorLimit
中,由com.dtstack.flinkx.outputformat.BaseRichOutputFormat#close
方法调用。
至此,错误检测机制说明白了。那为什么没有生效呢?可能的原因就是检测条件不成立,即maxErrors
为 null 或者 0;maxErrorRatio
为 null。顺着这个思路找到这两个值初始化的源头,com.dtstack.flinkx.mongodb.writer.MongodbWriter#writeData
1 | public DataStreamSink<?> writeData(List<DataStream<Row>> dataSets) { |
看到这里处理了 errors
,没有处理errorRatio
。而errors
是从配置中获取的,取值为 0。如此两个检测条件都不成立,所以任务没有失败。
1 | "errorLimit": { |
处理
在方法com.dtstack.flinkx.mongodb.writer.MongodbWriter#writeData
中设置errorRatio
或者在创建配置时将errorLimit.record
设置为 1。