之前在使用 flinkx 1.7 时,将插件的 jar 在 flink 的 lib 目录中也存放了一份,这样每次提交作业,用不上的 jar 也会上传,记录如何优化这个上传过程。
flinkx 1.7 任务启动过程中要上传的 jar 分两部分
flink lib 目录下的 jar,又细分成两部分 flink-dist 和 非 flink-dist,以及具体的 flinkx 插件 jar 包
flinkx 插件目录下的 jar,又细分成 flinkx.jar (对应 flinkx-core 模块)和具体插件的 jar
在具体的上传过程中, flink-dist 单独上传
1 2 3 4 5 6 7 8 Path remotePathJar = setupSingleLocalResource( "flink.jar" , fs, appId, flinkJarPath, localResources, homeDir, "" );
其他的 jar 作业 user jar 一起上传
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 if (jobGraph != null ) { for (org.apache.flink.core.fs.Path path : jobGraph.getUserJars()) { userJarFiles.add(new File (path.toUri())); } } if (userJarInclusion != YarnConfigOptions.UserJarInclusion.DISABLED) { userClassPaths = uploadAndRegisterFiles( userJarFiles, fs, homeDir, appId, paths, localResources, envShipFileList); } else { userClassPaths = Collections.emptyList(); }
userJarFiles
的初始化是在com.dtstack.flinkx.launcher.perjob.PerJobClusterClientBuilder#createPerJobClusterDescriptor
,这里将 flink lib 目录下除 flink-dist jar 之外的所有 jar 放到了 userJarFiles
中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 List<URL> classpaths = new ArrayList <>(); if (flinkJarPath != null ) { File[] jars = new File (flinkJarPath).listFiles(); for (File file : jars){ if (file.toURI().toURL().toString().contains("flink-dist" )){ clusterDescriptor.setLocalJarPath(new Path (file.toURI().toURL().toString())); } else { classpaths.add(file.toURI().toURL()); } } } else { throw new RuntimeException ("The Flink jar path is null" ); } clusterDescriptor.setProvidedUserJarFiles(classpaths);
这个过程中存在问题是,所有的插件 jar 在 flink lib 目录下都存在了一份,提交的作业时不管有没有使用到都会上传。
理想的状态是,任务使用了哪些插件,就上传这些插件对应的 jar。
任务使用了哪些插件,可以从任务配置文件中解析得到。这个解析过程,在com.dtstack.flinkx.launcher.Launcher#analyzeUserClasspath
方法中已经完成,保存在clusterSpecification.classpaths
中。并且在后续获取 PackagedProgram
和 JobGraph
对象时,又保存到了各自对象的 classpaths
变量中。
一个处理的方法是:
将插件包从 flink lib 目录下删除
上面上传 userJarFiles
表示的 jar 包之前,从 jobGraph
中获取了 flinkx.jar
,可以做一个类似的操作,即:将 jobGraph.classpath
中的 jar (就是任务需要的插件包)一并添加到 userJarFiles
中,待后续上传。
还有个方法,同事给出的,是在 com.dtstack.flinkx.launcher.perjob.PerJobClusterClientBuilder#createPerJobClusterDescriptor
方法中设置 classpaths 时,从 clusterSpefication 中获取解析出的插件包添加到 classpaths 中。
如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 List<URL> classpaths = new ArrayList <>(); clusterSpecification.getClasspaths().forEach(jar->classpaths.add(jar)); if (flinkJarPath != null ) { File[] jars = new File (flinkJarPath).listFiles(); for (File file : jars){ if (file.toURI().toURL().toString().contains("flink-dist" )){ clusterDescriptor.setLocalJarPath(new Path (file.toURI().toURL().toString())); } else { classpaths.add(file.toURI().toURL()); } } } else { throw new RuntimeException ("The Flink jar path is null" ); } clusterDescriptor.setProvidedUserJarFiles(classpaths);