mongo 任务执行成功但数据量不对
记录 flinkx 中 mongo 同步任务由于错误设置问题导致的任务应当失败而未失败的问题。
背景
业务反馈了一个同步 hive 数据到 mongo 的作业,同步结束后,mongo 里面的数据量与 hive 的对不上,任务运行日志中有 118 条记录写失败了。
分析
初看这个任务有一点奇怪:flinkx 中有一个数据量的检测机制,如果写失败了,错误记录数或者错误记录比例超过一定阈值,作业就失败了。这个作业有记录写失败了,为什么还是成功的呢?带着这个疑惑,重新看了下错误检测机制,分为两个部分:初始化和校验
初始化
在com.dtstack.flinkx.outputformat.BaseRichOutputFormat#open 内部会调用方法com.dtstack.flinkx.outputformat.BaseRichOutputFormat#initStatisticsAccumulator初始化累加器(比如错误记录数 errCounter),具体是通过 flink runtime 创建计数器,并将这些计数器添加到指标组中。
1 | protected void initStatisticsAccumulator(){ |
然后调用方法com.dtstack.flinkx.outputformat.BaseRichOutputFormat#initAccumulatorCollector初始话计数器收集器accumulatorCollector,内部会启动一个定时任务,执行方法com.dtstack.flinkx.metrics.AccumulatorCollector#collectAccumulatorWithApi从 JM 获取计数器的值。
校验
校验过程有两处:每一行数据写之前;以及所有数据写完。
写之前是在方法com.dtstack.flinkx.outputformat.BaseRichOutputFormat#writeSingleRecord中
1 | protected void writeSingleRecord(Row row) { |
这里先调用方法com.dtstack.flinkx.writer.ErrorLimiter#acquire获取错误的情况,
1 | public void acquire() { |
从计数器收集器accumulatorCollector中获取错误数errors和错误比例(错误数/读取数)errorRatio = (**double**) errors / numRead。如果超过最大错误数maxErrors或者最大错误比例maxErrorRatio,抛出异常。
由于上面的检测是在写之前,对于最后一次写,还是有可能失败的,所以写完之后,又有一次检测,也是调用方法com.dtstack.flinkx.writer.ErrorLimiter#acquire。具体是在方法com.dtstack.flinkx.outputformat.BaseRichOutputFormat#checkErrorLimit中,由com.dtstack.flinkx.outputformat.BaseRichOutputFormat#close方法调用。
至此,错误检测机制说明白了。那为什么没有生效呢?可能的原因就是检测条件不成立,即maxErrors为 null 或者 0;maxErrorRatio 为 null。顺着这个思路找到这两个值初始化的源头,com.dtstack.flinkx.mongodb.writer.MongodbWriter#writeData
1 | public DataStreamSink<?> writeData(List<DataStream<Row>> dataSets) { |
看到这里处理了 errors,没有处理errorRatio。而errors是从配置中获取的,取值为 0。如此两个检测条件都不成立,所以任务没有失败。
1 | "errorLimit": { |
处理
在方法com.dtstack.flinkx.mongodb.writer.MongodbWriter#writeData中设置errorRatio 或者在创建配置时将errorLimit.record设置为 1。