我们异步任务一般会想到什么 celery
celery 最终实现效果就是异步任务以及定时任务
那我们先来看看celery底层是什么
broker、backend、生产者、消费者、还有celery来调度任务
消息队列 用来存储任务 以及调度任务
那我们也可以使用 queue来实现异步任务
celery 使用
celery 中文手册
Demo
util_queue.py
手写模拟celery
import os
import django
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'mydjango.settings')
django.setup()
import redis
from user.models import User
import logging
logger = logging.getLogger(__name__)
# 进阶用法
class Asynchronization:
def __init__(self, key, **redis_kwargs):
# __私有方法
self.__db = redis.Redis(**redis_kwargs)
# 设置key
self.key = key
def size(self):
# 返回列表长度
return self.__db.llen(self.key)
def put(self, item):
# 右侧进入列表
self.__db.rpush(self.key, item)
def pop(self):
# 左侧出 形成队列 先进先出
item = self.__db.lpop(self.key)
return item
import time
from send_email import send_email_task
# 实例化
q = Asynchronization("mykey")
# 循环添加
# for i in range(5):
#
# q.put(i)
# time.sleep(1)
# 导入线程模块
import threading
def dojob():
# 函数内实例化 不冲突
q = Asynchronization("mykey")
# 无限循环
while 1:
# 删除左侧第一个 也就是陷进去的那一个
result = q.pop()
try:
email = User.objects.filter(pk=result.decode()).first().email
# print(email)
# logger.info(email)
send_email_task(email)
except Exception as e:
pass
# logger.error(e)
# 判断是否为空
if not result:
# 删除完毕 跳出循环
break
time.sleep(1)
def inform_user():
user = User.objects.all()
for i in user:
email = i.email
q.put(i.id)
for index in range(5):
# 定义线程
thread = threading.Thread(target=dojob)
# 启动
thread.start()
# inform_user()
# print(q.size())
然后我们在要使用的地方进行调用即可
觉得文章写的不错,可以请喝杯咖啡
- Post link: https://yanxiang.wang/python%E5%AE%9E%E7%8E%B0%E9%98%9F%E5%88%97%E6%89%A7%E8%A1%8C%E5%BC%82%E6%AD%A5%E4%BB%BB%E5%8A%A1/
- Copyright Notice: All articles in this blog are licensed under unless otherwise stated.