上一篇文章讲述了 Celery 基本的使用方法,但是显然 Celery 的配置直接写在任务中并不是很方便。
这里我们可以选择把 Celery 的设置拆分为单独的 py 文件,这样后续管理也会更加方便。
那么这里我们就直接上代码了。
创建任务之前我们首先需要一个 Celery app 对象,为了后续更加直观我选择单独使用一个文件来创建 app 对象。这里命名为: celery_project_app.py
1 | #!/usr/bin/env python3 |
Celery 通用的设置比较多,也可以单独列一个文件。这里我们写入到
celery_project_settings.py
1 | #!/usr/bin/env python3 |
注:在这里我们定义了多个队列,这样做的好处是,我们可以按照任务优先级放入不同的队列中避免有些慢速任务影响其他任务执行。如果不需要这样的功能,也可以直接使用默认队列。
Celery 设置好了,剩下的就是工作者和生产者了。
按照设置里面定义的创建生产者和工作者即可。
工作者: celery_worker_tasks.py
1 | #!/usr/bin/env python3 |
生产者为: celery_create_task.py
1 | #!/usr/bin/env python3 |
最后执行起来的结果应该看起来是这样的:
1 | /bin/python /opt/celery_task/celery_create_task.py |
Celery大致的使用方法就是这样,下面总结一下我个人在使用过程中遇到的坑。
1、内存泄漏的问题。
撰写本文时, Celery 最新版本是4.2.1,在这个版本上调用 apply_async 会直接导致内存泄漏。此问题将会在4.3版本中修复。参见:https://github.com/celery/celery/pull/4839
除此以外还有一些其他疑似内存泄漏的地方,还需要等待后续的确认以及修复。
2、配合 redis 时行为和文档不太一致。
使用过程中,曾发现 redis 的内存占用不断增长,经过调查发现,是任务返回的数据没有被及时删除导致的。
文档原文:Backends use resources to store and transmit results. To ensure that resources are released, you must eventually call get() or forget() on EVERY AsyncResult instance returned after calling a task.
文档里说使用 backend 储存结果时,任务完成后必须使用 get 或者 forget 来释放资源。但是我个人使用中发现调用 get 并不能释放资源,仍然需要手动调用 forget 。 由于没有仔细研究,我不太清楚是不是只有搭配 redis 的时候是这样,不过如果遇到类似问题,可以尝试这样解决。