streamis-作业执行过程源码
记录 streamis 作业执行流程
执行流程
flowchart TD 构建单次作业 --> 提交创建引擎 提交创建引擎 --> 获取引擎信息 获取引擎信息 --> 更新作业状态
构建一次性作业
访问接口
/streamis/streamJobManager/job/execute
,从数据库中获取 job 信息,封装到StreamisTransformJob
实例 中,再经过一系列的转换(Transform
)处理(添加 labels、source、作业配置、launchConfig)后得到一个LaunchJob
实例。这个实例,会有 linkis 的SimpleOnceJobBuilder
转成一个SubmittableSimpleOnceJob
对象,封装了 linkis 客户端和引擎创建 action:CreateEngineConnAction
,此外还会将作业要执行的 sql 作为资源上传到 HDFS 便于引擎执行时获取。提交创建引擎
提交上文创建的
SubmittableSimpleOnceJob
,通过 linkis 客户端执行请求CreateEngineConnAction
,然后在一个循环中等待引擎就绪,得到一个engineConnId
。linkis 启动引擎的过程中,会创建一个FlinkCodeOnceExecutor
实例的执行器,这个执行器内部会存储 yarn application id 和 作业所在 node manager 的地址。保存engineConnId
和SubmittableSimpleOnceJob
实例的映射到缓存onceJobs
中。获取引擎信息
内部创建一个
EngineConnOperateAction
,通过 linkis 客户端请求,拿到上面的 yarn application id 和 node manger 地址,与engineConnId
、提交用户、ECM 实例 等封装一个FlinkJobInfo
实例中。保存engineConnId
和FlinkJobInfo
实例的映射到缓存onceJobIdToJobInfo
中。更新作业状态
类
TaskMonitorService
中有个定时任务,从数据库中获取未完成的任务,根据任务的engineConnId
从映射中拿到SubmittableSimpleOnceJob
实例,内部会创建一个GetEngineConnAction
,有 linkis 客户端发起请求,获取节点信息,里面包含了引擎的状态。
任务执行 API 入口
sequenceDiagram activate JobRestfulApi JobRestfulApi ->> JobRestfulApi : 判断用户对于作业的权限 JobRestfulApi ->> + TaskService : executeJob() TaskService ->> TaskService : 根据 jobId 从表 linkis_stream_job 获取 job TaskService ->> TaskService : 根据 jobId 从表 linkis_stream_job_version 获取最新版本 jobVersion TaskService ->> TaskService : 创建 task 对象,并保存到表 linkis_stream_task Note over TaskService : 找到满足条件的 Builder:
StreamisFlinkTransformJobBuilder
调用 build() 方法 TaskService ->> + AbstractFlinkStreamisTransformJobBuilder : build():
调用符合条件的实现类
将 StreamJob 转成 StreamisTransformJob AbstractFlinkStreamisTransformJobBuilder ->> AbstractFlinkStreamisTransformJobBuilder : 通过 configurationService 获取任务配置
内部按照不同类型将配置分类
通过 streamJobMapper 获取版本 Note over FlinkSQLJobContentParser : 不同版本的作业,作业内容可能不一样 AbstractFlinkStreamisTransformJobBuilder ->> + FlinkSQLJobContentParser : parseTo():从作业版本中获取作业内容 Note over FlinkSQLJobContentParser : 根据任务类型获取作业内容
1. file 类型调用 getFileContent(...)
2. bml 类型调用 readFileFromBML(...) 3. sql 类型直接返回 sql 字符串 FlinkSQLJobContentParser -->> - AbstractFlinkStreamisTransformJobBuilder : 返回 StreamisSqlTransformJobContent 对象
封装了任务内容(需要执行的 SQL) AbstractFlinkStreamisTransformJobBuilder ->> AbstractFlinkStreamisTransformJobBuilder : 设置 engineConn 类型(flink-1.12.2)和运行类型(sql) AbstractFlinkStreamisTransformJobBuilder -->> - TaskService : 返回 transformJob Note over TaskService : 接下来,将 transformJob 转成 launchJob TaskService ->> + Transform : transform() Note over TaskService, Transform : 这里调用了一个 foldLeft 方法,
效果是从数组的最后一个 Transform 依次作用在 transformJob 上面,
并将结果作为下次 transform 的输入。 Transform -->> - TaskService : 返回 launchJob Note over TaskService : 接下来通过 linkisJobManager 启动 launchJob
通过反射获取 LinkisJobManager 接口的实现类,
接口中有个 getName 方法。
内部会构建一个 name -> 实现类的映射。
这里根据 name=simpleFlink 获取到的实现类是 SimpleFlinkJobManager Note over TaskService : 具体见“启动LaunchJob” TaskService -->> - JobRestfulApi : 执行结束返回 note over JobRestfulApi : 返回成功消息 deactivate JobRestfulApi
Transform 的作用是给 launchJob 添加 labels、source、作业配置、launchConfig。
classDiagram class Transform { <> + transform(...) } Transform<|..StreamisJobContentTransform Transform<|..ConfigTransform Transform<|..LabelsStreamisCodeTransform Transform<|..SourceTransform Transform<|..FlinkJarStreamisStartupParamsTransform Transform<|..LaunchConfigTransform class StreamisJobContentTransform { < > + transform(...) + transformJobContent(...) } StreamisJobContentTransform<|..SqlStreamisJobContentTransform StreamisJobContentTransform<|..FlinkJarStreamisJobContentTransform class ConfigTransform { < > + transform(...) + transform(...) } ConfigTransform<|..FlinkCheckpointConfigTransform ConfigTransform<|..ResourceConfigTransform class ResourceConfigTransform{ + transform(...) } ResourceConfigTransform<|..ExtraConfigTransform
LaunchJob 示例
1 | LaunchJob( |
launchConfig 由于在 toString 方法中没有实现,所以日志中不会打印
启动 LaunchJob
启动 LaunchJob 内部先是构建了一个单次作业 onceJob,再提交
com.webank.wedatasphere.streamis.jobmanager.launcher.linkis.manager.FlinkJobManager#launch
1 | override def launch(job: LaunchJob): String = { |
构建单次作业
sequenceDiagram activate TaskService TaskService ->> + FlinkJobManager : launch() FlinkJobManager ->> FlinkJobManager : 判断标签 engineType 是否 flink 开头 FlinkJobManager ->> + SimpleFlinkJobManager : buildOnceJob():
构建单次作业 SimpleFlinkJobManager ->> SimpleFlinkJobManager : 初始化 SimpleOnceJobBuilder 对象 rect rgb(121, 216, 206) SimpleFlinkJobManager ->> + SimpleOnceJobBuilder : build() SimpleOnceJobBuilder ->> SimpleOnceJobBuilder :
1. 校验 labels/jobContent 是否为空
2. 设置 params/source
3. 配置启动参数
4. TODO 还有些细节要看下 activate SimpleOnceJobBuilder Note right of SimpleOnceJobBuilder : 调用方法 getOnceExecutorContent() SimpleOnceJobBuilder ->> SimpleOnceJobBuilder : 构建 OnceExecutorContent 对象,并初始化 SimpleOnceJobBuilder ->> + OnceExecutorContentUtils : contentToMap() :
将 onceExecutorContent
转成 contentMap OnceExecutorContentUtils -->> - SimpleOnceJobBuilder : 返回 contentMap SimpleOnceJobBuilder ->> + HttpBmlClient : uploadResource():
json 序列化 contentMap 对象上传 HttpBmlClient -->> - SimpleOnceJobBuilder : 返回响应 response 包含资源 ID 和版本 SimpleOnceJobBuilder ->> SimpleOnceJobBuilder : 将 response 封装成 BmlResource SimpleOnceJobBuilder ->> + OnceExecutorContentUtils : resourceToValue() :
将 BmlResource 转成资源 ID OnceExecutorContentUtils -->> - SimpleOnceJobBuilder : 返回资源 ID deactivate SimpleOnceJobBuilder SimpleOnceJobBuilder ->> SimpleOnceJobBuilder : 初始化 CreateEngineConnAction.Builder 对象 SimpleOnceJobBuilder ->> + CreateEngineConnAction.Builder : build() CreateEngineConnAction.Builder -->> - SimpleOnceJobBuilder : 返回对象 createEngineConnAction SimpleOnceJobBuilder ->> SimpleOnceJobBuilder : 封装一个 SubmittableSimpleOnceJob 实例 Note over SimpleOnceJobBuilder : 这个实例中包含有 linkisClient,以及 createEngineConnAction
linkisCLient 内部有个成员 dwsHttpClient 后续与 linkis 接口的交互通过它执行 SimpleOnceJobBuilder -->> - SimpleFlinkJobManager : 返回实例 end SimpleFlinkJobManager -->> - FlinkJobManager : 返回实例 onceJob Note over FlinkJobManager : 执行 onceJob.submit() 方法
具体见“提交创建引擎” FlinkJobManager -->> - TaskService : 返回 linkisJobId deactivate TaskService
createEngineConnAction.getRequestPayload
1 | { |
接下来,会将上面的 SubmittableSimpleOnceJob 通过 linkis 客户端提交给 linkis,linkis 则会创建一个 EngineConn。
获取引擎信息
sequenceDiagram activate FlinkJobManager FlinkJobManager ->> + AbstractSubmittableLinkisJob : submit() 提交 onceJob AbstractSubmittableLinkisJob ->> + SubmittableSimpleOnceJob : doSubmit() Note over AbstractSubmittableLinkisJob : 这里与之前创建的 SubmittableSimpleOnceJob 对上了 SubmittableSimpleOnceJob ->> + LinkisManagerClientImpl : createEngineConn() Note over LinkisManagerClientImpl : 这个方法内部会发起 http 请求 linkis 接口
返回一个 CreateEngineConnResult 实例 LinkisManagerClientImpl -->> - SubmittableSimpleOnceJob : 返回 nodeInfo SubmittableSimpleOnceJob ->> SubmittableSimpleOnceJob : 从 nodeInfo 中获取 serviceInstance、ticketId、ecmServiceInstance、lastEngineConnState SubmittableSimpleOnceJob ->> SimpleOnceJob : initOnceOperatorActions():在数组 operatorActions 中添加一个 action
这个 action 给 OnceJobOperator 设置 user/ticketId/serviceInstance/linkisManagerClient Note over SimpleOnceJob : TODO action 的作用 rect rgb(100, 118, 135) alt 调用方法 isCompleted 判断状态 lastEngineConnState 未完成 且 未启动 Note over SimpleOnceJob : isCompleted() 方法中在对于 engineConn 结束和运行
分别回调 onJobFlinished / onJobRunning rect rgb(255, 230, 204) loop 状态 未完成 且 未启动 SubmittableSimpleOnceJob ->> + SimpleOnceJob : isCompleted SimpleOnceJob ->> + OnceJob : getNodeInfo() :通过 linkis 客户端获取
ServiceInstance 对应的 EngineConn 信息 OnceJob -->> - SimpleOnceJob : 返回 EngineConnNode SimpleOnceJob -->> - SubmittableSimpleOnceJob : 从 EngineConnNode 获取状态,返回 isCompleted() 结果作为状态 end end SubmittableSimpleOnceJob ->> + SimpleOnceJob : transformToId() SimpleOnceJob -->> - SubmittableSimpleOnceJob : 设置 engineConnId else SubmittableSimpleOnceJob ->> + SimpleOnceJob : transformToId() SimpleOnceJob -->> - SubmittableSimpleOnceJob : 设置 engineConnId end end SubmittableSimpleOnceJob -->> - AbstractSubmittableLinkisJob : 无返回值 Note over AbstractSubmittableLinkisJob : 具体见“提交创建引擎后回调” AbstractSubmittableLinkisJob -->> - FlinkJobManager : 无返回值 FlinkJobManager ->> FlinkJobManager : 以 engineConnId 作为 key
onceJob 作为 value
放到缓存 onceJobs 中 Note over FlinkJobManager : 具体见“通知引擎执行作业” deactivate FlinkJobManager
这里逻辑就是通过 linkis 客户端向 linkis 发送一个 createEngineConnAction
用于创建一个 engineConn
,然后一直等待直到这个 engineConn
结束或者 运行起来。
这里请求的 linkis 接口地址中路径是 linkisManager/createEngineConn
,对应的执行逻辑入口是 org.apache.linkis.manager.am.restful.EngineRestfulApi#createEngineConn
AbstractSubmittableLinkisJob
在提交之后,还有个指标 jobMetrics 和回调(onJobSubmitted()
),见下文。
engineConnId 的格式
1 | engineConnId = s"${ticketId.length}_${serviceInstance.getApplicationName.length}_${ticketId}${serviceInstance.getApplicationName}${serviceInstance.getInstance}" |
nodeInfo 中包含的信息,看下日志,engineConnId 几乎包含了。
1 | EngineConn created with status Running, the nodeInfo is {ecmServiceInstance={instance=10-177-198-114.os |
linkis 客户端请求创建EngineConn
org.apache.linkis.computation.client.once.simple.SubmittableSimpleOnceJob#doSubmit
1 | override protected def doSubmit(): Unit = { |
其中linkisManagerClient.createEngineConn(createEngineConnAction)
最终会调用到org.apache.linkis.httpclient.AbstractHttpClient#execute
,这里的逻辑在源码-linkis 之BML物料上传与下载有做介绍。
请求的接口路径是这样确定的
1 | protected def prepareReq(requestAction: HttpAction): HttpRequestBase = { |
对于CreateEngineConnAction
,getURL
方法实现是
1 | trait DWSHttpAction { |
url 是由前缀和后缀两部分组成
前缀是
"/api/" + getRestType + "/" + dwsVersion
,其中 restType 是rest_j
,dwsVersion
由配置项wds.linkis.web.version
指定,默认值是v1
后缀
suffixURLs
在CreateEngineConnAction
中有重载1
2
3
4class CreateEngineConnAction extends POSTAction with LinkisManagerAction {
override def getRequestPayload: String = DWSHttpClient.jacksonJson.writeValueAsString(getRequestPayloads)
override def suffixURLs: Array[String] = Array("linkisManager", "createEngineConn")
}
综上,请求的接口地址路径是api/rest_j/v1/linkisManager/createEngineConn
。这个地址在 linkis 的处理入口是org.apache.linkis.manager.am.restful.EngineRestfulApi#createEngineConn
1 |
|
此外还有些有意思的地方,比如
响应类型,是根据请求的 url 来确定:在响应对象上定义了一个注解,注解值是对应的请求 url,通过反射获取到 url 和 响应类的映射关系后,从映射关系中获取响应类型。例如,
1
2"/api/rest_j/v\\d+/linkisManager/createEngineConn") (
class CreateEngineConnResult extends GetEngineConnResult
提交创建引擎后回调
sequenceDiagram activate AbstractSubmittableLinkisJob AbstractSubmittableLinkisJob ->> + LinkisJobMetrics : Note over TaskService, LinkisJobMetrics : 创建 LinkisJobMetrics 对象,
参数是上面创建的 engineConnId LinkisJobMetrics -->> - AbstractSubmittableLinkisJob : 返回 jobMetrics AbstractSubmittableLinkisJob ->> + AbstractLinkisJob : getJobListeners():获取作业监听器 Note over AbstractLinkisJob : 这里的监听器应该是空的
方法 addJobListener 貌似没有被调用 AbstractLinkisJob -->> - AbstractSubmittableLinkisJob : 返回监听器 Note over AbstractSubmittableLinkisJob : 调用监听器的 onJobSubmitted 方法 deactivate AbstractSubmittableLinkisJob
获取引擎信息
sequenceDiagram activate FlinkJobManager FlinkJobManager ->> FlinkJobManager : 将 onceJob 保存到缓存 onceJobs 中 FlinkJobManager ->> + SimpleFlinkJobManager : createJobInfo() SimpleFlinkJobManager ->> + OnceJob : getNodeInfo : 获取 engineConn 信息 OnceJob -->> - SimpleFlinkJobManager : 返回 nodeInfo SimpleFlinkJobManager ->> SimpleFlinkJobManager : 创建一个 FlinkJobInfo 对象 jobInfo activate SimpleFlinkJobManager Note right of SimpleFlinkJobManager : 调用方法 fatchApplicationInfo()
获取 yarn 作业信息 Note right of SimpleFlinkJobManager : 根据 operator 名字获取 operator
EngineConnApplicationInfoOperator Note right of SimpleFlinkJobManager : 将前面设置的 action 依次作用到
EngineConnApplicationInfoOperator rect rgb(121, 216, 206) loop 可以重试 SimpleFlinkJobManager ->> + OnceJobOperator : apply() OnceJobOperator ->> OnceJobOperator : 构建 EngineConnOperateAction.Builder 对象 Note over OnceJobOperator : 从这个 builder 的类型
可以看出它是用来构建 EngineConn 执行操作的构建器 OnceJobOperator ->> + EngineConnOperateAction.Builder : build() EngineConnOperateAction.Builder -->> - OnceJobOperator : 返回 EngineConnOperateAction 实例
engineConnOperateAction OnceJobOperator ->> + LinkisManagerClientImpl : executeEngineConnOperation() Note over LinkisManagerClientImpl : 通过 linkis 客户端,提交 engineConnOperateAction
这里引擎开始执行任务 LinkisManagerClientImpl -->> - OnceJobOperator : 返回执行结果 result,结果对象中有 applicationId,applicationUrl OnceJobOperator ->> + EngineConnApplicationInfoOperator : resultToObject():将返回的结果 result 转成 ApplicationInfo EngineConnApplicationInfoOperator -->> - OnceJobOperator : 返回 ApplicationInfo 对象(包含:applicationId、applicationUrl、queue) OnceJobOperator -->> - SimpleFlinkJobManager :返回 applicationInfo 对象 end end SimpleFlinkJobManager ->> SimpleFlinkJobManager : 从 applicationInfo 对象中获取
applcaitionId,applicationUrl 设置 jobInfo deactivate SimpleFlinkJobManager SimpleFlinkJobManager ->> SimpleFlinkJobManager : jobInfo 设置 resource SimpleFlinkJobManager -->> - FlinkJobManager : 返回 jobInfo FlinkJobManager ->> FlinkJobManager : 存放到缓存 onceJobIdToJobInfo
返回 engineConnId deactivate FlinkJobManager
这里请求的 linkis 接口地址中路径是 linkisManager/executeEngineConnOperation
,对应的执行逻辑入口是 org.apache.linkis.manager.am.restful.EngineRestfulApi#executeEngineConnOperation
更新作业状态
sequenceDiagram activate TaskService activate TaskService Note right of TaskService : 调用 updateStreamTaskStatus()
这个方法内部会判断 task
是否在缓存 onceJobIdToJobInfo 中 TaskService ->> + FlinkJobManager : getJobInfo() :
从缓存 onceJobIdToJobInfo 获取 jobInfo FlinkJobManager ->> + SimpleFlinkJobManager : getStatus() :
从缓存 onceJobs 中获取 simpleOnceJob SimpleFlinkJobManager ->> + SimpleOnceJob : isCompleted() :调用 Note over SimpleOnceJob : 这个方法内部会调用 linkis 客户端获取 engineConn 状态 SimpleOnceJob -->> - SimpleFlinkJobManager : 返回状态 SimpleFlinkJobManager -->> - FlinkJobManager : 返回状态 status Note over FlinkJobManager : jobInfo 设置状态 status FlinkJobManager -->> - TaskService : 返回 jobInfo Note right of TaskService : 更新 task 的 更新时间和状态
task 状态结束,jobInfo 中 completedMsg 非空
task 设置错误消息 errDesc
jobInfo json 序列化保存到 task 的 linkisJobInfo deactivate TaskService Note right of TaskService : 最后更新数据表 linkis_stream_task deactivate TaskService
此外类TaskMonitorService
中会启动定时任务调用com.webank.wedatasphere.streamis.jobmanager.manager.service.TaskService$#updateStreamTaskStatus
来获取状态。