从0到1编码实现Redis分布式锁原创
问: 不是有redission等现成工具吗?咋不用?
答: 不,我就想自己写一个!
陈建斌说 : 你这个男的怎么回事 ?!
有的同学,就是这么尿性。也能理解,不自己弄一下,怎么能理解透彻,那就一起来搞一下呗!
使用场景和选型
分布式多节点的部署方式,使得共享变量有可能被同时操作,遇到有数据一致性要求的情况,就需要采取全局锁定的措施来保障并发操作下的一致性要求,如,库存扣减操作、同一个商品的上下架和更新操作等等。
常见的,分布式锁采用Zookeeper
和Redis
来实现。怎么取舍呢?
zookeeper | redis | |
加锁原理 | 创建节点,节点已存在时创建失败 | 插入数据,数据已存在则设置失败 |
过期保护 | 节点类型为临时节点,断连删除 | 设置过期时间,到期删除 |
优点 | 在加锁失败时,zk的注册通知更优雅 | 速度快,性能高 |
缺点 | 只有leader负责写,然后通知flower,性能较差 | 抢锁失败时,需要自旋循环尝试 |
生产环境下,性能往往被优先考虑,相比较各自的优缺点,综合考虑,我们一般更倾向于redis。
从0到1 实现分布式锁
step1: 加锁 和 解锁的基础能力构建
Jedis.set(key, value, params)
👏🏻
这个2.6之后新增的加强版set命令是真不错,解决了加锁时设置锁超时时间的原子诉求,防止服务宕机导致的死锁~
(1) 一个具有加锁解锁功能的分布式锁对象,最少要有 jedis客户端
、 对应的redis key
、 锁超时时间
:
//构建分布式锁对象
public class DistributedLock {
private Jedis jedis;
private String lockName;
private long lockExpireSecond;
public DistributedLock(Jedis jedis, String lockName, long lockExpireSecond) {
this.jedis = jedis;
this.lockName = lockName;
this.lockExpireSecond = lockExpireSecond;
}
}
(2) 利用jedis
提供的SetParams
,对NX
, PX
在jedis.set
操作中一次性的原子的完成设置:
public void lock() throws BizException {
String lockResult = null;
try {
//设置 NX PX 参数
SetParams params = new SetParams();
params.nx();
params.px(TimeUnit.SECONDS.toMillis(lockExpireSecond));
//执行加锁 , value 暂定 为固定字符串
lockResult = this.jedis.set(this.lockName, "lockValue", params);
} catch (Exception e) {
LOG.error("lock error",e);
}
if ("OK".equals(lockResult)) {
LOG.debug("locked success,lockName:{}",lockName);
} else {
throw new BizException("Get lock failed.");
}
}
(3) 用jedis.del
命令完成解锁:
public boolean unlock() {
boolean unlockResult=false;
try {
this.jedis.del(this.lockName);
unlockResult=true;
}catch (Exception e){
LOG.error("unLock error",e);
}
return unlockResult;
}
step2: 加锁失败直接结束? 希望多试几次
从上面的构造函数和lock()
实现,发现当前实现属于一锤子买卖,不成功便成仁。这其实不太满足我们的生产需求,很多场景下,业务执行速度是很快的,只要稍微等一等,就可以。那怎么办?
自定义重试次数和等待间隔,有限重试等待
//新增重试间隔属性
private long retryIntervalTime;
//通过构造方法初始化重试间隔
public DistributedLock(Jedis jedis, String lockName, long lockExpireSecond, long retryIntervalTime) {
...略
this.retryIntervalTime = retryIntervalTime;
}
//新增入参,加锁超时时间
public void lock(long timeout,TimeUnit unit) throws TimeoutException {
String lockResult = null;
try {
//设置 NX PX 参数
SetParams params = new SetParams();
params.nx();
params.px(TimeUnit.SECONDS.toMillis(lockExpireSecond));
//加锁开始时间
long startTime=System.nanoTime();
//循环有限等待
while (!"OK".equals(lockResult=this.jedis.set(this.lockName, "lockValue", params))&&!isTimeout(startTime,unit.toNanos(timeout))){
Thread.sleep(retryIntervalTime);
}
} catch (Exception e) {
LOG.error("lock error",e);
}
//修改抛出异常类型为超时异常
if ("OK".equals(lockResult)) {
LOG.debug("locked success,lockName:{}",lockName);
} else {
throw new TimeoutException("Get lock failed because of timeout.");
}
}
step3: 只能解自己加的锁,别人的锁不能乱动
考虑一个问题:我们为了防止加锁后机器宕机的情况,给锁设置了过期时间,以此来保障锁可以在服务节点宕机不能解锁时,也可以给后续业务提供锁操作。
上图中,因为业务执行时间的不可控(或者遇到GC等不可预期的停顿),给分布式锁带来了使用问题。
我们先看问题一:用户线程1
把 线程2
的锁释放了!怎么办呢?
加锁保存线程标识,解锁校验,非自己的锁不释放
//其他属性略,新增lockOwner标识
private String lockOwner;
//通过构造函数初始化lockOwner标识
public DistributedLock(Jedis jedis, String lockName, String lockOwner, long lockExpireSecond, long retryIntervalTime) {
...略
this.lockOwner = lockOwner;
}
public void lock(long timeout,TimeUnit unit) throws TimeoutException {
String lockResult = null;
try {
//设置 NX PX 参数
SetParams params = new SetParams();
params.nx();
params.px(TimeUnit.SECONDS.toMillis(lockExpireSecond));
//加锁开始时间
long startTime=System.nanoTime();
// set时的value 改为 lockOwner
while (!"OK".equals(lockResult=this.jedis.set(this.lockName, this.lockOwner, params))&&!isTimeout(startTime,unit.toNanos(timeout))){
Thread.sleep(retryIntervalTime);
}
} catch (Exception e) {
LOG.error("lock error",e);
}
...略
}
public boolean unlock() {
boolean unlockResult=false;
try {
// 先getValue ,并和当前lockOwner匹配,匹配上才去解锁
if (this.lockOwner.equals(this.jedis.get(this.lockName))) {
this.jedis.del(this.lockName);
unlockResult = true;
}
}catch (Exception e){
LOG.error("unLock error",e);
}
return unlockResult;
}
有的同学说,这个解锁的地方,需要用lua包成原子操作。单从功能上来讲,上面的实现也是OK的,因为只有get到的结果和本身匹配,才会进行下述操作。包成lua脚本的目的,应该主要是为了减少一次传输,提高执行效率。
step4: expire时间不够产生并发冲突
也就是之前的图中的问题二: 线程1 还在执行中,锁就过期释放了,导致线程2也加锁成功,这直接导致了线程间的业务冲突。 怎么办呢?
锁持有期内,根据需要,动态延长锁的过期时间
触发锁延期的方案选型,也是个大事,jdk原生timer
、调度线程池
、netty
的Timer
都可以实现,选哪个好?
综合对比精度、资源消耗等方面,Netty
中采用时间轮算法的Timer
应该是首选,都能管理成千上万的连接、调度心跳检测,拿来搞个锁延期还不是手拿把掐?
•首先,需要构建一个全局的Timer来存储和调度任务•其次,在加锁成功之后添加定时触发任务•再次,延期操作时,需要校验当前线程是否还持有锁•最后,在解锁时,取消定时任务•注意点,任务需要循环注册 , 考虑线程被中断的情况
构建分布式锁上下文,用于存储全局时间轮调度器:
public class LockContext {
private HashedWheelTimer timer;
private LockContext(){
//时间轮参数可以从业务自己的配置获取
// long tickDuration=(Long) config.get("tickDuration");
// int tickPerWheel=(int) config.get("tickPerWheel"); //默认1024
// boolean leakDetection=(Boolean)config.get("leakDetection");
timer = new HashedWheelTimer(new DefaultThreadFactory("distributedLock-timer",true), 10, TimeUnit.MILLISECONDS, 1024, false);
}
通过构造函数,将上下文及调度器传入分布式锁对象:
public class DistributedLock {
//上下文
private LockContext context;
//当前持有的 Timer 调度对象
private volatile Timeout lockTimeout;
public DistributedLock(Jedis jedis, String lockName, String lockOwner, long lockExpireSecond, long retryIntervalTime, LockContext context) {
...其他属性略
this.context = context;
}
加锁成功之后,执行调度器注册操作:
public void lock(long timeout, TimeUnit unit) throws TimeoutException {
//...加锁 略
if ("OK".equals(lockResult)) {
LOGGER.info("locked success,lockName:{}",lockName);
try {
//注册循环延期事件
registerLoopReExpire();
}finally {
if (Thread.currentThread().isInterrupted()&&this.lockTimeout!=null){
LOGGER.warn("线程中断,定时任务取消");
this.lockTimeout.cancel();
}
}
} else {
throw new TimeoutException("Get lock failed because of timeout.");
}
}
方法registerLoopReExpire()
中是实际的任务注册和延期操作:
private void registerLoopReExpire() {
LOGGER.info("分布式锁延期任务注册");
//每次注册,都把timeout赋给当前锁对象,用于后续解锁时取消
this.lockTimeout = context.getTimer().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
//校验是否还在持有锁,并延长过期时间
boolean isReExpired=reExpireLock(lockName,lockOwner);
if (isReExpired) {
//自己调自己,循环注册
registerLoopReExpire();
}else {
lockTimeout.cancel();
}
}
}, TimeUnit.SECONDS.toMillis( lockExpireSecond)/2, TimeUnit.MILLISECONDS);
LOGGER.info("分布式锁延期任务注册完成");
}
这里有几个点需要重点关注:
•newTimeout()
操作会返回一个Timeout
实体,我们需要依赖该实体来对当前任务进行管理,所以需要赋值给锁内部对象。•锁延期,需要根据lockOwner
和 lockName
来判断,持有锁才加锁,需要使用lua
方式来保证判断和执行的原子性。•执行完延期操作之后,需要根据结果进行后续处理,成功则继续注册,失败则取消当前任务。•定时任务的执行时间,应该要小于锁的过期时间,取过期时间的1/2
或1/3
或自定义传入。
来验证一下,我们设置 锁的过期时间为3秒,业务执行时间为10秒 ,执行:
可以看到,定时任务一共延期了6次,最后一次注册成功了,但是业务执行完随着解锁任务取消了。
总结和回顾
本文,我们从0到1的对分布式锁进行了编码实现。从基本能力,最后到生产环境的各种诉求基本都进行了填充和完善。
值得一提的是,除了延期功能,上述大部分能力都是经历过生产环境考验的。大家如果发现了延期功能的实现有什么问题,欢迎留言纠正,一起讨论进步。
当然,上述内容还有缺失,比如jedis
操作lua
脚本的延期实现,可重入锁的改造,由于篇幅原因就不都贴出来了,有兴趣的同学也可以按上述思路继续完善。
另外,我们的上述实现都是基于主从架构,因此,分布式锁有可能会在主从切换或者其他宕机场景出现异常,但是个人认为,用牺牲效率来保障稳定的redLock,在大部分场景下其实没有什么必要。关于这部分,其实有几个号主讲述的非常到位,可以搜来看看。
最后,当我们对照redisson的分布式锁实现,来回看我们自己的实现,其实就会发现,在主逻辑上的实现,其实是大同小异的,只是redission的在可重入、效率兼顾(netty架构的运用)等方面要更加完备。
纸上得来终觉浅~ 共勉~
推荐阅读
•高并发整体可用性:一文详解降级、限流和熔断•高并发整体可用性:大规模集群下的分片管理•高并发整体可用性:历经磨难的注册中心选型•高并发存储优化篇:诸多策略,缓存为王
欢迎关注我的公众号“Coder的技术之路”,原创技术文章第一时间推送。