有个项目需要用到后台任务,选了 celery,但在一个场景下遇到了问题。
需要定时收集所有 worker 的一些信息,比如 CPU 负载之类,相当于一个广播出去的任务,所有 worker 执行一下然后将结果回传,但 celery 中的 broadcast task 由于 taskid 是一样的,结果只能收到一个。
celery 提供了另外一种远程控制 worker 的方式,celery inspect xxx
,可以自定义一个inspect_command
来执行,这在命令行下是可以正常使用,但在定时任务中遇到了问题。
@inspect_command()
def resource_usage(state):
result = get_resource_usage()
result = result.__dict__
logger.warning(result)
return {"a": result}
# 这是个定时任务,比如每分钟收集一次
@shared_task()
def get_worker_stats():
from tasks import app
i = app.control.inspect()
results = i.resource_usage()
logger.info(results)
# 其他逻辑,存入 DB 等
直接执行 celery -A tasks inspect resource_usage
可以正常运行,但在定时任务中运行时会出现
AttributeError: 'Inspect' object has no attribute 'resource_usage'
这样的错误,似乎是自定义的 command 没有注册,但文档里也没有提到如何强制注册。。。
这种情况应该如何处理,如果 celery 实现不了别的库也行。