streamis-作业执行过程源码

记录 streamis 作业执行流程

执行流程

flowchart TD
构建单次作业 --> 提交创建引擎
提交创建引擎 --> 获取引擎信息
获取引擎信息 --> 更新作业状态
  1. 构建一次性作业

    访问接口/streamis/streamJobManager/job/execute,从数据库中获取 job 信息,封装到StreamisTransformJob实例 中,再经过一系列的转换(Transform)处理(添加 labels、source、作业配置、launchConfig)后得到一个LaunchJob实例。这个实例,会有 linkis 的 SimpleOnceJobBuilder 转成一个 SubmittableSimpleOnceJob 对象,封装了 linkis 客户端和引擎创建 action:CreateEngineConnAction,此外还会将作业要执行的 sql 作为资源上传到 HDFS 便于引擎执行时获取。

  2. 提交创建引擎

    提交上文创建的 SubmittableSimpleOnceJob,通过 linkis 客户端执行请求CreateEngineConnAction,然后在一个循环中等待引擎就绪,得到一个engineConnId。linkis 启动引擎的过程中,会创建一个FlinkCodeOnceExecutor实例的执行器,这个执行器内部会存储 yarn application id 和 作业所在 node manager 的地址。保存engineConnIdSubmittableSimpleOnceJob实例的映射到缓存onceJobs中。

  3. 获取引擎信息

    内部创建一个EngineConnOperateAction,通过 linkis 客户端请求,拿到上面的 yarn application id 和 node manger 地址,与 engineConnId、提交用户、ECM 实例 等封装一个 FlinkJobInfo 实例中。保存engineConnIdFlinkJobInfo实例的映射到缓存onceJobIdToJobInfo中。

  4. 更新作业状态

    TaskMonitorService 中有个定时任务,从数据库中获取未完成的任务,根据任务的 engineConnId从映射中拿到SubmittableSimpleOnceJob实例,内部会创建一个GetEngineConnAction,有 linkis 客户端发起请求,获取节点信息,里面包含了引擎的状态。

任务执行 API 入口

sequenceDiagram
activate JobRestfulApi
JobRestfulApi ->> JobRestfulApi : 判断用户对于作业的权限
JobRestfulApi ->> + TaskService : executeJob()
    TaskService ->> TaskService : 根据 jobId 从表 linkis_stream_job 获取 job
    TaskService ->> TaskService : 根据 jobId 从表 linkis_stream_job_version 获取最新版本 jobVersion
    TaskService ->> TaskService : 创建 task 对象,并保存到表 linkis_stream_task
    Note over TaskService : 找到满足条件的 Builder: 
StreamisFlinkTransformJobBuilder
调用 build() 方法 TaskService ->> + AbstractFlinkStreamisTransformJobBuilder : build():
调用符合条件的实现类
将 StreamJob 转成 StreamisTransformJob AbstractFlinkStreamisTransformJobBuilder ->> AbstractFlinkStreamisTransformJobBuilder : 通过 configurationService 获取任务配置
内部按照不同类型将配置分类
通过 streamJobMapper 获取版本 Note over FlinkSQLJobContentParser : 不同版本的作业,作业内容可能不一样 AbstractFlinkStreamisTransformJobBuilder ->> + FlinkSQLJobContentParser : parseTo():从作业版本中获取作业内容 Note over FlinkSQLJobContentParser : 根据任务类型获取作业内容
1. file 类型调用 getFileContent(...)
2. bml 类型调用 readFileFromBML(...) 3. sql 类型直接返回 sql 字符串 FlinkSQLJobContentParser -->> - AbstractFlinkStreamisTransformJobBuilder : 返回 StreamisSqlTransformJobContent 对象
封装了任务内容(需要执行的 SQL) AbstractFlinkStreamisTransformJobBuilder ->> AbstractFlinkStreamisTransformJobBuilder : 设置 engineConn 类型(flink-1.12.2)和运行类型(sql) AbstractFlinkStreamisTransformJobBuilder -->> - TaskService : 返回 transformJob Note over TaskService : 接下来,将 transformJob 转成 launchJob TaskService ->> + Transform : transform() Note over TaskService, Transform : 这里调用了一个 foldLeft 方法,
效果是从数组的最后一个 Transform 依次作用在 transformJob 上面,
并将结果作为下次 transform 的输入。 Transform -->> - TaskService : 返回 launchJob Note over TaskService : 接下来通过 linkisJobManager 启动 launchJob

