Skip to content

单节点执行

韩数 edited this page Dec 6, 2023 · 2 revisions

单节点执行功能版本要求: bamboo-pipeline >= 3.29.0

配置

1. 配置单节点执行增强包

INSTALLED_APPS += (
  ...
  "pipeline.contrib.plugin_execute"
  ...
)

2. 执行 migrate

python manage.py migrate plugin_execute
  1. settings配置单节点执行的执行队列
# 不配置默认值为 `plugin_execute`
PLUGIN_EXECUTE_QUEUE = "default"

4. 启动单节点执行worker

python manage.py celery worker -Q plugin_execute -l info 

节点状态说明

单节点执行的状态机在bamboo-engine 的基础上做了简化,保留四种状态,分别是:

  • READY 准备执行
  • RUNNING 正在执行
  • FINISHED 结束
  • FAILED 失败

API 说明

启动某个插件

from pipeline.contrib.plugin_execute import api

# 插件 code
component_code = "sleep_timer"

# 插件 version
version = "legacy"

# 插件 inputs, 对应 data.get_one_of_inputs("bk_timing")
inputs = {
	"bk_timing": 10	  
}

# 插件 contexts, 会被 parent_data 引用到 parent_data.inputs.project_id、
contexts = {
    "project_id": 1
}

# 会注入到 service 对象的熟悉,可以通过 self 拿到,例如 self.name
runtime_attrs = {
	"name": "hhh"
}

result = api.run(component_code, version, inputs, contexts, runtime_attrs)
task_id = result.data

查询任务的状态

from pipeline.contrib.plugin_execute import api

task_id = 1

result = api.get_state(task_id)

state = result.data

state 的返回示例:

{  
	"task_id": 1,  
	"state": "FINISHED",  
	"component_code": "sleep_timer",  
	"version": "legacy",  
	"invoke_count": 1,  
	"inputs": {  
		"bk_timing": 10  
	},  
	"outputs": {},  
	"contexts": {  
		"project_id": 1  
	},  
	"callback_data": {},
	"runtime_attrs": {  
		"name": "hhhh"  
	},  
	"create_at": "创建时间",  
	"finish_at": "完成时间",  
}

回调执行任务

from pipeline.contrib.plugin_execute import api
task_id = 1
callback_data = {}
result = api.callback(task_id, callback_data)

强制失败任务

只有正在运行的任务才允许强制失败

from pipeline.contrib.plugin_execute import api
task_id = 1
result = api.forced_fail(task_id)

重试任务

只允许重试处于失败状态中的任务,重试 invoke_count 会+1。

from pipeline.contrib.plugin_execute import api

task_id = 1

# 插件 inputs, 对应 data.get_one_of_inputs("bk_timing")
inputs = {
	"bk_timing": 10	  
}

# 插件 contexts, 会被 parent_data 引用到 parent_data.inputs.project_id、
contexts = {
    "project_id": 1
}

# 会注入到 service 对象的熟悉,可以通过 self 拿到,例如 self.name
runtime_attrs = {
	"name": "hhh"
}

result = api.retry(task_id, inputs, contexts, runtime_attrs)