获取对账数据
flinkx 1.11 任务执行过程中记录读写相关的指标数据,这里这里介绍如何将这些指标数据获取并输出到外部系统中。
FlinkX 内部将读写相关的统计数据记录到了Flink的metric 里面。
metric中的数据是如何获取的呢?BaseRichOutputFormat
和BaseRichInputFormat
有一个成员accumulatorCollector
,这个成员在open
方法里初始化,内部会启动一个线程,定时从metric获取。
现在统计数据可以获取到了,还有一个问题——与这个数据关联的作业是哪个,Flink作业运行的时候是没有这些信息的,要解决这个问题就要将作业相关的信息传给Flink。
作业相关的信息可以有
- taskId和调度时间
- applicationId(datahub 在提交作业时会记录applicationId,通过applicationId可以找到作业)
使用方案1的潜在问题是,一个taskId可能不止一个同步任务;
使用方案2的问题是,applicationId是yarn里面的概念,后续将作业运行到k8s上,这个applicationId改如何表示
有了作业信息,在输出数据写完之后,具体就是com.dtstack.flinkx.outputformat.BaseRichOutputFormat#close
方法结束的时候,将统计数据(通过accumulatorCollector
获取)和作业信息发送到外部的接口保存起来,就可以用于后续对账了。
无论采用哪个方案,传递的过程是类似的。
如何传递作业信息
在说明之前,以com.dtstack.flinkx.outputformat.BaseRichOutputFormat#monitorUrl
为例看下FlinkX中是如何传递参数的
首先,方法com.dtstack.flinkx.launcher.Launcher#main
中,解析命令行参数,提交到yarn之前
1 | argList.add("-monitor"); |
这里-monitor
参数设置的是空,会在方法com.dtstack.flinkx.launcher.perJob.FlinkPerJobUtil#buildProgram
填充
1 | if ("-monitor".equals(args[i])) { |
然后作为com.dtstack.flinkx.Main#main
的参数传入,
1 | String monitor = options.getMonitor(); |
com.dtstack.flinkx.Main#main
方法中会通过Factory方法创建BaseDataWriter
的子类,子类在初始化时,会调用基类com.dtstack.flinkx.writer.BaseDataWriter#BaseDataWriter
的构造方法
1 | public BaseDataWriter(DataTransferConfig config) { |
子类创建后,调用子类的writeData
方法,就会将上面的monitorUrls
通过com.dtstack.flinkx.outputformat.BaseRichOutputFormatBuilder#setMonitorUrls
设置到com.dtstack.flinkx.outputformat.BaseRichOutputFormat#monitorUrl
。
可以看到大致的流程是
graph LR; 命令行参数-->Launcher; Launcher-->填充; 填充-->Main; Main-->XXXWriter; Main-->YYYWriter; XXXWriter-->BaseRichOutputFormat; YYYWriter-->BaseRichOutputFormat;
这里的问题是,传递过程经过了BaseDataWriter
子类,不同的Writer子类都要在方法writeData
中调用一次。有没有办法绕过呢?
有的,可以参照龙哥上次处理jobId为null的做法:
在方法com.dtstack.flinkx.Main#main
中,将命令行参数解析到ParameterTool
的一个对象中,然后设置到Flink的运行时中
1 | ParameterTool parameterTool = ParameterTool.fromArgs(args); |
BaseRichOutputFormat
通过继承RichOutputFormat
可以获取到运行时,进而获取到参数
1 | context = (StreamingRuntimeContext) getRuntimeContext(); |
这里的两个参数
reportPath
上报接口地址appId
yarn任务ID
graph LR; 命令行参数-->Launcher; Launcher-->填充; 填充-->Main; Main-->BaseRichOutputFormat;
发送
在上面两个参数的基础上继续,通过accumulatorCollector
获取到指标数据,连同applicationId,发送到指定接口就可以了(下面代码中的日志)
1 | private void reportStatistics() { |
测试
提交作业,观察日志
1 | 2021-08-10 20:33:06,221 INFO com.dtstack.flinkx.hdfs.writer.HdfsTextOutputFormat - send result:{"numReads":91,"numWrites":91,"numErrors":0} of appId:application_1627567033654_0630 to report_path |