通过反射获取 LinkisJobManager 接口的实现类,
接口中有个 getName 方法。
内部会构建一个 name -> 实现类的映射。
这里根据 name=simpleFlink 获取到的实现类是 SimpleFlinkJobManager Note over TaskService : 具体见“启动LaunchJob” TaskService -->> - JobRestfulApi : 执行结束返回 note over JobRestfulApi : 返回成功消息 deactivate JobRestfulApi

Transform 的作用是给 launchJob 添加 labels、source、作业配置、launchConfig。

classDiagram
class Transform {
<>
+ transform(...)
}
Transform<|..StreamisJobContentTransform
Transform<|..ConfigTransform
Transform<|..LabelsStreamisCodeTransform
Transform<|..SourceTransform
Transform<|..FlinkJarStreamisStartupParamsTransform
Transform<|..LaunchConfigTransform



class StreamisJobContentTransform {
<>
+ transform(...)
+ transformJobContent(...)
}
StreamisJobContentTransform<|..SqlStreamisJobContentTransform
StreamisJobContentTransform<|..FlinkJarStreamisJobContentTransform

class ConfigTransform {
<>
+ transform(...)
+ transform(...)
}
ConfigTransform<|..FlinkCheckpointConfigTransform
ConfigTransform<|..ResourceConfigTransform


class ResourceConfigTransform{
+ transform(...)
}
ResourceConfigTransform<|..ExtraConfigTransform

LaunchJob 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
LaunchJob(
submitUser: hadoop,
labels: {
userCreator=hadoop-Streamis, engineType=flink-1.12.2, engineConnMode=once
},
jobContent: {
code=SELECT 'linkis flink engine test!!!', runType=sql
},
params: {
configuration={
startup={
wds.linkis.flink.taskmanager.num=null,
wds.linkis.flink.jobmanager.cpus=null,
wds.linkis.flink.taskmanager.memory=null,
wds.linkis.flink.custom=null,
wds.linkis.flink.taskManager.cpus=null,
wds.linkis.flink.jobmanager.memory=null
}
}
},
source: {
workspace=null, project=demo, job=demo_flink_00
}
)

launchConfig 由于在 toString 方法中没有实现,所以日志中不会打印

启动 LaunchJob

启动 LaunchJob 内部先是构建了一个单次作业 onceJob,再提交

com.webank.wedatasphere.streamis.jobmanager.launcher.linkis.manager.FlinkJobManager#launch

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
override def launch(job: LaunchJob): String = {
job.getLabels.get(LabelKeyUtils.ENGINE_TYPE_LABEL_KEY) match {
case engineConnType: String =>
if(!engineConnType.toLowerCase.startsWith(FlinkJobManager.FLINK_ENGINE_CONN_TYPE))
throw new FlinkJobLaunchErrorException(30401, s"Only ${FlinkJobManager.FLINK_ENGINE_CONN_TYPE} job is supported to be launched to Linkis, but $engineConnType is found.")
case _ => throw new FlinkJobLaunchErrorException(30401, s"Not exists ${LabelKeyUtils.ENGINE_TYPE_LABEL_KEY}, StreamisJob cannot be submitted to Linkis successfully.")
}
val onceJob = buildOnceJob(job)
onceJob.submit()
onceJobs synchronized onceJobs.put(onceJob.getId, onceJob)
val linkisJobInfo = Utils.tryCatch(createJobInfo(onceJob, job)){ t =>
error(s"${job.getSubmitUser} create jobInfo failed, now stop this EngineConn ${onceJob.getId}.")
stop(onceJob)
throw t
}
onceJobs synchronized onceJobIdToJobInfo.put(onceJob.getId, linkisJobInfo)
onceJob.getId
}

构建单次作业

sequenceDiagram
activate TaskService
TaskService ->> + FlinkJobManager : launch()
    
    FlinkJobManager ->> FlinkJobManager : 判断标签 engineType 是否 flink 开头
    FlinkJobManager ->> + SimpleFlinkJobManager : buildOnceJob():
