当前位置: 首页 > news >正文

CallbackData错误原因分析

CallbackData DoesNotExist 错误原因深度分析

问题现象

在调用 ticket_updateticket_submit API 时,Celery 出现错误:

ERROR: Task pipeline.eri.celery.tasks.schedule raised unexpected: 
DoesNotExist('CallbackData matching query does not exist.')File "pipeline/eri/imp/data.py", line 269, in get_callback_datadata_model = DBCallbackData.objects.get(id=data_id)

而使用内部的 handle 接口时,却不会出现这个问题。


根本原因分析

1. Django 事务隔离导致的数据不可见问题

问题核心:@transaction.atomic 装饰器

原始 ticket_update.py 的问题代码:

class TicketHandleView(BaseView):@transaction.atomic  # ⚠️ 问题源头def handle(self, input: TicketUpdateInputModel):ticket = Ticket.objects.get(id=input.ticket_id)task_id = self._get_current_task_id(ticket)handle_data = {"id": input.ticket_id,"task_id": task_id,"operator": input.operator,"action": {...},"form_data": input.form_data,}handle_bus(handle_data, ticket)  # 在事务中调用return TicketUpdateOutputModel(result=True)

内部 handle 接口的正常代码(viewsets.py):

@action(methods=["POST"], detail=True)
def handle(self, request, *args, **kwargs):  # ✅ 没有 @transaction.atomicdata = request.dataticket = self.get_object()form_data = data.get("form_data", {})# 数据验证data_validator = DataValidator(...)is_valid, error_message = data_validator.validate()if not is_valid:raise ValueError(error_message)handle_bus(data, ticket)  # 不在显式事务中return Response()

2. 执行流程差异

正常流程(内部 handle 接口):

1. API 请求进入└─ handle() 方法(无 @transaction.atomic)└─ handle_bus(data, ticket)└─ TicketActionActuator.run()├─ task.activate()  # 激活任务,状态:ENABLED → ACTIVE│  └─ 写入数据库(立即提交,因为没有外层事务)│└─ controller.throw_boundary_event()└─ bamboo_engine_api.callback()├─ 创建 CallbackData 记录│  └─ 写入数据库(立即提交)│└─ 发送 Celery 异步任务:pipeline.eri.celery.tasks.schedule2. Celery Worker 接收任务(异步)└─ Engine.schedule()└─ runtime.get_callback_data(callback_data_id)└─ DBCallbackData.objects.get(id=data_id)  # ✅ 能找到数据└─ 执行活动回调└─ Activity.on_message()└─ task.complete()  # 完成任务,状态:ACTIVE → COMPLETED

时序图:

API Thread                  Database              Celery Worker|                          |                        ||--handle_bus()----------->|                        ||  activate task           |                        ||------------------------->|[COMMIT]                ||                          |                        ||--throw_boundary_event()->|                        ||  create CallbackData     |                        ||------------------------->|[COMMIT]                ||                          |                        ||--send celery task--------|--------------------->>||                          |                        ||<-return to client        |                        ||                          |                        ||                          |    get_callback_data   ||                          |<-----------------------||                          |  ✅ Data Exists!       ||                          |----------------------->|

错误流程(带 @transaction.atomic 的 API):

1. API 请求进入└─ @transaction.atomic 开启事务 [TX-1 START]└─ handle() 方法└─ handle_bus(data, ticket)└─ TicketActionActuator.run()├─ task.activate()  # 激活任务│  └─ 写入数据库(但未提交,在 TX-1 中)⚠️│└─ controller.throw_boundary_event()└─ bamboo_engine_api.callback()├─ 创建 CallbackData 记录│  └─ 写入数据库(但未提交,在 TX-1 中)⚠️│└─ 发送 Celery 异步任务 ⚡└─ handle() 方法返回└─ @transaction.atomic 事务提交 [TX-1 COMMIT] ⏱️ 2. Celery Worker 接收任务(可能在事务提交前就开始执行)└─ Engine.schedule()└─ runtime.get_callback_data(callback_data_id)└─ DBCallbackData.objects.get(id=data_id)  # ❌ 找不到数据!└─ 抛出异常:DoesNotExist

时序图(竞态条件):

