# 058. 缓存数据生产服务中的 zk 分布式锁解决方案的代码实现(一)
本节基于 zk 进行分布式锁的代码封装;
# zk 分布式锁原理简单介绍
- 创建一个 zk 临时 node,来模拟一个商品 id 加锁
- zk 会保证一个 node path 只会被创建一次,如果已经被创建,则抛出 NodeExistsException
- 这个时候可以去做业务操作
- 释放锁,则是删除这个临时 node。
当一个多个缓存服务去对同一个商品 id 加锁时,只有一个成功, 其他的则轮循等待锁被释放,获取到锁之后,对比一下商品的时间版本,较新则重建缓存,否则放弃重建
# 基于 zkClient 封装分布式锁工具
zk 分布式锁有很多种实现方式,这里演示一种最简单的,但是比较实用的分布式锁
添加依赖: compile 'org.apache.zookeeper:zookeeper:3.4.5'
zk client 初始化代码
/**
* ${todo}
*
* @author : zhuqiang
* @date : 2019/5/7 22:37
*/
public class ZooKeeperSession {
private ZooKeeper zookeeper;
private CountDownLatch connectedSemaphore = new CountDownLatch(1);
private ZooKeeperSession() {
String connectString = "192.168.99.170:2181,192.168.99.171:2181,192.168.99.172:2181";
int sessionTimeout = 5000;
try {
// 异步连接,所以需要一个 org.apache.zookeeper.Watcher 来通知
// 由于是异步,利用 CountDownLatch 来让构造函数等待
zookeeper = new ZooKeeper(connectString, sessionTimeout, event -> {
Watcher.Event.KeeperState state = event.getState();
System.out.println("watch event:" + state);
if (state == Watcher.Event.KeeperState.SyncConnected) {
System.out.println("zookeeper 已连接");
connectedSemaphore.countDown();
}
});
} catch (IOException e) {
e.printStackTrace();
}
try {
connectedSemaphore.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("zookeeper 初始化成功");
}
private static ZooKeeperSession instance = new ZooKeeperSession();
public static ZooKeeperSession getInstance() {
return instance;
}
public static void main(String[] args) {
ZooKeeperSession instance = ZooKeeperSession.getInstance();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
运行测试之后输出信息
watch event:SyncConnected
zookeeper 已连接
zookeeper 初始化成功
1
2
3
2
3
接下来编写加锁与释放锁的逻辑
public class ZooKeeperSession {
private ZooKeeper zookeeper;
private CountDownLatch connectedSemaphore = new CountDownLatch(1);
private ZooKeeperSession() {
String connectString = "192.168.99.170:2181,192.168.99.171:2181,192.168.99.172:2181";
int sessionTimeout = 5000;
try {
// 异步连接,所以需要一个 org.apache.zookeeper.Watcher 来通知
// 由于是异步,利用 CountDownLatch 来让构造函数等待
zookeeper = new ZooKeeper(connectString, sessionTimeout, event -> {
Watcher.Event.KeeperState state = event.getState();
System.out.println("watch event:" + state);
if (state == Watcher.Event.KeeperState.SyncConnected) {
System.out.println("zookeeper 已连接");
connectedSemaphore.countDown();
}
});
} catch (IOException e) {
e.printStackTrace();
}
try {
connectedSemaphore.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("zookeeper 初始化成功");
}
/**
* 获取分布式锁
*/
public void acquireDistributedLock(Long productId) {
String path = "/product-lock-" + productId;
byte[] data = "".getBytes();
try {
// 创建一个临时节点,后面两个参数一个安全策略,一个临时节点类型
// EPHEMERAL:客户端被断开时,该节点自动被删除
zookeeper.create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
System.out.println("获取锁成功 product[id=" + productId + "]");
} catch (Exception e) {
e.printStackTrace();
// 如果锁已经被创建,那么将异常
// 循环等待锁的释放
int count = 0;
while (true) {
try {
TimeUnit.MILLISECONDS.sleep(20);
// 休眠 20 毫秒后再次尝试创建
zookeeper.create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
} catch (Exception e1) {
e1.printStackTrace();
count++;
continue;
}
System.out.println("获取锁成功 product[id=" + productId + "] 尝试了 " + count + " 次.");
break;
}
}
}
/**
* 释放分布式锁
*/
public void releaseDistributedLock(Long productId) {
String path = "/product-lock-" + productId;
try {
zookeeper.delete(path, -1);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
private static ZooKeeperSession instance = new ZooKeeperSession();
public static ZooKeeperSession getInstance() {
return instance;
}
public static void main(String[] args) throws InterruptedException {
ZooKeeperSession instance = ZooKeeperSession.getInstance();
CountDownLatch downLatch = new CountDownLatch(2);
IntStream.of(1, 2).forEach(i -> new Thread(() -> {
instance.acquireDistributedLock(1L);
System.out.println(Thread.currentThread().getName() + " 得到锁并休眠 10 秒");
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
instance.releaseDistributedLock(1L);
System.out.println(Thread.currentThread().getName() + " 释放锁");
downLatch.countDown();
}).start());
downLatch.await();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
运行 main 测试两个线程获取锁的等待过程如下
watch event:SyncConnected
zookeeper 已连接
zookeeper 初始化成功
获取锁成功 product[id=1]
Thread-1 得到锁并休眠 10 秒
循环异常中...
org.apache.zookeeper.KeeperException$NodeExistsException: KeeperErrorCode = NodeExists for /product-lock-1
at org.apache.zookeeper.KeeperException.create(KeeperException.java:119)
Thread-1 释放锁
获取锁成功 product[id=1] 尝试了 285 次.
Thread-0 得到锁并休眠 10 秒
Thread-0 释放锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
2
3
4
5
6
7
8
9
10
11
12
13
14
可以看到,日志输出,证明分布式锁已经编写成功