03_分布式锁
2024-05-23 21:40:17 47 举报
分布式锁源码剖析curator和Redission
作者其他创作
大纲/内容
lockCount-1 0
同一个线程或客户端,再次加读锁,不互斥
获取锁成功
返回null
Redis
是
timeoutSetName
同一个线程或客户端,再次加写锁,不互斥
删除锁key
收到前一个节点删除消息后,watch回调方法中执行notifyAll(),唤醒Objectwait()的所有线程
直接return
删除刷新map标识
锁A先上了一把锁之后锁B开始上锁
这个key中的表示当前线程的LockName是否存在
添加监听器
第3把锁进来,获取上一把锁的超时时间,如果小于当前时间,就在queue和set中删除上一把锁的记录。然后将自己入queue,加set
最终减到0
S
watchDog遍历所有的锁并续有效时间
通过pttl返回当前锁key的剩余时间ttl
释放锁1、删除自己的watcher2、删除当前锁path3、删除当前thread对应的LockData
3
/my/locks/leases/_c_8d946b48-807e-46df-8c27-78d7e20edc3f-lease-0000000001
第2把锁进来,看了看有锁,入队,加set
另一个线程或客户端,再次加读锁/写锁,互斥
2.1 加锁(hset) key为foo,值为hash类型的json拼装的键lockName值为1foo{ \"0154e246-53a5-41b3-aba2-5e79277caabe:1\":\"1\"}2.2 设置key的过期时间(pexpire)默认lock过期时间为30000毫秒
Redlock
UUID02:Thread02
存在元素且不是当前UUID:ThreadId
释放锁
根据当前线程获取LockData
重新进入获取锁循环
超时,加锁失败 返回false
返回nil或者剩余时间(参考上锁流程)
2
pexpire 设置锁key的过期时间为30000毫秒
根据当前线程获取LockData threadData.get(currentThread);
返回剩余时间ttl
先创建leads临时节点(一个lead代表一个线程持有的一把锁)
Y
返回 null ,拿到锁,启动watchDog
公平锁
先获取一把前置锁(同上锁流程)InterProcessSemaphoreV2#acquire()
对前一个path增加watcher,监听前一个锁的path是否还存在(锁A的path)client.getData().usingWatcher(watcher).forPath(previousSequencePath);
死循环 while true不断尝试获取锁递减time
设置需要几个线程执行完,才能执行latch.trySetCount(2);latch.await();
synchronized上锁括号
记录当前时间current
抛异常not locked by current thread by node id
N
lpop 弹出threadsQueueName第一个元素
当前是否=1
否
leads是否 = 3
将key递减1decrby semaphore 1
进入死循环
del 这个锁key,也就是foo
重新给这个key设置30000毫秒的过期时间
再次加读锁,不互斥
持续加锁流程
写锁lua脚本创建锁的hash结构
get semaphore获取到一个当前的值
不刷新剩余时间
获取到锁返回true
N/2+1 master实例上成功上锁,就说明Redlock上锁成功了。其实Redlock是继承自RedissonMultiLock的,只是把那个允许加锁失败的数量从0改为 locks.size()/2 + 1 个,呵呵ps:在构建RedissonRedLock的时候,传入的locks中的lockName在通过hash后尽可能是分散在所有实例上
计算slot从16384中计算出这个anyLock锁应该存储在哪个node上CommandAsyncExecutor#evalWriteAsync()
是否存在刷新时间的map标识expirationRenewalMap
刷新这个锁的剩余时间为最大剩余时间
LockName:ConnectionManager(UUID )+ threadIdeg: 0154e246-53a5-41b3-aba2-5e79277caabe:1
最后主动过期key
读锁lua脚本创建锁的hash结构
2.1 加重入锁(hincrby)判断是否存在key为foo,并且包含\"0154e246-53a5-41b3-aba2-5e79277caabe:1\"的对象。如果包含,就给这个对象+1foo{ \"0154e246-53a5-41b3-aba2-5e79277caabe:1\":\"2\"}2.2 设置key的过期时间(pexpire)默认lock过期时间为30000毫秒
否拿到锁
再次加写锁,互斥
返回pttl anyLock,导致加锁失败,不断的陷入死循环不断的重试
表示有当前线程上了N把锁当前lockData.lockCount +1(目的是上重入锁)
Semaphore
LockInternalsDriver#createsTheLock()创建父节点client.create().creatingParentContainersIfNeeded()创建临时有序子节点,也就是我们实际的锁node.withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);最终锁node:/my/locks/_c_0a-e0d-lock-0000001
发布消息
判断现在是否超时?
先加写锁后加了读锁
锁key的值累加1foo UUID:ThreadId 2(重入锁的情况)
M
返回0,保持锁
拿到锁跳出死循环
计算整体剩余时间
curator 公平互斥锁 by ZK
是否可以获取元素
发布unlockMessage消息到指定的channel中publish redisson_lock__channel_foo 0
多重锁
获取第list[i]把锁
释放前置lock
上锁请求
不存在 返回 0
获取队列第一个元素threadsQueueName
RCountDownLatch锁
返回1,释放锁
返回true拿到锁
不断循环,直到拿到锁
找到
while true 死循环获取锁key是否 0
删除锁
公平锁简单上锁流程
LUAdecr anyCountDownLatch递减1
hlen剩余读锁是否大于1
Mutil多重锁
服务器宕机
添加监听器,监听leads锁的path,如果path有变动,会立刻尝试重新
private final Watcher watcher = new Watcher(){ @Override public void process(WatchedEvent event) { notifyFromWatcher(); 唤醒所有wait()的线程 }};
从timeoutSetName中获取队列第一个元素的超时时间
threadsQueueName
for 循环获取所有锁
其他需要上锁的线程需要等待这个剩余时间
其他等待在这把锁上的线程之前订阅过这个channel,监听到0之后就可以尝试去获取foo的锁了
加锁流程
ttl = 获取当前锁key的剩余时间
拿锁失败返回0无限等待
返回nil,外部直接上锁(参考上锁流程)
当前临时节点在已经排序好的list中是否是第0个StandardLockInternalsDriver#getsTheLock()
第一个锁进来,不入队也不加set但是会将锁加上也就是hset 一个锁key:foo
请求上锁,只要有2个实例上锁成功,就成功
UUID02:Thread0210:00:10
watch
客户端宕机自动释放锁
semaphore锁
创建semaphore
说明这个等待的锁请求已经过期了
只是读锁
UUID02:Thread03
删除临时节点client.delete().guaranteed().forPath(ourPath);
直接获取锁(同加锁逻辑)
构造LUA脚本RedissonLock#tryLockInnerAsync()
返回因为已经给当前线程的锁 -1 了
其实就是semaphore的变种,就是将semaphore的数量设置为1即可
time是否过期
其他线程过来加锁
Redis cluster
rpush 在threadsQueueName队尾加入元素rpush redisson_lock_queue:{anyLock} UUID:theadId
全是读锁的情况
while true循环获取锁
hset设置锁key的值foo UUID:ThreadId 1
重入锁
读写锁RReadWriteLock
如果存在“foo”
放入localData中
申请公平锁
internalLockLoop()
如果存在多把锁,就递减1 直至减到0
设置刷新map标识
是否有值
报错
释放锁foo
存在
上锁 path=\"/my/locks\"lock.acquire();InterProcessMutex#internalLock
线程A获取锁semaphore.acquire();
将这些节点排序(公平锁的体现)getSortedChildren()
timeout = ttl + 当前时间 + 5000ms
值依然 0
UUID02:Thread01
1
返回 ttl
(锁key不存在)且(队列threadsQueueName第一个元素不存在或者 队列threadsQueueName第一个元素=当前lockName )
根据slot找到目标nodegetNodeSource()
锁A
2.3 (pttl)返回当前key剩余的时间对象ttlRemainingFuture
将lockName对应的值 - 1
2、执行LUA脚本
超时时间<=当前时间
剩余读锁将mode设置为read
tryLock
1、创建过期时间time2、记录当前时间current
目的是大家依次排队获取semaphore
加锁成功
是 继续循环
拿到foo(指定名称)的lockRedissonLock
zrem 从timeoutSetName集合中删除lockName对应的数据
ttl = 当前元素的剩余时间 - 当前时间
非可重入锁
等待在这里Object.wait()
LUA设置大小 3 只允许3个线程同时获取锁set semaphore 3
调用LUA刷新时间
拿到其中一个最长的剩余时间
获取锁
创建TimerTask每10000毫秒执行一次,也就是说:如果一个lock一直被某个线程占用,那么他的过期时间会一直保持在20~30秒之间
如果获取失败或者超时return false并且释放掉已经获取的其他锁
返回 1
设置锁的path
获取锁失败后要删除掉刚才创建的临时节点finally{ deleteOurPath(ourPath); }
1、计算slot
等待那个持有读锁的线程来释放锁
zadd 在timeoutSetName增加记录zadd redisson_lock_timeout:{anyLock} timeout UUID:ThreadId
key是否存在
Redis中是否存在\"foo\"
pttl anyLock,返回一个anyLock的剩余生存时间
hincrby 读锁-1hincrby anyLock UUID_01:threadId_01 -1
2.3 返回null
设置整个锁的超时时间为:锁的个数*1500ms
添加调度器 watchDog不断循环刷新key的过期时间
其他线程或客户端
创建临时有序节点LockInternals#attemptLock()
获取threadsQueueName第一个元素
UUID02:Thread0310:00:15
锁互斥
MultiLock,依次遍历获取每个锁,阻塞直到获取每个锁为止,然后返回true如果过程中有报错,依次释放已经获取到的锁,然后返回false
锁key的值UUID:ThreadId为1
获取锁path下的所有子节点list也就是\"/my/locks/\"下的所有锁节点
可重入锁会多次-1,直到减为0
是即上重入锁
获取前置lockinternalAcquire1Lease()lock.acquire();
同一个线程或客户端
执行countDownlatch1.countDown();
TryLock
是否同一个线程
其实就是将所有普通锁放到List locks中循环获取每个锁
0 条评论
下一页