Redission 实现 Redis Redlock 分布式锁

# Redis  /  分布式

1. 前言 -- 普通分布式锁实现

说到 Redis 分布式锁大部分人都会想到:setnx+lua,或者知道set key value NX PX milliseconds

后一种方式的核心实现命令如下:

- 获取锁(unique_value可以是UUID等)
SET resource_name unique_value NX PX 30000

- 释放锁(lua脚本中,一定要比较value,防止误解锁)
if redis.call("get",KEYS[1]) == ARGV[1] then
    return redis.call("del",KEYS[1])
else
    return 0
end

Java 实现代码可参考:gitee/SingleNodeRedisDistributedLock.java

这种实现方式有3大要点(也是面试概率非常高的地方)

  • set 命令要用set key value NX PX milliseconds
  • value 要具有唯一性;
  • 释放锁时要验证 value 值,不能误解锁;

事实上这类锁最大的缺点就是它加锁时只作用在一个Redis节点上(只适用于单机部署),即使 Redis 通过 sentinel 保证高可用,如果这个 master 节点由于某些原因发生了主从切换,那么就会出现锁丢失的情况:

在Redis的master节点上拿到了锁;但是这个加锁的key还没有同步到slave节点;master故障,发生故障转移,slave节点升级为master节点;导致锁丢失。

于是,Redis 作者 antirez 基于分布式环境下提出了一种更高级的分布式锁的实现方式:Redlock

2. Redlock 实现

redlock 算法大概是这样的:

在Redis的分布式环境中,我们假设有 N 个 Redis master。这些节点完全互相独立,不存在主从复制或者其他集群协调机制。我们确保将在 N 个实例上使用与在 Redis 单实例下相同方法获取和释放锁。现在我们假设有 5 个 Redis master 节点,同时我们需要在 5 台服务器上面运行这些 Redis 实例,这样保证他们不会同时都宕掉。

为了取到锁,客户端应该执行以下操作:

  • 1.获取当前 Unix 时间,以毫秒为单位。
  • 2.依次尝试从 5 个实例,使用相同的 key 和具有唯一性的 value(例如UUID)获取锁。当向 Redis 请求获取锁时,客户端应该设置一个网络连接和响应超时时间,这个超时时间应该小于锁的失效时间。

例如你的锁自动失效时间为 10 秒,则超时时间应该在 5-50 毫秒之间。这样可以避免服务器端 Redis 已经挂掉的情况下,客户端还在死死地等待响应结果。如果服务器端没有在规定时间内响应,客户端应该尽快尝试去另外一个 Redis 实例请求获取锁。

  • 3.客户端使用当前时间减去开始获取锁时间(步骤1记录的时间)就得到获取锁使用的时间。当且仅当从大多数(N/2+1,这里是3个节点)的 Redis 节点都取到锁,并且使用的时间小于锁失效时间时,锁才算获取成功。
  • 4.如果取到了锁,key 的真正有效时间等于有效时间减去获取锁所使用的时间(步骤3计算的结果)。
  • 5.如果因为某些原因,获取锁失败(没有在至少N/2+1个Redis实例取到锁或者取锁时间已经超过了有效时间),客户端应该在所有的 Redis 实例上进行解锁(即便某些 Redis 实例根本就没有加锁成功,防止某些节点获取到锁但是客户端没有得到响应而导致接下来的一段时间不能被重新获取锁)。

只有充分了解普通分布式锁是如何实现的,才能更好的了解Redlock分布式锁的实现,因为Redlock分布式锁的实现完全基于普通分布式锁

3. Redlock 分析

Redisson 已经对 Redlock 算法做了封装。

3.1 依赖

Maven

<dependency>
   <groupId>org.redisson</groupId>
   <artifactId>redisson</artifactId>
   <version>3.11.0</version>
</dependency>  

Gradle

compile 'org.redisson:redisson:3.11.0'  

3.2 Java 代码示例

redission 封装的 redlock 算法实现的分布式锁用法,非常简单,跟重入锁(ReentrantLock)有点类似:

Config config = new Config();
config.useSentinelServers().addSentinelAddress("127.0.0.1:6369", "127.0.0.1:6379", "127.0.0.1:6389")
		.setMasterName("masterName")
		.setPassword("password").setDatabase(0);

// Sync and Async API
RedissonClient redissonClient = Redisson.create(config);

