# 058. 缓存数据生产服务中的 zk 分布式锁解决方案的代码实现(一)

本节基于 zk 进行分布式锁的代码封装;

# zk 分布式锁原理简单介绍

  1. 创建一个 zk 临时 node,来模拟一个商品 id 加锁
  2. zk 会保证一个 node path 只会被创建一次,如果已经被创建,则抛出 NodeExistsException
  3. 这个时候可以去做业务操作
  4. 释放锁,则是删除这个临时 node。

当一个多个缓存服务去对同一个商品 id 加锁时,只有一个成功, 其他的则轮循等待锁被释放,获取到锁之后,对比一下商品的时间版本,较新则重建缓存,否则放弃重建

# 基于 zkClient 封装分布式锁工具

zk 分布式锁有很多种实现方式,这里演示一种最简单的,但是比较实用的分布式锁

添加依赖: compile 'org.apache.zookeeper:zookeeper:3.4.5'

zk client 初始化代码

/**
 * ${todo}
 *
 * @author : zhuqiang
 * @date : 2019/5/7 22:37
 */
public class ZooKeeperSession {
    private ZooKeeper zookeeper;
    private CountDownLatch connectedSemaphore = new CountDownLatch(1);

    private ZooKeeperSession() {
        String connectString = "192.168.99.170:2181,192.168.99.171:2181,192.168.99.172:2181";
        int sessionTimeout = 5000;
        try {
            // 异步连接,所以需要一个  org.apache.zookeeper.Watcher 来通知
            // 由于是异步,利用 CountDownLatch 来让构造函数等待
            zookeeper = new ZooKeeper(connectString, sessionTimeout, event -> {
                Watcher.Event.KeeperState state = event.getState();
                System.out.println("watch event:" + state);
                if (state == Watcher.Event.KeeperState.SyncConnected) {
                    System.out.println("zookeeper 已连接");
                    connectedSemaphore.countDown();
                }
            });
        } catch (IOException e) {
            e.printStackTrace();
        }
        try {
            connectedSemaphore.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("zookeeper 初始化成功");
    }

    private static ZooKeeperSession instance = new ZooKeeperSession();

    public static ZooKeeperSession getInstance() {
        return instance;
    }

    public static void main(String[] args) {
        ZooKeeperSession instance = ZooKeeperSession.getInstance();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45

运行测试之后输出信息

watch event:SyncConnected
zookeeper 已连接
zookeeper 初始化成功
1
2
3

接下来编写加锁与释放锁的逻辑

public class ZooKeeperSession {
    private ZooKeeper zookeeper;
    private CountDownLatch connectedSemaphore = new CountDownLatch(1);

    private ZooKeeperSession() {
        String connectString = "192.168.99.170:2181,192.168.99.171:2181,192.168.99.172:2181";
        int sessionTimeout = 5000;
        try {
            // 异步连接,所以需要一个  org.apache.zookeeper.Watcher 来通知
            // 由于是异步,利用 CountDownLatch 来让构造函数等待
            zookeeper = new ZooKeeper(connectString, sessionTimeout, event -> {
                Watcher.Event.KeeperState state = event.getState();
                System.out.println("watch event:" + state);
                if (state == Watcher.Event.KeeperState.SyncConnected) {
                    System.out.println("zookeeper 已连接");
                    connectedSemaphore.countDown();
                }
            });
        } catch (IOException e) {
            e.printStackTrace();
        }
        try {
            connectedSemaphore.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("zookeeper 初始化成功");
    }

    /**
     * 获取分布式锁
     */
    public void acquireDistributedLock(Long productId) {
        String path = "/product-lock-" + productId;
        byte[] data = "".getBytes();
        try {
            // 创建一个临时节点,后面两个参数一个安全策略,一个临时节点类型
            // EPHEMERAL:客户端被断开时,该节点自动被删除
            zookeeper.create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
            System.out.println("获取锁成功 product[id=" + productId + "]");
        } catch (Exception e) {
            e.printStackTrace();
            // 如果锁已经被创建,那么将异常
            // 循环等待锁的释放
            int count = 0;
            while (true) {
                try {
                    TimeUnit.MILLISECONDS.sleep(20);
                    // 休眠 20 毫秒后再次尝试创建
                    zookeeper.create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
                } catch (Exception e1) {
                    e1.printStackTrace();
                    count++;
                    continue;
                }
                System.out.println("获取锁成功 product[id=" + productId + "] 尝试了 " + count + " 次.");
                break;
            }
        }
    }

    /**
     * 释放分布式锁
     */
    public void releaseDistributedLock(Long productId) {
        String path = "/product-lock-" + productId;
        try {
            zookeeper.delete(path, -1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
    }

    private static ZooKeeperSession instance = new ZooKeeperSession();

    public static ZooKeeperSession getInstance() {
        return instance;
    }

    public static void main(String[] args) throws InterruptedException {
        ZooKeeperSession instance = ZooKeeperSession.getInstance();
        CountDownLatch downLatch = new CountDownLatch(2);
        IntStream.of(1, 2).forEach(i -> new Thread(() -> {
            instance.acquireDistributedLock(1L);
            System.out.println(Thread.currentThread().getName() + " 得到锁并休眠 10 秒");
            try {
                TimeUnit.SECONDS.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            instance.releaseDistributedLock(1L);
            System.out.println(Thread.currentThread().getName() + " 释放锁");
            downLatch.countDown();
        }).start());
        downLatch.await();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99

运行 main 测试两个线程获取锁的等待过程如下

watch event:SyncConnected
zookeeper 已连接
zookeeper 初始化成功
获取锁成功 product[id=1]
Thread-1 得到锁并休眠 10 秒

循环异常中...
org.apache.zookeeper.KeeperException$NodeExistsException: KeeperErrorCode = NodeExists for /product-lock-1
	at org.apache.zookeeper.KeeperException.create(KeeperException.java:119)

Thread-1 释放锁
获取锁成功 product[id=1] 尝试了 285 次.
Thread-0 得到锁并休眠 10 秒
Thread-0 释放锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14

可以看到,日志输出,证明分布式锁已经编写成功