构建单次作业 SimpleFlinkJobManager ->> SimpleFlinkJobManager : 初始化 SimpleOnceJobBuilder 对象 rect rgb(121, 216, 206) SimpleFlinkJobManager ->> + SimpleOnceJobBuilder : build() SimpleOnceJobBuilder ->> SimpleOnceJobBuilder :
1. 校验 labels/jobContent 是否为空
2. 设置 params/source
3. 配置启动参数
4. TODO 还有些细节要看下 activate SimpleOnceJobBuilder Note right of SimpleOnceJobBuilder : 调用方法 getOnceExecutorContent() SimpleOnceJobBuilder ->> SimpleOnceJobBuilder : 构建 OnceExecutorContent 对象,并初始化 SimpleOnceJobBuilder ->> + OnceExecutorContentUtils : contentToMap() :
将 onceExecutorContent
转成 contentMap OnceExecutorContentUtils -->> - SimpleOnceJobBuilder : 返回 contentMap SimpleOnceJobBuilder ->> + HttpBmlClient : uploadResource():
json 序列化 contentMap 对象上传 HttpBmlClient -->> - SimpleOnceJobBuilder : 返回响应 response 包含资源 ID 和版本 SimpleOnceJobBuilder ->> SimpleOnceJobBuilder : 将 response 封装成 BmlResource SimpleOnceJobBuilder ->> + OnceExecutorContentUtils : resourceToValue() :
将 BmlResource 转成资源 ID OnceExecutorContentUtils -->> - SimpleOnceJobBuilder : 返回资源 ID deactivate SimpleOnceJobBuilder SimpleOnceJobBuilder ->> SimpleOnceJobBuilder : 初始化 CreateEngineConnAction.Builder 对象 SimpleOnceJobBuilder ->> + CreateEngineConnAction.Builder : build() CreateEngineConnAction.Builder -->> - SimpleOnceJobBuilder : 返回对象 createEngineConnAction SimpleOnceJobBuilder ->> SimpleOnceJobBuilder : 封装一个 SubmittableSimpleOnceJob 实例 Note over SimpleOnceJobBuilder : 这个实例中包含有 linkisClient,以及 createEngineConnAction
linkisCLient 内部有个成员 dwsHttpClient 后续与 linkis 接口的交互通过它执行 SimpleOnceJobBuilder -->> - SimpleFlinkJobManager : 返回实例 end SimpleFlinkJobManager -->> - FlinkJobManager : 返回实例 onceJob Note over FlinkJobManager : 执行 onceJob.submit() 方法
具体见“提交创建引擎” FlinkJobManager -->> - TaskService : 返回 linkisJobId deactivate TaskService

createEngineConnAction.getRequestPayload

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
{
"createService": "ServiceInstance(streamis-server, 10-177-198-114.ostream-test.dgtest01:9400)",
"ignoreTimeout": true,
"description": "demo for streamis",
"properties": {
"wds.linkis.flink.taskmanager.num": null,
"wds.linkis.flink.jobmanager.cpus": null,
"wds.linkis.flink.taskmanager.memory": null,
"wds.linkis.flink.taskManager.cpus": null,
"wds.linkis.flink.custom": null,
"label.codeType": "sql",
"wds.linkis.flink.jobmanager.memory": null,
"onceExecutorContent": "resource_036360c3c48-fc89-4a86-bafc-b21a9af830eav000001"
},
"labels": {
"userCreator": "hadoop-Streamis",
"engineType": "flink-1.12.2",
"engineConnMode": "once"
},
"timeOut": 300000
}

接下来,会将上面的 SubmittableSimpleOnceJob 通过 linkis 客户端提交给 linkis,linkis 则会创建一个 EngineConn。

获取引擎信息

sequenceDiagram
activate FlinkJobManager
    FlinkJobManager ->> + AbstractSubmittableLinkisJob : submit() 提交 onceJob
    AbstractSubmittableLinkisJob ->> + SubmittableSimpleOnceJob : doSubmit()
    Note over AbstractSubmittableLinkisJob : 这里与之前创建的 SubmittableSimpleOnceJob 对上了
    SubmittableSimpleOnceJob ->> + LinkisManagerClientImpl : createEngineConn()
    Note over LinkisManagerClientImpl : 这个方法内部会发起 http 请求 linkis 接口
