我们异步任务一般会想到什么 celery celery 最终实现效果就是异步任务以及定时任务 那我们先来看看celery底层是什么 broker、backend、生产者、消费者、还有celery来调度任务 消息队列 用来存储任务 以及调度任务 那我们也可以使用 queue来实现异步任务

有兴趣戳这里

celery 使用

celery 中文手册

Demo

util_queue.py

手写模拟celery

import os
import django

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'mydjango.settings')
django.setup()
import redis

from user.models import User
import logging

logger = logging.getLogger(__name__)


# 进阶用法
class Asynchronization:
    def __init__(self, key, **redis_kwargs):
        # __私有方法
        self.__db = redis.Redis(**redis_kwargs)
        # 设置key
        self.key = key

    def size(self):
        # 返回列表长度
        return self.__db.llen(self.key)

    def put(self, item):
        # 右侧进入列表

        self.__db.rpush(self.key, item)

    def pop(self):
        # 左侧出  形成队列 先进先出
        item = self.__db.lpop(self.key)

        return item


import time
from send_email import send_email_task

# 实例化
q = Asynchronization("mykey")

# 循环添加
# for i in range(5):
#
#     q.put(i)
#     time.sleep(1)


# 导入线程模块
import threading


def dojob():
    # 函数内实例化 不冲突
    q = Asynchronization("mykey")
    # 无限循环
    while 1:
        # 删除左侧第一个 也就是陷进去的那一个
        result = q.pop()
        try:
            email = User.objects.filter(pk=result.decode()).first().email
            # print(email)
            # logger.info(email)
            send_email_task(email)
        except Exception as e:
            pass
            # logger.error(e)
        # 判断是否为空
        if not result:
            # 删除完毕 跳出循环
            break
        time.sleep(1)


def inform_user():
    user = User.objects.all()

    for i in user:
        email = i.email
        q.put(i.id)
    for index in range(5):
        # 定义线程
        thread = threading.Thread(target=dojob)
        # 启动
        thread.start()

# inform_user()
# print(q.size())

然后我们在要使用的地方进行调用即可