# 073. 基于 storm+zookeeper 完成热门商品列表的分段存储
分段存储的思路:
- 每个 task 启动时,将自己的 task id 存储至 zk 中的 hot-product-task-list 节点
- 每个 task 在计算完一次 top n 时,将自己的 列表存储在 hot-product-task-task id 节点中
# 改造 zk 工具类
- 改造了 分布式锁的获取与释放,path 传递,而不再写死代码中了
- 新增了获取/写入节点数据
public class ZooKeeperSession {
private ZooKeeper zookeeper;
private CountDownLatch connectedSemaphore = new CountDownLatch(1);
private ZooKeeperSession() {
String connectString = "192.168.99.170:2181,192.168.99.171:2181,192.168.99.172:2181";
int sessionTimeout = 5000;
try {
// 异步连接,所以需要一个 org.apache.zookeeper.Watcher 来通知
// 由于是异步,利用 CountDownLatch 来让构造函数等待
zookeeper = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent event) {
Watcher.Event.KeeperState state = event.getState();
System.out.println("watch event:" + state);
if (state == Watcher.Event.KeeperState.SyncConnected) {
System.out.println("zookeeper 已连接");
connectedSemaphore.countDown();
}
}
});
} catch (IOException e) {
e.printStackTrace();
}
try {
connectedSemaphore.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("zookeeper 初始化成功");
}
/**
* 获取分布式锁
*/
public void acquireDistributedLock(String path) {
byte[] data = "".getBytes();
try {
// 创建一个临时节点,后面两个参数一个安全策略,一个临时节点类型
// EPHEMERAL:客户端被断开时,该节点自动被删除
zookeeper.create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
System.out.println("获取锁成功 [path=" + path + "]");
} catch (Exception e) {
e.printStackTrace();
// 如果锁已经被创建,那么将异常
// 循环等待锁的释放
int count = 0;
while (true) {
try {
TimeUnit.MILLISECONDS.sleep(20);
// 休眠 20 毫秒后再次尝试创建
zookeeper.create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
} catch (Exception e1) {
// e1.printStackTrace();
count++;
continue;
}
System.out.println("获取锁成功 [path=" + path + "] 尝试了 " + count + " 次.");
break;
}
}
}
/**
* 释放分布式锁
*/
public void releaseDistributedLock(String path) {
try {
zookeeper.delete(path, -1);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
/**
* 写节点数据
*/
public void setNodeData(String path, String data) {
try {
Stat exists = zookeeper.exists(path, false);
if (exists == null) {
// 节点不存在,先创建 PERSISTENT 持久连接
zookeeper.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
return;
}
zookeeper.setData(path, data.getBytes(), -1);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public String getNodeData(String path) {
try {
return new String(zookeeper.getData(path, false, new Stat()));
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
}
private static ZooKeeperSession instance = new ZooKeeperSession();
public static ZooKeeperSession getInstance() {
return instance;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# 实现分段存储后的 ProductCountBolt
package cn.mrcode.cachepdp.eshop.storm;
import com.alibaba.fastjson.JSON;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.trident.util.LRUMap;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.utils.Utils;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
public class ProductCountBolt extends BaseRichBolt {
private LRUMap<Long, Long> countMap = new LRUMap(100);
private ZooKeeperSession zooKeeperSession;
private int taskId = -1;
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
taskId = context.getThisTaskId();
zooKeeperSession = ZooKeeperSession.getInstance();
// 启动一个线程,1 分钟计算一次
topnStart();
// 上报自己的节点 id 到列表中
writeTaskPathToZk();
}
private void topnStart() {
new Thread(new Runnable() {
@Override
public void run() {
int n = 3;
Map.Entry<Long, Long>[] top = new Map.Entry[n];
while (true) {
Arrays.fill(top, null);
Utils.sleep(6000);
for (Map.Entry<Long, Long> entry : countMap.entrySet()) {
long value = entry.getValue();
for (int i = 0; i < top.length; i++) {
Map.Entry<Long, Long> targetObj = top[i];
if (targetObj == null) {
top[i] = entry;
break;
}
long target = targetObj.getValue();
if (value > target) {
System.arraycopy(top, i, top, i + 1, n - (i + 1));
top[i] = entry;
break;
}
}
}
System.out.println(Thread.currentThread().getName() + ":" + Arrays.toString(top));
// 把结果接入到 zk 上
writeTopnToZk(top);
}
}
}).start();
}
private void writeTaskPathToZk() {
// 由于该操作是并发操作,需要通过分布式锁来写入
final String lockPath = "/hot-product-task-list-lock";
final String taskListNode = "/hot-product-task-list";
zooKeeperSession.acquireDistributedLock(lockPath);
String nodeData = zooKeeperSession.getNodeData(taskListNode);
// 已经存在数据的话,把自己的 task id 追加到尾部
if (nodeData != null && !"".equals(nodeData)) {
nodeData += "," + taskId;
} else {
nodeData = taskId + "";
}
zooKeeperSession.setNodeData(taskListNode, nodeData);
zooKeeperSession.releaseDistributedLock(lockPath);
}
private void writeTopnToZk(Map.Entry<Long, Long>[] topn) {
List<Long> proudcts = new ArrayList<>();
for (Map.Entry<Long, Long> t : topn) {
if (t == null) {
continue;
}
proudcts.add(t.getKey());
}
final String taskNodePath = "/hot-product-task-" + taskId;
zooKeeperSession.setNodeData(taskNodePath, JSON.toJSONString(proudcts));
}
@Override
public void execute(Tuple input) {
Long productId = input.getLongByField("productId");
Long count = countMap.get(productId);
if (count == null) {
count = 0L;
}
countMap.put(productId, ++count);
System.out.println("商品 " + productId + ",次数 " + countMap.get(productId));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110