获取对账数据

flinkx 1.11 任务执行过程中记录读写相关的指标数据,这里这里介绍如何将这些指标数据获取并输出到外部系统中。


FlinkX 内部将读写相关的统计数据记录到了Flink的metric 里面。

metric中的数据是如何获取的呢?BaseRichOutputFormatBaseRichInputFormat 有一个成员accumulatorCollector,这个成员在open方法里初始化,内部会启动一个线程,定时从metric获取。

现在统计数据可以获取到了,还有一个问题——与这个数据关联的作业是哪个,Flink作业运行的时候是没有这些信息的,要解决这个问题就要将作业相关的信息传给Flink。

作业相关的信息可以有

  1. taskId和调度时间
  2. applicationId(datahub 在提交作业时会记录applicationId,通过applicationId可以找到作业)

使用方案1的潜在问题是,一个taskId可能不止一个同步任务;

使用方案2的问题是,applicationId是yarn里面的概念,后续将作业运行到k8s上,这个applicationId改如何表示

有了作业信息,在输出数据写完之后,具体就是com.dtstack.flinkx.outputformat.BaseRichOutputFormat#close方法结束的时候,将统计数据(通过accumulatorCollector获取)和作业信息发送到外部的接口保存起来,就可以用于后续对账了。

无论采用哪个方案,传递的过程是类似的。

如何传递作业信息

在说明之前,以com.dtstack.flinkx.outputformat.BaseRichOutputFormat#monitorUrl为例看下FlinkX中是如何传递参数的

首先,方法com.dtstack.flinkx.launcher.Launcher#main中,解析命令行参数,提交到yarn之前

1
2
3
argList.add("-monitor");
argList.add("");
PerJobSubmitter.submit(launcherOptions, new JobGraph(), argList.toArray(new String[0]));

这里-monitor参数设置的是空,会在方法com.dtstack.flinkx.launcher.perJob.FlinkPerJobUtil#buildProgram填充

1
2
3
4
if ("-monitor".equals(args[i])) {
args[i + 1] = monitorUrl;
break;
}

然后作为com.dtstack.flinkx.Main#main的参数传入,

1
2
3
4
5
String monitor = options.getMonitor();
DataTransferConfig config = DataTransferConfig.parse(job);
if(StringUtils.isNotEmpty(monitor)) {
config.setMonitorUrls(monitor);
}

com.dtstack.flinkx.Main#main方法中会通过Factory方法创建BaseDataWriter的子类,子类在初始化时,会调用基类com.dtstack.flinkx.writer.BaseDataWriter#BaseDataWriter的构造方法

1
2
3
4
public BaseDataWriter(DataTransferConfig config) {
this.monitorUrls = config.getMonitorUrls();
//...
}

子类创建后,调用子类的writeData方法,就会将上面的monitorUrls通过com.dtstack.flinkx.outputformat.BaseRichOutputFormatBuilder#setMonitorUrls设置到com.dtstack.flinkx.outputformat.BaseRichOutputFormat#monitorUrl

可以看到大致的流程是

graph LR;
    命令行参数-->Launcher;
    Launcher-->填充;
    填充-->Main;
    Main-->XXXWriter;
    Main-->YYYWriter;
    XXXWriter-->BaseRichOutputFormat;    
    YYYWriter-->BaseRichOutputFormat;

这里的问题是,传递过程经过了BaseDataWriter子类,不同的Writer子类都要在方法writeData中调用一次。有没有办法绕过呢?

有的,可以参照龙哥上次处理jobId为null的做法:

在方法com.dtstack.flinkx.Main#main中,将命令行参数解析到ParameterTool的一个对象中,然后设置到Flink的运行时中

1
2
ParameterTool parameterTool = ParameterTool.fromArgs(args);
env.getConfig().setGlobalJobParameters(parameterTool);

BaseRichOutputFormat通过继承RichOutputFormat可以获取到运行时,进而获取到参数

1
2
3
4
context = (StreamingRuntimeContext) getRuntimeContext();
ParameterTool parameterTool = (ParameterTool) context.getExecutionConfig().getGlobalJobParameters();
reportPath = parameterTool.get("reportPath", null);
appId = parameterTool.get("appId", null);

这里的两个参数

  • reportPath 上报接口地址
  • appId yarn任务ID
graph LR;
    命令行参数-->Launcher;
    Launcher-->填充;
    填充-->Main;
    Main-->BaseRichOutputFormat;

发送

在上面两个参数的基础上继续,通过accumulatorCollector获取到指标数据,连同applicationId,发送到指定接口就可以了(下面代码中的日志)

1
2
3
4
5
6
7
8
9
10
private void reportStatistics() {
if (taskNumber == 0 && StringUtils.isNotEmpty(reportPath) && StringUtils.isNotEmpty(appId)) {
long numReads = accumulatorCollector.getAccumulatorValue(Metrics.NUM_READS);
long numWrites = accumulatorCollector.getAccumulatorValue(Metrics.NUM_WRITES);
long numErrors = accumulatorCollector.getAccumulatorValue(Metrics.NUM_ERRORS);
String result = String.format("{\"numReads\":%d,\"numWrites\":%d,\"numErrors\":%d}",
numReads, numWrites, numErrors);
LOG.info("send result:{} of appId:{} to {}", result, appId, reportPath);
}
}

测试

提交作业,观察日志

1
2021-08-10 20:33:06,221 INFO  com.dtstack.flinkx.hdfs.writer.HdfsTextOutputFormat           - send result:{"numReads":91,"numWrites":91,"numErrors":0} of appId:application_1627567033654_0630 to report_path