CallbackData DoesNotExist 错误原因深度分析
问题现象
在调用 ticket_update
和 ticket_submit
API 时,Celery 出现错误:
ERROR: Task pipeline.eri.celery.tasks.schedule raised unexpected:
DoesNotExist('CallbackData matching query does not exist.')File "pipeline/eri/imp/data.py", line 269, in get_callback_datadata_model = DBCallbackData.objects.get(id=data_id)
而使用内部的 handle
接口时,却不会出现这个问题。
根本原因分析
1. Django 事务隔离导致的数据不可见问题
问题核心:@transaction.atomic
装饰器
原始 ticket_update.py
的问题代码:
class TicketHandleView(BaseView):@transaction.atomic # ⚠️ 问题源头def handle(self, input: TicketUpdateInputModel):ticket = Ticket.objects.get(id=input.ticket_id)task_id = self._get_current_task_id(ticket)handle_data = {"id": input.ticket_id,"task_id": task_id,"operator": input.operator,"action": {...},"form_data": input.form_data,}handle_bus(handle_data, ticket) # 在事务中调用return TicketUpdateOutputModel(result=True)
内部 handle
接口的正常代码(viewsets.py):
@action(methods=["POST"], detail=True)
def handle(self, request, *args, **kwargs): # ✅ 没有 @transaction.atomicdata = request.dataticket = self.get_object()form_data = data.get("form_data", {})# 数据验证data_validator = DataValidator(...)is_valid, error_message = data_validator.validate()if not is_valid:raise ValueError(error_message)handle_bus(data, ticket) # 不在显式事务中return Response()
2. 执行流程差异
✅ 正常流程(内部 handle 接口):
1. API 请求进入└─ handle() 方法(无 @transaction.atomic)└─ handle_bus(data, ticket)└─ TicketActionActuator.run()├─ task.activate() # 激活任务,状态:ENABLED → ACTIVE│ └─ 写入数据库(立即提交,因为没有外层事务)│└─ controller.throw_boundary_event()└─ bamboo_engine_api.callback()├─ 创建 CallbackData 记录│ └─ 写入数据库(立即提交)│└─ 发送 Celery 异步任务:pipeline.eri.celery.tasks.schedule2. Celery Worker 接收任务(异步)└─ Engine.schedule()└─ runtime.get_callback_data(callback_data_id)└─ DBCallbackData.objects.get(id=data_id) # ✅ 能找到数据└─ 执行活动回调└─ Activity.on_message()└─ task.complete() # 完成任务,状态:ACTIVE → COMPLETED
时序图:
API Thread Database Celery Worker| | ||--handle_bus()----------->| || activate task | ||------------------------->|[COMMIT] || | ||--throw_boundary_event()->| || create CallbackData | ||------------------------->|[COMMIT] || | ||--send celery task--------|--------------------->>|| | ||<-return to client | || | || | get_callback_data || |<-----------------------|| | ✅ Data Exists! || |----------------------->|
❌ 错误流程(带 @transaction.atomic 的 API):
1. API 请求进入└─ @transaction.atomic 开启事务 [TX-1 START]└─ handle() 方法└─ handle_bus(data, ticket)└─ TicketActionActuator.run()├─ task.activate() # 激活任务│ └─ 写入数据库(但未提交,在 TX-1 中)⚠️│└─ controller.throw_boundary_event()└─ bamboo_engine_api.callback()├─ 创建 CallbackData 记录│ └─ 写入数据库(但未提交,在 TX-1 中)⚠️│└─ 发送 Celery 异步任务 ⚡└─ handle() 方法返回└─ @transaction.atomic 事务提交 [TX-1 COMMIT] ⏱️ 2. Celery Worker 接收任务(可能在事务提交前就开始执行)└─ Engine.schedule()└─ runtime.get_callback_data(callback_data_id)└─ DBCallbackData.objects.get(id=data_id) # ❌ 找不到数据!└─ 抛出异常:DoesNotExist
时序图(竞态条件):
API Thread Database Celery Worker| | ||[TX-1 START] | ||--handle_bus()----------->| || activate task | ||------------------------->|[PENDING in TX-1] || | ||--throw_boundary_event()->| || create CallbackData | ||------------------------->|[PENDING in TX-1] ⚠️ || | ||--send celery task--------|--------------------->>|| | || | ⚡ RACE CONDITION| | || | get_callback_data || |<-----------------------|| | ❌ Data NOT Visible! || | (Still in TX-1) || | ||<-return | ||[TX-1 COMMIT]------------>|[COMMIT] ⏰ Too Late! || | |
3. Django 事务隔离级别
Django 默认使用数据库的隔离级别(通常是 READ COMMITTED
):
- READ COMMITTED: 事务只能读取已提交的数据
- 在
@transaction.atomic
块中创建的数据,在事务提交前对其他数据库连接不可见 - Celery Worker 使用的是独立的数据库连接,无法看到未提交的数据
关键点:
# API Thread (Connection 1)
with transaction.atomic(): # TX-1callback_data = DBCallbackData.objects.create(...) # 创建但未提交send_celery_task(callback_data.id) # 发送任务ID# TX-1 尚未提交# Celery Worker (Connection 2) - 几乎同时执行
callback_data = DBCallbackData.objects.get(id=...) # ❌ 看不到 Connection 1 的未提交数据!
4. 为什么异步调度会有问题?
Bamboo Engine 使用 异步 Celery 任务 来处理工作流调度:
# controller.py: throw_boundary_event()
def throw_boundary_event(...):event = Event(event_type=event_type, data=data, meta=meta or {})result = bamboo_engine_api.callback(self.runtime,node_id,version,event.dict(), # 这里会创建 CallbackData)
bamboo_engine_api.callback() 内部流程:
- 创建
CallbackData
记录(存储事件数据) - 发送 Celery 任务
pipeline.eri.celery.tasks.schedule
- 立即返回(不等待任务完成)
问题:
- Celery 任务可能在几毫秒内就被 Worker 接收并开始执行
- 如果此时 API 的
@transaction.atomic
还没提交,CallbackData 记录对 Worker 不可见 - 导致
DoesNotExist
异常
解决方案
✅ 方案 1:移除 @transaction.atomic(推荐)
修改后的代码:
class TicketHandleView(BaseView):# ❌ 移除 @transaction.atomicdef handle(self, input: TicketUpdateInputModel):ticket = Ticket.objects.get(id=input.ticket_id)task_id = self._get_current_task_id(ticket)handle_data = {"id": input.ticket_id,"task_id": task_id,"operator": input.operator,"action": {...},"form_data": input.form_data,}# 数据验证data_validator = DataValidator(...)is_valid, error_message = data_validator.validate()if not is_valid:raise ValueError(error_message)handle_bus(handle_data, ticket) # ✅ 不在事务中,数据立即提交return TicketUpdateOutputModel(result=True)
优点:
- 与内部
handle
接口行为一致 - 避免事务隔离问题
- 数据库操作立即提交,Celery Worker 能及时看到数据
注意:
handle_bus
内部已经有适当的事务管理TicketActionActuator._activate_task()
使用了@transaction.atomic
来保护任务激活操作- 不需要在 API 层面额外包裹事务
⚠️ 方案 2:使用 transaction.on_commit()(复杂)
如果必须使用 @transaction.atomic
,可以延迟 Celery 任务发送:
@transaction.atomic
def handle(self, input: TicketUpdateInputModel):...handle_bus(handle_data, ticket)# 确保在事务提交后才处理异步任务transaction.on_commit(lambda: post_process_if_needed())return TicketUpdateOutputModel(result=True)
问题:
- 这需要修改 Bamboo Engine 的回调机制,工作量大
- 不是标准做法,可能引入其他问题
对比总结
维度 | 内部 handle 接口 | 错误的 API (有 @transaction.atomic) |
---|---|---|
事务控制 | 无外层事务 | 有 @transaction.atomic |
数据提交时机 | 立即提交 | 方法返回后才提交 |
CallbackData 可见性 | Worker 能立即看到 | Worker 可能看不到(竞态) |
Celery 任务状态 | 正常执行 | DoesNotExist 异常 |
任务完成状态 | ACTIVE → COMPLETED | 卡在 ACTIVE(因为 Worker 失败) |
最佳实践建议
-
不要在调用
handle_bus()
的 API 层使用@transaction.atomic
handle_bus
内部已有事务管理- 外层事务会导致数据不可见问题
-
如果需要事务保护,在业务逻辑层实现
- 在
handle_bus
之前完成数据验证 - 让
handle_bus
内部的事务自然提交
- 在
-
异步任务的数据一定要在发送任务前提交
- Celery Worker 使用独立数据库连接
- 必须确保数据已提交才能被读取
-
参考现有的成功实现
- 内部
handle
接口是最佳实践 - 新的 OpenAPI 应该遵循相同的模式
- 内部
修复验证
修复后的行为应该是:
- ✅ API 调用立即返回成功
- ✅ 任务状态正确从 ENABLED → ACTIVE
- ✅ Celery Worker 能找到 CallbackData
- ✅ 异步回调执行
task.complete()
- ✅ 任务状态最终变为 COMPLETED
- ✅ 工单流程继续流转
附录:相关代码位置
- 问题代码:
backend/bk_itsm/core/openapi/ticket_update.py
- 正确实现:
backend/bk_itsm/core/views/ticket/viewsets.py:handle()
- 工作流控制器:
backend/bk_itsm/core/services/workflow_engine/controller.py
- 任务执行器:
backend/bk_itsm/core/scenarios/ticket_actuator/action.py
- Bamboo Engine 回调:
pipeline/eri/imp/data.py
结论: 移除 API 层的 @transaction.atomic
装饰器,让数据库操作自然提交,避免事务隔离导致的竞态条件。