返回一个 CreateEngineConnResult 实例 LinkisManagerClientImpl -->> - SubmittableSimpleOnceJob : 返回 nodeInfo SubmittableSimpleOnceJob ->> SubmittableSimpleOnceJob : 从 nodeInfo 中获取 serviceInstance、ticketId、ecmServiceInstance、lastEngineConnState SubmittableSimpleOnceJob ->> SimpleOnceJob : initOnceOperatorActions():在数组 operatorActions 中添加一个 action
这个 action 给 OnceJobOperator 设置 user/ticketId/serviceInstance/linkisManagerClient Note over SimpleOnceJob : TODO action 的作用 rect rgb(100, 118, 135) alt 调用方法 isCompleted 判断状态 lastEngineConnState 未完成 且 未启动 Note over SimpleOnceJob : isCompleted() 方法中在对于 engineConn 结束和运行
分别回调 onJobFlinished / onJobRunning rect rgb(255, 230, 204) loop 状态 未完成 且 未启动 SubmittableSimpleOnceJob ->> + SimpleOnceJob : isCompleted SimpleOnceJob ->> + OnceJob : getNodeInfo() :通过 linkis 客户端获取
ServiceInstance 对应的 EngineConn 信息 OnceJob -->> - SimpleOnceJob : 返回 EngineConnNode SimpleOnceJob -->> - SubmittableSimpleOnceJob : 从 EngineConnNode 获取状态,返回 isCompleted() 结果作为状态 end end SubmittableSimpleOnceJob ->> + SimpleOnceJob : transformToId() SimpleOnceJob -->> - SubmittableSimpleOnceJob : 设置 engineConnId else SubmittableSimpleOnceJob ->> + SimpleOnceJob : transformToId() SimpleOnceJob -->> - SubmittableSimpleOnceJob : 设置 engineConnId end end SubmittableSimpleOnceJob -->> - AbstractSubmittableLinkisJob : 无返回值 Note over AbstractSubmittableLinkisJob : 具体见“提交创建引擎后回调” AbstractSubmittableLinkisJob -->> - FlinkJobManager : 无返回值 FlinkJobManager ->> FlinkJobManager : 以 engineConnId 作为 key
onceJob 作为 value
放到缓存 onceJobs 中 Note over FlinkJobManager : 具体见“通知引擎执行作业” deactivate FlinkJobManager

这里逻辑就是通过 linkis 客户端向 linkis 发送一个 createEngineConnAction 用于创建一个 engineConn,然后一直等待直到这个 engineConn 结束或者 运行起来。

这里请求的 linkis 接口地址中路径是 linkisManager/createEngineConn,对应的执行逻辑入口是 org.apache.linkis.manager.am.restful.EngineRestfulApi#createEngineConn

AbstractSubmittableLinkisJob 在提交之后,还有个指标 jobMetrics 和回调(onJobSubmitted()),见下文。

engineConnId 的格式

1
2
3
4
5
6
7
8
engineConnId = s"${ticketId.length}_${serviceInstance.getApplicationName.length}_${ticketId}${serviceInstance.getApplicationName}${serviceInstance.getInstance}"

//例如:
//36_20_05443308-f227-442e-9b20-8d291e1fa7d1linkis-cg-engineconn10-177-198-114.ostream-test.dgtest01:37244
//其中:
// ticketId = 05443308-f227-442e-9b20-8d291e1fa7d1
// applicationName = linkis-cg-engineconn
// serviceInstance = 10-177-198-114.ostream-test.dgtest01:37244

nodeInfo 中包含的信息,看下日志,engineConnId 几乎包含了。

1
2
3
EngineConn created with status Running, the nodeInfo is {ecmServiceInstance={instance=10-177-198-114.os
tream-test.dgtest01:9112, applicationName=linkis-cg-engineconnmanager}, serviceInstance={instance=10-177-198-114.ostream-test.dgtest01:40698, applicationName=linkis-cg-engineconn}, nodeStatus=Running, ticketId=a3f69955-02a1-4973-9325-5b
f33f78b9d6}.

linkis 客户端请求创建EngineConn

org.apache.linkis.computation.client.once.simple.SubmittableSimpleOnceJob#doSubmit

