# 166. 商品详情页动态渲染系统:基于 Spring Cloud 开发数据同步服务

创建一个新服务 eshop-datasync-service(端口 9106) 数据同步服务,就是获取各种原子数据的变更消息

  1. 通过 spring cloud fegion 调用 eshop-product-service 服务的各种接口,获取数据
  2. 将原子数据在 redis 中进行增删改
  3. 将维度数据变化消息写入 rabbitmq 中另外一个 queue,供数据聚合服务来消费

维度分类(这里维度分类是为了后面的聚合服务聚合用的):

  • brand
  • category
  • product_intro
  • product

核心实现

package cn.mrcode.cache.eshop.datasyncserver.rabbitmq;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;

import cn.mrcode.cache.eshop.datasyncserver.service.EshopProductService;

@Component
@RabbitListener(queues = "data-change-queue") // 前面的队列也统一成这种横线写法
public class DataChangeQueueReceiver {
    @Autowired
    private EshopProductService eshopProductService;
    @Autowired
    private RedisTemplate<String, String> redisTemplate;
    @Autowired
    private RabbitMQSender rabbitMQSender;

    /**
     * 数据聚合队列
     */
    final static String AGGR_DATA_CHANGE_QUEUE = "aggr-data-change-queue";

    @RabbitHandler
    public void process(String message) {
        ProductEvent productEvent = JSON.parseObject(message, ProductEvent.class);
        // 先获取data_type
        String dataType = productEvent.getDataType();
        switch (dataType) {
            case "brand":
                processBrandDataChangeMessage(productEvent);
                break;
            case "category":
                processCategoryDataChangeMessage(productEvent);
                break;
            case "product":
                processProductDataChangeMessage(productEvent);
                break;
            case "product_intro":
                processProductIntroDataChangeMessage(productEvent);
                break;
            case "product_property":
                processProductPropertyDataChangeMessage(productEvent);
                break;
            case "product_specification":
                processProductSpecificationDataChangeMessage(productEvent);
                break;
        }
    }

    private void processBrandDataChangeMessage(ProductEvent productEvent) {
        Long id = productEvent.getId();
        String eventType = productEvent.getEventType();

        if ("add".equals(eventType) || "update".equals(eventType)) {
            // 通过 fegion 写的 service,获取相关数据
            JSONObject dataJSONObject = JSONObject.parseObject(eshopProductService.findBrandById(id));
            redisTemplate.opsForValue().set("brand_" + dataJSONObject.getLong("id"), dataJSONObject.toJSONString());
        } else if ("delete".equals(eventType)) {
            redisTemplate.delete("brand_" + id);
        }
        DimEvent dimEvent = new DimEvent("brand", id);
        rabbitMQSender.send(AGGR_DATA_CHANGE_QUEUE, JSON.toJSONString(dimEvent));
    }

    private void processCategoryDataChangeMessage(ProductEvent productEvent) {
        Long id = productEvent.getId();
        String eventType = productEvent.getEventType();

        if ("add".equals(eventType) || "update".equals(eventType)) {
            JSONObject dataJSONObject = JSONObject.parseObject(eshopProductService.findCategoryById(id));
            redisTemplate.opsForValue().set("category_" + dataJSONObject.getLong("id"), dataJSONObject.toJSONString());
        } else if ("delete".equals(eventType)) {
            redisTemplate.delete("category_" + id);
        }
        DimEvent dimEvent = new DimEvent("category", id);
        rabbitMQSender.send(AGGR_DATA_CHANGE_QUEUE, JSON.toJSONString(dimEvent));
    }

    private void processProductDataChangeMessage(ProductEvent productEvent) {
        Long id = productEvent.getId();
        String eventType = productEvent.getEventType();

        if ("add".equals(eventType) || "update".equals(eventType)) {
            JSONObject dataJSONObject = JSONObject.parseObject(eshopProductService.findProductById(id));
            redisTemplate.opsForValue().set("product_" + dataJSONObject.getLong("id"), dataJSONObject.toJSONString());
        } else if ("delete".equals(eventType)) {
            redisTemplate.delete("product_" + id);
        }
        DimEvent dimEvent = new DimEvent("product", id);
        rabbitMQSender.send(AGGR_DATA_CHANGE_QUEUE, JSON.toJSONString(dimEvent));
    }

    private void processProductIntroDataChangeMessage(ProductEvent productEvent) {
        Long id = productEvent.getId();
        String eventType = productEvent.getEventType();
        // 注意这里,产品有关联的几个维度数据都使用产品 id 进行放置,在数据聚合里面都是通过 productId 对产品完整数据聚合
        // 那么与之对应发送事件的 eshop-product-service 服务中就要加上这个属性
        Long productId = productEvent.getProductId();

        if ("add".equals(eventType) || "update".equals(eventType)) {
            JSONObject dataJSONObject = JSONObject.parseObject(eshopProductService.findProductIntroById(id));
            redisTemplate.opsForValue().set("product_intro_" + productId, dataJSONObject.toJSONString());
        } else if ("delete".equals(eventType)) {
            redisTemplate.delete("product_intro_" + id);
        }
        // 这里暂时还不知道为什么要用 product 事件,而不是具体的对象事件,只能后面再来补坑了
        DimEvent dimEvent = new DimEvent("product_intro", productId);
        rabbitMQSender.send(AGGR_DATA_CHANGE_QUEUE, JSON.toJSONString(dimEvent));
    }

    private void processProductPropertyDataChangeMessage(ProductEvent productEvent) {
        Long id = productEvent.getId();
        String eventType = productEvent.getEventType();
        Long productId = productEvent.getProductId();

        if ("add".equals(eventType) || "update".equals(eventType)) {
            JSONObject dataJSONObject = JSONObject.parseObject(eshopProductService.findProductPropertyById(id));
            redisTemplate.opsForValue().set("product_property_" + productId, dataJSONObject.toJSONString());
        } else if ("delete".equals(eventType)) {
            redisTemplate.delete("product_property_" + id);
        }
        // 这里暂时还不知道为什么要用 product 事件,而不是具体的对象事件,只能后面再来补坑了
        DimEvent dimEvent = new DimEvent("product", productId);
        rabbitMQSender.send(AGGR_DATA_CHANGE_QUEUE, JSON.toJSONString(dimEvent));
    }

    private void processProductSpecificationDataChangeMessage(ProductEvent productEvent) {
        Long id = productEvent.getId();
        String eventType = productEvent.getEventType();
        Long productId = productEvent.getProductId();

        if ("add".equals(eventType) || "update".equals(eventType)) {
            JSONObject dataJSONObject = JSONObject.parseObject(eshopProductService.findProductSpecificationById(id));
            redisTemplate.opsForValue().set("product_specification_" + productId, dataJSONObject.toJSONString());
        } else if ("delete".equals(eventType)) {
            redisTemplate.delete("product_specification_" + id);
        }
        // 这里暂时还不知道为什么要用 product 事件,而不是具体的对象事件,只能后面再来补坑了
        DimEvent dimEvent = new DimEvent("product", productId);
        rabbitMQSender.send(AGGR_DATA_CHANGE_QUEUE, JSON.toJSONString(dimEvent));
    }
}  
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148

测试

  1. 访问商品修改地址:http://localhost:9100/product/update?id=1&name=修改Apple/苹果 iPhone X 5.8寸 国行 iphonex三网通4G 全新苹果x手机&categoryId=1&brandId=1
  2. 观察 aggr-data-change-queue 队列中是否有消息
  3. 观察 redis 中是否存在 product_1 的 key