# 169. 商品详情页动态渲染系统:消息队列架构升级之去重队列

已经做好的有如下:

  • 基础依赖服务
  • 商品服务走动态渲染系统(本章也属于动态渲染系统)
  • OneService 系统:价格服务和库存服务走 mysql + redis 双写

前面已经讲过的流程:

  1. 商品服务:增删改、并发送变更消息到 mq
  2. 数据同步服务:接收变更消息,获取原子数据更新到 redis 中,并发送维度数据变更事件
  3. 数据聚合服务: 将原子数据从 redis 中查询出来,按维度聚合后写入 redis

对这个里面的细节进行架构上的优化和升级:本次做去重队列。

# 为什么要做去重队列?

从现有架构和场景来看,数据同步服务接收到每一个原子数据变更都会发送 dim 事件给聚合服务, 短时间内一个商品修改了多次,那么数据聚合服务,将会多次从 redis 中取出数据聚合再回写。 所以做去重队列是有必要的

# 去重队列实现思路

在内存中放置一个 set 集合,来一个原子操作就放入 set 中,在一个时间窗口内(如一分钟), 清空一次 set 集合,这里就实现了多条相同维度数据合并去重的功能。

优点如下:

  1. 减少数据聚合服务的压力
  2. 减少数据聚合服务调用 redis 次数

# 业务实现

核心代码如下

 private Set<String> dimDataChangeEventSet = Collections.synchronizedSet(new HashSet<>());

// 拿商品数据来举例
 private void processProductDataChangeMessage(ProductEvent productEvent) {
    Long id = productEvent.getId();
    String eventType = productEvent.getEventType();

    if ("add".equals(eventType) || "update".equals(eventType)) {
        JSONObject dataJSONObject = JSONObject.parseObject(eshopProductService.findProductById(id));
        redisTemplate.opsForValue().set("product_" + dataJSONObject.getLong("id"), dataJSONObject.toJSONString());
    } else if ("delete".equals(eventType)) {
        redisTemplate.delete("product_" + id);
    }
    DimEvent dimEvent = new DimEvent("product", id);
    // 这里不直接发送,先放入 set 中去重
//        rabbitMQSender.send(AGGR_DATA_CHANGE_QUEUE, JSON.toJSONString(dimEvent));
    dimDataChangeEventSet.add(JSON.toJSONString(dimEvent));
    System.out.println("Product: " + id);
}

// 新开一个线程,定时去清空集合,投递消息
@PostConstruct
public void start() {
    new Thread(() -> {
        while (true) {
            // 这种方式目前肯定在并发下会出现问题,这线程和上面的线程不同步,会导致某些数据没有被处理就清空了
            // 我自己感觉会有这个问题,可能概率有点小
            if (!dimDataChangeEventSet.isEmpty()) {
                for (String dimEvent : dimDataChangeEventSet) {
                    rabbitMQSender.send(AGGR_DATA_CHANGE_QUEUE, dimEvent);
                }
                dimDataChangeEventSet.clear();
            }
            try {
                TimeUnit.SECONDS.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }).start();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41

测试:拿商品维度来举例,业务无论执行以下哪一个修改操作,都会触发一条商品维度数据的变更消息, 这里在 10 秒内执行以下三条修改操作,看最后执行了几次商品维度数据的聚合。

  • 修改商品:http://localhost:9100/product/update?id=1&name=修改Apple/苹果 iPhone X 5.8寸 国行 iphonex三网通4G 全新苹果x手机&categoryId=1&brandId=1
  • 修改商品规格:http://localhost:9100/product-specification/update?id=1&name=网络类型&value=4G全网通-修改&productId=1
  • 修改商品属性:http://localhost:9100/product-property/update?id=1&name=机身颜色&value=修改iPhoneX【5.8寸黑色】,iPhoneX【5.8寸银色】&productId=1

日志输出如下,可以看到,去重成功

ProductProperty: 1
ProductSpecification: 1
Product: 1

商品聚合:cn.mrcode.cache.eshop.dataaggrserver.rabbitmq.DimEvent@4334d1d5
1
2
3
4
5