// 还可以使用 getFairLock(), getReadWriteLock()
RLock redLock = redissonClient.getLock("REDLOCK_KEY");
boolean isLock;
try {
	isLock = redLock.tryLock();
	// 500ms拿不到锁, 就认为获取锁失败。10000ms即10s是锁失效时间。
	isLock = redLock.tryLock(500, 10000, TimeUnit.MILLISECONDS);
	if (isLock) {
		//TODO if get lock success, do something;
	}
} catch (Exception e) {

} finally {
	// 无论如何, 最后都要解锁
	redLock.unlock();
}

3.3 唯一 ID

实现分布式锁的一个非常重要的点就是 set 的 value 要具有唯一性,redisson 的 value 是通过UUID+threadId保证唯一性的。
入口在redissonClient.getLock("REDLOCK_KEY"),源码在ConfigSupport.javaRedissonLock.java中:

// ConfigSupport.java
public static ConnectionManager createConnectionManager(Config configCopy) {
	UUID id = UUID.randomUUID();
	// ...
}

// RedissonLock.java
protected String getLockName(long threadId) {
	return id + ":" + threadId;
}

3.4 获取锁

获取锁的代码为RedissionLock.tryLock()或者redLock.tryLock(waitTime, leaseTime, TimeUnit),两者的最终核心源码都是下面这段代码,只不过前者获取锁的默认租约时间(leaseTime)是 -1,对应的是Config.lockWatchdogTimeout = 30 * 1000,即 30 秒。

    <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        internalLockLeaseTime = unit.toMillis(leaseTime);
		// 获取锁时向 多个 redis 实例发送的命令
        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
				// 首先分布式锁的KEY不能存在,如果确实不存在,那么执行hset命令(hset REDLOCK_KEY uuid+threadId 1),并通过pexpire设置失效时间(也是锁的租约时间)
                  "if (redis.call('exists', KEYS[1]) == 0) then " +
                      "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
				  // 如果分布式锁的KEY已经存在,并且value也匹配,表示是当前线程持有的锁,那么重入次数加1,并且设置失效时间
                  "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                      "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
				  // 获取分布式锁的KEY的失效时间毫秒数
                  "return redis.call('pttl', KEYS[1]);",
					// 这三个参数分别对应KEYS[1],ARGV[1]和ARGV[2]
                    Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
    }

获取锁的命令中,

  • KEYS[1]就是Collections.singletonList(getName()),表示分布式锁的key,即REDLOCK_KEY;
  • ARGV[1]就是internalLockLeaseTime,即锁的租约时间,默认30s;
  • ARGV[2]就是getLockName(threadId),是获取锁时set的唯一值,即:UUID+threadId:

3.5 释放锁

释放锁的代码为RedissionLock.unlock(),核心源码如下:

    protected RFuture<Boolean> unlockInnerAsync(long threadId) {
		// 向 多个 redis 实例都执行如下命令
        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
				// 如果分布式锁 KEY 不存在,那么直接返回
                "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                    "return nil;" +
                "end; " +
				// 如果就是当前线程占有分布式锁,那么将重入次数减1
                "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
				// 重入次数减1后的值如果大于0,表示分布式锁有重入过,那么只设置失效时间,还不能删除
                "if (counter > 0) then " +
                    "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                    "return 0; " +
                "else " +
					// 重入次数减1后的值如果为0,表示分布式锁只获取过1次,那么删除这个KEY,并发布解锁消息
                    "redis.call('del', KEYS[1]); " +
                    "redis.call('publish', KEYS[2], ARGV[1]); " +
                    "return 1; "+
                "end; " +
                "return nil;",
				// 这5个参数分别对应KEYS[1],KEYS[2],ARGV[1],ARGV[2]和ARGV[3]
                Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
    }

4.Redlock分布式锁

以单机模式Redis架构为例:

Config config1 = new Config();
config1.useSingleServer().setAddress("redis://172.29.1.180:5378")
        .setPassword("a123456").setDatabase(0);
RedissonClient redissonClient1 = Redisson.create(config1);

Config config2 = new Config();
config2.useSingleServer().setAddress("redis://172.29.1.180:5379")
        .setPassword("a123456").setDatabase(0);
RedissonClient redissonClient2 = Redisson.create(config2);

Config config3 = new Config();
config3.useSingleServer().setAddress("redis://172.29.1.180:5380")
        .setPassword("a123456").setDatabase(0);
RedissonClient redissonClient3 = Redisson.create(config3);

String resourceName = "REDLOCK";
RLock lock1 = redissonClient1.getLock(resourceName);
RLock lock2 = redissonClient2.getLock(resourceName);
RLock lock3 = redissonClient3.getLock(resourceName);

