# 050. 基于 kafka + ehcache + redis 完成缓存数据生产服务的开发与测试

# 将 kafka 整合到 spring boot 中

这里的整合是 boot 1.x 的使用方式,也就是手工整合,由于我第一次接触 kafka, 就跟着老师的脚步走了。

增加依赖,需要与服务器上一样的版本,否则可能会有问题

compile 'org.apache.kafka:kafka_2.9.2:0.8.1.1'
1

创建消费者

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

/**
 *
 * @author : zhuqiang
 * @date : 2019/4/7 16:25
 */
public class KafkaConcusmer implements Runnable {

    private final ConsumerConnector consumer;
    private final String topic;

    public KafkaConcusmer(String topic, CacheService cacheService) {
      consumer = Consumer.createJavaConsumerConnector(createConsumerConfig(
              "192.168.99.170:2181," +
                      "192.168.99.171:2181," +
                      "192.168.99.172:2181",
              "eshop-cache-group"));
      this.topic = topic;
      this.cacheService = cacheService;
    }

    private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {
        Properties props = new Properties();
        props.put("zookeeper.connect", a_zookeeper);
        props.put("group.id", a_groupId);
        props.put("zookeeper.session.timeout.ms", "40000");
        props.put("zookeeper.sync.time.ms", "200");
        props.put("auto.commit.interval.ms", "1000");
        return new ConsumerConfig(props);
    }

    @Override
    public void run() {
        Map<String, Integer> topicCountMap = new HashMap<>();
        topicCountMap.put(topic, 1);
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
        List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);

        for (final KafkaStream stream : streams) {
            new Thread(new KafkaMessageProcessor(stream)).start();
        }
    }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48

消费者处理线程

import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;

/**
 * @author : zhuqiang
 * @date : 2019/4/7 16:29
 */
public class KafkaMessageProcessor implements Runnable {

    private KafkaStream kafkaStream;

    public KafkaMessageProcessor(KafkaStream kafkaStream) {
        this.kafkaStream = kafkaStream;
    }

    public void run() {
        ConsumerIterator<byte[], byte[]> it = kafkaStream.iterator();
        while (it.hasNext()) {
            String message = new String(it.next().message());
            System.out.println(message);
        }
    }

}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

项目启动的时候开始这消费者线程

@Component
public class KafkaInit implements ApplicationRunner {
    @Override
    public void run(ApplicationArguments args) throws Exception {
        new Thread(new KafkaConcusmer("eshop-message")).start();
    }
}
1
2
3
4
5
6
7

以上代码做的工作是:对接 kafka 消息队列,监听一个 topic。等待其他服务的修改事件

# 编写业务逻辑

  1. 两种服务会发送来数据变更消息:商品信息服务和商品店铺信息服务,每个消息都包含服务名以及商品 id

  2. 接收到消息之后,根据商品 id 到对应的服务拉取数据

    这一步,我们采取简化的模拟方式,就是在代码里面写死,会获取到什么数据,不去实际再写其他的服务去调用了

  3. 商品信息:id、名称、价格、图片列表、商品规格、售后信息、颜色、尺寸

  4. 商品店铺信息:其他维度

    用这个维度模拟出来缓存数据维度化拆分:id、店铺名称、店铺等级、店铺好评率

  5. 分别拉取到了数据之后,将数据组织成 json 串,然后分别存储到 ehcache 中和 redis 缓存中

这里的业务逻辑代码,因为是模拟这个场景,所以事先比较简单,重要的类如下

接收到事件之后,分别处理事件

public class KafkaMessageProcessor implements Runnable {

    private KafkaStream kafkaStream;
    private CacheService cacheService;
    private Logger log = LoggerFactory.getLogger(getClass());

    public KafkaMessageProcessor(KafkaStream kafkaStream, CacheService cacheService) {
        this.kafkaStream = kafkaStream;
        this.cacheService = cacheService;
    }

    public void run() {
        ConsumerIterator<byte[], byte[]> it = kafkaStream.iterator();
        while (it.hasNext()) {
            String message = new String(it.next().message());

            // 首先将message转换成json对象
            JSONObject messageJSONObject = JSONObject.parseObject(message);

            // 从这里提取出消息对应的服务的标识
            String serviceId = messageJSONObject.getString("serviceId");

            // 如果是商品信息服务
            if ("productInfoService".equals(serviceId)) {
                processProductInfoChangeMessage(messageJSONObject);
            } else if ("shopInfoService".equals(serviceId)) {
                processShopInfoChangeMessage(messageJSONObject);
            }
        }
    }

    /**
     * 处理商品信息变更的消息
     */
    private void processProductInfoChangeMessage(JSONObject messageJSONObject) {
        // 提取出商品id
        Long productId = messageJSONObject.getLong("productId");

        // 调用商品信息服务的接口
        // 直接用注释模拟:getProductInfo?productId=1,传递过去
        // 商品信息服务,一般来说就会去查询数据库,去获取productId=1的商品信息,然后返回回来

        String productInfoJSON = "{\"id\": 1, \"name\": \"iphone7手机\", \"price\": 5599, \"pictureList\":\"a.jpg,b.jpg\", \"specification\": \"iphone7的规格\", \"service\": \"iphone7的售后服务\", \"color\": \"红色,白色,黑色\", \"size\": \"5.5\", \"shopId\": 1}";
        ProductInfo productInfo = JSONObject.parseObject(productInfoJSON, ProductInfo.class);
        cacheService.saveProductInfo2LocalCache(productInfo);
        log.info("获取刚保存到本地缓存的商品信息:" + cacheService.getProductInfoFromLocalCache(productId));
        cacheService.saveProductInfo2ReidsCache(productInfo);
    }

