streamis-作业执行过程源码
记录 streamis 作业执行流程
执行流程
flowchart TD 构建单次作业 --> 提交创建引擎 提交创建引擎 --> 获取引擎信息 获取引擎信息 --> 更新作业状态
构建一次性作业
访问接口
/streamis/streamJobManager/job/execute,从数据库中获取 job 信息,封装到StreamisTransformJob实例 中,再经过一系列的转换(Transform)处理(添加 labels、source、作业配置、launchConfig)后得到一个LaunchJob实例。这个实例,会有 linkis 的SimpleOnceJobBuilder转成一个SubmittableSimpleOnceJob对象,封装了 linkis 客户端和引擎创建 action:CreateEngineConnAction,此外还会将作业要执行的 sql 作为资源上传到 HDFS 便于引擎执行时获取。提交创建引擎
提交上文创建的
SubmittableSimpleOnceJob,通过 linkis 客户端执行请求CreateEngineConnAction,然后在一个循环中等待引擎就绪,得到一个engineConnId。linkis 启动引擎的过程中,会创建一个FlinkCodeOnceExecutor实例的执行器,这个执行器内部会存储 yarn application id 和 作业所在 node manager 的地址。保存engineConnId和SubmittableSimpleOnceJob实例的映射到缓存onceJobs中。获取引擎信息
内部创建一个
EngineConnOperateAction,通过 linkis 客户端请求,拿到上面的 yarn application id 和 node manger 地址,与engineConnId、提交用户、ECM 实例 等封装一个FlinkJobInfo实例中。保存engineConnId和FlinkJobInfo实例的映射到缓存onceJobIdToJobInfo中。更新作业状态
类
TaskMonitorService中有个定时任务,从数据库中获取未完成的任务,根据任务的engineConnId从映射中拿到SubmittableSimpleOnceJob实例,内部会创建一个GetEngineConnAction,有 linkis 客户端发起请求,获取节点信息,里面包含了引擎的状态。
任务执行 API 入口
sequenceDiagram
activate JobRestfulApi
JobRestfulApi ->> JobRestfulApi : 判断用户对于作业的权限
JobRestfulApi ->> + TaskService : executeJob()
TaskService ->> TaskService : 根据 jobId 从表 linkis_stream_job 获取 job
TaskService ->> TaskService : 根据 jobId 从表 linkis_stream_job_version 获取最新版本 jobVersion
TaskService ->> TaskService : 创建 task 对象,并保存到表 linkis_stream_task
Note over TaskService : 找到满足条件的 Builder:
StreamisFlinkTransformJobBuilder
调用 build() 方法
TaskService ->> + AbstractFlinkStreamisTransformJobBuilder : build():
调用符合条件的实现类
将 StreamJob 转成 StreamisTransformJob
AbstractFlinkStreamisTransformJobBuilder ->> AbstractFlinkStreamisTransformJobBuilder : 通过 configurationService 获取任务配置
内部按照不同类型将配置分类
通过 streamJobMapper 获取版本
Note over FlinkSQLJobContentParser : 不同版本的作业,作业内容可能不一样
AbstractFlinkStreamisTransformJobBuilder ->> + FlinkSQLJobContentParser : parseTo():从作业版本中获取作业内容
Note over FlinkSQLJobContentParser : 根据任务类型获取作业内容
1. file 类型调用 getFileContent(...)
2. bml 类型调用 readFileFromBML(...) 3. sql 类型直接返回 sql 字符串
FlinkSQLJobContentParser -->> - AbstractFlinkStreamisTransformJobBuilder : 返回 StreamisSqlTransformJobContent 对象
封装了任务内容(需要执行的 SQL)
AbstractFlinkStreamisTransformJobBuilder ->> AbstractFlinkStreamisTransformJobBuilder : 设置 engineConn 类型(flink-1.12.2)和运行类型(sql)
AbstractFlinkStreamisTransformJobBuilder -->> - TaskService : 返回 transformJob
Note over TaskService : 接下来,将 transformJob 转成 launchJob
TaskService ->> + Transform : transform()
Note over TaskService, Transform : 这里调用了一个 foldLeft 方法,
效果是从数组的最后一个 Transform 依次作用在 transformJob 上面,
并将结果作为下次 transform 的输入。
Transform -->> - TaskService : 返回 launchJob
Note over TaskService : 接下来通过 linkisJobManager 启动 launchJob
通过反射获取 LinkisJobManager 接口的实现类,
接口中有个 getName 方法。
内部会构建一个 name -> 实现类的映射。
这里根据 name=simpleFlink 获取到的实现类是 SimpleFlinkJobManager
Note over TaskService : 具体见“启动LaunchJob”
TaskService -->> - JobRestfulApi : 执行结束返回
note over JobRestfulApi : 返回成功消息
deactivate JobRestfulApi
Transform 的作用是给 launchJob 添加 labels、source、作业配置、launchConfig。
classDiagram
class Transform {
<>
+ transform(...)
}
Transform<|..StreamisJobContentTransform
Transform<|..ConfigTransform
Transform<|..LabelsStreamisCodeTransform
Transform<|..SourceTransform
Transform<|..FlinkJarStreamisStartupParamsTransform
Transform<|..LaunchConfigTransform
class StreamisJobContentTransform {
<>
+ transform(...)
+ transformJobContent(...)
}
StreamisJobContentTransform<|..SqlStreamisJobContentTransform
StreamisJobContentTransform<|..FlinkJarStreamisJobContentTransform
class ConfigTransform {
<>
+ transform(...)
+ transform(...)
}
ConfigTransform<|..FlinkCheckpointConfigTransform
ConfigTransform<|..ResourceConfigTransform
class ResourceConfigTransform{
+ transform(...)
}
ResourceConfigTransform<|..ExtraConfigTransform
LaunchJob 示例
1 | LaunchJob( |
launchConfig 由于在 toString 方法中没有实现,所以日志中不会打印
启动 LaunchJob
启动 LaunchJob 内部先是构建了一个单次作业 onceJob,再提交
com.webank.wedatasphere.streamis.jobmanager.launcher.linkis.manager.FlinkJobManager#launch
1 | override def launch(job: LaunchJob): String = { |
构建单次作业
sequenceDiagram
activate TaskService
TaskService ->> + FlinkJobManager : launch()
FlinkJobManager ->> FlinkJobManager : 判断标签 engineType 是否 flink 开头
FlinkJobManager ->> + SimpleFlinkJobManager : buildOnceJob():
构建单次作业
SimpleFlinkJobManager ->> SimpleFlinkJobManager : 初始化 SimpleOnceJobBuilder 对象
rect rgb(121, 216, 206)
SimpleFlinkJobManager ->> + SimpleOnceJobBuilder : build()
SimpleOnceJobBuilder ->> SimpleOnceJobBuilder :
1. 校验 labels/jobContent 是否为空
2. 设置 params/source
3. 配置启动参数
4. TODO 还有些细节要看下
activate SimpleOnceJobBuilder
Note right of SimpleOnceJobBuilder : 调用方法 getOnceExecutorContent()
SimpleOnceJobBuilder ->> SimpleOnceJobBuilder : 构建 OnceExecutorContent 对象,并初始化
SimpleOnceJobBuilder ->> + OnceExecutorContentUtils : contentToMap() :
将 onceExecutorContent
转成 contentMap
OnceExecutorContentUtils -->> - SimpleOnceJobBuilder : 返回 contentMap
SimpleOnceJobBuilder ->> + HttpBmlClient : uploadResource():
json 序列化 contentMap 对象上传
HttpBmlClient -->> - SimpleOnceJobBuilder : 返回响应 response 包含资源 ID 和版本
SimpleOnceJobBuilder ->> SimpleOnceJobBuilder : 将 response 封装成 BmlResource
SimpleOnceJobBuilder ->> + OnceExecutorContentUtils : resourceToValue() :
将 BmlResource 转成资源 ID
OnceExecutorContentUtils -->> - SimpleOnceJobBuilder : 返回资源 ID
deactivate SimpleOnceJobBuilder
SimpleOnceJobBuilder ->> SimpleOnceJobBuilder : 初始化 CreateEngineConnAction.Builder 对象
SimpleOnceJobBuilder ->> + CreateEngineConnAction.Builder : build()
CreateEngineConnAction.Builder -->> - SimpleOnceJobBuilder : 返回对象 createEngineConnAction
SimpleOnceJobBuilder ->> SimpleOnceJobBuilder : 封装一个 SubmittableSimpleOnceJob 实例
Note over SimpleOnceJobBuilder : 这个实例中包含有 linkisClient,以及 createEngineConnAction
linkisCLient 内部有个成员 dwsHttpClient 后续与 linkis 接口的交互通过它执行
SimpleOnceJobBuilder -->> - SimpleFlinkJobManager : 返回实例
end
SimpleFlinkJobManager -->> - FlinkJobManager : 返回实例 onceJob
Note over FlinkJobManager : 执行 onceJob.submit() 方法
具体见“提交创建引擎”
FlinkJobManager -->> - TaskService : 返回 linkisJobId
deactivate TaskService
createEngineConnAction.getRequestPayload
1 | { |
接下来,会将上面的 SubmittableSimpleOnceJob 通过 linkis 客户端提交给 linkis,linkis 则会创建一个 EngineConn。
获取引擎信息
sequenceDiagram
activate FlinkJobManager
FlinkJobManager ->> + AbstractSubmittableLinkisJob : submit() 提交 onceJob
AbstractSubmittableLinkisJob ->> + SubmittableSimpleOnceJob : doSubmit()
Note over AbstractSubmittableLinkisJob : 这里与之前创建的 SubmittableSimpleOnceJob 对上了
SubmittableSimpleOnceJob ->> + LinkisManagerClientImpl : createEngineConn()
Note over LinkisManagerClientImpl : 这个方法内部会发起 http 请求 linkis 接口
返回一个 CreateEngineConnResult 实例
LinkisManagerClientImpl -->> - SubmittableSimpleOnceJob : 返回 nodeInfo
SubmittableSimpleOnceJob ->> SubmittableSimpleOnceJob : 从 nodeInfo 中获取 serviceInstance、ticketId、ecmServiceInstance、lastEngineConnState
SubmittableSimpleOnceJob ->> SimpleOnceJob : initOnceOperatorActions():在数组 operatorActions 中添加一个 action
这个 action 给 OnceJobOperator 设置 user/ticketId/serviceInstance/linkisManagerClient
Note over SimpleOnceJob : TODO action 的作用
rect rgb(100, 118, 135)
alt 调用方法 isCompleted 判断状态 lastEngineConnState 未完成 且 未启动
Note over SimpleOnceJob : isCompleted() 方法中在对于 engineConn 结束和运行
分别回调 onJobFlinished / onJobRunning
rect rgb(255, 230, 204)
loop 状态 未完成 且 未启动
SubmittableSimpleOnceJob ->> + SimpleOnceJob : isCompleted
SimpleOnceJob ->> + OnceJob : getNodeInfo() :通过 linkis 客户端获取
ServiceInstance 对应的 EngineConn 信息
OnceJob -->> - SimpleOnceJob : 返回 EngineConnNode
SimpleOnceJob -->> - SubmittableSimpleOnceJob : 从 EngineConnNode 获取状态,返回 isCompleted() 结果作为状态
end
end
SubmittableSimpleOnceJob ->> + SimpleOnceJob : transformToId()
SimpleOnceJob -->> - SubmittableSimpleOnceJob : 设置 engineConnId
else
SubmittableSimpleOnceJob ->> + SimpleOnceJob : transformToId()
SimpleOnceJob -->> - SubmittableSimpleOnceJob : 设置 engineConnId
end
end
SubmittableSimpleOnceJob -->> - AbstractSubmittableLinkisJob : 无返回值
Note over AbstractSubmittableLinkisJob : 具体见“提交创建引擎后回调”
AbstractSubmittableLinkisJob -->> - FlinkJobManager : 无返回值
FlinkJobManager ->> FlinkJobManager : 以 engineConnId 作为 key
onceJob 作为 value
放到缓存 onceJobs 中
Note over FlinkJobManager : 具体见“通知引擎执行作业”
deactivate FlinkJobManager
这里逻辑就是通过 linkis 客户端向 linkis 发送一个 createEngineConnAction 用于创建一个 engineConn,然后一直等待直到这个 engineConn 结束或者 运行起来。
这里请求的 linkis 接口地址中路径是 linkisManager/createEngineConn,对应的执行逻辑入口是 org.apache.linkis.manager.am.restful.EngineRestfulApi#createEngineConn
AbstractSubmittableLinkisJob 在提交之后,还有个指标 jobMetrics 和回调(onJobSubmitted()),见下文。
engineConnId 的格式
1 | engineConnId = s"${ticketId.length}_${serviceInstance.getApplicationName.length}_${ticketId}${serviceInstance.getApplicationName}${serviceInstance.getInstance}" |
nodeInfo 中包含的信息,看下日志,engineConnId 几乎包含了。
1 | EngineConn created with status Running, the nodeInfo is {ecmServiceInstance={instance=10-177-198-114.os |
linkis 客户端请求创建EngineConn
org.apache.linkis.computation.client.once.simple.SubmittableSimpleOnceJob#doSubmit
1 | override protected def doSubmit(): Unit = { |
其中linkisManagerClient.createEngineConn(createEngineConnAction)最终会调用到org.apache.linkis.httpclient.AbstractHttpClient#execute,这里的逻辑在源码-linkis 之BML物料上传与下载有做介绍。
请求的接口路径是这样确定的
1 | protected def prepareReq(requestAction: HttpAction): HttpRequestBase = { |
对于CreateEngineConnAction,getURL方法实现是
1 | trait DWSHttpAction { |
url 是由前缀和后缀两部分组成
前缀是
"/api/" + getRestType + "/" + dwsVersion,其中 restType 是rest_j,dwsVersion由配置项wds.linkis.web.version指定,默认值是v1后缀
suffixURLs在CreateEngineConnAction中有重载1
2
3
4class CreateEngineConnAction extends POSTAction with LinkisManagerAction {
override def getRequestPayload: String = DWSHttpClient.jacksonJson.writeValueAsString(getRequestPayloads)
override def suffixURLs: Array[String] = Array("linkisManager", "createEngineConn")
}
综上,请求的接口地址路径是api/rest_j/v1/linkisManager/createEngineConn。这个地址在 linkis 的处理入口是org.apache.linkis.manager.am.restful.EngineRestfulApi#createEngineConn
1 |
|
此外还有些有意思的地方,比如
响应类型,是根据请求的 url 来确定:在响应对象上定义了一个注解,注解值是对应的请求 url,通过反射获取到 url 和 响应类的映射关系后,从映射关系中获取响应类型。例如,
1
2("/api/rest_j/v\\d+/linkisManager/createEngineConn")
class CreateEngineConnResult extends GetEngineConnResult
提交创建引擎后回调
sequenceDiagram
activate AbstractSubmittableLinkisJob
AbstractSubmittableLinkisJob ->> + LinkisJobMetrics :
Note over TaskService, LinkisJobMetrics : 创建 LinkisJobMetrics 对象,
参数是上面创建的 engineConnId
LinkisJobMetrics -->> - AbstractSubmittableLinkisJob : 返回 jobMetrics
AbstractSubmittableLinkisJob ->> + AbstractLinkisJob : getJobListeners():获取作业监听器
Note over AbstractLinkisJob : 这里的监听器应该是空的
方法 addJobListener 貌似没有被调用
AbstractLinkisJob -->> - AbstractSubmittableLinkisJob : 返回监听器
Note over AbstractSubmittableLinkisJob : 调用监听器的 onJobSubmitted 方法
deactivate AbstractSubmittableLinkisJob
获取引擎信息
sequenceDiagram
activate FlinkJobManager
FlinkJobManager ->> FlinkJobManager : 将 onceJob 保存到缓存 onceJobs 中
FlinkJobManager ->> + SimpleFlinkJobManager : createJobInfo()
SimpleFlinkJobManager ->> + OnceJob : getNodeInfo : 获取 engineConn 信息
OnceJob -->> - SimpleFlinkJobManager : 返回 nodeInfo
SimpleFlinkJobManager ->> SimpleFlinkJobManager : 创建一个 FlinkJobInfo 对象 jobInfo
activate SimpleFlinkJobManager
Note right of SimpleFlinkJobManager : 调用方法 fatchApplicationInfo()
获取 yarn 作业信息
Note right of SimpleFlinkJobManager : 根据 operator 名字获取 operator
EngineConnApplicationInfoOperator
Note right of SimpleFlinkJobManager : 将前面设置的 action 依次作用到
EngineConnApplicationInfoOperator
rect rgb(121, 216, 206)
loop 可以重试
SimpleFlinkJobManager ->> + OnceJobOperator : apply()
OnceJobOperator ->> OnceJobOperator : 构建 EngineConnOperateAction.Builder 对象
Note over OnceJobOperator : 从这个 builder 的类型
可以看出它是用来构建 EngineConn 执行操作的构建器
OnceJobOperator ->> + EngineConnOperateAction.Builder : build()
EngineConnOperateAction.Builder -->> - OnceJobOperator : 返回 EngineConnOperateAction 实例
engineConnOperateAction
OnceJobOperator ->> + LinkisManagerClientImpl : executeEngineConnOperation()
Note over LinkisManagerClientImpl : 通过 linkis 客户端,提交 engineConnOperateAction
这里引擎开始执行任务
LinkisManagerClientImpl -->> - OnceJobOperator : 返回执行结果 result,结果对象中有 applicationId,applicationUrl
OnceJobOperator ->> + EngineConnApplicationInfoOperator : resultToObject():将返回的结果 result 转成 ApplicationInfo
EngineConnApplicationInfoOperator -->> - OnceJobOperator : 返回 ApplicationInfo 对象(包含:applicationId、applicationUrl、queue)
OnceJobOperator -->> - SimpleFlinkJobManager :返回 applicationInfo 对象
end
end
SimpleFlinkJobManager ->> SimpleFlinkJobManager : 从 applicationInfo 对象中获取
applcaitionId,applicationUrl 设置 jobInfo
deactivate SimpleFlinkJobManager
SimpleFlinkJobManager ->> SimpleFlinkJobManager : jobInfo 设置 resource
SimpleFlinkJobManager -->> - FlinkJobManager : 返回 jobInfo
FlinkJobManager ->> FlinkJobManager : 存放到缓存 onceJobIdToJobInfo
返回 engineConnId
deactivate FlinkJobManager
这里请求的 linkis 接口地址中路径是 linkisManager/executeEngineConnOperation,对应的执行逻辑入口是 org.apache.linkis.manager.am.restful.EngineRestfulApi#executeEngineConnOperation
更新作业状态
sequenceDiagram
activate TaskService
activate TaskService
Note right of TaskService : 调用 updateStreamTaskStatus()
这个方法内部会判断 task
是否在缓存 onceJobIdToJobInfo 中
TaskService ->> + FlinkJobManager : getJobInfo() :
从缓存 onceJobIdToJobInfo 获取 jobInfo
FlinkJobManager ->> + SimpleFlinkJobManager : getStatus() :
从缓存 onceJobs 中获取 simpleOnceJob
SimpleFlinkJobManager ->> + SimpleOnceJob : isCompleted() :调用
Note over SimpleOnceJob : 这个方法内部会调用 linkis 客户端获取 engineConn 状态
SimpleOnceJob -->> - SimpleFlinkJobManager : 返回状态
SimpleFlinkJobManager -->> - FlinkJobManager : 返回状态 status
Note over FlinkJobManager : jobInfo 设置状态 status
FlinkJobManager -->> - TaskService : 返回 jobInfo
Note right of TaskService : 更新 task 的 更新时间和状态
task 状态结束,jobInfo 中 completedMsg 非空
task 设置错误消息 errDesc
jobInfo json 序列化保存到 task 的 linkisJobInfo
deactivate TaskService
Note right of TaskService : 最后更新数据表 linkis_stream_task
deactivate TaskService
此外类TaskMonitorService中会启动定时任务调用com.webank.wedatasphere.streamis.jobmanager.manager.service.TaskService$#updateStreamTaskStatus来获取状态。