1
2
3
4
5
override protected def doSubmit(): Unit = {
info(s"Ready to create a engineConn: ${createEngineConnAction.getRequestPayload}.")
val nodeInfo = linkisManagerClient.createEngineConn(createEngineConnAction)
//...
}

其中linkisManagerClient.createEngineConn(createEngineConnAction)最终会调用到org.apache.linkis.httpclient.AbstractHttpClient#execute,这里的逻辑在源码-linkis 之BML物料上传与下载有做介绍。

请求的接口路径是这样确定的

1
2
3
4
5
6
7
8
9
10
protected def prepareReq(requestAction: HttpAction): HttpRequestBase = {
var realURL = ""
requestAction match {
case serverUrlAction: ServerUrlAction =>
realURL = connectUrl(serverUrlAction.serverUrl, requestAction.getURL)
case _ =>
realURL = getRequestUrl(requestAction.getURL, requestAction.getRequestBody)
}
//...
}

对于CreateEngineConnActiongetURL方法实现是

1
2
3
4
5
6
7
8
trait DWSHttpAction {
private var dwsVersion: String = _
def setDWSVersion(dwsVersion: String): Unit = this.dwsVersion = dwsVersion
def getPrefixURL: String = "/api/" + getRestType + "/" + dwsVersion
protected def getRestType: RestType = RestType.JERSEY
def suffixURLs: Array[String]
def getURL: String = getPrefixURL + "/" + suffixURLs.mkString("/")
}

url 是由前缀和后缀两部分组成

  1. 前缀是 "/api/" + getRestType + "/" + dwsVersion,其中 restType 是rest_jdwsVersion由配置项wds.linkis.web.version指定,默认值是v1

  2. 后缀suffixURLsCreateEngineConnAction中有重载

    1
    2
    3
    4
    class CreateEngineConnAction extends POSTAction with LinkisManagerAction {
    override def getRequestPayload: String = DWSHttpClient.jacksonJson.writeValueAsString(getRequestPayloads)
    override def suffixURLs: Array[String] = Array("linkisManager", "createEngineConn")
    }

综上,请求的接口地址路径是api/rest_j/v1/linkisManager/createEngineConn。这个地址在 linkis 的处理入口是org.apache.linkis.manager.am.restful.EngineRestfulApi#createEngineConn

1
2
3
4
5
6
7
8
9
10
11
12
@RequestMapping(path = "/linkisManager", produces = {"application/json"})
@RestController
public class EngineRestfulApi {
//...
@RequestMapping(path = "/createEngineConn", method = RequestMethod.POST)
public Message createEngineConn( HttpServletRequest req, @RequestBody JsonNode jsonNode)
throws IOException, InterruptedException {
//...
}
//...
}

此外还有些有意思的地方,比如

  1. 响应类型,是根据请求的 url 来确定:在响应对象上定义了一个注解,注解值是对应的请求 url,通过反射获取到 url 和 响应类的映射关系后,从映射关系中获取响应类型。例如,

    1
    2
    @DWSHttpMessageResult("/api/rest_j/v\\d+/linkisManager/createEngineConn")
    class CreateEngineConnResult extends GetEngineConnResult

提交创建引擎后回调

sequenceDiagram 
activate AbstractSubmittableLinkisJob
    AbstractSubmittableLinkisJob ->> + LinkisJobMetrics : 
    Note over TaskService, LinkisJobMetrics : 创建 LinkisJobMetrics 对象, 
参数是上面创建的 engineConnId LinkisJobMetrics -->> - AbstractSubmittableLinkisJob : 返回 jobMetrics AbstractSubmittableLinkisJob ->> + AbstractLinkisJob : getJobListeners():获取作业监听器 Note over AbstractLinkisJob : 这里的监听器应该是空的
方法 addJobListener 貌似没有被调用 AbstractLinkisJob -->> - AbstractSubmittableLinkisJob : 返回监听器 Note over AbstractSubmittableLinkisJob : 调用监听器的 onJobSubmitted 方法 deactivate AbstractSubmittableLinkisJob

获取引擎信息