    /**
     * 处理店铺信息变更的消息
     */
    private void processShopInfoChangeMessage(JSONObject messageJSONObject) {
        // 提取出商品id
        Long productId = messageJSONObject.getLong("productId");
        Long shopId = messageJSONObject.getLong("shopId");
        // 这里也是模拟去数据库获取到了信息

        String shopInfoJSON = "{\"id\": 1, \"name\": \"小王的手机店\", \"level\": 5, \"goodCommentRate\":0.99}";
        ShopInfo shopInfo = JSONObject.parseObject(shopInfoJSON, ShopInfo.class);
        cacheService.saveShopInfo2LocalCache(shopInfo);
        log.info("获取刚保存到本地缓存的店铺信息:" + cacheService.getShopInfoFromLocalCache(shopId));
        cacheService.saveShopInfo2ReidsCache(shopInfo);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65

把缓存的读写封装了到 service 中。上面通过 service 去操作缓存

@Service
public class CacheServiceImpl implements CacheService {
    public static final String CACHE_NAME = "local";

    @Resource
    private JedisCluster jedisCluster;

    /**
     * 将商品信息保存到本地缓存中
     */
    @CachePut(value = CACHE_NAME, key = "'key_'+#productInfo.getId()")
    public ProductInfo saveLocalCache(ProductInfo productInfo) {
        return productInfo;
    }

    /**
     * 从本地缓存中获取商品信息
     */
    @Cacheable(value = CACHE_NAME, key = "'key_'+#id")
    public ProductInfo getLocalCache(Long id) {
        return null;
    }

    /**
     * 将商品信息保存到本地的ehcache缓存中
     */
    @CachePut(value = CACHE_NAME, key = "'product_info_'+#productInfo.getId()")
    public ProductInfo saveProductInfo2LocalCache(ProductInfo productInfo) {
        return productInfo;
    }

    /**
     * 从本地ehcache缓存中获取商品信息
     */
    @Cacheable(value = CACHE_NAME, key = "'product_info_'+#productId")
    public ProductInfo getProductInfoFromLocalCache(Long productId) {
        return null;
    }

    /**
     * 将店铺信息保存到本地的ehcache缓存中
     */
    @CachePut(value = CACHE_NAME, key = "'shop_info_'+#shopInfo.getId()")
    public ShopInfo saveShopInfo2LocalCache(ShopInfo shopInfo) {
        return shopInfo;
    }

    /**
     * 从本地ehcache缓存中获取店铺信息
     */
    @Cacheable(value = CACHE_NAME, key = "'shop_info_'+#shopId")
    public ShopInfo getShopInfoFromLocalCache(Long shopId) {
        return null;
    }

    /**
     * 将商品信息保存到redis中
     */
    public void saveProductInfo2ReidsCache(ProductInfo productInfo) {
        String key = "product_info_" + productInfo.getId();
        jedisCluster.set(key, JSONObject.toJSONString(productInfo));
    }

    /**
     * 将店铺信息保存到redis中
     */
    public void saveShopInfo2ReidsCache(ShopInfo shopInfo) {
        String key = "shop_info_" + shopInfo.getId();
        jedisCluster.set(key, JSONObject.toJSONString(shopInfo));
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71

# 测试业务逻辑

命令行创建 topic 和 producer 可以参考前一章节 中的 检查集群状态

  1. 创建一个 kafka topic

    # 创建 topic ,需要和程序中的一致 :eshop-message
    bin/kafka-topics.sh --zookeeper 192.168.99.170:2181,192.168.99.171:2181,192.168.99.172:2181 --topic eshop-message --replication-factor 1 --partitions 1 --create
    
    1
    2
  2. 在命令行启动一个 kafka producer

    # 创建一个生产者
    bin/kafka-console-producer.sh --broker-list 192.168.99.170:9092,192.168.99.171:9092,192.168.99.172:9092 --topic eshop-message
    
    1
    2
  3. 启动系统,消费者开始监听 kafka topic

    注意:在 boot 2.1.x 中,连不上也没有日志打印。需要把几个虚拟机 hostname 映射到本地

    C:\Windows\System32\drivers\etc\hosts

    192.168.99.170 eshop-cache01
    192.168.99.171 eshop-cache02
    192.168.99.172 eshop-cache03
    192.168.99.173 eshop-cache04
    
    1
    2
    3
    4
  4. 在 producer 中,分别发送两条消息,一个是商品信息服务的消息,一个是商品店铺信息服务的消息

    由于在本次模拟中,cn.mrcode.cachepdp.eshop.cache.kafka.KafkaMessageProcessor 只使用 serviceId 作为了判定,其他数据是程序中写死的,所以这里推送两条携带 serviceId 的信息即可

    {"serviceId":"productInfoService","productId":"1"}
    
    {"serviceId":"shopInfoService","shopId":"1"}
    
    1
    2
    3
  5. 能否接收到两条消息,并模拟拉取到两条数据,同时将数据写入 ehcache 中,并写入 redis 缓存中

  6. ehcache 通过打印日志方式来观察,redis 通过手工连接上去来查询

WARNING

由于 kafka 安装与使用感觉比较难,而且又是老版本的 kafka, 在 boot 2.x 中使用问题很多,不能 debug 等问题

虽然有效果了,但是不能 debug,感觉这个消费者的代码很薄弱