# 059. 缓存数据生产服务中的 zk 分布式锁解决方案的代码实现(二)
# 主动更新
缓存生产服务接收基础信息更改事件的时候,有一个操作是更新本地缓存和 redis 中的缓存, 这个场景下也存可能存在并发冲突情况。所以这里也可以使用分布式锁来保证数据错乱问题
cn.mrcode.cachepdp.eshop.cache.kafka.KafkaMessageProcessor#processProductInfoChangeMessage
回顾下现在的实现代码。以商品为例,来展示怎么使用分布式锁
/**
* 处理商品信息变更的消息
*/
private void processProductInfoChangeMessage(JSONObject messageJSONObject) {
// 提取出商品id
Long productId = messageJSONObject.getLong("productId");
// 调用商品信息服务的接口
// 直接用注释模拟:getProductInfo?productId=1,传递过去
// 商品信息服务,一般来说就会去查询数据库,去获取productId=1的商品信息,然后返回回来
String productInfoJSON = "{\"id\": 1, \"name\": \"iphone7手机\", \"price\": 5599, \"pictureList\":\"a.jpg,b.jpg\", \"specification\": \"iphone7的规格\", \"service\": \"iphone7的售后服务\", \"color\": \"红色,白色,黑色\", \"size\": \"5.5\", \"shopId\": 1}";
ProductInfo productInfo = JSONObject.parseObject(productInfoJSON, ProductInfo.class);
cacheService.saveProductInfo2LocalCache(productInfo);
log.info("获取刚保存到本地缓存的商品信息:" + cacheService.getProductInfoFromLocalCache(productId));
cacheService.saveProductInfo2ReidsCache(productInfo);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
使用分布式锁之后
private void processProductInfoChangeMessage(JSONObject messageJSONObject) {
// 提取出商品id
Long productId = messageJSONObject.getLong("productId");
// 增加了一个 modifyTime 字段,来比较数据修改先后顺序
String productInfoJSON = "{\"id\": 1, \"name\": \"iphone7手机\", \"price\": 5599, \"pictureList\":\"a.jpg,b.jpg\", \"specification\": \"iphone7的规格\", \"service\": \"iphone7的售后服务\", \"color\": \"红色,白色,黑色\", \"size\": \"5.5\", \"shopId\": 1," +
"\"modifyTime\":\"2019-05-13 22:00:00\"}";
ProductInfo productInfo = JSONObject.parseObject(productInfoJSON, ProductInfo.class);
// 加锁
ZooKeeperSession zks = ZooKeeperSession.getInstance();
zks.acquireDistributedLock(productId);
try {
// 先获取一次 redis ,防止其他实例已经放入数据了
ProductInfo existedProduct = cacheService.getProductInfoOfReidsCache(productId);
if (existedProduct != null) {
// 判定通过消息获取到的数据版本和 redis 中的谁最新
Date existedModifyTime = existedProduct.getModifyTime();
Date modifyTime = productInfo.getModifyTime();
// 如果本次获取到的修改时间大于 redis 中的,那么说明此数据是最新的,可以放入 redis 中
if (modifyTime.after(existedModifyTime)) {
cacheService.saveProductInfo2LocalCache(productInfo);
log.info("最新数据覆盖 redis 中的数据:" + cacheService.getProductInfoFromLocalCache(productId));
cacheService.saveProductInfo2ReidsCache(productInfo);
}
} else {
// redis 中没有数据,直接放入
cacheService.saveProductInfo2LocalCache(productInfo);
log.info("获取刚保存到本地缓存的商品信息:" + cacheService.getProductInfoFromLocalCache(productId));
cacheService.saveProductInfo2ReidsCache(productInfo);
}
} finally {
// 最后释放锁
zks.releaseDistributedLock(productId);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# 缓存重建
回顾下重建的地方
/**
* 这里的代码别看着奇怪,简单回顾下之前的流程: 1. nginx 获取 redis 缓存 2. 获取不到再获取服务的堆缓存(也就是这里的 ecache) 3.
* 还获取不到就需要去数据库获取并重建缓存
*/
@RequestMapping("/getProductInfo")
@ResponseBody
public ProductInfo getProductInfo(Long productId) {
ProductInfo productInfo = cacheService.getProductInfoOfReidsCache(productId);
log.info("从 redis 中获取商品信息");
if (productInfo == null) {
productInfo = cacheService.getProductInfoFromLocalCache(productId);
log.info("从 ehcache 中获取商品信息");
}
if (productInfo == null) {
// 两级缓存中都获取不到数据,那么就需要从数据源重新拉取数据,重建缓存
// 但是这里暂时不讲
log.info("缓存重建 商品信息");
}
return productInfo;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
如上代码,都获取不到数据的时候,就需要从数据库读取数据进行重建。
第一版思路:
- 从数据库读取数据
- 队列异步重建
- 返回第一步的数据
下面来实现下这个代码(先不考虑该思路是否有问题)
cn.mrcode.cachepdp.eshop.cache.controller.RebuildCache
/**
* 缓存重建;一个队列对应一个消费线程
*
* @author : zhuqiang
* @date : 2019/5/14 21:06
*/
@Component
public class RebuildCache {
private Logger log = LoggerFactory.getLogger(getClass());
private ArrayBlockingQueue<ProductInfo> queue = new ArrayBlockingQueue<>(100);
private CacheService cacheService;
public RebuildCache(CacheService cacheService) {
this.cacheService = cacheService;
start();
}
public void put(ProductInfo productInfo) {
try {
queue.put(productInfo);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public ProductInfo take() {
try {
return queue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
}
// 启动一个线程来消费
private void start() {
new Thread(() -> {
while (true) {
try {
ProductInfo productInfo = queue.take();
Long productId = productInfo.getId();
ZooKeeperSession zks = ZooKeeperSession.getInstance();
zks.acquireDistributedLock(productId);
try {
// 先获取一次 redis ,防止其他实例已经放入数据了
ProductInfo existedProduct = cacheService.getProductInfoOfReidsCache(productId);
if (existedProduct != null) {
// 判定通过消息获取到的数据版本和 redis 中的谁最新
Date existedModifyTime = existedProduct.getModifyTime();
Date modifyTime = productInfo.getModifyTime();
// 如果本次获取到的修改时间大于 redis 中的,那么说明此数据是最新的,可以放入 redis 中
if (modifyTime.after(existedModifyTime)) {
cacheService.saveProductInfo2LocalCache(productInfo);
log.info("最新数据覆盖 redis 中的数据:" + cacheService.getProductInfoFromLocalCache(productId));
cacheService.saveProductInfo2ReidsCache(productInfo);
} else {
log.info("此次数据版本落后,放弃重建");
}
} else {
// redis 中没有数据,直接放入
cacheService.saveProductInfo2LocalCache(productInfo);
log.info("缓存重建成功" + cacheService.getProductInfoFromLocalCache(productId));
cacheService.saveProductInfo2ReidsCache(productInfo);
}
} finally {
// 最后释放锁
zks.releaseDistributedLock(productId);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
controller 中使用该队列
/**
* 这里的代码别看着奇怪,简单回顾下之前的流程: 1. nginx 获取 redis 缓存 2. 获取不到再获取服务的堆缓存(也就是这里的 ecache) 3.
* 还获取不到就需要去数据库获取并重建缓存
*/
@RequestMapping("/getProductInfo")
@ResponseBody
public ProductInfo getProductInfo(Long productId) {
ProductInfo productInfo = cacheService.getProductInfoOfReidsCache(productId);
log.info("从 redis 中获取商品信息");
if (productInfo == null) {
productInfo = cacheService.getProductInfoFromLocalCache(productId);
log.info("从 ehcache 中获取商品信息");
}
if (productInfo == null) {
// 两级缓存中都获取不到数据,那么就需要从数据源重新拉取数据,重建缓存
// 假设这里从数据库中获取的数据
String productInfoJSON = "{\"id\": 1, \"name\": \"iphone7手机\", \"price\": 5599, \"pictureList\":\"a.jpg,b.jpg\", \"specification\": \"iphone7的规格\", \"service\": \"iphone7的售后服务\", \"color\": \"红色,白色,黑色\", \"size\": \"5.5\", \"shopId\": 1," +
"\"modifyTime\":\"2019-05-13 22:00:00\"}";
productInfo = JSONObject.parseObject(productInfoJSON, ProductInfo.class);
rebuildCache.put(productInfo);
}
return productInfo;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23