Celery 介绍

Celery 是一个 基于python开发的分布式异步消息任务队列,通过它可以轻松的实现任务的异步处理, 如果你的业务场景中需要用到异步任务,就可以考虑使用celery, 举几个实例场景中可用的例子:

  • 异步任务:将耗时的操作任务提交给Celery去异步执行,比如发送短信/邮件、消息推送、音频处理等等
  • 做一个定时任务,比如每天定时执行爬虫爬取指定内容
  • 还可以使用celery实现简单的分布式爬虫系统等等

Celery有以下优点:

  • 简单:Celery 易于使用和维护,并且它 不需要配置文件 ,并且配置和使用还是比较简单的(后面会讲到配置文件可以有)
  • 高可用:当任务执行失败或执行过程中发生连接中断,celery 会自动尝试重新执行任务
  • 快速:单个 Celery 进程每分钟可处理数以百万计的任务,而保持往返延迟在亚毫秒级
  • 灵活: Celery 几乎所有部分都可以扩展或单独使用,各个部分可以自定义。

执行流程:

1

组成模块:

  • 任务模块 Task
    • 用来创建异步任务,或者定时任务
  • 消息中间件 Broker
    • 用来调度生产者创建的任务,并将其放入消息队列(celery本身不提供消息队列, 官方推荐RabbitMQ 和 Redis 等)
  • 任务执行单元 Worker
    • 监控消息队列里的任务并执行。
  • 结果存储 Backend
    • 用于存储任务执行的结果。

初识Celery

安装Celery

我们直接通过pip来安装

1
$ pip install celery

创建tasks

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 导入Celery对象
from celery import Celery
# 配置消息中间件的地址,推荐用redis或者RabbitMQ
# 这里我们用redis
# redis://密码@地址:端口号/db
broker='redis://127.0.0.1:6379/1'
# 配置结果存放地址
backend = 'redis://127.0.0.1:6379/2'
# 实例化
# 第一个参数为命名,可以随便取
# 第二个参数为broker
# 第三个参数为backend
app = Celery('test',broker=broker,backend=backend)

# 创建一个task函数
@app.task
def add(x,y):
return x+y

运行worker

在刚才创建的目录下

1
2
3
# 运行worker

$ celery -A tasks worker --loglevel=info

成功后会输入如下信息

image-20210119220136204

调用任务

打开ipython

1
2
3
4
5
6
# 导入任务
In[3]: from tasks import add
# 运行任务
In[4]: add.delay(1,2)
# 输出结果ID
Out[4]: <AsyncResult: 070956a9-0791-43c6-9bb6-55af6460cc37>

我们可以看到我们启动的worker已经把结果计算完了

image-20210119221336068

我们还可以去redis中查看

image-20210119221812325

获取结果

那么我们如何获取到执行的结果呢?

我们可以

1
2
3
from celery.result import AsyncResult
res=AsyncResult("070956a9-0791-43c6-9bb6-55af6460cc37") # 参数为task id
res.result