flinkx 1.7 jar 上传逻辑

之前在使用 flinkx 1.7 时,将插件的 jar 在 flink 的 lib 目录中也存放了一份,这样每次提交作业,用不上的 jar 也会上传,记录如何优化这个上传过程。

flinkx 1.7 任务启动过程中要上传的 jar 分两部分

  1. flink lib 目录下的 jar,又细分成两部分 flink-dist 和 非 flink-dist,以及具体的 flinkx 插件 jar 包
  2. flinkx 插件目录下的 jar,又细分成 flinkx.jar (对应 flinkx-core 模块)和具体插件的 jar

在具体的上传过程中,
flink-dist 单独上传

1
2
3
4
5
6
7
8
Path remotePathJar = setupSingleLocalResource(
"flink.jar",
fs,
appId,
flinkJarPath,
localResources,
homeDir,
"");

其他的 jar 作业 user jar 一起上传

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
if (jobGraph != null) {
// add the user code jars from the provided JobGraph
// 这里所说的 user code jars 就是 flinkx plugin 目录下的 flinkx.jar
for (org.apache.flink.core.fs.Path path : jobGraph.getUserJars()) {
userJarFiles.add(new File(path.toUri()));
}
}

if (userJarInclusion != YarnConfigOptions.UserJarInclusion.DISABLED) {
userClassPaths = uploadAndRegisterFiles(
userJarFiles,
fs,
homeDir,
appId,
paths,
localResources,
envShipFileList);
} else {
userClassPaths = Collections.emptyList();
}

userJarFiles 的初始化是在com.dtstack.flinkx.launcher.perjob.PerJobClusterClientBuilder#createPerJobClusterDescriptor,这里将 flink lib 目录下除 flink-dist jar 之外的所有 jar 放到了 userJarFiles

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
List<URL> classpaths = new ArrayList<>();
if (flinkJarPath != null) {
File[] jars = new File(flinkJarPath).listFiles();

for (File file : jars){
if (file.toURI().toURL().toString().contains("flink-dist")){
clusterDescriptor.setLocalJarPath(new Path(file.toURI().toURL().toString()));
} else {
classpaths.add(file.toURI().toURL());
}
}

} else {
throw new RuntimeException("The Flink jar path is null");
}

clusterDescriptor.setProvidedUserJarFiles(classpaths);

这个过程中存在问题是,所有的插件 jar 在 flink lib 目录下都存在了一份,提交的作业时不管有没有使用到都会上传。

理想的状态是,任务使用了哪些插件,就上传这些插件对应的 jar。

任务使用了哪些插件,可以从任务配置文件中解析得到。这个解析过程,在com.dtstack.flinkx.launcher.Launcher#analyzeUserClasspath方法中已经完成,保存在clusterSpecification.classpaths 中。并且在后续获取 PackagedProgramJobGraph 对象时,又保存到了各自对象的 classpaths 变量中。

一个处理的方法是:

  1. 将插件包从 flink lib 目录下删除
  2. 上面上传 userJarFiles 表示的 jar 包之前,从 jobGraph 中获取了 flinkx.jar,可以做一个类似的操作,即:将 jobGraph.classpath 中的 jar (就是任务需要的插件包)一并添加到 userJarFiles 中,待后续上传。
  3. 还有个方法,同事给出的,是在 com.dtstack.flinkx.launcher.perjob.PerJobClusterClientBuilder#createPerJobClusterDescriptor方法中设置 classpaths 时,从 clusterSpefication 中获取解析出的插件包添加到 classpaths 中。

如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
List<URL> classpaths = new ArrayList<>();
clusterSpecification.getClasspaths().forEach(jar->classpaths.add(jar));
if (flinkJarPath != null) {
File[] jars = new File(flinkJarPath).listFiles();

for (File file : jars){
if (file.toURI().toURL().toString().contains("flink-dist")){
clusterDescriptor.setLocalJarPath(new Path(file.toURI().toURL().toString()));
} else {
classpaths.add(file.toURI().toURL());
}
}

} else {
throw new RuntimeException("The Flink jar path is null");
}

clusterDescriptor.setProvidedUserJarFiles(classpaths);