# 049. zookeeper + kafka 集群的安装部署以及如何简单使用的介绍
# 回顾
多级缓存的架构主要用来解决的问题是:时效高低数据的维度存储
时效性不高的数据,比如一些商品的基本信息,如果发生了变更,假设在 5 分钟之后再更新到页面中, 供用户观察到,也是 ok 的,那么我们采取的是异步更新缓存的策略
时效性要求很高的数据,如库存,采取的是数据库 + 缓存双写的技术方案,也解决了双写的一致性的问题
上面这两条可能直接看觉得好像差不多的,这里忽略了一个解释,对于页面来说,需要静态的生成页面, 这个过程可能稍微耗时一些,而对于双写来说则快太多了,它不负责页面渲染等工作,只需要把缓存数据更新即可
缓存数据生产服务,监听一个消息队列,然后数据源服务(商品信息管理服务)发生了数据变更之后, 就将数据变更的消息推送到消息队列中
缓存数据生产服务可以去消费到这个数据变更的消息,然后根据消息的指示提取一些参数, 然后调用对应的数据源服务的接口拉取数据,这个时候一般是从 mysql 库中拉去的
消息队列这里采用的是 kafka,这里选择 kafka 另外一个原因: 后面我们还要用 zookeeper 来解决缓存的分布式并发更新的问题(如分布式锁解决)
而 kafka 集群是依赖 zookeeper 集群,所以先搭建 zookeeper 集群,再搭建 kafka 集群
zookeeper + kafka 的集群,都至少是三节点
# 老师旁白
我工作的时候,很多项目是跟大数据相关的,当然也有很多是纯 java 系统的架构,最近用 kafka 用得比较多
kafka 比较简单易用,讲课来说,很方便
解释一下,我们当然是不可能对课程中涉及的各种技术都深入浅出的讲解的了,比如 kafka,花上 20个小时给你讲解一下,不可能的
所以说呢,在这里,一些技术的组合,用什么都 ok
笑傲江湖中的风清扬,手中无剑胜有剑,还有任何东西都可以当做兵器,哪怕是一根草也可以
搞技术,kafka 和 activemq 肯定有区别,但是说,在有些场景下,其实可能没有那么大的区分度,kafka 和 activemq 其实是一样的
生产者+消费者的场景,kafka + activemq 都 ok
涉及的这种架构,对时效性要求高和时效性要求低的数据,分别采取什么技术方案?数据库+缓存双写一致性?异步+多级缓存架构?大缓存的维度化拆分?
你要关注的,是一些架构上的东西和思想,而不是具体的什么 mq 的使用
作为一名架构师,需要宏观的思考,通盘去考虑整个架构,还有未来的技术规划,业务的发展方向, 架构的演进方向和路线,站在一个 java 高级工程师的角度来思考的话,就是这么去落地实现, 这个侧重点不一样
把课程里讲解的各种技术方案组合成、修改成你需要的适合你的业务的缓存架构
WARNING
建议不要跟着老师的版本走,如果你一开始所有的版本都跟着走的话,就可以
否则,请安装最新版的 kafka,因为 kafka 的客户端与 linux 服务端通信的兼容性问题很麻烦, 且老版本的配置在 boot 2.x 中不兼容,如果手工安装的话(也就是本笔记跟着老师视频走的), 会出现 debug 这个消费者的时候,debug 异常,不知道什么原因
# zookeeper 集群安装
版本:zookeeper-3.4.5.tar.gz
安装机器:eshop-01、eshop-02、eshop-03
cd /usr/local
wget https://archive.apache.org/dist/zookeeper/zookeeper-3.4.5/zookeeper-3.4.5.tar.gz
tar -zxvf zookeeper-3.4.5.tar.gz
# 配置环境变量,原来 .bashrc 才是环境变量的配置处
vi ~/.bashrc
# 内容新增
export ZOOKEEPER_HOME=/usr/local/zookeeper-3.4.5
export PATH=$PATH:$ZOOKEEPER_HOME/bin
# 刷新环境变量配置
source ~/.bashrc
# 修改配置文件
cd /usr/local/zookeeper-3.4.5/conf
cp zoo_sample.cfg zoo.cfg
vi zoo.cfg
# 涉及到的内容如下
dataDir=/usr/local/zookeeper-3.4.5/data
server.0=eshop-cache01:2888:3888
server.1=eshop-cache02:2888:3888
server.2=eshop-cache03:2888:3888
# data
mkdir /usr/local/zookeeper-3.4.5/data
# 编辑集群 id,给一个 0,从 0 开始起
# vi 一个不存在的文件,保存后就存在了
cd /usr/local/zookeeper-3.4.5/data
vi myid
#
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
一台搞定之后,使用 scp 命令把环境变量和 zookper 复制到 02 和 03 上
# scp --help 中查看支持同时 scp 到多个机器,但是这里尝试一部分成功,一部分失败,所以分开
scp ~/.bashrc root@eshop-cache02:~/
scp ~/.bashrc root@eshop-cache03:~/
scp -r /usr/local/zookeeper-3.4.5 root@eshop-cache02:/usr/local
scp -r /usr/local/zookeeper-3.4.5 root@eshop-cache03:/usr/local
2
3
4
5
复制之后,需要修改的地方有:myid ,分别修改为 1 和 2,唯一即可
zookper 启动与检查;三台机器都执行操作
# 由于之前配置了环境变量,这里可以任意目录执行
zkServer.sh start
# 检查 ZooKeeper 状态:一个 leader,两个 follower
zkServer.sh status
# jps:检查三个节点是否都有 QuromPeerMain 进程
jps
2
3
4
5
6
# scala 安装
kafka 依赖 scala ,所以需要先安装
cd /usr/local
wget https://downloads.lightbend.com/scala/2.11.12/scala-2.11.12.tgz
tar -zxvf scala-2.11.12.tgz
# 配置环境变量,原来 .bashrc 才是环境变量的配置处
vi ~/.bashrc
# 内容新增
export SCALA_HOME=/usr/local/scala-2.11.12
export PATH=$PATH:$SCALA_HOME/bin
# 刷新环境变量配置
source ~/.bashrc
# 查看是否已经配置好
scala -version
scp ~/.bashrc root@eshop-cache02:~/
scp ~/.bashrc root@eshop-cache03:~/
scp -r /usr/local/scala-2.11.12 root@eshop-cache02:/usr/local/
scp -r /usr/local/scala-2.11.12 root@eshop-cache03:/usr/local/
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# kafka 集群
版本:kafka_2.9.2-0.8.1.1.tgz,该版本是 2014 年发布的,很老的一个版本了
cd /usr/local
wget https://archive.apache.org/dist/kafka/0.8.1.1/kafka_2.9.2-0.8.1.1.tgz
tar -zxvf kafka_2.9.2-0.8.1.1.tgz
# 配置环境变量,原来 .bashrc 才是环境变量的配置处
vi ~/.bashrc
# 内容新增
export KAFKA_HOME=/usr/local/kafka_2.9.2-0.8.1.1
export PATH=$PATH:$KAFKA_HOME/bin
# 刷新环境变量配置
source ~/.bashrc
2
3
4
5
6
7
8
9
10
11
配置 kafka
vi /usr/local/kafka_2.9.2-0.8.1.1/config/server.properties
# 涉及到的内容如下
broker.id # 类似 myid。每台机器唯一即可
zookeeper.connect=192.168.99.170:2181,192.168.99.171:2181,192.168.99.172:2181
2
3
4
安装 slf4j 日志
- slf4j-1.7.6.zip 包解压后
- slf4j-nop-1.7.6.jar 包放在 kafka/libs 目录下
解决 kafka 启动 报错 Unrecognized VM option 'UseCompressedOops' 问题
vi /usr/local/kafka_2.9.2-0.8.1.1/bin/kafka-run-class.sh
# 去掉 -XX:+UseCompressedOops 即可
if [ -z "$KAFKA_JVM_PERFORMANCE_OPTS" ]; then
KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:+UseCompressedOops -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:+CMSClassUnloadingEnabled -XX:+CMSScavengeBeforeRemark -XX:+DisableExplicitGC -Djava.awt.headless=true"
fi
2
3
4
5
6
启动命令
cd /usr/local/kafka_2.9.2-0.8.1.1
nohup bin/kafka-server-start.sh config/server.properties &
# 查看是否启动是否报错
cat nohup.out
2
3
4
单机启动没有问题后,可以按照之前的方法把环境变量和 kafka 目录 copy 到另外两台机器
scp ~/.bashrc root@eshop-cache02:~/
scp ~/.bashrc root@eshop-cache03:~/
scp -r /usr/local/kafka_2.9.2-0.8.1.1 root@eshop-cache02:/usr/local/
scp -r /usr/local/kafka_2.9.2-0.8.1.1 root@eshop-cache03:/usr/local/
# 记得修改每台机器的 broker.id 。修改之后才能启动成功
2
3
4
5
6
7
检查集群状态
使用 jps 检查启动是否成功
使用基本命令检查kafka是否搭建成功
cd /usr/local/kafka_2.9.2-0.8.1.1
# 创建测试的 topic,名称为 test
bin/kafka-topics.sh --zookeeper 192.168.99.170:2181,192.168.99.171:2181,192.168.99.172:2181 --topic test --replication-factor 1 --partitions 1 --create
# 生产者和消费者是一个命令行互动程序,所以需要开启两个终端连接
# 在生产者程序中输入字符串发送后,会在消费者上显示出来
# 创建一个生产者
bin/kafka-console-producer.sh --broker-list 192.168.99.170:9092,192.168.99.171:9092,192.168.99.172:9092 --topic test
# 创建一个消费者
bin/kafka-console-consumer.sh --zookeeper 192.168.99.170:2181,192.168.99.171:2181,192.168.99.172:2181 --topic test --from-beginning
2
3
4
5
6
7
8
9
10
11
12
13
14
15