flinkx 写 s3

介绍 flinkx 1.11 写 aws s3 过程中遇到的问题及处理。

背景

在新加坡部署集成服务,写 hdfs 的作业执行结束后,在 S3 上面看不到数据。

分析&解决

写 S3 有两种方式

  1. 使用社区的 hadoop-aws 模块,实现一个 HDFS 文件系统:S3AFileSystem
  2. 使用 aws 的 emrfs,实现一个 HDFS 文件系统:EmrFileSystem

考虑到这个问题之前在欧盟出现过,是通过 hadoop-aws 这个模块写的。所以这次先按照这个思路走。

flinkx 1.7 与 1.11 有些区别,1.7 中使用的是 flink shade 后的 hadoop 包,1.11 中使用的是未 shade 的 hadoop 包。

image-20220406145535986

org.apache.hadoop.fs.FileSystem 可以看到依赖的是 hadoop 2.7.3 版本,所以在 flinkx-hdfs/flinkx-hdfs-writer/pom.xml 中引入依赖 2.7.3 版本的 hadoop-aws

1
2
3
4
5
<dependency>
<artifactId>hadoop-aws</artifactId>
<groupId>org.apache.hadoop</groupId>
<version>2.7.3</version>
</dependency>

保持版本一致,否则会出现各种问题

重新打包,提交任务,任务成功执行。检查 S3 发现没有数据。接下来,打开日志,查看 hadoop-aws 模块的运行。日志打开方法如下:

修改 /home/service/datahub/flink-1.11-ostream/conf 中的 log4j.properties

1
log4j.logger.org.apache.hadoop=DEBUG

重新执行任务,从日志大致看到,hadoop-aws 写文件的过程大致是,先写到 node manager 一个临时的目录,例如/mnt/var/lib/hadoop/tmp/s3a/output-6052406643142588357.tmp,然后在刷新的时候,将文件上传到 S3。

从日志中看到,在执行完com.dtstack.flinkx.outputformat.BaseFileOutputFormat#flushData后,hadoop-aws 将对象上传到 s3 了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
2022-04-02 06:42:51,900 INFO  com.dtstack.flinkx.hdfs.writer.HdfsTextOutputFormat           - Close current text stream, write data size:[1563]
2022-04-02 06:42:51,900 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem - OutputStream for key 'warehouse/compass_2/dataland_test_db.db/test_yl_0325/.data/0.12c6031f80ede3449f01b72916912c0f.0' closed. Now beginning upload
2022-04-02 06:42:51,900 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem - Minimum upload part size: 104857600 threshold 2147483647

2022-04-02 06:42:51,946 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem - Getting path status for /warehouse/compass_2/dataland_test_db.db/test_yl_0325/.data (warehouse/compass_2/dataland_test_db.db/test_yl_0325/.data)
2022-04-02 06:42:52,241 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem - Found path as directory (with /): 0/1
2022-04-02 06:42:52,241 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem - Summary: warehouse/compass_2/dataland_test_db.db/test_yl_0325/.data/0.12c6031f80ede3449f01b72916912c0f.0 1543

2022-04-02 06:42:52,241 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem - Getting path status for /warehouse/compass_2/dataland_test_db.db/test_yl_0325 (warehouse/compass_2/dataland_test_db.db/test_yl_0325)
2022-04-02 06:42:52,287 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem - Found file (with /): fake directory
2022-04-02 06:42:52,287 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem - Deleting fake directory warehouse/compass_2/dataland_test_db.db/test_yl_0325/

2022-04-02 06:42:52,312 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem - Getting path status for /warehouse/compass_2/dataland_test_db.db (warehouse/compass_2/dataland_test_db.db)
2022-04-02 06:42:52,384 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem - Found path as directory (with /): 1/0
2022-04-02 06:42:52,384 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem - Prefix: warehouse/compass_2/dataland_test_db.db/cx_iceberg_sg_1/

2022-04-02 06:42:52,384 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem - Getting path status for /warehouse/compass_2 (warehouse/compass_2)
2022-04-02 06:42:52,451 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem - Found path as directory (with /): 1/0
2022-04-02 06:42:52,451 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem - Prefix: warehouse/compass_2/dataland_test_db.db/

