# 050. 基于 kafka + ehcache + redis 完成缓存数据生产服务的开发与测试
# 将 kafka 整合到 spring boot 中
这里的整合是 boot 1.x 的使用方式,也就是手工整合,由于我第一次接触 kafka, 就跟着老师的脚步走了。
增加依赖,需要与服务器上一样的版本,否则可能会有问题
compile 'org.apache.kafka:kafka_2.9.2:0.8.1.1'
创建消费者
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
/**
*
* @author : zhuqiang
* @date : 2019/4/7 16:25
*/
public class KafkaConcusmer implements Runnable {
private final ConsumerConnector consumer;
private final String topic;
public KafkaConcusmer(String topic, CacheService cacheService) {
consumer = Consumer.createJavaConsumerConnector(createConsumerConfig(
"192.168.99.170:2181," +
"192.168.99.171:2181," +
"192.168.99.172:2181",
"eshop-cache-group"));
this.topic = topic;
this.cacheService = cacheService;
}
private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {
Properties props = new Properties();
props.put("zookeeper.connect", a_zookeeper);
props.put("group.id", a_groupId);
props.put("zookeeper.session.timeout.ms", "40000");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
return new ConsumerConfig(props);
}
@Override
public void run() {
Map<String, Integer> topicCountMap = new HashMap<>();
topicCountMap.put(topic, 1);
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
for (final KafkaStream stream : streams) {
new Thread(new KafkaMessageProcessor(stream)).start();
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
消费者处理线程
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
/**
* @author : zhuqiang
* @date : 2019/4/7 16:29
*/
public class KafkaMessageProcessor implements Runnable {
private KafkaStream kafkaStream;
public KafkaMessageProcessor(KafkaStream kafkaStream) {
this.kafkaStream = kafkaStream;
}
public void run() {
ConsumerIterator<byte[], byte[]> it = kafkaStream.iterator();
while (it.hasNext()) {
String message = new String(it.next().message());
System.out.println(message);
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
项目启动的时候开始这消费者线程
@Component
public class KafkaInit implements ApplicationRunner {
@Override
public void run(ApplicationArguments args) throws Exception {
new Thread(new KafkaConcusmer("eshop-message")).start();
}
}
2
3
4
5
6
7
以上代码做的工作是:对接 kafka 消息队列,监听一个 topic。等待其他服务的修改事件
# 编写业务逻辑
两种服务会发送来数据变更消息:商品信息服务和商品店铺信息服务,每个消息都包含服务名以及商品 id
接收到消息之后,根据商品 id 到对应的服务拉取数据
这一步,我们采取简化的模拟方式,就是在代码里面写死,会获取到什么数据,不去实际再写其他的服务去调用了
商品信息:id、名称、价格、图片列表、商品规格、售后信息、颜色、尺寸
商品店铺信息:其他维度
用这个维度模拟出来缓存数据维度化拆分:id、店铺名称、店铺等级、店铺好评率
分别拉取到了数据之后,将数据组织成 json 串,然后分别存储到 ehcache 中和 redis 缓存中
这里的业务逻辑代码,因为是模拟这个场景,所以事先比较简单,重要的类如下
接收到事件之后,分别处理事件
public class KafkaMessageProcessor implements Runnable {
private KafkaStream kafkaStream;
private CacheService cacheService;
private Logger log = LoggerFactory.getLogger(getClass());
public KafkaMessageProcessor(KafkaStream kafkaStream, CacheService cacheService) {
this.kafkaStream = kafkaStream;
this.cacheService = cacheService;
}
public void run() {
ConsumerIterator<byte[], byte[]> it = kafkaStream.iterator();
while (it.hasNext()) {
String message = new String(it.next().message());
// 首先将message转换成json对象
JSONObject messageJSONObject = JSONObject.parseObject(message);
// 从这里提取出消息对应的服务的标识
String serviceId = messageJSONObject.getString("serviceId");
// 如果是商品信息服务
if ("productInfoService".equals(serviceId)) {
processProductInfoChangeMessage(messageJSONObject);
} else if ("shopInfoService".equals(serviceId)) {
processShopInfoChangeMessage(messageJSONObject);
}
}
}
/**
* 处理商品信息变更的消息
*/
private void processProductInfoChangeMessage(JSONObject messageJSONObject) {
// 提取出商品id
Long productId = messageJSONObject.getLong("productId");
// 调用商品信息服务的接口
// 直接用注释模拟:getProductInfo?productId=1,传递过去
// 商品信息服务,一般来说就会去查询数据库,去获取productId=1的商品信息,然后返回回来
String productInfoJSON = "{\"id\": 1, \"name\": \"iphone7手机\", \"price\": 5599, \"pictureList\":\"a.jpg,b.jpg\", \"specification\": \"iphone7的规格\", \"service\": \"iphone7的售后服务\", \"color\": \"红色,白色,黑色\", \"size\": \"5.5\", \"shopId\": 1}";
ProductInfo productInfo = JSONObject.parseObject(productInfoJSON, ProductInfo.class);
cacheService.saveProductInfo2LocalCache(productInfo);
log.info("获取刚保存到本地缓存的商品信息:" + cacheService.getProductInfoFromLocalCache(productId));
cacheService.saveProductInfo2ReidsCache(productInfo);
}
/**
* 处理店铺信息变更的消息
*/
private void processShopInfoChangeMessage(JSONObject messageJSONObject) {
// 提取出商品id
Long productId = messageJSONObject.getLong("productId");
Long shopId = messageJSONObject.getLong("shopId");
// 这里也是模拟去数据库获取到了信息
String shopInfoJSON = "{\"id\": 1, \"name\": \"小王的手机店\", \"level\": 5, \"goodCommentRate\":0.99}";
ShopInfo shopInfo = JSONObject.parseObject(shopInfoJSON, ShopInfo.class);
cacheService.saveShopInfo2LocalCache(shopInfo);
log.info("获取刚保存到本地缓存的店铺信息:" + cacheService.getShopInfoFromLocalCache(shopId));
cacheService.saveShopInfo2ReidsCache(shopInfo);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
把缓存的读写封装了到 service 中。上面通过 service 去操作缓存
@Service
public class CacheServiceImpl implements CacheService {
public static final String CACHE_NAME = "local";
@Resource
private JedisCluster jedisCluster;
/**
* 将商品信息保存到本地缓存中
*/
@CachePut(value = CACHE_NAME, key = "'key_'+#productInfo.getId()")
public ProductInfo saveLocalCache(ProductInfo productInfo) {
return productInfo;
}
/**
* 从本地缓存中获取商品信息
*/
@Cacheable(value = CACHE_NAME, key = "'key_'+#id")
public ProductInfo getLocalCache(Long id) {
return null;
}
/**
* 将商品信息保存到本地的ehcache缓存中
*/
@CachePut(value = CACHE_NAME, key = "'product_info_'+#productInfo.getId()")
public ProductInfo saveProductInfo2LocalCache(ProductInfo productInfo) {
return productInfo;
}
/**
* 从本地ehcache缓存中获取商品信息
*/
@Cacheable(value = CACHE_NAME, key = "'product_info_'+#productId")
public ProductInfo getProductInfoFromLocalCache(Long productId) {
return null;
}
/**
* 将店铺信息保存到本地的ehcache缓存中
*/
@CachePut(value = CACHE_NAME, key = "'shop_info_'+#shopInfo.getId()")
public ShopInfo saveShopInfo2LocalCache(ShopInfo shopInfo) {
return shopInfo;
}
/**
* 从本地ehcache缓存中获取店铺信息
*/
@Cacheable(value = CACHE_NAME, key = "'shop_info_'+#shopId")
public ShopInfo getShopInfoFromLocalCache(Long shopId) {
return null;
}
/**
* 将商品信息保存到redis中
*/
public void saveProductInfo2ReidsCache(ProductInfo productInfo) {
String key = "product_info_" + productInfo.getId();
jedisCluster.set(key, JSONObject.toJSONString(productInfo));
}
/**
* 将店铺信息保存到redis中
*/
public void saveShopInfo2ReidsCache(ShopInfo shopInfo) {
String key = "shop_info_" + shopInfo.getId();
jedisCluster.set(key, JSONObject.toJSONString(shopInfo));
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# 测试业务逻辑
命令行创建 topic 和 producer 可以参考前一章节 中的 检查集群状态
创建一个 kafka topic
# 创建 topic ,需要和程序中的一致 :eshop-message bin/kafka-topics.sh --zookeeper 192.168.99.170:2181,192.168.99.171:2181,192.168.99.172:2181 --topic eshop-message --replication-factor 1 --partitions 1 --create
1
2在命令行启动一个 kafka producer
# 创建一个生产者 bin/kafka-console-producer.sh --broker-list 192.168.99.170:9092,192.168.99.171:9092,192.168.99.172:9092 --topic eshop-message
1
2启动系统,消费者开始监听 kafka topic
注意:在 boot 2.1.x 中,连不上也没有日志打印。需要把几个虚拟机 hostname 映射到本地
C:\Windows\System32\drivers\etc\hosts
192.168.99.170 eshop-cache01 192.168.99.171 eshop-cache02 192.168.99.172 eshop-cache03 192.168.99.173 eshop-cache04
1
2
3
4在 producer 中,分别发送两条消息,一个是商品信息服务的消息,一个是商品店铺信息服务的消息
由于在本次模拟中,cn.mrcode.cachepdp.eshop.cache.kafka.KafkaMessageProcessor 只使用 serviceId 作为了判定,其他数据是程序中写死的,所以这里推送两条携带 serviceId 的信息即可
{"serviceId":"productInfoService","productId":"1"} {"serviceId":"shopInfoService","shopId":"1"}
1
2
3能否接收到两条消息,并模拟拉取到两条数据,同时将数据写入 ehcache 中,并写入 redis 缓存中
ehcache 通过打印日志方式来观察,redis 通过手工连接上去来查询
WARNING
由于 kafka 安装与使用感觉比较难,而且又是老版本的 kafka, 在 boot 2.x 中使用问题很多,不能 debug 等问题
虽然有效果了,但是不能 debug,感觉这个消费者的代码很薄弱