API Thread                  Database              Celery Worker|                          |                        ||[TX-1 START]              |                        ||--handle_bus()----------->|                        ||  activate task           |                        ||------------------------->|[PENDING in TX-1]       ||                          |                        ||--throw_boundary_event()->|                        ||  create CallbackData     |                        ||------------------------->|[PENDING in TX-1]  ⚠️   ||                          |                        ||--send celery task--------|--------------------->>||                          |                        ||                          |         ⚡ RACE CONDITION|                          |                        ||                          |    get_callback_data   ||                          |<-----------------------||                          |  ❌ Data NOT Visible!  ||                          |    (Still in TX-1)     ||                          |                        ||<-return                  |                        ||[TX-1 COMMIT]------------>|[COMMIT]  ⏰ Too Late!  ||                          |                        |

3. Django 事务隔离级别

Django 默认使用数据库的隔离级别(通常是 READ COMMITTED):

  • READ COMMITTED: 事务只能读取已提交的数据
  • @transaction.atomic 块中创建的数据,在事务提交前对其他数据库连接不可见
  • Celery Worker 使用的是独立的数据库连接,无法看到未提交的数据

关键点:

# API Thread (Connection 1)
with transaction.atomic():  # TX-1callback_data = DBCallbackData.objects.create(...)  # 创建但未提交send_celery_task(callback_data.id)  # 发送任务ID# TX-1 尚未提交# Celery Worker (Connection 2) - 几乎同时执行
callback_data = DBCallbackData.objects.get(id=...)  # ❌ 看不到 Connection 1 的未提交数据!

4. 为什么异步调度会有问题?

Bamboo Engine 使用 异步 Celery 任务 来处理工作流调度:

