# 059. 缓存数据生产服务中的 zk 分布式锁解决方案的代码实现(二)

# 主动更新

缓存生产服务接收基础信息更改事件的时候,有一个操作是更新本地缓存和 redis 中的缓存, 这个场景下也存可能存在并发冲突情况。所以这里也可以使用分布式锁来保证数据错乱问题

cn.mrcode.cachepdp.eshop.cache.kafka.KafkaMessageProcessor#processProductInfoChangeMessage

回顾下现在的实现代码。以商品为例,来展示怎么使用分布式锁

/**
 * 处理商品信息变更的消息
 */
private void processProductInfoChangeMessage(JSONObject messageJSONObject) {
    // 提取出商品id
    Long productId = messageJSONObject.getLong("productId");

    // 调用商品信息服务的接口
    // 直接用注释模拟:getProductInfo?productId=1,传递过去
    // 商品信息服务,一般来说就会去查询数据库,去获取productId=1的商品信息,然后返回回来

    String productInfoJSON = "{\"id\": 1, \"name\": \"iphone7手机\", \"price\": 5599, \"pictureList\":\"a.jpg,b.jpg\", \"specification\": \"iphone7的规格\", \"service\": \"iphone7的售后服务\", \"color\": \"红色,白色,黑色\", \"size\": \"5.5\", \"shopId\": 1}";
    ProductInfo productInfo = JSONObject.parseObject(productInfoJSON, ProductInfo.class);
    cacheService.saveProductInfo2LocalCache(productInfo);
    log.info("获取刚保存到本地缓存的商品信息:" + cacheService.getProductInfoFromLocalCache(productId));
    cacheService.saveProductInfo2ReidsCache(productInfo);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

使用分布式锁之后

private void processProductInfoChangeMessage(JSONObject messageJSONObject) {
    // 提取出商品id
    Long productId = messageJSONObject.getLong("productId");
    // 增加了一个 modifyTime 字段,来比较数据修改先后顺序
    String productInfoJSON = "{\"id\": 1, \"name\": \"iphone7手机\", \"price\": 5599, \"pictureList\":\"a.jpg,b.jpg\", \"specification\": \"iphone7的规格\", \"service\": \"iphone7的售后服务\", \"color\": \"红色,白色,黑色\", \"size\": \"5.5\", \"shopId\": 1," +
            "\"modifyTime\":\"2019-05-13 22:00:00\"}";
    ProductInfo productInfo = JSONObject.parseObject(productInfoJSON, ProductInfo.class);

    // 加锁
    ZooKeeperSession zks = ZooKeeperSession.getInstance();
    zks.acquireDistributedLock(productId);
    try {
        // 先获取一次 redis ,防止其他实例已经放入数据了
        ProductInfo existedProduct = cacheService.getProductInfoOfReidsCache(productId);
        if (existedProduct != null) {
            // 判定通过消息获取到的数据版本和 redis 中的谁最新
            Date existedModifyTime = existedProduct.getModifyTime();
            Date modifyTime = productInfo.getModifyTime();
            // 如果本次获取到的修改时间大于 redis 中的,那么说明此数据是最新的,可以放入 redis 中
            if (modifyTime.after(existedModifyTime)) {
                cacheService.saveProductInfo2LocalCache(productInfo);
                log.info("最新数据覆盖 redis 中的数据:" + cacheService.getProductInfoFromLocalCache(productId));
                cacheService.saveProductInfo2ReidsCache(productInfo);
            }
        } else {
            // redis 中没有数据,直接放入
            cacheService.saveProductInfo2LocalCache(productInfo);
            log.info("获取刚保存到本地缓存的商品信息:" + cacheService.getProductInfoFromLocalCache(productId));
            cacheService.saveProductInfo2ReidsCache(productInfo);
        }
    } finally {
        // 最后释放锁
        zks.releaseDistributedLock(productId);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35

# 缓存重建

回顾下重建的地方

/**
 * 这里的代码别看着奇怪,简单回顾下之前的流程: 1. nginx 获取 redis 缓存 2. 获取不到再获取服务的堆缓存(也就是这里的 ecache) 3.
 * 还获取不到就需要去数据库获取并重建缓存
 */
@RequestMapping("/getProductInfo")
@ResponseBody
public ProductInfo getProductInfo(Long productId) {
    ProductInfo productInfo = cacheService.getProductInfoOfReidsCache(productId);
    log.info("从 redis 中获取商品信息");
    if (productInfo == null) {
        productInfo = cacheService.getProductInfoFromLocalCache(productId);
        log.info("从 ehcache 中获取商品信息");
    }
    if (productInfo == null) {
        // 两级缓存中都获取不到数据,那么就需要从数据源重新拉取数据,重建缓存
        // 但是这里暂时不讲
        log.info("缓存重建 商品信息");
    }
    return productInfo;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

如上代码,都获取不到数据的时候,就需要从数据库读取数据进行重建。

第一版思路:

  1. 从数据库读取数据
  2. 队列异步重建
  3. 返回第一步的数据

下面来实现下这个代码(先不考虑该思路是否有问题)

cn.mrcode.cachepdp.eshop.cache.controller.RebuildCache

/**
 * 缓存重建;一个队列对应一个消费线程
 *
 * @author : zhuqiang
 * @date : 2019/5/14 21:06
 */
@Component
public class RebuildCache {
    private Logger log = LoggerFactory.getLogger(getClass());
    private ArrayBlockingQueue<ProductInfo> queue = new ArrayBlockingQueue<>(100);
    private CacheService cacheService;

    public RebuildCache(CacheService cacheService) {
        this.cacheService = cacheService;
        start();
    }

    public void put(ProductInfo productInfo) {
        try {
            queue.put(productInfo);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public ProductInfo take() {
        try {
            return queue.take();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return null;
    }

    // 启动一个线程来消费

    private void start() {
        new Thread(() -> {
            while (true) {
                try {
                    ProductInfo productInfo = queue.take();
                    Long productId = productInfo.getId();
                    ZooKeeperSession zks = ZooKeeperSession.getInstance();
                    zks.acquireDistributedLock(productId);
                    try {
                        // 先获取一次 redis ,防止其他实例已经放入数据了
                        ProductInfo existedProduct = cacheService.getProductInfoOfReidsCache(productId);
                        if (existedProduct != null) {
                            // 判定通过消息获取到的数据版本和 redis 中的谁最新
                            Date existedModifyTime = existedProduct.getModifyTime();
                            Date modifyTime = productInfo.getModifyTime();
                            // 如果本次获取到的修改时间大于 redis 中的,那么说明此数据是最新的,可以放入 redis 中
                            if (modifyTime.after(existedModifyTime)) {
                                cacheService.saveProductInfo2LocalCache(productInfo);
                                log.info("最新数据覆盖 redis 中的数据:" + cacheService.getProductInfoFromLocalCache(productId));
                                cacheService.saveProductInfo2ReidsCache(productInfo);
                            } else {
                                log.info("此次数据版本落后,放弃重建");
                            }
                        } else {
                            // redis 中没有数据,直接放入
                            cacheService.saveProductInfo2LocalCache(productInfo);
                            log.info("缓存重建成功" + cacheService.getProductInfoFromLocalCache(productId));
                            cacheService.saveProductInfo2ReidsCache(productInfo);
                        }
                    } finally {
                        // 最后释放锁
                        zks.releaseDistributedLock(productId);
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77

controller 中使用该队列

/**
 * 这里的代码别看着奇怪,简单回顾下之前的流程: 1. nginx 获取 redis 缓存 2. 获取不到再获取服务的堆缓存(也就是这里的 ecache) 3.
 * 还获取不到就需要去数据库获取并重建缓存
 */
@RequestMapping("/getProductInfo")
@ResponseBody
public ProductInfo getProductInfo(Long productId) {
    ProductInfo productInfo = cacheService.getProductInfoOfReidsCache(productId);
    log.info("从 redis 中获取商品信息");
    if (productInfo == null) {
        productInfo = cacheService.getProductInfoFromLocalCache(productId);
        log.info("从 ehcache 中获取商品信息");
    }
    if (productInfo == null) {
        // 两级缓存中都获取不到数据,那么就需要从数据源重新拉取数据,重建缓存
        // 假设这里从数据库中获取的数据
        String productInfoJSON = "{\"id\": 1, \"name\": \"iphone7手机\", \"price\": 5599, \"pictureList\":\"a.jpg,b.jpg\", \"specification\": \"iphone7的规格\", \"service\": \"iphone7的售后服务\", \"color\": \"红色,白色,黑色\", \"size\": \"5.5\", \"shopId\": 1," +
                "\"modifyTime\":\"2019-05-13 22:00:00\"}";
        productInfo = JSONObject.parseObject(productInfoJSON, ProductInfo.class);
        rebuildCache.put(productInfo);
    }
    return productInfo;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23