# 073. 基于 storm+zookeeper 完成热门商品列表的分段存储

分段存储的思路:

  1. 每个 task 启动时,将自己的 task id 存储至 zk 中的 hot-product-task-list 节点
  2. 每个 task 在计算完一次 top n 时,将自己的 列表存储在 hot-product-task-task id 节点中

# 改造 zk 工具类

  • 改造了 分布式锁的获取与释放,path 传递,而不再写死代码中了
  • 新增了获取/写入节点数据
public class ZooKeeperSession {
    private ZooKeeper zookeeper;
    private CountDownLatch connectedSemaphore = new CountDownLatch(1);

    private ZooKeeperSession() {
        String connectString = "192.168.99.170:2181,192.168.99.171:2181,192.168.99.172:2181";
        int sessionTimeout = 5000;
        try {
            // 异步连接,所以需要一个  org.apache.zookeeper.Watcher 来通知
            // 由于是异步,利用 CountDownLatch 来让构造函数等待
            zookeeper = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    Watcher.Event.KeeperState state = event.getState();
                    System.out.println("watch event:" + state);
                    if (state == Watcher.Event.KeeperState.SyncConnected) {
                        System.out.println("zookeeper 已连接");
                        connectedSemaphore.countDown();
                    }
                }
            });
        } catch (IOException e) {
            e.printStackTrace();
        }
        try {
            connectedSemaphore.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("zookeeper 初始化成功");
    }

    /**
     * 获取分布式锁
     */
    public void acquireDistributedLock(String path) {
        byte[] data = "".getBytes();
        try {
            // 创建一个临时节点,后面两个参数一个安全策略,一个临时节点类型
            // EPHEMERAL:客户端被断开时,该节点自动被删除
            zookeeper.create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
            System.out.println("获取锁成功 [path=" + path + "]");
        } catch (Exception e) {
            e.printStackTrace();
            // 如果锁已经被创建,那么将异常
            // 循环等待锁的释放
            int count = 0;
            while (true) {
                try {
                    TimeUnit.MILLISECONDS.sleep(20);
                    // 休眠 20 毫秒后再次尝试创建
                    zookeeper.create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
                } catch (Exception e1) {
//                    e1.printStackTrace();
                    count++;
                    continue;
                }
                System.out.println("获取锁成功 [path=" + path + "] 尝试了 " + count + " 次.");
                break;
            }
        }
    }

    /**
     * 释放分布式锁
     */
    public void releaseDistributedLock(String path) {
        try {
            zookeeper.delete(path, -1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
    }

    /**
     * 写节点数据
     */
    public void setNodeData(String path, String data) {
        try {
          Stat exists = zookeeper.exists(path, false);
            if (exists == null) {
                // 节点不存在,先创建 PERSISTENT 持久连接
                zookeeper.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                return;
            }
            zookeeper.setData(path, data.getBytes(), -1);
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public String getNodeData(String path) {
        try {
            return new String(zookeeper.getData(path, false, new Stat()));
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return null;
    }

    private static ZooKeeperSession instance = new ZooKeeperSession();

    public static ZooKeeperSession getInstance() {
        return instance;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112

# 实现分段存储后的 ProductCountBolt

package cn.mrcode.cachepdp.eshop.storm;

import com.alibaba.fastjson.JSON;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.trident.util.LRUMap;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.utils.Utils;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;

public class ProductCountBolt extends BaseRichBolt {
    private LRUMap<Long, Long> countMap = new LRUMap(100);
    private ZooKeeperSession zooKeeperSession;
    private int taskId = -1;

    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        taskId = context.getThisTaskId();
        zooKeeperSession = ZooKeeperSession.getInstance();
        // 启动一个线程,1 分钟计算一次
        topnStart();
        // 上报自己的节点 id 到列表中
        writeTaskPathToZk();
    }

    private void topnStart() {
        new Thread(new Runnable() {
            @Override
            public void run() {
                int n = 3;
                Map.Entry<Long, Long>[] top = new Map.Entry[n];
                while (true) {
                    Arrays.fill(top, null);
                    Utils.sleep(6000);
                    for (Map.Entry<Long, Long> entry : countMap.entrySet()) {
                        long value = entry.getValue();
                        for (int i = 0; i < top.length; i++) {
                            Map.Entry<Long, Long> targetObj = top[i];
                            if (targetObj == null) {
                                top[i] = entry;
                                break;
                            }
                            long target = targetObj.getValue();
                            if (value > target) {
                                System.arraycopy(top, i, top, i + 1, n - (i + 1));
                                top[i] = entry;
                                break;
                            }
                        }
                    }
                    System.out.println(Thread.currentThread().getName() + ":" + Arrays.toString(top));
                    // 把结果接入到 zk 上
                    writeTopnToZk(top);
                }
            }
        }).start();
    }

    private void writeTaskPathToZk() {
        // 由于该操作是并发操作,需要通过分布式锁来写入
        final String lockPath = "/hot-product-task-list-lock";
        final String taskListNode = "/hot-product-task-list";
        zooKeeperSession.acquireDistributedLock(lockPath);
        String nodeData = zooKeeperSession.getNodeData(taskListNode);
        // 已经存在数据的话,把自己的 task id 追加到尾部
        if (nodeData != null && !"".equals(nodeData)) {
            nodeData += "," + taskId;
        } else {
            nodeData = taskId + "";
        }
        zooKeeperSession.setNodeData(taskListNode, nodeData);
        zooKeeperSession.releaseDistributedLock(lockPath);
    }

    private void writeTopnToZk(Map.Entry<Long, Long>[] topn) {
        List<Long> proudcts = new ArrayList<>();
        for (Map.Entry<Long, Long> t : topn) {
          if (t == null) {
               continue;
           }
            proudcts.add(t.getKey());
        }
        final String taskNodePath = "/hot-product-task-" + taskId;
        zooKeeperSession.setNodeData(taskNodePath, JSON.toJSONString(proudcts));
    }

    @Override
    public void execute(Tuple input) {
        Long productId = input.getLongByField("productId");
        Long count = countMap.get(productId);
        if (count == null) {
            count = 0L;
        }
        countMap.put(productId, ++count);
        System.out.println("商品 " + productId + ",次数 " + countMap.get(productId));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {

    }
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110