# 060. 缓存数据生产服务中的 zk 分布式锁解决方案的代码实现(三)
启动项目来测试,会发现报错了。找到两个 StaticLoggerBinder,来自两个 jar 中;
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/E:/SoftwareDevelop/Repository/caches/modules-2/files-2.1/org.slf4j/slf4j-log4j12/1.7.25/110cefe2df103412849d72ef7a67e4e91e4266b4/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/E:/SoftwareDevelop/Repository/caches/modules-2/files-2.1/ch.qos.logback/logback-classic/1.2.3/7c4f3c474fb2c041d8028740440937705ebb473a/logback-classic-1.2.3.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Exception in thread "main" java.lang.IllegalArgumentException: LoggerFactory is not a Logback LoggerContext but Logback is on the classpath. Either remove Logback or the competing implementation (class org.slf4j.impl.Log4jLoggerFactory loaded from file:/E:/SoftwareDevelop/Repository/caches/modules-2/files-2.1/org.slf4j/slf4j-log4j12/1.7.25/110cefe2df103412849d72ef7a67e4e91e4266b4/slf4j-log4j12-1.7.25.jar). If you are using WebLogic you will need to add 'org.slf4j' to prefer-application-packages in WEB-INF/weblogic.xml: org.slf4j.impl.Log4jLoggerFactory
at org.springframework.util.Assert.instanceCheckFailed(Assert.java:655)
1
2
3
4
5
6
7
2
3
4
5
6
7
这里排除 zookper 中的
# gradle 解决冲突&查找依赖树
gradle dependencyInsight --dependency slf4j-log4j12
H:\dev\project\mrcode\cache-pdp\eshop-cache>gradle dependencyInsight --dependency slf4j-log4j12
> Task :eshop-cache:dependencyInsight
org.slf4j:slf4j-log4j12:1.7.25 (selected by rule)
variant "runtime" [
org.gradle.status = release (not requested)
Requested attributes not found in the selected variant:
org.gradle.usage = java-api
]
org.slf4j:slf4j-log4j12:1.6.1 -> 1.7.25
variant "runtime" [
org.gradle.status = release (not requested)
Requested attributes not found in the selected variant:
org.gradle.usage = java-api
]
\--- org.apache.zookeeper:zookeeper:3.4.5
+--- compileClasspath
+--- org.apache.kafka:kafka_2.9.2:0.8.1.1
| \--- compileClasspath
\--- com.101tec:zkclient:0.3
\--- org.apache.kafka:kafka_2.9.2:0.8.1.1 (*)
(*) - dependencies omitted (listed previously)
A web-based, searchable dependency report is available by adding the --scan option.
BUILD SUCCESSFUL in 7s
1 actionable task: 1 executed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
可以看到 在 zk 里面有引用,然后 kafka 里面有引用 zookper。所以需要排除两个地方
compile ('org.apache.kafka:kafka_2.9.2:0.8.1.1'){
exclude group: 'org.slf4j', module: 'slf4j-log4j12'
}
compile ('org.apache.zookeeper:zookeeper:3.4.5'){
exclude group: 'org.slf4j', module: 'slf4j-log4j12'
}
1
2
3
4
5
6
2
3
4
5
6
# 测试缓存重建流程
首先先删除 redis 中的数据,通过之前的 redis 章节中的命令登录并删除
redis-cli -h eshop-cache02 -p 7003
eshop-cache02:7003> del product_info_1
1
2
2
再启动 eshop-cache 服务;
访问:http://localhost:6002/getProductInfo?productId=1
,查看打印日志
2019-05-14 22:22:38.618 INFO 11616 --- [nio-6002-exec-1] c.m.c.e.c.controller.CacheController : 从 redis 中获取商品信息
2019-05-14 22:22:38.643 INFO 11616 --- [nio-6002-exec-1] c.m.c.e.c.controller.CacheController : 从 ehcache 中获取商品信息
2019-05-14 22:22:45.276 INFO 11616 --- [p-cache03:2181)] org.apache.zookeeper.ClientCnxn : Session establishment complete on server eshop-cache03/192.168.99.172:2181, sessionid = 0x26a8e8df54a0001, negotiated timeout = 5000
watch event:SyncConnected
zookeeper 已连接
zookeeper 初始化成功
获取锁成功 product[id=1]
2019-05-14 22:22:45.295 INFO 11616 --- [ Thread-14] c.m.c.e.cache.controller.RebuildCache : 缓存重建成功:cn.mrcode.cachepdp.eshop.cache.model.ProductInfo@1ce9e40a
1
2
3
4
5
6
7
8
2
3
4
5
6
7
8
再次访问,只有一条日志了
2019-05-14 22:23:52.328 INFO 11616 --- [nio-6002-exec-4] c.m.c.e.c.controller.CacheController : 从 redis 中获取商品信息
1
# 测试商品信息变更缓存重建
也就是 KafkaMessageProcessor 中的逻辑
这个测试需要手动往 kafka 中植入一条数据,触发这个消费逻辑
手动植入数据参考 这里
bin/kafka-console-producer.sh --broker-list 192.168.99.170:9092,192.168.99.171:9092,192.168.99.172:9092 --topic eshop-message
{"serviceId":"productInfoService","productId":"1"}
1
2
3
2
3
日志打印
watch event:SyncConnected
zookeeper 已连接
zookeeper 初始化成功
获取锁成功 product[id=1]
2019-05-14 22:37:32.884 INFO 6692 --- [ Thread-29] c.m.c.e.c.kafka.KafkaMessageProcessor : 获取刚保存到本地缓存的商品信息:cn.mrcode.cachepdp.eshop.cache.model.ProductInfo@37582749
获取锁成功 product[id=1]
2019-05-14 22:37:38.648 INFO 6692 --- [ Thread-29] c.m.c.e.c.kafka.KafkaMessageProcessor : 数据未变更过
1
2
3
4
5
6
7
2
3
4
5
6
7
# 测试两个流程
- 增加 kafka 获取锁后的休眠,模拟耗时操作
- 删除 redis 缓存,重启项目后
- 先触发 kafka
- 再访问
http://localhost:6002/getProductInfo?productId=1
watch event:SyncConnected
zookeeper 已连接
zookeeper 初始化成功
获取锁成功 product[id=1]
2019-05-14 22:44:56.225 INFO 9376 --- [ Thread-30] c.m.c.e.c.kafka.KafkaMessageProcessor : kafka 休眠 10 秒
2019-05-14 22:44:59.640 TRACE 9376 --- [nio-6002-exec-1] o.s.web.servlet.DispatcherServlet : GET "/getProductInfo?productId=1", parameters={masked}, headers={masked} in DispatcherServlet 'dispatcherServlet'
2019-05-14 22:44:59.649 TRACE 9376 --- [nio-6002-exec-1] s.w.s.m.m.a.RequestMappingHandlerMapping : Mapped to public cn.mrcode.cachepdp.eshop.cache.model.ProductInfo cn.mrcode.cachepdp.eshop.cache.controller.CacheController.getProductInfo(java.lang.Long)
2019-05-14 22:44:59.672 TRACE 9376 --- [nio-6002-exec-1] .w.s.m.m.a.ServletInvocableHandlerMethod : Arguments: [1]
2019-05-14 22:44:59.682 INFO 9376 --- [nio-6002-exec-1] c.m.c.e.c.controller.CacheController : 从 redis 中获取商品信息
2019-05-14 22:44:59.705 INFO 9376 --- [nio-6002-exec-1] c.m.c.e.c.controller.CacheController : 从 ehcache 中获取商品信息
org.apache.zookeeper.KeeperException$NodeExistsException: KeeperErrorCode = NodeExists for /product-lock-1
at org.apache.zookeeper.KeeperException.create(KeeperException.java:119)
at org.apache.zookeeper.KeeperException.create(KeeperException.java:51)
at org.apache.zookeeper.ZooKeeper.create(ZooKeeper.java:783)
at cn.mrcode.cachepdp.eshop.cache.ZooKeeperSession.acquireDistributedLock(ZooKeeperSession.java:58)
at cn.mrcode.cachepdp.eshop.cache.controller.RebuildCache.lambda$start$0(RebuildCache.java:58)
at cn.mrcode.cachepdp.eshop.cache.controller.RebuildCache$$Lambda$433/1623148876.run(Unknown Source)
at java.lang.Thread.run(Thread.java:745)
2019-05-14 22:45:06.243 INFO 9376 --- [ Thread-30] c.m.c.e.c.kafka.KafkaMessageProcessor : 获取刚保存到本地缓存的商品信息:cn.mrcode.cachepdp.eshop.cache.model.ProductInfo@9fbc9fb
获取锁成功 product[id=1] 尝试了 180 次.
2019-05-14 22:45:06.293 INFO 9376 --- [ Thread-13] c.m.c.e.cache.controller.RebuildCache : 此次数据版本落后,放弃重建
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
上面的日志可以看到:
- 在休眠过程中,触发了缓存重建操作
- 缓存重建一直在等待锁的释放,最后尝试了 180 次,才获取到
- 获取到之后,发现 kafka 线程重建缓存后的数据比自己的新(其实是一样的时间,日志打印问题),所以放弃了往 redis 中放入数据的操作
# 小结
# 思想总结
缓存重建出现在两个地方:
- 当基础服务信息变更之后(被动)
- 当所有缓存失效之后(主动)
一个主动一个被动,他们的执行逻辑都相同,其实可以使用一个队列逻辑来处理缓存重建
缓存重建重要依赖「zk 分布式锁」让多个实例/操作 串行化起来。避免脏数据覆盖新数据
# 疑问
此处的 kafka 调试还是不能 debug。一 debug 线程就卡卡住;
另外这个逻辑细节有些地方是不严谨的,比如从数据库获取数据,再用分布式锁去, 那么会不会出现在分布式锁之前很多请求全部打到数据库中去了呢?
我这里只是提出来课程中的一点疑问,现在最主要的还是要跟着课程思路走