# 169. 商品详情页动态渲染系统:消息队列架构升级之去重队列
已经做好的有如下:
- 基础依赖服务
- 商品服务走动态渲染系统(本章也属于动态渲染系统)
- OneService 系统:价格服务和库存服务走 mysql + redis 双写
前面已经讲过的流程:
- 商品服务:增删改、并发送变更消息到 mq
- 数据同步服务:接收变更消息,获取原子数据更新到 redis 中,并发送维度数据变更事件
- 数据聚合服务: 将原子数据从 redis 中查询出来,按维度聚合后写入 redis
对这个里面的细节进行架构上的优化和升级:本次做去重队列。
# 为什么要做去重队列?
从现有架构和场景来看,数据同步服务接收到每一个原子数据变更都会发送 dim 事件给聚合服务, 短时间内一个商品修改了多次,那么数据聚合服务,将会多次从 redis 中取出数据聚合再回写。 所以做去重队列是有必要的
# 去重队列实现思路
在内存中放置一个 set 集合,来一个原子操作就放入 set 中,在一个时间窗口内(如一分钟), 清空一次 set 集合,这里就实现了多条相同维度数据合并去重的功能。
优点如下:
- 减少数据聚合服务的压力
- 减少数据聚合服务调用 redis 次数
# 业务实现
核心代码如下
private Set<String> dimDataChangeEventSet = Collections.synchronizedSet(new HashSet<>());
// 拿商品数据来举例
private void processProductDataChangeMessage(ProductEvent productEvent) {
Long id = productEvent.getId();
String eventType = productEvent.getEventType();
if ("add".equals(eventType) || "update".equals(eventType)) {
JSONObject dataJSONObject = JSONObject.parseObject(eshopProductService.findProductById(id));
redisTemplate.opsForValue().set("product_" + dataJSONObject.getLong("id"), dataJSONObject.toJSONString());
} else if ("delete".equals(eventType)) {
redisTemplate.delete("product_" + id);
}
DimEvent dimEvent = new DimEvent("product", id);
// 这里不直接发送,先放入 set 中去重
// rabbitMQSender.send(AGGR_DATA_CHANGE_QUEUE, JSON.toJSONString(dimEvent));
dimDataChangeEventSet.add(JSON.toJSONString(dimEvent));
System.out.println("Product: " + id);
}
// 新开一个线程,定时去清空集合,投递消息
@PostConstruct
public void start() {
new Thread(() -> {
while (true) {
// 这种方式目前肯定在并发下会出现问题,这线程和上面的线程不同步,会导致某些数据没有被处理就清空了
// 我自己感觉会有这个问题,可能概率有点小
if (!dimDataChangeEventSet.isEmpty()) {
for (String dimEvent : dimDataChangeEventSet) {
rabbitMQSender.send(AGGR_DATA_CHANGE_QUEUE, dimEvent);
}
dimDataChangeEventSet.clear();
}
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
测试:拿商品维度来举例,业务无论执行以下哪一个修改操作,都会触发一条商品维度数据的变更消息, 这里在 10 秒内执行以下三条修改操作,看最后执行了几次商品维度数据的聚合。
- 修改商品:
http://localhost:9100/product/update?id=1&name=修改Apple/苹果 iPhone X 5.8寸 国行 iphonex三网通4G 全新苹果x手机&categoryId=1&brandId=1
- 修改商品规格:
http://localhost:9100/product-specification/update?id=1&name=网络类型&value=4G全网通-修改&productId=1
- 修改商品属性:
http://localhost:9100/product-property/update?id=1&name=机身颜色&value=修改iPhoneX【5.8寸黑色】,iPhoneX【5.8寸银色】&productId=1
日志输出如下,可以看到,去重成功
ProductProperty: 1
ProductSpecification: 1
Product: 1
商品聚合:cn.mrcode.cache.eshop.dataaggrserver.rabbitmq.DimEvent@4334d1d5
1
2
3
4
5
2
3
4
5