在开发日常中我们使用redis的场景越来越多,一般为缓存或者消息队列这一类的,今天我们介绍下redis分布式锁的应用,使用场景为处理高并发。

redis事物介绍

  • redis事物是可以一次执行多个命令,本质是一组命令的集合。
  • 一个事务中的所有命令都会序列化,按顺序串行化的执行而不会被其他命令插入
  • 作用:一个队列中,一次性、顺序性、排他性的执行一系列命令

multi 指令基本使用

  • 下面指令演示了一个完整的事物过程,所有指令在exec前不执行,而是缓存在服务器的一个事物队列中
  • 服务器一旦收到exec指令才开始执行事物队列,执行完毕后一次性返回所有结果
  • 因为redis是单线程的,所以不必担心自己在执行队列是被打断,可以保证这样的“原子性”
     注:redis事物在遇到指令失败后,后面的指令会继续执行

setnx lock:codehole true
…. do something critical ….
del lock:codehole

demo

#! /usr/bin/env python
# -*- coding: utf-8 -*-
import redis
import uuid
import time

# 1.初始化连接函数
def get_conn(host,port=6379):
    rs = redis.Redis(host=host, port=port)
    return rs

# 2. 构建redis锁
def acquire_lock(rs, lock_name, expire_time=10):
    '''
    rs: 连接对象
    lock_name: 锁标识
    acquire_time: 过期超时时间
    return -> False 获锁失败 or True 获锁成功
    '''
    identifier = str(uuid.uuid4())
    end = time.time() + expire_time
    while time.time() < end:
        # 当获取锁的行为超过有效时间,则退出循环,本次取锁失败,返回False
        if rs.setnx(lock_name, identifier): # 尝试取得锁
            return identifier
        time.sleep(0.001)
        return False

# 3. 释放锁
def release_lock(rs, lockname, identifier):
    '''
    rs: 连接对象
    lockname: 锁标识
    identifier: 锁的value值,用来校验
    '''
    pipe = rs.pipeline(True)
    try:
        pipe.watch(lockname)
        if rs.get(lockname).decode() == identifier:  # 防止其他进程同名锁被误删
            pipe.multi()           # 开启事务
            pipe.delete(lockname)
            pipe.execute()
            return True            # 删除锁
        pipe.unwatch()              # 取消事务
    except Exception as e:
        pass
    return False                    # 删除失败


'''在业务函数中使用上面的锁'''
def sale(rs):
    start = time.time()            # 程序启动时间
    with rs.pipeline() as p:
        '''
        通过管道方式进行连接
        多条命令执行结束,一次性获取结果
        '''
        while True:
            lock = acquire_lock(rs, 'lock')
            if not lock: # 持锁失败
                continue
            try:
                count = int(rs.get('apple')) # 取量
                p.set('apple', count-1)      # 减量
                p.execute()
                print('当前库存量: %s' % count)
                break
            finally:
                release_lock(rs, 'lock', lock)
        print('[time]: %.2f' % (time.time() - start))

rs = redis.Redis(host='127.0.0.1', port=6379)      # 连接redis
rs.set('apple',1000)                               # # 首先在redis中设置某商品apple 对应数量value值为1000
sale(rs)

优化:给分布式锁加超时时间防止死锁

def acquire_expire_lock(rs, lock_name, expire_time=10, locked_time=10):
    '''
    rs: 连接对象
    lock_name: 锁标识
    acquire_time: 过期超时时间
    locked_time: 锁的有效时间
    return -> False 获锁失败 or True 获锁成功
    '''
    identifier = str(uuid.uuid4())
    end = time.time() + expire_time
    while time.time() < end:
        # 当获取锁的行为超过有效时间,则退出循环,本次取锁失败,返回False
        if rs.setnx(lock_name, identifier): # 尝试取得锁
            # print('锁已设置: %s' % identifier)
            rs.expire(lock_name, locked_time)
            return identifier
        time.sleep(.001)
    return False

简单案例

def test_lock(request):
    # apache压力测试
    # ab -c 200 -n 1000 http://localhost:8000/pay/testlock/
    res = User.objects.get(pk=1)
    try:
        # 加锁
        lock = r6.setnx(str(res.username), "lock")
        # 给锁设置过期时间 防止宕机
        r6.expire(str(res.username), 5)
        # 判断是否有锁  有则为False 不执行
        if lock:
            if res.balance >= 1:
                with connection.cursor() as c:
                    # 原生SQL语句 orm会有全局解释器锁GIL
                    c.execute('update 用户 set balance =balance -1 where id=1')
                return HttpResponse("ok")
            else:
                return HttpResponse("么得钱")
    finally:
        # 释放锁
        r6.delete(str(res.username))
        return HttpResponse("ok")

使用工具apache压力测试

一次200 一共1000 后边是路由地址

ab -c 200 -n 1000 http://localhost:8000/pay/testlock/