本文探讨了在 celery 中处理动态创建子任务并等待其完成的挑战,尤其是在传统 celery 编排(如 `chain` 或 `chord`)不适用的场景。由于 celery 的内置编排机制要求任务签名在创建时已知,对于运行时动态生成的子任务,需要一种自定义的解决方案。文章提供了一种基于手动收集子任务 id 和轮询其状态的实现方法,以确保父任务在所有动态子任务完成后才继续执行。
在构建复杂的异步任务流时,Celery 提供了强大的编排工具,如 chain、group 和 chord。然而,当业务逻辑需要在父任务执行过程中动态生成并调度子任务,并且父任务必须等待所有这些动态子任务完成后才能继续时,传统的编排方式便显得力不从心。本文将深入探讨这一问题,并提供一种实用的解决方案。
设想一个场景:一个主任务负责从外部 API 分页获取数据。每获取一页数据,都需要触发一个子任务来处理该页数据并写入数据库。由于 API 响应时间不确定,以及处理和写入数据库的时间可能较长,我们希望将每页数据的处理卸载到独立的子任务中,以提高整体的墙钟时间效率。关键在于,主任务必须确保所有这些动态生成的数据库写入子任务完成后,才能执行下一个顶层操作,以维护数据完整性。
Celery 的 chain、group 和 chord 等编排原语,其核心设计理念是基于预先定义的任务签名(signature)。这意味着,当一个 chain 或 chord 被创建时,它所包含的所有任务及其依赖关系都必须是已知的。
因此,对于在父任务运行时动态创建子任务并要求父任务等待其完成的需求,Celery 的原生编排机制无法直接满足。我们需要一种手动管理依赖关系的方法。
解决此问题的核心思路是:在父任务中动态创建子任务时,收集这些子任务的唯一标识符(AsyncResult.id)。然后,父任务进入一个循环,周期性地检查这些子任务的执行状态,直到所有子任务都完成(成功或失败),父任务才能继续执行后续逻辑。
这种方法将任务间的同步控制从 Celery 的编排层下放到了应用程序代码层。
,轮询循环终止。以下是一个详细的 Python 示例,演示了如何在 Celery 中实现动态子任务的等待机制。
首先,假设我们定义了两个 Celery 任务:一个主任务 task_dummy_task1 和一个子任务 task_dummy_subtask,以及一个用于创建子任务的中间函数。
import time
from celery import Celery, Task
from celery.result import AsyncResult
from typing import List, Tuple
# 假设 app 已经配置好,broker 和 backend 都已设置
app = Celery('my_app', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0')
# 模拟一个 JobMaster 类用于日志记录,实际应用中替换为您的日志系统
class JobMaster:
def __init__(self, job_id, job_title):
self.job_id = job_id
self.job_title = job_title
@staticmethod
def get_job(job_id: int, job_title: str) -> Tuple['JobMaster', int]:
# 实际应用中可能从数据库获取或创建 Job 实例
if job_id is None:
job_id = int(time.time()) # 简单模拟一个 ID
return JobMaster(job_id, job_title), job_id
def log_message(self, log_message: str, status=None, job_score=None):
print(f"[{self.job_title} - {self.job_id}] {log_message}")
# 假设 consts 包含状态常量
class consts:
IN_PROGRESS = "IN_PROGRESS"
COMPLETED = "COMPLETED"
ERRORS_FOUND = "ERRORS_FOUND"
@app.task(bind=True)
def task_dummy_subtask(self: Task, parent_task_name: str, job_id: int = None):
job, _ = JobMaster.get_job(job_id, job_title="dummy subtask")
job.log_message(log_message=f"Entered subtask for {parent_task_name}. Simulating work...")
time.sleep(2) # 模拟耗时操作
job.log_message(log_message=f"Finished subtask for {parent_task_name}.")
return f"Subtask {parent_task_name} completed successfully."
def intermediary_dummy_subtask_function(parent_task_name: str, job_id: int) -> AsyncResult:
job, _ = JobMaster.get_job(job_id, job_title="intermediary task")
job.log_message(
log_message=f"Intermediary function for {parent_task_name} has been reached, will now make a task")
# 注意:add_to_parent=True 仅用于在结果后端建立父子关系,不影响阻塞行为
r = task_dummy_subtask.apply_async(kwargs={"parent_task_name": parent_task_name, "job_id": job_id},
add_to_parent=True)
return r
def wait_for_tasks_to_complete(async_ids: List[str], job_id: int = None, msg: str = None, timeout: int = 300):
"""
等待一组 Celery 任务完成。
Args:
async_ids: 需要等待的子任务 ID 列表。
job_id: 关联的作业 ID,用于日志记录。
msg: 等待过程中的提示信息。
timeout: 最长等待时间(秒)。
"""
job, _ = JobMaster.get_job(job_id, job_title="waiting for tasks")
job.log_message(log_message=f"等待 {len(async_ids)} 个任务完成, {msg}", status=consts.IN_PROGRESS)
job.log_message(log_message=f"任务ID: {async_ids}", status=consts.IN_PROGRESS)
remaining_ids = list(async_ids) # 复制一份,因为会修改
start_time = time.time()
while remaining_ids and (time.time() - start_time < timeout):
# 遍历剩余任务,检查状态
tasks_to_remove = []
for async_id in remaining_ids:
result = app.AsyncResult(async_id)
status = result.status
if status == "SUCCESS":
returned_value = result.result
job.log_message(log_message=f"任务 {async_id} 状态: SUCCESS, 返回值: {returned_value}")
tasks_to_remove.append(async_id)
elif status in ("FAILURE", "REVOKED", "RETRY"):
job.log_message(log_message=f"任务 {async_id} 状态: {status}. 错误信息: {result.traceback if status == 'FAILURE' else 'N/A'}", status=consts.ERRORS_FOUND)
tasks_to_remove.append(async_id) # 视为完成,但可能需要进一步处理错误
# else: PENDING, STARTED 等状态,继续等待
# 移除已完成的任务
for tid in tasks_to_remove:
if tid in remaining_ids: # 避免重复移除或并发问题
remaining_ids.remove(tid)
if not remaining_ids:
job.log_message(log_message="所有任务均已完成。", status=consts.COMPLETED, job_score=100)
return
job.log_message(log_message=f"仍有 {len(remaining_ids)} 个任务待完成。")
time.sleep(1) # 每秒检查一次
# 超时处理
job.log_message(
log_message=f"等待超时 ({timeout}s)。仍有 {len(remaining_ids)} 个任务未完成。",
status=consts.ERRORS_FOUND,
job_score=100
)
@app.task(bind=True)
def task_dummy_task1(self: Task, part_number: int, job_id: int = None):
job, job_id = JobMaster.get_job(job_id, job_title="dummy task")
sleeping_duration = 1 # 模拟一些前期工作
subtask_ids = []
job.log_message(log_message=f"进入主任务 1,模拟前期工作 {sleeping_duration} 秒。")
# 直接创建子任务
job.log_message(log_message="主任务 1: 创建子任务 a")
subtask = task_dummy_subtask.apply_async(kwargs={"parent_task_name": "task1_a", "job_id": job_id}, add_to_parent=True)
subtask_ids.append(subtask.id)
job.log_message(log_message="主任务 1: 创建子任务 b")
subtask = task_dummy_subtask.apply_async(kwargs={"parent_task_name": "task1_b", "job_id": job_id}, add_to_parent=True)
subtask_ids.append(subtask.id)
job.log_message(log_message="主任务 1: 创建子任务 c")
subtask = task_dummy_subtask.apply_async(kwargs={"parent_task_name": "task1_c", "job_id": job_id}, add_to_parent=True)
subtask_ids.append(subtask.id)
# 通过中间函数创建子任务
job.log_message(log_message="主任务 1: 通过中间函数创建子任务 d")
subtask = intermediary_dummy_subtask_function(parent_task_name="task1_d", job_id=job_id)
subtask_ids.append(subtask.id)
job.log_message(log_message="主任务 1: 通过中间函数创建子任务 e")
subtask = intermediary_dummy_subtask_function(parent_task_name="task1_e", job_id=job_id)
subtask_ids.append(subtask.id)
time.sleep(sleeping_duration) # 模拟主任务在创建子任务后继续做一些工作
# 等待所有子任务完成
job.log_message(log_message=f"主任务 1: 开始等待 {len(subtask_ids)} 个子任务完成。")
wait_for_tasks_to_complete(async_ids=subtask_ids, job_id=job_id,
msg="等待所有动态子任务完成", timeout=60) # 设置一个合理的超时时间
job.log_message(log_message="主任务 1: 所有子任务已完成或超时,继续执行主任务的后续逻辑。")
return part_number
# 如何调用主任务 (在另一个脚本或 Celery worker 启动后)
# if __name__ == '__main__':
# task_dummy_task1.delay(part_number=123)task_dummy_task1 (父任务):
wait_for_tasks_to_complete (等待函数):
intermediary_dummy_subtask_function (中间函数):
当 Celery 的静态编排原语无法满足动态创建子任务并等待其完成的需求时,手动收集子任务 ID 并实现轮询等待机制是一个有效且直接的解决方案。这种方法虽然会在父任务执行期间阻塞 Worker 进程,但在许多需要严格顺序和数据完整性的场景中是可接受的。在实际应用中,应根据业务需求和系统负载,合理配置轮询间隔和超时时间,并完善错误处理逻辑,以确保系统的健壮性和效率。
# python
# redis
# app
# 工具
# 后端
# ai
# 异步任务
# red
# elif
相关文章:
如何快速搭建自助建站会员专属系统?
宝塔建站教程:一键部署配置流程与SEO优化实战指南
广平建站公司哪家专业可靠?如何选择?
Python lxml的etree和ElementTree有什么区别
成都网站制作公司哪家好,四川省职工服务网是做什么用?
建站之星免费版是否永久可用?
在线流程图制作网站手机版,谁能推荐几个好的CG原画资源网站么?
建站之星客服服务时间及联系方式如何?
,怎么用自己头像做动态表情包?
,想在网上投简历,哪几个网站比较好?
如何用PHP快速搭建高效网站?分步指南
网站制作服务平台,有什么网站可以发布本地服务信息?
建站之星下载版如何获取与安装?
建站之星图片链接生成指南:自助建站与智能设计教程
如何在阿里云ECS服务器部署织梦CMS网站?
宠物网站制作html代码,有没有专门介绍宠物如何养的网站啊?
Avalonia如何实现跨窗口通信 Avalonia窗口间数据传递
如何通过网站建站时间优化SEO与用户体验?
武汉网站制作费用多少,在武汉武昌,建面100平方左右的房子,想装暖气片,费用大概是多少啊?
官网自助建站平台指南:在线制作、快速建站与模板选择全解析
如何用搬瓦工VPS快速搭建个人网站?
建站之星安装步骤有哪些常见问题?
详解一款开源免费的.NET文档操作组件DocX(.NET组件介绍之一)
制作门户网站的参考文献在哪,小说网站怎么建立?
,制作一个手机app网站要多少钱?
如何在建站宝盒中设置产品搜索功能?
陕西网站制作公司有哪些,陕西凌云电器有限公司官网?
装修招标网站设计制作流程,装修招标流程?
建站主机是否等同于虚拟主机?
如何快速生成橙子建站落地页链接?
道歉网站制作流程,世纪佳缘致歉小吴事件,相亲网站身份信息伪造该如何稽查?
网页设计与网站制作内容,怎样注册网站?
如何访问已购建站主机并解决登录问题?
学生网站制作软件,一个12岁的学生写小说,应该去什么样的网站?
mc皮肤壁纸制作器,苹果平板怎么设置自己想要的壁纸我的世界?
建站中国必看指南:CMS建站系统+手机网站搭建核心技巧解析
制作旅游网站html,怎样注册旅游网站?
c# 在高并发场景下,委托和接口调用的性能对比
建站主机是什么?如何选择适合的建站主机?
建站VPS推荐:2025年高性能服务器配置指南
如何在IIS服务器上快速部署高效网站?
,网页ppt怎么弄成自己的ppt?
如何通过多用户协作模板快速搭建高效企业网站?
再谈Python中的字符串与字符编码(推荐)
如何获取PHP WAP自助建站系统源码?
在线ppt制作网站有哪些,请推荐几个好的课件下载的网站?
制作网站的公司有哪些,做一个公司网站要多少钱?
微信小程序 input输入框控件详解及实例(多种示例)
营销式网站制作方案,销售哪个网站招聘效果最好?
如何用虚拟主机快速搭建网站?详细步骤解析
*请认真填写需求信息,我们会在24小时内与您取得联系。