sequenceDiagram
activate FlinkJobManager
    FlinkJobManager ->> FlinkJobManager : 将 onceJob 保存到缓存 onceJobs 中
    FlinkJobManager ->> + SimpleFlinkJobManager : createJobInfo()
    
    SimpleFlinkJobManager ->> + OnceJob : getNodeInfo : 获取 engineConn 信息
    OnceJob -->> - SimpleFlinkJobManager : 返回 nodeInfo
    SimpleFlinkJobManager ->> SimpleFlinkJobManager : 创建一个 FlinkJobInfo 对象 jobInfo
    
    activate SimpleFlinkJobManager
    Note right of SimpleFlinkJobManager : 调用方法 fatchApplicationInfo() 
获取 yarn 作业信息 Note right of SimpleFlinkJobManager : 根据 operator 名字获取 operator
EngineConnApplicationInfoOperator Note right of SimpleFlinkJobManager : 将前面设置的 action 依次作用到
EngineConnApplicationInfoOperator rect rgb(121, 216, 206) loop 可以重试 SimpleFlinkJobManager ->> + OnceJobOperator : apply() OnceJobOperator ->> OnceJobOperator : 构建 EngineConnOperateAction.Builder 对象 Note over OnceJobOperator : 从这个 builder 的类型
可以看出它是用来构建 EngineConn 执行操作的构建器 OnceJobOperator ->> + EngineConnOperateAction.Builder : build() EngineConnOperateAction.Builder -->> - OnceJobOperator : 返回 EngineConnOperateAction 实例
engineConnOperateAction OnceJobOperator ->> + LinkisManagerClientImpl : executeEngineConnOperation() Note over LinkisManagerClientImpl : 通过 linkis 客户端,提交 engineConnOperateAction
这里引擎开始执行任务 LinkisManagerClientImpl -->> - OnceJobOperator : 返回执行结果 result,结果对象中有 applicationId,applicationUrl OnceJobOperator ->> + EngineConnApplicationInfoOperator : resultToObject():将返回的结果 result 转成 ApplicationInfo EngineConnApplicationInfoOperator -->> - OnceJobOperator : 返回 ApplicationInfo 对象(包含:applicationId、applicationUrl、queue) OnceJobOperator -->> - SimpleFlinkJobManager :返回 applicationInfo 对象 end end SimpleFlinkJobManager ->> SimpleFlinkJobManager : 从 applicationInfo 对象中获取
applcaitionId,applicationUrl 设置 jobInfo deactivate SimpleFlinkJobManager SimpleFlinkJobManager ->> SimpleFlinkJobManager : jobInfo 设置 resource SimpleFlinkJobManager -->> - FlinkJobManager : 返回 jobInfo FlinkJobManager ->> FlinkJobManager : 存放到缓存 onceJobIdToJobInfo
返回 engineConnId deactivate FlinkJobManager

这里请求的 linkis 接口地址中路径是 linkisManager/executeEngineConnOperation,对应的执行逻辑入口是 org.apache.linkis.manager.am.restful.EngineRestfulApi#executeEngineConnOperation

更新作业状态

sequenceDiagram

activate TaskService 
    activate TaskService
    Note right of TaskService : 调用 updateStreamTaskStatus()
这个方法内部会判断 task
是否在缓存 onceJobIdToJobInfo 中 TaskService ->> + FlinkJobManager : getJobInfo() :
从缓存 onceJobIdToJobInfo 获取 jobInfo FlinkJobManager ->> + SimpleFlinkJobManager : getStatus() :
从缓存 onceJobs 中获取 simpleOnceJob SimpleFlinkJobManager ->> + SimpleOnceJob : isCompleted() :调用 Note over SimpleOnceJob : 这个方法内部会调用 linkis 客户端获取 engineConn 状态 SimpleOnceJob -->> - SimpleFlinkJobManager : 返回状态 SimpleFlinkJobManager -->> - FlinkJobManager : 返回状态 status Note over FlinkJobManager : jobInfo 设置状态 status FlinkJobManager -->> - TaskService : 返回 jobInfo Note right of TaskService : 更新 task 的 更新时间和状态
task 状态结束,jobInfo 中 completedMsg 非空
task 设置错误消息 errDesc
jobInfo json 序列化保存到 task 的 linkisJobInfo deactivate TaskService Note right of TaskService : 最后更新数据表 linkis_stream_task deactivate TaskService

此外类TaskMonitorService中会启动定时任务调用com.webank.wedatasphere.streamis.jobmanager.manager.service.TaskService$#updateStreamTaskStatus来获取状态。