2022-04-02 06:42:52,451 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem - Getting path status for /warehouse (warehouse)
2022-04-02 06:42:52,533 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem - Found path as directory (with /): 1/0
2022-04-02 06:42:52,534 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem - Prefix: warehouse/Strategy_data/

2022-04-02 06:42:52,534 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem - OutputStream for key 'warehouse/compass_2/dataland_test_db.db/test_yl_0325/.data/0.12c6031f80ede3449f01b72916912c0f.0' upload complete

但是为什么 S3 上就是没有呢?

这个问题需要 AWS 帮查看,没有继续看。

hadoop-aws 这个方法走不通了,那 emrfs 该怎么使用呢?

Flink 对 S3 提供了支持,具体见文档 Amazon S3,这里提到了两个文件系统插件的实现,一个是 flink-s3-fs-prestoflink-s3-fs-hadoop。插件的使用方式是

1
2
mkdir ./plugins/s3-fs-hadoop
cp ./opt/flink-s3-fs-hadoop-1.11.3.jar ./plugins/s3-fs-hadoop/

中间有把flink-s3-fs-hadoop-1.11.3.jar 放到了 lib 下,同时把 flink-shaded-hadoop-2-uber-2.6.5-10.0.jar 删掉了,报错如下

1
java.lang.NoClassDefFoundError: Could not initialize class org.apache.flink.runtime.entrypoint.parser.CommandLineOptions

来自Flink ON YARN 报错及解决方案

提交作业,执行失败,报错如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
Caused by: java.lang.NoSuchMethodError: org.apache.hadoop.conf.Configuration.setAllowNullValueProperties(Z)V
at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.initializeConfiguration(EmrFileSystem.java:128)
at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.initialize(EmrFileSystem.java:96)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2598)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2632)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2614)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:169)
at com.dtstack.flinkx.util.FileSystemUtil.getFileSystem(FileSystemUtil.java:62)
at com.dtstack.flinkx.hdfs.writer.BaseHdfsOutputFormat.openSource(BaseHdfsOutputFormat.java:354)
at com.dtstack.flinkx.outputformat.BaseFileOutputFormat.openInternal(BaseFileOutputFormat.java:103)
at com.dtstack.flinkx.hdfs.writer.BaseHdfsOutputFormat.openInternal(BaseHdfsOutputFormat.java:111)
at com.dtstack.flinkx.outputformat.BaseRichOutputFormat.open(BaseRichOutputFormat.java:258)
at com.dtstack.flinkx.streaming.api.functions.sink.DtOutputFormatSinkFunction.open(DtOutputFormatSinkFunction.java:89)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:473)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:469)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.lang.Thread.run(Thread.java:748)

方法 org.apache.hadoop.conf.Configuration.setAllowNullValueProperties 找不到。

从这里也看到使用的文件系统是 EmrFileSystem

反编译了 flink-shaded-hadoop-2-uber-2.6.5-10.0.jar,确实没有上述方法

image-20220406153313924

查阅文档 Hadoop Integration,flink 从 1.11 版本之后,不再提供官方的 flink-shaded-hadoop-2-uber,不过可以手动从这里下载

确认 hadoop 版本 , 2.8.5 版本,下载 flink-shaded-hadoop-2-uber-2.8.3-10.0.jar,与此同时,将 flink 改为使用社区提供的 1.11.3 版本,代替我们自己编译的 ostream-1.11 版本。

1
2
3
4
5
6
7
[amy@ip-10-1-42-72 ~]$ hadoop version
Hadoop 2.8.5-amzn-5
Subversion git@aws157git.com:/pkg/Aws157BigTop -r a3b61461af0d6b4d981c915b0a1f342464987aaa
Compiled by ec2-user on 2019-12-14T09:05Z
Compiled with protoc 2.5.0
From source with checksum fad06c90f0f460226b0a91c6c1926d4
This command was run using /usr/lib/hadoop/hadoop-common-2.8.5-amzn-5.jar

重新提交作业执行成功,S3 也能看到数据。

后面遇到作业看不到日志的问题,解决办法是将原有 flink 目录下的 conf 目录整体拷贝到新目录。