# 060. 缓存数据生产服务中的 zk 分布式锁解决方案的代码实现(三)

启动项目来测试,会发现报错了。找到两个 StaticLoggerBinder,来自两个 jar 中;

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/E:/SoftwareDevelop/Repository/caches/modules-2/files-2.1/org.slf4j/slf4j-log4j12/1.7.25/110cefe2df103412849d72ef7a67e4e91e4266b4/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/E:/SoftwareDevelop/Repository/caches/modules-2/files-2.1/ch.qos.logback/logback-classic/1.2.3/7c4f3c474fb2c041d8028740440937705ebb473a/logback-classic-1.2.3.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Exception in thread "main" java.lang.IllegalArgumentException: LoggerFactory is not a Logback LoggerContext but Logback is on the classpath. Either remove Logback or the competing implementation (class org.slf4j.impl.Log4jLoggerFactory loaded from file:/E:/SoftwareDevelop/Repository/caches/modules-2/files-2.1/org.slf4j/slf4j-log4j12/1.7.25/110cefe2df103412849d72ef7a67e4e91e4266b4/slf4j-log4j12-1.7.25.jar). If you are using WebLogic you will need to add 'org.slf4j' to prefer-application-packages in WEB-INF/weblogic.xml: org.slf4j.impl.Log4jLoggerFactory
	at org.springframework.util.Assert.instanceCheckFailed(Assert.java:655)
1
2
3
4
5
6
7

这里排除 zookper 中的

# gradle 解决冲突&查找依赖树

gradle dependencyInsight --dependency slf4j-log4j12

H:\dev\project\mrcode\cache-pdp\eshop-cache>gradle dependencyInsight --dependency slf4j-log4j12

> Task :eshop-cache:dependencyInsight
org.slf4j:slf4j-log4j12:1.7.25 (selected by rule)
   variant "runtime" [
      org.gradle.status = release (not requested)
      Requested attributes not found in the selected variant:
         org.gradle.usage  = java-api
   ]

org.slf4j:slf4j-log4j12:1.6.1 -> 1.7.25
   variant "runtime" [
      org.gradle.status = release (not requested)
      Requested attributes not found in the selected variant:
         org.gradle.usage  = java-api
   ]
\--- org.apache.zookeeper:zookeeper:3.4.5
     +--- compileClasspath
     +--- org.apache.kafka:kafka_2.9.2:0.8.1.1
     |    \--- compileClasspath
     \--- com.101tec:zkclient:0.3
          \--- org.apache.kafka:kafka_2.9.2:0.8.1.1 (*)

(*) - dependencies omitted (listed previously)

A web-based, searchable dependency report is available by adding the --scan option.

BUILD SUCCESSFUL in 7s
1 actionable task: 1 executed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

可以看到 在 zk 里面有引用,然后 kafka 里面有引用 zookper。所以需要排除两个地方

compile ('org.apache.kafka:kafka_2.9.2:0.8.1.1'){
       exclude group: 'org.slf4j', module: 'slf4j-log4j12'
   }
 compile ('org.apache.zookeeper:zookeeper:3.4.5'){
     exclude group: 'org.slf4j', module: 'slf4j-log4j12'
 }
1
2
3
4
5
6

# 测试缓存重建流程

首先先删除 redis 中的数据,通过之前的 redis 章节中的命令登录并删除

redis-cli -h eshop-cache02 -p 7003
eshop-cache02:7003> del product_info_1
1
2

再启动 eshop-cache 服务;

访问:http://localhost:6002/getProductInfo?productId=1,查看打印日志

2019-05-14 22:22:38.618  INFO 11616 --- [nio-6002-exec-1] c.m.c.e.c.controller.CacheController     : 从 redis 中获取商品信息
2019-05-14 22:22:38.643  INFO 11616 --- [nio-6002-exec-1] c.m.c.e.c.controller.CacheController     : 从 ehcache 中获取商品信息
2019-05-14 22:22:45.276  INFO 11616 --- [p-cache03:2181)] org.apache.zookeeper.ClientCnxn          : Session establishment complete on server eshop-cache03/192.168.99.172:2181, sessionid = 0x26a8e8df54a0001, negotiated timeout = 5000
watch event:SyncConnected
zookeeper 已连接
zookeeper 初始化成功
获取锁成功 product[id=1]
2019-05-14 22:22:45.295  INFO 11616 --- [      Thread-14] c.m.c.e.cache.controller.RebuildCache    : 缓存重建成功:cn.mrcode.cachepdp.eshop.cache.model.ProductInfo@1ce9e40a
1
2
3
4
5
6
7
8

再次访问,只有一条日志了

2019-05-14 22:23:52.328  INFO 11616 --- [nio-6002-exec-4] c.m.c.e.c.controller.CacheController     : 从 redis 中获取商品信息
1

# 测试商品信息变更缓存重建

也就是 KafkaMessageProcessor 中的逻辑

这个测试需要手动往 kafka 中植入一条数据,触发这个消费逻辑

手动植入数据参考 这里

