mongo 任务执行成功但数据量不对

记录 flinkx 中 mongo 同步任务由于错误设置问题导致的任务应当失败而未失败的问题。

背景

业务反馈了一个同步 hive 数据到 mongo 的作业,同步结束后,mongo 里面的数据量与 hive 的对不上,任务运行日志中有 118 条记录写失败了。

分析

初看这个任务有一点奇怪:flinkx 中有一个数据量的检测机制,如果写失败了,错误记录数或者错误记录比例超过一定阈值,作业就失败了。这个作业有记录写失败了,为什么还是成功的呢?带着这个疑惑,重新看了下错误检测机制,分为两个部分:初始化和校验

初始化

com.dtstack.flinkx.outputformat.BaseRichOutputFormat#open 内部会调用方法com.dtstack.flinkx.outputformat.BaseRichOutputFormat#initStatisticsAccumulator初始化累加器(比如错误记录数 errCounter),具体是通过 flink runtime 创建计数器,并将这些计数器添加到指标组中。

1
2
3
4
5
6
7
8
9
10
protected void initStatisticsAccumulator(){
errCounter = context.getLongCounter(Metrics.NUM_ERRORS);
// ...

outputMetric = new BaseMetric(context);
outputMetric.addMetric(Metrics.NUM_ERRORS, errCounter);
// ...

startTime = System.currentTimeMillis();
}

然后调用方法com.dtstack.flinkx.outputformat.BaseRichOutputFormat#initAccumulatorCollector初始话计数器收集器accumulatorCollector,内部会启动一个定时任务,执行方法com.dtstack.flinkx.metrics.AccumulatorCollector#collectAccumulatorWithApi从 JM 获取计数器的值。

校验

校验过程有两处:每一行数据写之前;以及所有数据写完。
写之前是在方法com.dtstack.flinkx.outputformat.BaseRichOutputFormat#writeSingleRecord

1
2
3
4
5
6
7
8
9
10
protected void writeSingleRecord(Row row) {
if(errorLimiter != null) {
errorLimiter.acquire();
}

try {
writeSingleRecordInternal(row);
}
//...
}

这里先调用方法com.dtstack.flinkx.writer.ErrorLimiter#acquire获取错误的情况,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public void acquire() {
String errorDataStr = "";
if(errorData != null){
errorDataStr = errorData.toString() + "\n";
}

long errors = accumulatorCollector.getAccumulatorValue(Metrics.NUM_ERRORS);
if(maxErrors != null && !maxErrors.equals(0)){
Preconditions.checkArgument(errors <= maxErrors, "WritingRecordError: error writing record [" + errors + "] exceed limit [" + maxErrors
+ "]\n" + errorDataStr + errMsg);
}

if(maxErrorRatio != null){
long numRead = accumulatorCollector.getAccumulatorValue(Metrics.NUM_READS);
if(numRead >= 1) {
errorRatio = (double) errors / numRead;
}

Preconditions.checkArgument(errorRatio <= maxErrorRatio, "WritingRecordError: error writing record ratio [" + errorRatio + "] exceed limit [" + maxErrorRatio
+ "]\n" + errorDataStr + errMsg);
}
}

从计数器收集器accumulatorCollector中获取错误数errors和错误比例(错误数/读取数)errorRatio = (**double**) errors / numRead。如果超过最大错误数maxErrors或者最大错误比例maxErrorRatio,抛出异常。
由于上面的检测是在写之前,对于最后一次写,还是有可能失败的,所以写完之后,又有一次检测,也是调用方法com.dtstack.flinkx.writer.ErrorLimiter#acquire。具体是在方法com.dtstack.flinkx.outputformat.BaseRichOutputFormat#checkErrorLimit中,由com.dtstack.flinkx.outputformat.BaseRichOutputFormat#close方法调用。
至此,错误检测机制说明白了。那为什么没有生效呢?可能的原因就是检测条件不成立,即maxErrors为 null 或者 0;maxErrorRatio 为 null。顺着这个思路找到这两个值初始化的源头,com.dtstack.flinkx.mongodb.writer.MongodbWriter#writeData

1
2
3
4
5
6
7
8
9
10
11
12
13
public DataStreamSink<?> writeData(List<DataStream<Row>> dataSets) {
MongodbOutputFormatBuilder builder = new MongodbOutputFormatBuilder();

builder.setMongodbConfig(mongodbConfig);
builder.setColumns(columns);
builder.setMonitorUrls(monitorUrls);
builder.setErrors(errors);
builder.setDirtyPath(dirtyPath);
builder.setDirtyHadoopConfig(dirtyHadoopConfig);
builder.setSrcCols(srcCols);

return createOutput(dataSets, builder.finish());
}

看到这里处理了 errors,没有处理errorRatio。而errors是从配置中获取的,取值为 0。如此两个检测条件都不成立,所以任务没有失败。

1
2
3
4
"errorLimit": {
"percentage": 0.01,
"record": 0
},

处理

在方法com.dtstack.flinkx.mongodb.writer.MongodbWriter#writeData中设置errorRatio 或者在创建配置时将errorLimit.record设置为 1。