# 074. 基于双重 zookeeper 分布式锁完成分布式并行缓存预热的代码开发

# 并行缓存预热思路

  1. 服务启动的时候,进行缓存预热

  2. 从 zk 中读取 taskid 列表

  3. 依次遍历每个 taskid,尝试获取分布式锁,如果获取不到,快速报错,不要等待,因为说明已经有其他服务实例在预热了

  4. 直接尝试获取下一个 taskid 的分布式锁

  5. 即使获取到了分布式锁,也要检查一下这个 taskid 的预热状态,如果已经被预热过了,就不再预热了

    预热状态,也是一个 node path 来存储的,每个 task 一个状态节点

  6. 执行预热操作,遍历 productid 列表,查询数据,然后写 ehcache 和 redis

  7. 预热完成后,设置 taskid 对应的预热状态

# 服务启动启动预热与 spring 实例工具类封装

由于需要在缓存预热的线程中使用缓存服务进行存储,这里需要封装一个 spring bean 获取工具类

package cn.mrcode.cachepdp.eshop.cache;

import org.springframework.web.context.WebApplicationContext;
import org.springframework.web.context.support.WebApplicationContextUtils;

import javax.servlet.ServletContext;
import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;

import cn.mrcode.cachepdp.eshop.cache.prewarm.CachePrewarmThread;

/**
 * @author : zhuqiang
 * @date : 2019/5/25 15:58
 */
 @Component
