在开发日常中我们使用redis的场景越来越多,一般为缓存或者消息队列这一类的,今天我们介绍下redis分布式锁的应用,使用场景为处理高并发。
redis事物介绍
- redis事物是可以一次执行多个命令,本质是一组命令的集合。
- 一个事务中的所有命令都会序列化,按顺序串行化的执行而不会被其他命令插入
- 作用:一个队列中,一次性、顺序性、排他性的执行一系列命令
multi 指令基本使用
- 下面指令演示了一个完整的事物过程,所有指令在exec前不执行,而是缓存在服务器的一个事物队列中
- 服务器一旦收到exec指令才开始执行事物队列,执行完毕后一次性返回所有结果
- 因为redis是单线程的,所以不必担心自己在执行队列是被打断,可以保证这样的“原子性”
注:redis事物在遇到指令失败后,后面的指令会继续执行
setnx lock:codehole true
…. do something critical ….
del lock:codehole
demo
#! /usr/bin/env python
# -*- coding: utf-8 -*-
import redis
import uuid
import time
# 1.初始化连接函数
def get_conn(host,port=6379):
rs = redis.Redis(host=host, port=port)
return rs
# 2. 构建redis锁
def acquire_lock(rs, lock_name, expire_time=10):
'''
rs: 连接对象
lock_name: 锁标识
acquire_time: 过期超时时间
return -> False 获锁失败 or True 获锁成功
'''
identifier = str(uuid.uuid4())
end = time.time() + expire_time
while time.time() < end:
# 当获取锁的行为超过有效时间,则退出循环,本次取锁失败,返回False
if rs.setnx(lock_name, identifier): # 尝试取得锁
return identifier
time.sleep(0.001)
return False
# 3. 释放锁
def release_lock(rs, lockname, identifier):
'''
rs: 连接对象
lockname: 锁标识
identifier: 锁的value值,用来校验
'''
pipe = rs.pipeline(True)
try:
pipe.watch(lockname)
if rs.get(lockname).decode() == identifier: # 防止其他进程同名锁被误删
pipe.multi() # 开启事务
pipe.delete(lockname)
pipe.execute()
return True # 删除锁
pipe.unwatch() # 取消事务
except Exception as e:
pass
return False # 删除失败
'''在业务函数中使用上面的锁'''
def sale(rs):
start = time.time() # 程序启动时间
with rs.pipeline() as p:
'''
通过管道方式进行连接
多条命令执行结束,一次性获取结果
'''
while True:
lock = acquire_lock(rs, 'lock')
if not lock: # 持锁失败
continue
try:
count = int(rs.get('apple')) # 取量
p.set('apple', count-1) # 减量
p.execute()
print('当前库存量: %s' % count)
break
finally:
release_lock(rs, 'lock', lock)
print('[time]: %.2f' % (time.time() - start))
rs = redis.Redis(host='127.0.0.1', port=6379) # 连接redis
rs.set('apple',1000) # # 首先在redis中设置某商品apple 对应数量value值为1000
sale(rs)
优化:给分布式锁加超时时间防止死锁
def acquire_expire_lock(rs, lock_name, expire_time=10, locked_time=10):
'''
rs: 连接对象
lock_name: 锁标识
acquire_time: 过期超时时间
locked_time: 锁的有效时间
return -> False 获锁失败 or True 获锁成功
'''
identifier = str(uuid.uuid4())
end = time.time() + expire_time
while time.time() < end:
# 当获取锁的行为超过有效时间,则退出循环,本次取锁失败,返回False
if rs.setnx(lock_name, identifier): # 尝试取得锁
# print('锁已设置: %s' % identifier)
rs.expire(lock_name, locked_time)
return identifier
time.sleep(.001)
return False
简单案例
def test_lock(request):
# apache压力测试
# ab -c 200 -n 1000 http://localhost:8000/pay/testlock/
res = User.objects.get(pk=1)
try:
# 加锁
lock = r6.setnx(str(res.username), "lock")
# 给锁设置过期时间 防止宕机
r6.expire(str(res.username), 5)
# 判断是否有锁 有则为False 不执行
if lock:
if res.balance >= 1:
with connection.cursor() as c:
# 原生SQL语句 orm会有全局解释器锁GIL
c.execute('update 用户 set balance =balance -1 where id=1')
return HttpResponse("ok")
else:
return HttpResponse("么得钱")
finally:
# 释放锁
r6.delete(str(res.username))
return HttpResponse("ok")
使用工具apache压力测试
一次200 一共1000 后边是路由地址
ab -c 200 -n 1000 http://localhost:8000/pay/testlock/
觉得文章写的不错,可以请喝杯咖啡
- Post link: https://yanxiang.wang/redis%E5%A4%84%E7%90%86%E9%AB%98%E5%B9%B6%E5%8F%91/
- Copyright Notice: All articles in this blog are licensed under unless otherwise stated.