Python分布式异步任务队列库-Celery学习-02

上一篇文章讲述了 Celery 基本的使用方法,但是显然 Celery 的配置直接写在任务中并不是很方便。
这里我们可以选择把 Celery 的设置拆分为单独的 py 文件,这样后续管理也会更加方便。
那么这里我们就直接上代码了。

创建任务之前我们首先需要一个 Celery app 对象,为了后续更加直观我选择单独使用一个文件来创建 app 对象。这里命名为: celery_project_app.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#!/usr/bin/env python3
# -*- coding:utf-8 -*-

from celery import Celery

app = Celery(
"celery_app_name",
# include中包含的是任务所在的文件名
include=["celery_worker_tasks"]
)

# config_from_object顾名思义,从指定的文件中加载设置
app.config_from_object("celery_project_settings")

if __name__ == "__main__":
pass

Celery 通用的设置比较多,也可以单独列一个文件。这里我们写入到
celery_project_settings.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
#!/usr/bin/env python3
# -*- coding:utf-8 -*-

from kombu import Queue
import re


# 使用redis 作为消息代理
# BROKER_URL 在4.0中变更为 broker_url
# BROKER_URL = "redis://127.0.0.1:6379/0"
broker_url = "redis://:@127.0.0.1:6379/0"

# 任务结果存在Redis
# CELERY_RESULT_BACKEND 在4.0中变更为 result_backend
# result_backend = "redis://127.0.0.1:6379/0"
result_backend = "redis://:@127.0.0.1:6379/0"

# 任务信息传递的类型(默认json)
# CELERY_RESULT_SERIALIZER 在4.0中变更为 result_serializer
result_serializer = "json"
# CELERY_RESULT_SERIALIZER = "json"

# 时区
timezone = "UTC"

# 任务过期秒数
# CELERY_TASK_RESULT_EXPIRES 在4.0中变更为 result_expires
# result_expires = 60 * 10
result_expires = 60 * 10

# 任务完成指定数量后自动重启
worker_max_tasks_per_child = 1000

# List of modules to import when the Celery worker starts.
imports = ('celery_worker_tasks',)

# CELERY_QUEUES 在4.0中变更为 task_queues
task_queues = (
# 定义任务队列
# 路由键以“default_task.”开头的消息都进default队列
Queue("default", routing_key="default_task.#"),
# 路由键以“low_priority_task.”开头的消息都进low_priority_task队列
Queue("low_priority_task", routing_key="low_priority_task.#"),
# 路由键以“middle_priority_task.”开头的消息都进middle_priority_task队列
Queue("middle_priority_task", routing_key="middle_priority_task.#"),
# 路由键以“higher_priority_task.”开头的消息都进higher_priority_task队列
Queue("higher_priority_task", routing_key="higher_priority_task.#"),
)

# 设置默认队列名为 default
# CELERY_TASK_DEFAULT_QUEUE 在4.0中变更为 task_default_queue
task_default_queue = "default"
# CELERY_TASK_DEFAULT_EXCHANGE 在4.0中变更为 task_default_exchange
task_default_exchange = "default"
# CELERY_TASK_DEFAULT_EXCHANGE_TYPE 在4.0中变更为 task_default_exchange_type
task_default_exchange_type = "direct"
# CELERY_TASK_DEFAULT_ROUTING_KEY 在4.0中变更为 task_default_routing_key
task_default_routing_key = "default_task.default"

# CELERY_ROUTES 在4.0中变更为 task_routes
task_routes = (
[
(
#re.compile(r"low"),
"celery_app_name.celery_worker_tasks.low_rand",
{"queue": "low_priority_task", "routing_key": "low_priority_task.rand"},
), # 将celery_worker_tasks模块中的low_rand 分配至队列 low_priority_task
(
re.compile(r"middle"),
{"queue": "middle_priority_task", "routing_key": "low_priority_task.rand"},
), # 队列匹配支持正则表达式
],
)

注:在这里我们定义了多个队列,这样做的好处是,我们可以按照任务优先级放入不同的队列中避免有些慢速任务影响其他任务执行。如果不需要这样的功能,也可以直接使用默认队列。

Celery 设置好了,剩下的就是工作者和生产者了。
按照设置里面定义的创建生产者和工作者即可。

工作者: celery_worker_tasks.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
#!/usr/bin/env python3
# -*- coding:utf-8 -*-

from celery_project_app import app

import time
import random


@app.task
def low_rand(range_start, range_end):
# 模拟耗时操作
time.sleep(30)
return random.randint(range_start, range_end)


@app.task
def middle_rand(range_start, range_end):
# 模拟耗时操作
time.sleep(10)
return random.randint(range_start, range_end)


@app.task
def higher_rand(range_start, range_end):
return random.randint(range_start, range_end)


if __name__ == "__main__":
app.worker_main()

生产者为: celery_create_task.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
#!/usr/bin/env python3
# -*- coding: utf-8 -*-


import time
# 导入分布任务函数
from celery_worker_tasks import *


def process_task():
task_result = 0
task_object = None
try:
# 创建任务
# 设置超过60秒任务会超时
task_object = low_rand.apply_async((1,1000), expires=60)
# 等待任务结束
while not task_object.ready():
time.sleep(1)
if task_object.successful() == True:
#获取任务结果
task_result = task_object.get()
except Exception as e:
print('celery task unknow except.')
print(e)
finally:
#删除任务和结果
if task_object:
task_object.forget()
print('task return: %s' % task_result)
pass


def main():
while True:
process_task()
pass


if __name__ == '__main__':
main()
pass

最后执行起来的结果应该看起来是这样的:

1
2
3
4
5
6
7
8
9
/bin/python /opt/celery_task/celery_create_task.py
task return: 978
task return: 23
task return: 389
task return: 409
task return: 284
task return: 673
task return: 645
task return: 565

Celery大致的使用方法就是这样,下面总结一下我个人在使用过程中遇到的坑。

1、内存泄漏的问题。

撰写本文时, Celery 最新版本是4.2.1,在这个版本上调用 apply_async 会直接导致内存泄漏。此问题将会在4.3版本中修复。参见:https://github.com/celery/celery/pull/4839
除此以外还有一些其他疑似内存泄漏的地方,还需要等待后续的确认以及修复。

2、配合 redis 时行为和文档不太一致。

使用过程中,曾发现 redis 的内存占用不断增长,经过调查发现,是任务返回的数据没有被及时删除导致的。

文档原文:Backends use resources to store and transmit results. To ensure that resources are released, you must eventually call get() or forget() on EVERY AsyncResult instance returned after calling a task.

文档里说使用 backend 储存结果时,任务完成后必须使用 get 或者 forget 来释放资源。但是我个人使用中发现调用 get 并不能释放资源,仍然需要手动调用 forget 。 由于没有仔细研究,我不太清楚是不是只有搭配 redis 的时候是这样,不过如果遇到类似问题,可以尝试这样解决。