public class InitListener implements ServletContextListener {
    @Override
    public void contextInitialized(ServletContextEvent sce) {
        ServletContext servletContext = sce.getServletContext();
        WebApplicationContext webApplicationContext = WebApplicationContextUtils.getWebApplicationContext(servletContext);
        SpringContextUtil.setWebApplicationContext(webApplicationContext);

        new CachePrewarmThread().start();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class SpringContextUtil {
    private static WebApplicationContext context;

    public static WebApplicationContext getWebApplicationContext() {
        return context;
    }

    public static void setWebApplicationContext(WebApplicationContext webApplicationContext) {
        context = webApplicationContext;
    }
}
1
2
3
4
5
6
7
8
9
10
11

# zk 工具类的改造

在思路里面提到了,需要快速失败的一个加锁方式,还有写入/获取数据的方法。 在缓存服务里面的 zk 工具类还没有这样的功能,对这个进行改造

由于之前写过,这些代码都是体力活了,不想贴上来了

public class ZooKeeperSession {
    private ZooKeeper zookeeper;
    private CountDownLatch connectedSemaphore = new CountDownLatch(1);

    private ZooKeeperSession() {
        String connectString = "192.168.99.170:2181,192.168.99.171:2181,192.168.99.172:2181";
        int sessionTimeout = 5000;
        try {
            // 异步连接,所以需要一个  org.apache.zookeeper.Watcher 来通知
            // 由于是异步,利用 CountDownLatch 来让构造函数等待
            zookeeper = new ZooKeeper(connectString, sessionTimeout, event -> {
                Watcher.Event.KeeperState state = event.getState();
                System.out.println("watch event:" + state);
                if (state == Watcher.Event.KeeperState.SyncConnected) {
                    System.out.println("zookeeper 已连接");
                    connectedSemaphore.countDown();
                }
            });
        } catch (IOException e) {
            e.printStackTrace();
        }
        try {
            connectedSemaphore.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("zookeeper 初始化成功");
    }

    /**
     * 获取分布式锁
     */
    public void acquireDistributedLock(Long productId) {
        String path = "/product-lock-" + productId;
        byte[] data = "".getBytes();
        try {
            // 创建一个临时节点,后面两个参数一个安全策略,一个临时节点类型
            // EPHEMERAL:客户端被断开时,该节点自动被删除
            zookeeper.create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
            System.out.println("获取锁成功 product[id=" + productId + "]");
        } catch (Exception e) {
            e.printStackTrace();
            // 如果锁已经被创建,那么将异常
            // 循环等待锁的释放
            int count = 0;
            while (true) {
                try {
                    TimeUnit.MILLISECONDS.sleep(20);
                    // 休眠 20 毫秒后再次尝试创建
                    zookeeper.create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
                } catch (Exception e1) {
//                    e1.printStackTrace();
                    count++;
                    continue;
                }
                System.out.println("获取锁成功 product[id=" + productId + "] 尝试了 " + count + " 次.");
                break;
            }
        }
    }

    /**
     * 释放分布式锁
     */
    public void releaseDistributedLock(Long productId) {
        try {
            String path = "/product-lock-" + productId;
            zookeeper.delete(path, -1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
    }

    /**
     * 获取分布式锁
     */
    public void acquireDistributedLock(String path) {
        byte[] data = "".getBytes();
        try {
            // 创建一个临时节点,后面两个参数一个安全策略,一个临时节点类型
            // EPHEMERAL:客户端被断开时,该节点自动被删除
            zookeeper.create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
            System.out.println("获取锁成功 [path=" + path + "]");
        } catch (Exception e) {
            e.printStackTrace();
            // 如果锁已经被创建,那么将异常
            // 循环等待锁的释放
            int count = 0;
            while (true) {
                try {
                    TimeUnit.MILLISECONDS.sleep(20);
                    // 休眠 20 毫秒后再次尝试创建
                    zookeeper.create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
                } catch (Exception e1) {
//                    e1.printStackTrace();
                    count++;
                    continue;
                }
                System.out.println("获取锁成功 [path=" + path + "] 尝试了 " + count + " 次.");
                break;
            }
        }
    }

    /**
     * 获取分布式锁;快速失败,不等待
     */
    public boolean acquireFastFailDistributedLock(String path) {
        byte[] data = "".getBytes();
        try {
            // 创建一个临时节点,后面两个参数一个安全策略,一个临时节点类型
            // EPHEMERAL:客户端被断开时,该节点自动被删除
            zookeeper.create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
            System.out.println("获取锁成功 [path=" + path + "]");
            return true;
        } catch (Exception e) {
            System.out.println("获取锁失败 [path=" + path + "]");
            return false;
        }
    }

    /**
     * 释放分布式锁
     */
    public void releaseDistributedLock(String path) {
        try {
            zookeeper.delete(path, -1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
    }

    /**
     * 写节点数据
     */
    public void setNodeData(String path, String data) {
        try {
           Stat exists = zookeeper.exists(path, false);
            if (exists == null) {
                // 节点不存在,先创建 PERSISTENT 持久连接
                zookeeper.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                return;
            }
            zookeeper.setData(path, data.getBytes(), -1);
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public String getNodeData(String path) {
        try {
            return new String(zookeeper.getData(path, false, new Stat()));
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return null;
    }

    private static ZooKeeperSession instance = new ZooKeeperSession();

    public static ZooKeeperSession getInstance() {
        return instance;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172

# 缓存预热逻辑编写

public class CachePrewarmThread extends Thread {
    @Override
    public void run() {
        // 1. 获取 task id 列表
        ZooKeeperSession zk = ZooKeeperSession.getInstance();
        final String taskListNode = "/hot-product-task-list";
        String taskListNdeData = zk.getNodeData(taskListNode);
        if (taskListNode == null || "".equals(taskListNdeData)) {
            System.err.println("task list 为空");
            return;
        }

        CacheService cacheService = SpringContextUtil.getWebApplicationContext().getBean(CacheService.class);

        String[] taskList = taskListNdeData.split(",");
        for (String taskId : taskList) {
            final String taskNodeLockPath = "/hot-product-task-lock-" + taskId;
            // 尝试获取该节点的锁,如果获取失败,说明被其他服务预热了
            if (!zk.acquireFastFailDistributedLock(taskNodeLockPath)) {
                continue;
            }
            // 疑问:视频中为什么需要在这里对 预热数据节点加锁?
            // 获取 检查预热状态
            final String taskNodePrewarmStatePath = "/hot-product-task-prewarm-state" + taskId;
            String taskNodePrewarmState = zk.getNodeData(taskNodePrewarmStatePath);
            // 已经被预热过了
            if (taskNodePrewarmState != null && !"".equals(taskNodePrewarmState)) {
                zk.releaseDistributedLock(taskNodeLockPath);
                continue;
            }

            // 还未被预热过,读取 topn 列表,并从数据库中获取商品信息,存入缓存中
            final String taskNodePath = "/hot-product-task-" + taskId;
            String nodeData = zk.getNodeData(taskNodePath);
            if (nodeData == null && "".equals(nodeData)) {
                // 如果没有数据则不处理
                zk.releaseDistributedLock(taskNodeLockPath);
                continue;
            }

            List<Long> pids = JSON.parseArray(nodeData, Long.class);

            // 假设这里是从数据库中获取的数据
            pids.forEach(pid -> {
                ProductInfo productInfo = getProduct(pid);
                System.out.println("预热缓存信息:" + productInfo);
                cacheService.saveProductInfo2LocalCache(productInfo);
                cacheService.saveProductInfo2ReidsCache(productInfo);
            });

            // 修改预热状态
            zk.setNodeData(taskNodePrewarmStatePath, "success");
            // 释放该 task 节点的锁
            zk.releaseDistributedLock(taskNodeLockPath);
        }
    }

    private ProductInfo getProduct(Long pid) {
        String productInfoJSON = "{\"id\": " + pid + ", \"name\": \"iphone7手机\", \"price\": 5599, \"pictureList\":\"a.jpg,b.jpg\", \"specification\": \"iphone7的规格\", \"service\": \"iphone7的售后服务\", \"color\": \"红色,白色,黑色\", \"size\": \"5.5\", \"shopId\": 1," +
                "\"modifyTime\":\"2019-05-13 22:00:00\"}";
        return JSON.parseObject(productInfoJSON, ProductInfo.class);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63

针对这个逻辑,我有几个疑问:

  1. 疑问:视频中为什么需要在这里对 预热数据节点加锁?

    在代码逻辑上来看,对该 task 节点加锁之后,也就只能是加锁的机器才能访问本任务对应的预热节点数据,为什么还需要加锁?

  2. 疑问:这里缓存存入会存在数据竞争吗?

    换句话说,在 storm 中统计中,同一个商品 id 始终只会路由到一个 task 中吗?

    有一点没有搞明白,之前课程已经说过同一个商品 id 始终只会路由到一个 task 中, 如果这个能保证,那么这里就不会存在数据竞争问题。

    相当于每个 task 中的 topn 的商品 id 都是独有的

  3. 疑问:缓存预热场景到现在也还没有明白

    缓存预热只适合在已有系统?不然预热的访问数据从何而来?

  4. 疑问:视频中把预热启动缓存的放在了 controller 中,不会与缓存重建产生冲突吗?

    这章节的思路,是项目重启的时候去异步预热缓存,如果这个时候对外开放了服务, 那么触发了缓存重建相关的操作,就会出现数据入缓存冲突的问题;

    但是我记得之前说缓存预热的场景是,先预热,预热完成后,再对外开放服务。 如果是这样的话,那么这里是没有问题的