1. redis分布式锁的原理
Redis Setnx(SET if Not eXists) 命令在指定的 key 不存在时,为 key 设置指定的值。如果设置成功,返回 1 。 设置失败,返回 0 。
基于上面这个命令,我们可以用于设计分布式锁。对于多个分布式节点,通过对同一个key执行setnx命令,如果某个节点执行setnx命令成功,那么该节点获取到分布式锁,其他节点因为该key已经存在了,所以执行setnx会失败,也就是获取锁失败,需要等待获取到锁的节点释放,才能获取到锁;
2. 上锁
利用setnx命令能够方便的判定,当前节点是否能够获取到锁;但是为了避免该节点在获取到锁之后,因为各种异常,导致该分布式锁没有释放(比如程序崩溃了),所以一般会给该key加个过期时间,这样即便获取到锁的节点崩溃了,其他节点也会在key过期之后,也能够有机会获取到锁;
所有一般会使用SET key value PX milliseconds NX 命令
127.0.0.1:6379> set test_key 11 PX 100000 NX
OK
127.0.0.1:6379> get test_key
"11"
127.0.0.1:6379> ttl test_key
(integer) 92
127.0.0.1:6379> pttl test_key
(integer) 87046
3. 解锁
解锁的话,最直接的想法就是,直接del相应的key;但是这有很大的问题,列个简单的场景
节点A获取到锁,但是因为获取到锁之后执行的逻辑很很耗时间,导致该锁的key过期了;这个时候另外一个节点B获取到锁了,然后开始处理逻辑,在B释放锁之前,A先执行完逻辑,要释放锁了,直接del key;这个时候另外一个节点C获取到锁,然后节点B和节点C就存在并发问题了
所以一般会在上锁的时候,将key的值,设置相应请求者id(requestor_id, requestor_id 可以是即时生成的uuid或者就是一个很大的随机数),然后解锁的时候,先判断下该key的值,还是不是requestor_id,如果还是的话,说明该锁还是被该节点拥有,可以执行解锁操作;否则,说明该锁已经被其他节点所获取,不能再执行解锁操作了;
伪代码如下
if redis_client.get("lock_key") == requestor_id then
redis_client.del("lock_key")
end
但是先get然后del毕竟是两次请求,没法保证其原子性;幸好redis支持通过eval命令执行lua脚本
127.0.0.1:6379> eval "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end"
1 test_key 1111
(integer) 1
在eval命令执行Lua代码的时候,Lua代码将被当成一个命令去执行,并且直到eval命令执行完成,Redis才会执行其他命令(其实就是因为redis是单线程的,命令都是串行执行的,整个lua脚本的执行被看成是单个命令执行,所以lua脚本内部的多个redis命令的执行肯定是顺序执行的)
4. 代码示例
代码示例使用的redis_client是笔者自己写的c++版本的redis_client redis_cpp
redis_lock
/**
*
* redis_lock.hpp
*
* the distribute lock implement by redis
*
* @author : yandaren1220@126.com
* @date : 2018-12-28
*/
#ifndef __ydk_rediscpp_redis_utils_redis_lock_hpp
#define __ydk_rediscpp_redis_utils_redis_lock_hpp
#include <cstdint>
#include <string>
#include <chrono>
#include <thread>
#include <redis_cpp/detail/redis_reply_util.hpp>
#include <redis_cpp/detail/sync/redis_sync_operator.hpp>
namespace redis_cpp
{
namespace utils {
// redis distribute lock
class redis_lock
{
protected:
enum {
try_lock_interval = 100, // in milliseconds
};
protected:
detail::redis_sync_operator* m_redis_op;
std::string m_lock_key;
std::string m_lock_requestor_id;
int32_t m_key_expired_time;
int32_t m_locked_time_out;
public:
/**
* @brief
* @param op : redis_operator
* @param lock_key : the redis key
* @param requestor_id : the lock requestor
* @param key_expire_time : the key ttl in milliseconds
* @param locked_time_out : try get the lock time out( in milliseconds)
*/
redis_lock(detail::redis_sync_operator* op,
const std::string& lock_key,
const std::string& requestor_id,
int32_t key_expire_time,
int32_t locked_time_out)
: m_redis_op(op)
, m_lock_key(lock_key)
, m_lock_requestor_id(requestor_id)
, m_key_expired_time(key_expire_time)
, m_locked_time_out(locked_time_out){
}
~redis_lock() {
}
bool lock() {
int32_t timeout = m_locked_time_out;
while (timeout >= 0) {
// get lock
if (m_redis_op->setnxpx(m_lock_key.c_str(), m_lock_requestor_id, m_key_expired_time)) {
return true;
}
// sleep for a while
std::this_thread::sleep_for(std::chrono::milliseconds(try_lock_interval));
timeout -= try_lock_interval;
}
return false;
}
bool unlock() {
std::string del_lock_script =
"if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
std::vector<std::string> keys;
std::vector<std::string> args;
keys.push_back(m_lock_key);
args.push_back(m_lock_requestor_id);
auto reply = m_redis_op->eval(del_lock_script.c_str(), keys, args);
return reply && reply->to_integer() == 1;
}
};
// redis distribute auto lock
class redis_auto_lock
{
protected:
bool m_locked;
redis_lock m_locker;
public:
redis_auto_lock(detail::redis_sync_operator* op,
const std::string& lock_key,
const std::string& requestor_id,
int32_t key_expire_time,
int32_t locked_time_out)
: m_locked(false)
, m_locker(op, lock_key, requestor_id, key_expire_time, locked_time_out){
m_locked = m_locker.lock();
}
~redis_auto_lock() {
if (m_locked) {
m_locker.unlock();
m_locked = false;
}
}
bool islocked() {
return m_locked;
}
};
}
}
#endif
test code
#include <iostream>
#include <strstream>
#include <vector>
#include <redis_cpp.hpp>
#include <utils/redis_lock.hpp>
#include <utility/asio_base/thread_pool.hpp>
static uint32_t get_cur_time() {
return std::chrono::duration_cast<std::chrono::duration<uint32_t, std::milli>>(
std::chrono::high_resolution_clock::now().time_since_epoch()).count();
}
void redis_lock_test() {
using namespace redis_cpp;
using namespace redis_cpp::detail;
utility::asio_base::thread_pool pool(2);
pool.start();
std::string redis_uri = "redis://foobared@127.0.0.1:6379/0";
standalone_sync_client_pool client_pool(redis_uri.c_str(), 1, 2, &pool);
base_sync_client* sync_client = &client_pool;
redis_sync_operator client(sync_client);
while (true) {
std::string lock_key;
std::string requestor_id;
int32_t expired_time;
int32_t time_out;
printf("input params:\n");
std::cin >> lock_key >> requestor_id >> expired_time >> time_out;
uint32_t start_time = get_cur_time();
printf("try start get locker: %u\n", start_time);
redis_cpp::utils::redis_auto_lock locker(&client, lock_key, requestor_id, expired_time, time_out);
uint32_t end_time = get_cur_time();
uint32_t cost = end_time - start_time;
bool locked = locker.islocked();
printf("time: %u, cost: %d, locked: %d\n", end_time, cost, locked);
}
}