bin/kafka-console-producer.sh --broker-list 192.168.99.170:9092,192.168.99.171:9092,192.168.99.172:9092 --topic eshop-message

{"serviceId":"productInfoService","productId":"1"}
1
2
3

日志打印

watch event:SyncConnected
zookeeper 已连接
zookeeper 初始化成功
获取锁成功 product[id=1]
2019-05-14 22:37:32.884  INFO 6692 --- [      Thread-29] c.m.c.e.c.kafka.KafkaMessageProcessor    : 获取刚保存到本地缓存的商品信息:cn.mrcode.cachepdp.eshop.cache.model.ProductInfo@37582749
获取锁成功 product[id=1]
2019-05-14 22:37:38.648  INFO 6692 --- [      Thread-29] c.m.c.e.c.kafka.KafkaMessageProcessor    : 数据未变更过
1
2
3
4
5
6
7

# 测试两个流程

  1. 增加 kafka 获取锁后的休眠,模拟耗时操作
  2. 删除 redis 缓存,重启项目后
  3. 先触发 kafka
  4. 再访问 http://localhost:6002/getProductInfo?productId=1
watch event:SyncConnected
zookeeper 已连接
zookeeper 初始化成功
获取锁成功 product[id=1]
2019-05-14 22:44:56.225  INFO 9376 --- [      Thread-30] c.m.c.e.c.kafka.KafkaMessageProcessor    : kafka 休眠 10 秒
2019-05-14 22:44:59.640 TRACE 9376 --- [nio-6002-exec-1] o.s.web.servlet.DispatcherServlet        : GET "/getProductInfo?productId=1", parameters={masked}, headers={masked} in DispatcherServlet 'dispatcherServlet'
2019-05-14 22:44:59.649 TRACE 9376 --- [nio-6002-exec-1] s.w.s.m.m.a.RequestMappingHandlerMapping : Mapped to public cn.mrcode.cachepdp.eshop.cache.model.ProductInfo cn.mrcode.cachepdp.eshop.cache.controller.CacheController.getProductInfo(java.lang.Long)
2019-05-14 22:44:59.672 TRACE 9376 --- [nio-6002-exec-1] .w.s.m.m.a.ServletInvocableHandlerMethod : Arguments: [1]
2019-05-14 22:44:59.682  INFO 9376 --- [nio-6002-exec-1] c.m.c.e.c.controller.CacheController     : 从 redis 中获取商品信息
2019-05-14 22:44:59.705  INFO 9376 --- [nio-6002-exec-1] c.m.c.e.c.controller.CacheController     : 从 ehcache 中获取商品信息
org.apache.zookeeper.KeeperException$NodeExistsException: KeeperErrorCode = NodeExists for /product-lock-1
	at org.apache.zookeeper.KeeperException.create(KeeperException.java:119)
	at org.apache.zookeeper.KeeperException.create(KeeperException.java:51)
	at org.apache.zookeeper.ZooKeeper.create(ZooKeeper.java:783)
	at cn.mrcode.cachepdp.eshop.cache.ZooKeeperSession.acquireDistributedLock(ZooKeeperSession.java:58)
	at cn.mrcode.cachepdp.eshop.cache.controller.RebuildCache.lambda$start$0(RebuildCache.java:58)
	at cn.mrcode.cachepdp.eshop.cache.controller.RebuildCache$$Lambda$433/1623148876.run(Unknown Source)
	at java.lang.Thread.run(Thread.java:745)

2019-05-14 22:45:06.243  INFO 9376 --- [      Thread-30] c.m.c.e.c.kafka.KafkaMessageProcessor    : 获取刚保存到本地缓存的商品信息:cn.mrcode.cachepdp.eshop.cache.model.ProductInfo@9fbc9fb
获取锁成功 product[id=1] 尝试了 180 次.
2019-05-14 22:45:06.293  INFO 9376 --- [      Thread-13] c.m.c.e.cache.controller.RebuildCache    : 此次数据版本落后,放弃重建

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

上面的日志可以看到:

  1. 在休眠过程中,触发了缓存重建操作
  2. 缓存重建一直在等待锁的释放,最后尝试了 180 次,才获取到
  3. 获取到之后,发现 kafka 线程重建缓存后的数据比自己的新(其实是一样的时间,日志打印问题),所以放弃了往 redis 中放入数据的操作

# 小结

# 思想总结

缓存重建出现在两个地方:

  1. 当基础服务信息变更之后(被动)
  2. 当所有缓存失效之后(主动)

一个主动一个被动,他们的执行逻辑都相同,其实可以使用一个队列逻辑来处理缓存重建

缓存重建重要依赖「zk 分布式锁」让多个实例/操作 串行化起来。避免脏数据覆盖新数据

# 疑问

此处的 kafka 调试还是不能 debug。一 debug 线程就卡卡住;

另外这个逻辑细节有些地方是不严谨的,比如从数据库获取数据,再用分布式锁去, 那么会不会出现在分布式锁之前很多请求全部打到数据库中去了呢?

我这里只是提出来课程中的一点疑问,现在最主要的还是要跟着课程思路走