用django-redis很容易实现redis分布式锁,但本文介绍用另外一种方式redis模块来实现redis锁—过期后自动解锁。
from celery_demo.celery import app
import time
from celery import Celery, platforms
# from django.core.cache import cache
import redisplatforms.C_FORCE_ROOT = True #如果报错:root权限不能运行celery 则加上这一行def acquire_lock(conn,lockname,identifier,expire=10):if conn.setnx(lockname,identifier):conn.expire(lockname,expire)return Trueelse:return False@app.task
def my_task():# flag = 'my_task'+ str(my_task.request.id)flag_key = 'test_key'value = "value"# redis_lock = cache.set(flag,'task',60,nx=True)conn = redis.StrictRedis(host='127.0.0.1',port=6379,db=7)redis_lock = acquire_lock(conn,flag_key,value,60)print("检查开始")if not redis_lock:print("task has been locked")returnprint("任务开始执行....")for i in range(20):print(i)time.sleep(1)print("任务执行结束....")return True
然后再view中调用3次:
my_task.delay()
启动celery:
然后看看redis中:
发现test_key锁定60秒后自动解锁 了