Python分布式异步任务队列库-Celery学习-01

最近在项目上遇到了单机性能瓶颈,故趁此机会准备让项目中耗时的部分升级为分布式处理。考虑如果用 C++ 来进行开发的话周期太长,而且很容易出现问题,于是乎初步选择了 Python 和 Celery 库来解决问题。

注:本文将以Celery 4.X版本为基础。

Celery 简介

它是一个异步任务调度工具,用户使用 Celery 产生任务,借用中间人来传递任务,任务执行单元从中间人那里消费任务。任务执行单元可以单机部署,也可以分布式部署,因此 Celery 是一个高可用的生产者消费者模型的异步任务队列。你可以将你的任务交给 Celery 处理,也可以让 Celery 自动按 crontab 那样去自动调度任务,然后去做其他事情,你可以随时查看任务执行的状态,也可以让 Celery 执行完成后自动把执行结果告诉你。

Celery 基本概念

任务生产者 :调用 Celery 提供的 API ,函数,装饰器而产生任务并交给任务队列的都是任务生产者。

任务执行单元(Worker): Worker 是 Celery 提供的任务执行的单元, Worker 并发的运行在分布式的系统节点中

中间人(Broker):Celery 中通常使用中间人(Broker)在客户端和工作者(Worker)之间传递。在任务生产者向队列添加消息后 Broker 把消息传递给 Worker。

后端(Backend):Celery 中用于储存 Worker 执行完毕后的结果。

官方给出的实现 Broker 的工具包括:

Name Status Monitoring Remote Control
RabbitMQ Stable Yes Yes
Redis Stable Yes Yes
Amazon SQS Stable No No
Zookeeper Experimental No No

更多信息可参阅:http://docs.celeryproject.org/en/latest/getting-started/brokers/index.html

Celery 安装

正常来说安装是很简单的只需要pip install一下就可以了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
pip install celery[librabbitmq,redis,auth,msgpack]

#注:在 Python3.7 上 celery 4.2.0 之前的版本async模块和关键字async起了冲突,所以可能会提示错误。
#from . import async, base
#SyntaxError: invalid syntax
#详情参见 : https://github.com/celery/celery/issues/4500

#注:celery 4.3之前的版本存在内存泄漏的问题。
#如果遇到此类问题,建议反馈等待修复后使用开发者版本。
#解决方案:使用github上的最新版本(开发者版本)
pip install --upgrade --force https://github.com/celery/celery/zipball/master#egg=celery
pip install --upgrade --force https://github.com/celery/billiard/zipball/master#egg=billiard
pip install --upgrade --force https://github.com/celery/py-amqp/zipball/master#egg=amqp
pip install --upgrade --force https://github.com/celery/kombu/zipball/master#egg=kombu
pip install --upgrade --force https://github.com/celery/vine/zipball/master#egg=vine

第一个Celery应用程序

这里我们选择使用 Redis 作为中间人和后端。

由于 Redis 使用的非常广泛,介绍也很多,这里不单独讨论了。

1、Worker

作为分布式的程序当然需要有具体干活的代码, Worker 就是这个干活的代码。

这里直接贴出代码,可直接保存为 add_tasks.py 。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
#!/usr/bin/env python3
# -*- coding:utf-8 -*-

from celery import Celery
import time
import os
import platform


def CeleryPlatformSet():
#使用platform来判断操作系统类型
sysstr = platform.system()
if(sysstr =="Windows"):
os.environ.setdefault('FORKED_BY_MULTIPROCESSING', '1')
elif(sysstr == "Linux"):
pass
else:
pass

CeleryPlatformSet()

#'redis://:[email protected]:port/db'
app = Celery('tasks', broker='redis://:[email protected]:6379/0',backend ='redis://:[email protected]:6379/0' )

@app.task
def add(x, y):
#模拟耗时操作
time.sleep(30)
s = x + y
print("sum is " + str(s))
return s

if __name__ == '__main__':
app.worker_main()

这个例子里添加了一个最基本的工作函数 add ,用来计算两个参数的和。

唯一需要注意的一点是,由于 Windows 下没有 fork 函数,所以如果Celery是在 Windows 下运行需要设置环境变量”FORKED_BY_MULTIPROCESSING”为1.

cmd下可以使用命令进行设置 (关闭后失效)

1
set FORKED_BY_MULTIPROCESSING=1

这里例子中直接使用了 os 包自动根据操作系统来设置。

启动worker可以使用命令

celery -A add_tasks worker -l info

或者:

python add_tasks .py worker

如果有疑问可以使用 celery –help 命令来查看 celery 命令的帮助文档。

2、生产者

有了完成任务的工作者当然也还需要一个负责添加任务的生产者。

下面是生产者的代码。命名为 start_tasks.py 。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#!/usr/bin/env python3
# -*- coding:utf-8 -*-

#导入任务函数add
from add_tasks import add
import time


def main():
#delay是异步调用,不会阻塞,程序会立即往下运行
result = add.delay(12,12)
# 循环检查任务是否执行完毕
while not result.ready():
print(time.strftime('%H:%M:%S'))
#每秒只检查一次
time.sleep(1)
#判断任务是否成功执行
print(result.successful())
#获取任务的返回结果
print(result.get())


if __name__ == '__main__':
main()

代码很简单,注释也写的比较清楚,这里不做过多的解释。保存为文件后可直接运行,通过输出我们可以观察到执行的过程和结果。
如果需要更多的工作者,可以把 worker 拷贝到更多计算机并行运行,需要注意的是 IP 是否正确, redis 这样的中间人是否能够被其他计算机正确访问。

友情提示:redis默认无密码,直接允许公网访问有极大风险,可能会直接导致服务器被入侵。请务必设置一个强大难以破解的密码并设置防火墙阻止他人访问。