Python分布式异步任务队列库-Celery学习-01

最近在项目上遇到了单机性能瓶颈,故趁此机会准备让项目中耗时的部分升级为分布式处理。考虑如果用C++来进行开发的话周期太长,而且很容易出现问题,于是乎初步选择了 Python 和Celery库来解决问题。

注:本文将以Celery 4.X版本为基础。

1、Celery 简介

它是一个异步任务调度工具,用户使用 Celery 产生任务,借用中间人来传递任务,任务执行单元从中间人那里消费任务。任务执行单元可以单机部署,也可以分布式部署,因此 Celery 是一个高可用的生产者消费者模型的异步任务队列。你可以将你的任务交给 Celery 处理,也可以让 Celery 自动按 crontab 那样去自动调度任务,然后去做其他事情,你可以随时查看任务执行的状态,也可以让 Celery 执行完成后自动把执行结果告诉你。

2、Celery 基本概念

任务生产者 :调用Celery提供的API,函数,装饰器而产生任务并交给任务队列的都是任务生产者。

任务执行单元(Worker):Worker是Celery提供的任务执行的单元,Worker并发的运行在分布式的系统节点中

中间人(Broker):Celery 中通常使用中间人(Broker)在客户端和工作者(Worker)之间传递。在任务生产者向队列添加消息后Broker把消息传递给 Worker。

后端(Backend):Celery 中用于储存Worker执行完毕后的结果。

官方给出的实现Broker的工具包括:

NameStatusMonitoringRemote Control
RabbitMQStableYesYes
RedisStableYesYes
Amazon SQSStableNoNo
ZookeeperExperimentalNoNo

更多信息可参阅:http://docs.celeryproject.org/en/latest/getting-started/brokers/index.html

3、Celery 安装

正常来说安装是很简单的只需要pip install一下就可以了。

pip install celery[librabbitmq,redis,auth,msgpack]

#注:在 Python3.7 上 celery 4.2.0 之前的版本async模块和关键字async起了冲突,所以可能会提示错误。
#from . import async, base 
#SyntaxError: invalid syntax
#详情参见 : https://github.com/celery/celery/issues/4500

#解决方案:使用github上的最新版本
pip install --upgrade https://github.com/celery/celery/archive/master.zip

4、第一个Celery应用程序

这里我们选择使用 Redis 作为中间人和后端。

由于Redis使用的非常广泛,介绍也很多,这里不单独讨论了。

4.1、Worker

作为分布式的程序当然需要有具体干活的代码,Worker就是这个干活的代码。

这里直接贴出代码,可直接保存为add_tasks.py。

#!/usr/bin/env python3
# -*- coding:utf-8 -*-

from celery import Celery
import time
import os
import platform


def CeleryPlatformSet():
    #使用platform来判断操作系统类型
    sysstr = platform.system()
    if(sysstr =="Windows"):
        os.environ.setdefault('FORKED_BY_MULTIPROCESSING', '1')
    elif(sysstr == "Linux"):
        pass
    else:
        pass

CeleryPlatformSet()

#'redis://:[email protected]:port/db'
app = Celery('tasks', broker='redis://:[email protected]:6379/0',backend ='redis://:[email protected]:6379/0' )

@app.task
def add(x, y):
    #模拟耗时操作
    time.sleep(30)
    s = x + y
    print("sum is " + str(s))
    return s

if __name__ == '__main__':
    app.worker_main()

这个例子里添加了一个最基本的工作函数add,用来计算两个参数的和。

唯一需要注意的一点是,由于Windows下没有fork函数,所以如果Celery是在Windows下运行需要设置环境变量”FORKED_BY_MULTIPROCESSING”为1.

cmd下可以使用命令进行设置 (关闭后失效)
set FORKED_BY_MULTIPROCESSING=1

这里例子中直接使用了os包自动根据操作系统来设置。

启动worker可以使用命令celery -A add_tasks worker -l info

或者:python add_tasks .py worker

如果有疑问可以使用 celery –help 命令来查看celery命令的帮助文档。

4.2、生产者

有了完成任务的工作者当然也还需要一个负责添加任务的生产者。

下面是生产者的代码。命名为start_tasks.py。

#!/usr/bin/env python3
# -*- coding:utf-8 -*-

#导入任务函数add
from add_tasks import add 
import time


def main():
    #delay是异步调用,不会阻塞,程序会立即往下运行
    result = add.delay(12,12) 
    # 循环检查任务是否执行完毕
    while not result.ready():
        print(time.strftime('%H:%M:%S'))
        #每秒只检查一次
        time.sleep(1)
    #判断任务是否成功执行
    print(result.successful())
    #获取任务的返回结果
    print(result.get())

    
if __name__ == '__main__':
    main()

代码很简单,注释也写的比较清楚,这里不做过多的解释。保存为文件后可直接运行,通过输出我们可以观察到执行的过程和结果。
如果需要更多的工作者,可以把worker拷贝到更多计算机并行运行,需要注意的是IP是否正确,redis这样的中间人是否能够被其他计算机正确访问。

友情提示:redis默认无密码,直接允许公网访问有极大风险,可能会直接导致服务器被入侵。请务必设置一个强大难以破解的密码并设置防火墙阻止他人访问。

python3 pip更换源

[global]
index-url = http://mirrors.aliyun.com/pypi/simple/

[install]
trusted-host=mirrors.aliyun.com

再次吐槽出国带宽
Ubuntu直接更新包或者python的pip安装更新都慢到死,这里记录一下更换pip源的方法

来源是参考pip官方手册
https://pip.pypa.io/en/latest/user_guide/#configuration

On Unix the default configuration file is: $HOME/.config/pip/pip.conf which respects the XDG_CONFIG_HOME environment variable.
On Mac OS X the configuration file is $HOME/Library/Application Support/pip/pip.conf.
On Windows the configuration file is %APPDATA%\pip\pip.ini.

On Unix and Mac OS X the configuration file is: $HOME/.pip/pip.conf
On Windows the configuration file is: %HOME%\pip\pip.ini

这些地方设置pip.conf是可以接受的
在我这里是C:\Users\USERNAME\pip\pip.ini
我推荐使用阿里云的mirrors

[global]
index-url = http://mirrors.aliyun.com/pypi/simple/

[install]
trusted-host=mirrors.aliyun.com