# controller.py: throw_boundary_event()
def throw_boundary_event(...):event = Event(event_type=event_type, data=data, meta=meta or {})result = bamboo_engine_api.callback(self.runtime,node_id,version,event.dict(),  # 这里会创建 CallbackData)

bamboo_engine_api.callback() 内部流程:

  1. 创建 CallbackData 记录(存储事件数据)
  2. 发送 Celery 任务 pipeline.eri.celery.tasks.schedule
  3. 立即返回(不等待任务完成)

问题:

  • Celery 任务可能在几毫秒内就被 Worker 接收并开始执行
  • 如果此时 API 的 @transaction.atomic 还没提交,CallbackData 记录对 Worker 不可见
  • 导致 DoesNotExist 异常

解决方案

方案 1:移除 @transaction.atomic(推荐)

修改后的代码:

class TicketHandleView(BaseView):# ❌ 移除 @transaction.atomicdef handle(self, input: TicketUpdateInputModel):ticket = Ticket.objects.get(id=input.ticket_id)task_id = self._get_current_task_id(ticket)handle_data = {"id": input.ticket_id,"task_id": task_id,"operator": input.operator,"action": {...},"form_data": input.form_data,}# 数据验证data_validator = DataValidator(...)is_valid, error_message = data_validator.validate()if not is_valid:raise ValueError(error_message)handle_bus(handle_data, ticket)  # ✅ 不在事务中,数据立即提交return TicketUpdateOutputModel(result=True)

优点:

  • 与内部 handle 接口行为一致
  • 避免事务隔离问题
  • 数据库操作立即提交,Celery Worker 能及时看到数据

注意:

  • handle_bus 内部已经有适当的事务管理
  • TicketActionActuator._activate_task() 使用了 @transaction.atomic 来保护任务激活操作
  • 不需要在 API 层面额外包裹事务

⚠️ 方案 2:使用 transaction.on_commit()(复杂)

如果必须使用 @transaction.atomic,可以延迟 Celery 任务发送:

@transaction.atomic
def handle(self, input: TicketUpdateInputModel):...handle_bus(handle_data, ticket)# 确保在事务提交后才处理异步任务transaction.on_commit(lambda: post_process_if_needed())return TicketUpdateOutputModel(result=True)

问题:

  • 这需要修改 Bamboo Engine 的回调机制,工作量大
  • 不是标准做法,可能引入其他问题

对比总结

维度 内部 handle 接口 错误的 API (有 @transaction.atomic)
事务控制 无外层事务 有 @transaction.atomic
数据提交时机 立即提交 方法返回后才提交
CallbackData 可见性 Worker 能立即看到 Worker 可能看不到(竞态)
Celery 任务状态 正常执行 DoesNotExist 异常
任务完成状态 ACTIVE → COMPLETED 卡在 ACTIVE(因为 Worker 失败)

最佳实践建议

  1. 不要在调用 handle_bus() 的 API 层使用 @transaction.atomic

    • handle_bus 内部已有事务管理
    • 外层事务会导致数据不可见问题
  2. 如果需要事务保护,在业务逻辑层实现

    • handle_bus 之前完成数据验证
    • handle_bus 内部的事务自然提交
  3. 异步任务的数据一定要在发送任务前提交

    • Celery Worker 使用独立数据库连接
    • 必须确保数据已提交才能被读取
  4. 参考现有的成功实现

    • 内部 handle 接口是最佳实践
    • 新的 OpenAPI 应该遵循相同的模式

修复验证

修复后的行为应该是:

  1. ✅ API 调用立即返回成功
  2. ✅ 任务状态正确从 ENABLED → ACTIVE
  3. ✅ Celery Worker 能找到 CallbackData
  4. ✅ 异步回调执行 task.complete()
  5. ✅ 任务状态最终变为 COMPLETED
  6. ✅ 工单流程继续流转

附录:相关代码位置

  • 问题代码: backend/bk_itsm/core/openapi/ticket_update.py
  • 正确实现: backend/bk_itsm/core/views/ticket/viewsets.py:handle()
  • 工作流控制器: backend/bk_itsm/core/services/workflow_engine/controller.py
  • 任务执行器: backend/bk_itsm/core/scenarios/ticket_actuator/action.py
  • Bamboo Engine 回调: pipeline/eri/imp/data.py

结论: 移除 API 层的 @transaction.atomic 装饰器,让数据库操作自然提交,避免事务隔离导致的竞态条件。

http://www.hskmm.com/?act=detail&tid=35740

相关文章:

  • 2025微弧氧化加工厂家推荐:常州华源专业表面处理技术供应商
  • hash判断两个集合是否完全相同
  • 2025滑触线实力厂家推荐,无锡宸澳电气多型号防爆安全定制!
  • 2025年GEO优化公司推荐:五大实力企业口碑榜,引领AI搜索营销新生态
  • 2025年10月全屋智能家居品牌推荐:盈趣领衔对比评测榜
  • 2025码垛机厂家推荐济南金瑞祥,全自动龙门桁架定制实力企业
  • 2025防腐工程厂家推荐:无锡华金喷涂技术领先,定制防腐解决方案
  • [LangChian] 05.结构化提示词
  • C#获取文件md5码
  • 2025年10月防腐木凉亭厂家对比评测榜:江西纳美领衔五强深度解析
  • 2025通风天窗实力厂家推荐,正鑫专业制造与定制服务保障
  • 2025年10月治鼻炎产品推荐:权威对比评测榜助您精准选购
  • git提PR时很多别人的commit,清理多余的commit
  • Visual Studio 使用小知识记录
  • 2025数控锯床厂家推荐无锡正川,专业立式锯床制造企业
  • DeepSeek-OCR:让 AI “一眼看懂” 的黑科技
  • 生成一张图,苹果logo是透明冰块,安卓小机器人撒尿到苹果logo,冲出一个豁口
  • 业务记录:登录
  • kafka2.8出现NotLeaderOrFollowerException
  • IEC 61850 ICD文件解析
  • 2025无锡新梅赛智能设备厂家推荐:全自动视觉定位点胶机专业制造商
  • 2025安全光栅厂家推荐安一光电,超薄无盲区设计守护工业安全
  • 2025石头纸设备厂家权威推荐:鼎浩包装科技环保吹塑机制造专家
  • Java面试题总结
  • 读书笔记:Oracle分区技术详解
  • 2025精密光电厂家推荐:柯依努UV固化设备专业定制,品质保障!
  • 徐老师2025新版Nodejs课程含项目实战
  • 详细介绍:isis整体知识梳理
  • Moe-ctf Misc部分题解
  • DBA必备脚本:Oracle获取正在运行SQL的字面SQL文本