RedissonRedLock redLock = new RedissonRedLock(lock1, lock2, lock3);
boolean isLock;
try {
    isLock = redLock.tryLock(500, 30000, TimeUnit.MILLISECONDS);
    System.out.println("isLock = "+isLock);
    if (isLock) {
        //TODO if get lock success, do something;
        Thread.sleep(30000);
    }
} catch (Exception e) {
} finally {
    // 无论如何, 最后都要解锁
    System.out.println("");
    redLock.unlock();
}

最核心的变化就是RedissonRedLock redLock = new RedissonRedLock(lock1, lock2, lock3);,因为此处是以三个节点为例。
那么如果是哨兵模式呢?需要搭建3个,或者5个sentinel模式集群。。 那么如果是集群模式呢?需要搭建3个,或者5个cluster模式集群。。

4.1 RedissonRedLock 实现原理

RedissonRedLock 是 RedissonMultiLock 的子类,所以调用 tryLock 方法时,事实上调用了RedissonMultiLock 的 tryLock 方法,精简源码如下:

public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
    // 实现要点之允许加锁失败节点限制
    int failedLocksLimit = failedLocksLimit();
    List<RLock> acquiredLocks = new ArrayList<RLock>(locks.size());
    // 实现要点之遍历所有节点通过EVAL命令执行lua加锁
    for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) {
        RLock lock = iterator.next();
        boolean lockAcquired;
        try {
            // 对节点尝试加锁
            lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);
        } catch (RedisConnectionClosedException|RedisResponseTimeoutException e) {
            // 如果抛出这类异常,为了防止加锁成功,但是响应失败,需要解锁
            unlockInner(Arrays.asList(lock));
            lockAcquired = false;
        } catch (Exception e) {
            // 抛出异常表示获取锁失败
            lockAcquired = false;
        }
        
        if (lockAcquired) {
            // 成功获取锁集合
            acquiredLocks.add(lock);
        } else {
            // 如果达到了允许加锁失败节点限制,那么break,即此次Redlock加锁失败
            if (locks.size() - acquiredLocks.size() == failedLocksLimit()) {
                break;
            }               
        }
    }
    return true;
}

4.2 总结

逻辑并不复杂, 但是通过记录客户端ID和线程ID来唯一标识线程, 实现重入功能, 通过 pub/sub 功能来减少空转。

  • 优点: 实现了Lock的大部分功能, 提供了特殊情况方法(如:强制解锁, 判断当前线程是否已经获取锁, 超时强制解锁等功能), 可重入, 减少重试.

  • 缺点: 使用依赖Redisson, 而Redisson依赖netty, 如果简单使用, 引入了较多的依赖, pub/sub 的实时性需要测试, 没有监控等功能, 查问题麻烦, 统计功能也没有。

5. dlock:百度分布式 Redis 锁

DLock是由Java实现的,一套高效高可靠的分布式锁方案。 使用Redis存储锁,通过 Lua 脚本进行原子性锁操作, 实现了基于 Redis 过期机制的 lease,并提供了一种基于变种 CLH 队列的进程级锁竞争模型。

6. 基于 Zookeeper 实现分布式锁

对于 ZK 来说,实现分布式锁的核心是临时顺序节点

临时:表示在客户端创建某节点后,如果客户端经过一段时间跟服务端之间失去了心跳,说明客户端已经掉线了,那么这个节点就会被自动删除(这一点跟 Redis key 的过期时间类似);

顺序:表示是在一个 node 下面生成的子节点是按顺序的,每个子节点都有一个唯一编号,并且这个编号是按顺序自增的。

临时顺序节点再加上 ZK 的监听机制就可以实现分布式锁了,Curator 是一个 ZK 的开源客户端,也提供了分布式锁的实现。

适用于顺序执行的程序,大体思路就是创建临时序列节点,找出最小的序列节点,获取分布式锁,程序执行完成之后此序列节点消失,通过 watch 来监控节点的变化,从剩下的节点的找到最小的序列节点,获取分布式锁,执行相应处理,依次类推……

PS:zk 强制关闭时候,通知会有延迟。但是使用 close() 方法关闭的时候,延迟小。

优点:

  1. ZK 本身就是集群部署,避免单机故障;
  2. 顺序节点所以不用考虑过期时间设置问题;

缺点:

  1. 实现较为复杂;
  2. 非缓存机制,大量频繁创建删除节点会影响 ZK 集群性能;

参考

评论

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×