# 049. zookeeper + kafka 集群的安装部署以及如何简单使用的介绍

# 回顾

多级缓存的架构主要用来解决的问题是:时效高低数据的维度存储

时效性不高的数据,比如一些商品的基本信息,如果发生了变更,假设在 5 分钟之后再更新到页面中, 供用户观察到,也是 ok 的,那么我们采取的是异步更新缓存的策略

时效性要求很高的数据,如库存,采取的是数据库 + 缓存双写的技术方案,也解决了双写的一致性的问题

上面这两条可能直接看觉得好像差不多的,这里忽略了一个解释,对于页面来说,需要静态的生成页面, 这个过程可能稍微耗时一些,而对于双写来说则快太多了,它不负责页面渲染等工作,只需要把缓存数据更新即可

缓存数据生产服务,监听一个消息队列,然后数据源服务(商品信息管理服务)发生了数据变更之后, 就将数据变更的消息推送到消息队列中

缓存数据生产服务可以去消费到这个数据变更的消息,然后根据消息的指示提取一些参数, 然后调用对应的数据源服务的接口拉取数据,这个时候一般是从 mysql 库中拉去的

消息队列这里采用的是 kafka,这里选择 kafka 另外一个原因: 后面我们还要用 zookeeper 来解决缓存的分布式并发更新的问题(如分布式锁解决)

而 kafka 集群是依赖 zookeeper 集群,所以先搭建 zookeeper 集群,再搭建 kafka 集群

zookeeper + kafka 的集群,都至少是三节点

# 老师旁白

我工作的时候,很多项目是跟大数据相关的,当然也有很多是纯 java 系统的架构,最近用 kafka 用得比较多

kafka 比较简单易用,讲课来说,很方便

解释一下,我们当然是不可能对课程中涉及的各种技术都深入浅出的讲解的了,比如 kafka,花上 20个小时给你讲解一下,不可能的

所以说呢,在这里,一些技术的组合,用什么都 ok

笑傲江湖中的风清扬,手中无剑胜有剑,还有任何东西都可以当做兵器,哪怕是一根草也可以

搞技术,kafka 和 activemq 肯定有区别,但是说,在有些场景下,其实可能没有那么大的区分度,kafka 和 activemq 其实是一样的

生产者+消费者的场景,kafka + activemq 都 ok

涉及的这种架构,对时效性要求高和时效性要求低的数据,分别采取什么技术方案?数据库+缓存双写一致性?异步+多级缓存架构?大缓存的维度化拆分?

你要关注的,是一些架构上的东西和思想,而不是具体的什么 mq 的使用

作为一名架构师,需要宏观的思考,通盘去考虑整个架构,还有未来的技术规划,业务的发展方向, 架构的演进方向和路线,站在一个 java 高级工程师的角度来思考的话,就是这么去落地实现, 这个侧重点不一样

把课程里讲解的各种技术方案组合成、修改成你需要的适合你的业务的缓存架构

WARNING

建议不要跟着老师的版本走,如果你一开始所有的版本都跟着走的话,就可以

否则,请安装最新版的 kafka,因为 kafka 的客户端与 linux 服务端通信的兼容性问题很麻烦, 且老版本的配置在 boot 2.x 中不兼容,如果手工安装的话(也就是本笔记跟着老师视频走的), 会出现 debug 这个消费者的时候,debug 异常,不知道什么原因

# zookeeper 集群安装

版本:zookeeper-3.4.5.tar.gz

安装机器:eshop-01、eshop-02、eshop-03

cd /usr/local
wget https://archive.apache.org/dist/zookeeper/zookeeper-3.4.5/zookeeper-3.4.5.tar.gz
tar -zxvf zookeeper-3.4.5.tar.gz

# 配置环境变量,原来 .bashrc 才是环境变量的配置处
vi ~/.bashrc
# 内容新增
export ZOOKEEPER_HOME=/usr/local/zookeeper-3.4.5
export PATH=$PATH:$ZOOKEEPER_HOME/bin
# 刷新环境变量配置
source ~/.bashrc

# 修改配置文件
cd /usr/local/zookeeper-3.4.5/conf
cp zoo_sample.cfg zoo.cfg

vi zoo.cfg
# 涉及到的内容如下
dataDir=/usr/local/zookeeper-3.4.5/data
server.0=eshop-cache01:2888:3888
server.1=eshop-cache02:2888:3888
server.2=eshop-cache03:2888:3888

# data
mkdir /usr/local/zookeeper-3.4.5/data
# 编辑集群 id,给一个 0,从 0 开始起
# vi 一个不存在的文件,保存后就存在了
cd /usr/local/zookeeper-3.4.5/data
vi myid
#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

一台搞定之后,使用 scp 命令把环境变量和 zookper 复制到 02 和 03 上

# scp --help 中查看支持同时 scp 到多个机器,但是这里尝试一部分成功,一部分失败,所以分开
scp ~/.bashrc root@eshop-cache02:~/
scp ~/.bashrc root@eshop-cache03:~/
scp -r /usr/local/zookeeper-3.4.5 root@eshop-cache02:/usr/local
scp -r /usr/local/zookeeper-3.4.5 root@eshop-cache03:/usr/local
1
2
3
4
5

