watchdog没有生效引发的bug?原创
事故现场
业务同学反馈有数据是负的。这个数量就像库存一样,最小值只能是0 。看到这个情况,一下子有点不相信了。这个地方之前是有bug的:
第一次是有线程安全问题
第二次是事务与分布式锁重叠的问题
会出现事务没提交,但分布式锁已经释放,前端连续发起多次取消时,会引发幂等问题。
当@Transactional遇到@CacheEvict,会不会先清缓存呢?
但这些已经fix了。
为什么还会出现?难道是历史数据的缘故?
查了下,发现数据是当天生成、修改的。
不是历史数据
分析
出现负数,必然是被重复扣减的原因。问题是,为什么会重复扣减?
目前的逻辑是这样的:
按照设计,是不可能出现上面的问题的。看看实际的执行情况
搜了下日志,果然是重复扣减了。
此处也有蹊跷:如果要"取消",先在弹出确认框中,选中"确认",然后前端才会发"取消"的请求。
那怎么会连续发送两次请求呢?
继续查。
根据每次请求的唯一标识traceId+日志,来核对下时间。
果然,两次扣减操作,在时间上有交叉
/**
* 第一次执行取消 1081855这条数据 的时间线。只保留了关键环节的日志
*/
2022-08-19 11:11:24[8417011098fe22][aio10]DEBUG com.lock.DistributionLockAspect-lock Key is:lock:audit:pending:1081855
2022-08-19 11:11:24[8417011098fe22][aio10]DEBUG com.lock.DistributionLockAspect-get lock on key:lock:audit:pending:1081855
2022-08-19 11:11:24[8417011098fe22][aio10]INFO c.f.r.s.i.AuditServiceImpl-取消审批开始 id 1081855
此时,线程aio10应该持有锁:lock:audit:pending:1081855
2022-08-19 11:11:36[8417011098fe22][aio10]INFO c.f.r.s.i.AuditServiceImpl-取消审批完成 id 1081855
2022-08-19 11:11:37[8417011098fe22][aio10]INFO c.f.r.b.c.AuditStatusServiceImpl-AuditCancel finish AuditId 1081855 modifiedBy userName1
/**
* 第二次执行取消 1081855这条数据 的时间线。只保留了关键环节的日志
*/
2022-08-19 11:11:35[9567611138fe22][aio7]DEBUG com.lock.DistributionLockAspect-lock Key is:lock:audit:pending:1081855
但,线程aio7,也拿到锁:lock:audit:pending:1081855。
此处与期望不一致,不应该持有锁的
2022-08-19 11:11:35[9567611138fe22][aio7]DEBUG com.lock.DistributionLockAspect-get lock on key:lock:audit:pending:1081855
2022-08-19 11:11:35[9567611138fe22][aio7]INFO c.f.r.s.i.AuditServiceImpl-取消审批开始 id 1081855
2022-08-19 11:11:49[9567611138fe22][aio7]INFO c.f.r.s.i.AuditServiceImpl-取消审批完成 id 1081855
2022-08-19 11:11:49[9567611138fe22][aio7]INFO c.f.r.b.c.AuditStatusServiceImpl-AuditCancel finish AuditId 1081855modifiedBy userName1
可以看到, 第一次取消操作的线程 aio10 执行了13s,提交扣减数据的数据库事务的时间点,大约在 2022-08-19 11:11:37
第二次取消操作的线程 aio7 执行了14s,获取分布式锁的时间点是 2022-08-19 11:11:35
这个地方就不对了:
此处与期望不一致,锁的粒度是记录ID, aio7不应该持有锁的。要等aio10执行完成【提交事务】并释放锁后,才可以
用来互斥的分布式锁呢?
从日志上看,在redis上用来互斥的信号量标识【key】也是相同的:lock:audit:pending:1081855
锁的粒度是没有问题。
那么,问题出在呢?
正在没有头绪之际,突然想起来,之前在本地测试时,同一次操作中,有3条日志,现在怎么才两条:
- 构建好互斥信号量时【redis key】,打印日志 lock Key is: ${key}
- 加锁成功时【在redis上成功创建key】,打印日志 get lock on key: ${key}
- 释放锁时【把redis上的key删除], 打印日志 release lock on key: ${key}
关键环节的日志打印好重要:
优化这段逻辑时,意识到这个基于Redisson的分布式的信号量【key】很重要,
就把debug级别的日志也放出来了,现在排查问题时,这个日志要立功了。
释放锁时日志去哪了?这个地方有异常了!
排查
检查了下,分布锁的pom版本没有变。相关的行为逻辑应该与之前的相同,还是三条日志打印。
反编译了相关的类,看了下,在释放锁时的打印日志逻辑还在
@Aspect
@Component
public class DistributionLockAspect {
@Around("@annotation(com.lock.DistributionLock)")
public void doLock(ProceedingJoinPoint joinPoint) throws Throwable {
try {
return joinPoint.proceed();
} finally {
if (lock.isHeldByCurrentThread()) {
lock.unlock();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("release lock on path:" + key);
}
}
}
}
}
难道是锁的时间太短,过期了?
Redisson不是有Watchdog进行自动续期的嘛!
核对了下取消操作上分布式锁的过期时间。
目前设置了10s,相较于这次耗时13s,14s来说,的确短了 。此处需要再改下
但Redisson的自动续期,为啥没生效?看了下
@Aspect
@Component
public class DistributionLockAspect {
@Autowired
private RedissonClient redisson;
@Around("@annotation(com.lock.DistributionLock)")
public void doLock(ProceedingJoinPoint joinPoint) throws Throwable {
String key = buildLock();
RLock lock = redisson.getLock(key);
/**
* 难道这个
* org.redisson.RedissonLock#tryLock(long, long, java.util.concurrent.TimeUnit)
* 不会自动续期?
*/
if (lock.tryLock(distributedLock.waitTime(), distributedLock.leaseTime(), TimeUnit.MILLISECONDS)) {
try {
return joinPoint.proceed();
} finally {
if (lock.isHeldByCurrentThread()) {
lock.unlock();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("release lock on key:" + key);
}
}
}
}
}
}
继续往下看源码。是的,这个分布式锁的api不会自动续期。
因为传了参数leaseTime【租约期限】。
Redisson原来是这样实现的:
- 如果指定了leaseTime【租约期限】,那么就不会启动Watchdog进行自动续期;
- 如果没有指定leaseTime【租约期限】,则会启动一个Watchdog每隔一段时间就对redis中的key进行续期。默认的时间间隔是10s
坑啊
那和自己写的基于redis的分布式锁有什么区别。还不如自己写个呢,至少知根知底。用个锤子Redisson
在fix之前,再一块学习下Redisson的加锁代码:
public class RedissonLock extends RedissonExpirable implements RLock {
/**
* Watchdog 每次续期时,再add的延迟的过期时间是30s
*/
public static final long LOCK_EXPIRATION_INTERVAL_SECONDS = 30;
private static final ConcurrentMap<String, Timeout> expirationRenewalMap = PlatformDependent.newConcurrentHashMap();
protected long internalLockLeaseTime = TimeUnit.SECONDS.toMillis(LOCK_EXPIRATION_INTERVAL_SECONDS);
/**
* 执行加锁操作
* @param waitTime 等待获取锁的时间
* @param leaseTime 租约时间,即持有锁的时间
* @param unit 声明等待时间的单位
* @return true:成功拿到了锁 false: 失败
* @throws InterruptedException
*/
@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) {
long time = unit.toMillis(waitTime);
long current = System.currentTimeMillis();
/**
* 只所以实现了可重入,因为加了当前线程id的缘故
*/
final long threadId = Thread.currentThread().getId();
/**
* 关键地方:关于租约时间的逻辑
*/
Long ttl = tryAcquire(leaseTime, unit);
/**
* 把处理租约时间的耗时也从waitTime中去掉。
* 即处理租约时间的逻辑,也算在等待时间之内
*/
time -= (System.currentTimeMillis() - current);
/**
* 如果 waitTime 设置为<=0,则加锁失败
*/
if (time <= 0) {
return false;
}
/**
* 下面的代码不涉及,就不贴了
*/
}
private Long tryAcquire(long leaseTime, TimeUnit unit) {
return get(tryAcquireAsync(leaseTime, unit,Thread.currentThread().getId()));//虽然只有一行代码,但整合了两个行为
}
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
/**
* 如果租约时间不魔法数字 -1 ,则会使用调用方指定的时间leaseTime
*/
if (leaseTime != -1) {
return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
/**
* 使用默认的租约时间LOCK_EXPIRATION_INTERVAL_SECONDS,
* 即30s
*/
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(LOCK_EXPIRATION_INTERVAL_SECONDS,
TimeUnit.SECONDS, threadId, RedisCommands.EVAL_LONG);
ttlRemainingFuture.addListener(new FutureListener<Long>() {
@Override
public void operationComplete(Future<Long> future) {
if (!future.isSuccess()) {
return;
}
Long ttlRemaining = future.getNow();
// lock acquired
if (ttlRemaining == null) {
/**
* 使用WatchDog每隔10s进行约期
*/
scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
}
/**
* 对租约进行续期,频次是租约时间的1/3 ms
* internalLockLeaseTime / 3, TimeUnit.MILLISECONDS
* @param threadId
*/
private void scheduleExpirationRenewal(final long threadId) {
if (expirationRenewalMap.containsKey(getEntryName())) {
return;
}
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
/**
* 通过lua脚本来实现 。
* 分布式系统中的NPC问题
* N:Network Delay,网络延迟
* P:Process Pause,进程暂停(GC)
* C:Clock Drift,时钟漂移
* 要减轻这个问题时,需要在超时时间1/3时,就去续期。
*
*/
RFuture<Boolean> future = commandExecutor.evalWriteAsync(
getName(),
LongCodec.INSTANCE,
RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.<Object>singletonList(getName()),
internalLockLeaseTime,
getLockName(threadId));
future.addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) {
expirationRenewalMap.remove(getEntryName());
if (!future.isSuccess()) {
log.error("Can't update lock " + getName() + " expiration", future.cause());
return;
}
if (future.getNow()) {
// reschedule itself
scheduleExpirationRenewal(threadId);
}
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
if (expirationRenewalMap.putIfAbsent(getEntryName(), task) != null) {
task.cancel();
}
}
}
再看下, DistributionLock上的描述:
当前线程在获取到锁以后,在租约时间到期以后,会自动释放锁。
@Target({ ElementType.METHOD })
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface DistributionLock{
/**
*
* 获取锁的等待时间,单位是毫秒,默认为0,如果获取不到锁就立即抛出异常。
* 注意,这里是获取锁的等待时间,也就是说,如果两个并发线程试图获取同一个锁,
* 等待时间是1000毫秒的话,如果获取到锁的线程在1000毫秒内处理完毕并释放锁的话,另一个线程是可以拿到锁的,
* 所以,服务的幂等性还需要服务自身来保证。
* 由于加了@DistributedLock的方法在没拿到锁时至少会等待waitTime参数指定的时间,
* 所以,waitTime应该尽可能小,
* 以避免长时间的等待。由于拿不到锁的线程会抛出DistributedLockFailException,
* 所以,
* 如果不希望因为全局锁导致方法失败,waitTime应该有一定值,以便在持有锁的线程释放锁时当前线程可以拿到锁并继续处理。
* 反之,如果你希望拿不到锁时立即抛出异常,只要将waitTime设为0即可。
*
* @return
*/
*
* 获取锁的等待时间,单位是毫秒,默认为0,如果获取不到锁就立即抛出异常。
* 注意,这里是获取锁的等待时间,也就是说,如果两个并发线程试图获取同一个锁,
* 等待时间是1000毫秒的话,如果获取到锁的线程在1000毫秒内处理完毕并释放锁的话,另一个线程是可以拿到锁的,
* 所以,服务的幂等性还需要服务自身来保证。
* 由于加了@DistributedLock的方法在没拿到锁时至少会等待waitTime参数指定的时间,
* 所以,waitTime应该尽可能小,
* 以避免长时间的等待。由于拿不到锁的线程会抛出DistributedLockFailException,
* 所以,
* 如果不希望因为全局锁导致方法失败,waitTime应该有一定值,以便在持有锁的线程释放锁时当前线程可以拿到锁并继续处理。
* 反之,如果你希望拿不到锁时立即抛出异常,只要将waitTime设为0即可。
*
* @return
*/
long waitTime() default 0;
/**
* 锁的租约时间,单位是秒,默认3秒。
* 当前线程在获取到锁以后,在租约时间到期以后,会自动释放锁。
* 如果在租约时间到期之前,方法执行完毕了,也会释放锁。
*
* @return
*/
long leaseTime() default 3;
}
问题已经清楚了:
目前的分布式锁必须要指定租约时间【持有锁的时间】,如果超过这个时间,锁就会自动释放,不管业务是否执行完成。
Redisson的特色,WatchDog自动续期并没有使用。
拓展一下:
结合这次的问题,要是这个分布式锁可以再增加以下特性:
增加WatchDog自动续期支持。并不是每个同学对都可以精确估算出执行时间,毕竟耗时与数量是成正向相关的。
解决办法
增加leaseTime的时长。本场景,加到10分钟
小结
估算操作耗时,要悲观一点,尽可能按最差的情况去估。特别是分布锁场景,要将梳理到的最大耗时再*10。
使用第三方API时,要多花点时间确认一些猜想。譬如Redisson有WatchDog,就觉得就有这个自动续期,可能当时也看到leaseTime的描述,但有些侥幸心理,没有去翻下源码。
分布式锁小结
拆成两个关键词来解读:
分布式:针对多进程场景。在jvm域,使用synchronized 或 ReentrantLock就可以了
锁 :互斥性。譬如一把钥匙在开锁时,另一把钥匙就要等,直到第一把钥匙离开
分布式锁主要有两个作用:
- 保证数据的正确性:比如:秒杀的时候防止商品超卖,表单重复提交,接口幂等性。
- 避免数据重复处理:比如:调度任务在多台机器重复执行,缓存过期所有请求都去加载数据库。
分布式锁要具有以下这些特性:
- 互斥:同一时刻只能有一个线程获得锁。【基本特性】
- 可重入:当一个线程获取锁后,还可以再次获取这个锁,避免死锁发生。【基本特性】
- 自动续期:获取锁的同时,启动一个异步任务,每当业务执行到三分之一时间,也就是6秒中的第2秒的时候,就自动延长锁过期时间,继续延长到6秒,这样就能保证业务逻辑处理完成之前锁不会过期。
- 高可用:当小部分节点挂掉后,仍然能够对外提供服务。【基本特性】
- 高性能:要做到高并发、低延迟。【基本特性】
- 支持阻塞和非阻塞:Synchronized是阻塞的,ReentrantLock.tryLock()就是非阻塞的。【基本特性】
- 支持公平锁和非公平锁:Synchronized是非公平锁,ReentrantLock(boolean fair)可以创建公平锁。
Redisson小结
Redisson是什么
Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid),属于一个基于Redis的偏应用层面的组件。
它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务。
其中包括(BitSet, Set, Multimap, SortedSet, Map, List, Queue, BlockingQueue, Deque, BlockingDeque, Semaphore, Lock, AtomicLong, CountDownLatch, Publish / Subscribe, Bloom filter, Remote service, Spring cache, Executor service, Live Object service, Scheduler service) Redisson提供了使用Redis的最简单和最便捷的方法。
Redisson的宗旨是促进使用者对Redis的关注分离(Separation of Concern),从而让使用者能够将精力更集中地放在处理业务逻辑上。
Redis提供丰富的原子命令,简单易用,可以解决大部分业务场景的问题。相较而言,Redisson在简单业务场景,就显得有些笨重了。
这也是Redisson不瘟不火的一个原因。
- Netty 框架:Redisson采用了基于NIO的Netty框架,不仅能作为Redis底层驱动客户端,具备提供对Redis各种组态形式的连接功能,对Redis命令能以同步发送、异步形式发送、异步流形式发送或管道形式发送的功能,LUA脚本执行处理,以及处理返回结果的功能
- 基础数据结构:将原生的Redis Hash,List,Set,String,Geo,HyperLogLog等数据结构封装为Java里大家最熟悉的映射(Map),列表(List),集(Set),通用对象桶(Object Bucket),地理空间对象桶(Geospatial Bucket),基数估计算法(HyperLogLog)等结构,分布式数据结构:这基础上还提供了分布式的多值映射(Multimap),本地缓存映射(LocalCachedMap),有序集(SortedSet),计分排序集(ScoredSortedSet),字典排序集(LexSortedSet),列队(Queue),阻塞队列(Blocking Queue),有界阻塞列队(Bounded Blocking Queue),双端队列(Deque),阻塞双端列队(Blocking Deque),阻塞公平列队(Blocking Fair Queue),延迟列队(Delayed Queue),布隆过滤器(Bloom Filter),原子整长形(AtomicLong),原子双精度浮点数(AtomicDouble),BitSet等Redis原本没有的分布式数据结构
- 分布式锁:Redisson还实现了Redis文档中提到像分布式锁Lock这样的更高阶应用场景。事实上Redisson并没有不止步于此,在分布式锁的基础上还提供了联锁(MultiLock),读写锁(ReadWriteLock),公平锁(Fair Lock),红锁(RedLock),信号量(Semaphore),可过期性信号量(PermitExpirableSemaphore)和闭锁(CountDownLatch)这些实际当中对多线程高并发应用至关重要的基本部件。正是通过实现基于Redis的高阶应用方案,使Redisson成为构建分布式系统的重要工具。
- 节点:Redisson作为独立节点可以用于独立执行其他节点发布到分布式执行服务和分布式调度服务里的远程任务。
Redisson基于redis进行了封装和加强,提供了很多功能,具体详情可以查阅官方文档:https://github.com/redisson/redisson
Redisson和Jedis、Lettuce有什么区别?
Redisson和它俩的区别主要是解决的问题不同,Redisson聚焦于提供一个分布式场景的开箱即用的解决方案,譬如分布式锁。
Jedis、Lettuce聚焦于做一个易用、高效的redis客户端。
简单来说: Redisson是更高层的抽象,Jedis和Lettuce是Redis命令的封装。
Jedis是Redis官方推出的用于通过Java连接Redis客户端的一个工具包,提供了Redis的各种命令支持
Lettuce是一种可扩展的线程安全的 Redis 客户端,通讯框架基于Netty,支持高级的 Redis 特性,比如哨兵,集群,管道,自动重新连接和Redis数据模型。Spring Boot 2.x 开始 Lettuce 已取代 Jedis 成为首选 Redis 的客户端。
Redisson是架设在Redis基础上,通讯基于Netty的综合的、新型的中间件。
Jedis把Redis命令封装好,Lettuce则进一步有了更丰富的Api,也支持集群等模式。
但是两者也都点到为止,只给了你操作Redis数据库的脚手架,而Redisson则是基于Redis、Lua和Netty建立起了成熟的分布式解决方案。