复制之后,需要修改的地方有:myid ,分别修改为 1 和 2,唯一即可

zookper 启动与检查;三台机器都执行操作

# 由于之前配置了环境变量,这里可以任意目录执行
zkServer.sh start
# 检查 ZooKeeper 状态:一个 leader,两个 follower
zkServer.sh status
# jps:检查三个节点是否都有 QuromPeerMain 进程
jps
1
2
3
4
5
6

# scala 安装

kafka 依赖 scala ,所以需要先安装

cd /usr/local
wget https://downloads.lightbend.com/scala/2.11.12/scala-2.11.12.tgz
tar -zxvf scala-2.11.12.tgz

# 配置环境变量,原来 .bashrc 才是环境变量的配置处
vi ~/.bashrc
# 内容新增
export SCALA_HOME=/usr/local/scala-2.11.12
export PATH=$PATH:$SCALA_HOME/bin
# 刷新环境变量配置
source ~/.bashrc
# 查看是否已经配置好
scala -version

scp ~/.bashrc root@eshop-cache02:~/
scp ~/.bashrc root@eshop-cache03:~/
scp -r /usr/local/scala-2.11.12 root@eshop-cache02:/usr/local/
scp -r /usr/local/scala-2.11.12 root@eshop-cache03:/usr/local/
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

# kafka 集群

版本:kafka_2.9.2-0.8.1.1.tgz,该版本是 2014 年发布的,很老的一个版本了

cd /usr/local
wget https://archive.apache.org/dist/kafka/0.8.1.1/kafka_2.9.2-0.8.1.1.tgz
tar -zxvf kafka_2.9.2-0.8.1.1.tgz

# 配置环境变量,原来 .bashrc 才是环境变量的配置处
vi ~/.bashrc
# 内容新增
export KAFKA_HOME=/usr/local/kafka_2.9.2-0.8.1.1
export PATH=$PATH:$KAFKA_HOME/bin
# 刷新环境变量配置
source ~/.bashrc
1
2
3
4
5
6
7
8
9
10
11

配置 kafka

vi /usr/local/kafka_2.9.2-0.8.1.1/config/server.properties
# 涉及到的内容如下
broker.id  # 类似 myid。每台机器唯一即可
zookeeper.connect=192.168.99.170:2181,192.168.99.171:2181,192.168.99.172:2181
1
2
3
4

安装 slf4j 日志

  • slf4j-1.7.6.zip 包解压后
  • slf4j-nop-1.7.6.jar 包放在 kafka/libs 目录下

解决 kafka 启动 报错 Unrecognized VM option 'UseCompressedOops' 问题

vi /usr/local/kafka_2.9.2-0.8.1.1/bin/kafka-run-class.sh

# 去掉 -XX:+UseCompressedOops 即可
if [ -z "$KAFKA_JVM_PERFORMANCE_OPTS" ]; then
  KAFKA_JVM_PERFORMANCE_OPTS="-server  -XX:+UseCompressedOops -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:+CMSClassUnloadingEnabled -XX:+CMSScavengeBeforeRemark -XX:+DisableExplicitGC -Djava.awt.headless=true"
fi
1
2
3
4
5
6

启动命令

cd /usr/local/kafka_2.9.2-0.8.1.1
nohup bin/kafka-server-start.sh config/server.properties &
# 查看是否启动是否报错
cat nohup.out
1
2
3
4

单机启动没有问题后,可以按照之前的方法把环境变量和 kafka 目录 copy 到另外两台机器

scp ~/.bashrc root@eshop-cache02:~/
scp ~/.bashrc root@eshop-cache03:~/

scp -r /usr/local/kafka_2.9.2-0.8.1.1 root@eshop-cache02:/usr/local/
scp -r /usr/local/kafka_2.9.2-0.8.1.1 root@eshop-cache03:/usr/local/

# 记得修改每台机器的 broker.id 。修改之后才能启动成功
1
2
3
4
5
6
7

检查集群状态

使用 jps 检查启动是否成功

使用基本命令检查kafka是否搭建成功
cd /usr/local/kafka_2.9.2-0.8.1.1

# 创建测试的 topic,名称为 test
bin/kafka-topics.sh --zookeeper 192.168.99.170:2181,192.168.99.171:2181,192.168.99.172:2181 --topic test --replication-factor 1 --partitions 1 --create

# 生产者和消费者是一个命令行互动程序,所以需要开启两个终端连接
# 在生产者程序中输入字符串发送后,会在消费者上显示出来
# 创建一个生产者
bin/kafka-console-producer.sh --broker-list 192.168.99.170:9092,192.168.99.171:9092,192.168.99.172:9092 --topic test

# 创建一个消费者
bin/kafka-console-consumer.sh --zookeeper 192.168.99.170:2181,192.168.99.171:2181,192.168.99.172:2181 --topic test --from-beginning
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15