# 070. 基于 nginx+lua 完成商品详情页访问流量实时上报 kafka 的开发

本章节要实现的就是:在 nginx 这一层,接收到访问请求的时候,就把请求的流量上报发送给 kafka, 这样的话,storm 才能去消费 kafka 中的实时的访问日志,然后去进行缓存热数据的统计

使用到的 lua 工具包:lua-resty-kafka (opens new window)

# 安装 lua-resty-kafka

nginx 三台的作用

  • eshop-01:应用层
  • eshop-02:应用层
  • eshop-03:分发层

我们需要在 01 和 02 应用层上装上该依赖,并编写上报脚本

cd /usr/local/
wget https://github.com/doujiang24/lua-resty-kafka/archive/master.zip
yum install -y unzip
unzip master.zip
# resty 目录下是 kafka 目录,其实就是讲 kafka 目录放到 lualib 中去
cp -rf /usr/local/lua-resty-kafka-master/lib/resty/ /usr/hello/lualib
# 加载依赖包,其实后续写完脚本之后也需要 reload 的
/usr/servers/nginx/sbin/nginx -s reload
1
2
3
4
5
6
7
8

# 脚本编写

/usr/hello/lua/product.lua 中增加这段逻辑

提示:这种工具类的核心写法,在该工具官网 github 中有示例

该段逻辑由于比较独立,可以放在 product.lua 顶部。

local cjson = require("cjson")
-- 引用之前安装的工具包
local producer = require("resty.kafka.producer")

local broker_list = {
  { host = "192.168.99.170", port = 9092 },  
  { host = "192.168.99.171", port = 9092 },  
  { host = "192.168.99.172", port = 9092 }
}

-- 定义日志信息
local log_json = {}
log_json["headers"] = ngx.req.get_headers()  
log_json["uri_args"] = ngx.req.get_uri_args()  
log_json["body"] = ngx.req.read_body()  
log_json["http_version"] = ngx.req.http_version()  
log_json["method"] =ngx.req.get_method()
log_json["raw_reader"] = ngx.req.raw_header()  
log_json["body_data"] = ngx.req.get_body_data()

-- 序列化为一个字符串
local message = cjson.encode(log_json);  

-- local offset, err = p:send("test", key, message)
-- 这里的 key 只是作为消息路由分区使用,kafka 中的概念
local productId = ngx.req.get_uri_args()["productId"]
-- 异步发送信息
local async_producer = producer:new(broker_list, { producer_type = "async" })   
local ok, err = async_producer:send("access-log", productId, message)  

if not ok then  
    ngx.log(ngx.ERR, "kafka send err:", err)  
    return  
end
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34

记得需要 /usr/servers/nginx/sbin/nginx -s reload

# kafka topic 创建与消费显示

详细内容可参考之前的内容

cd /usr/local/kafka_2.9.2-0.8.1.1
# 创建测试的 topic,名称为 access-log
bin/kafka-topics.sh --zookeeper 192.168.99.170:2181,192.168.99.171:2181,192.168.99.172:2181 --topic access-log --replication-factor 1 --partitions 1 --create
# 创建一个消费者
bin/kafka-console-consumer.sh --zookeeper 192.168.99.170:2181,192.168.99.171:2181,192.168.99.172:2181 --topic access-log --from-beginning
1
2
3
4
5

# 测试脚本是否达到正常效果

记得后端缓存服务需要启动,nginx 本地缓存是有过期时间的,过期后就会去请求后端服务了

访问地址:http://eshop-cache03/product?method=product&productId=1&shopId=1

页面能正常看到商品信息,但是 kafka consumer 无信息

# 查看 nginx 的错误日志发现
tail -f /usr/servers/nginx/logs/error.log

2019/05/07 20:14:49 [error] 9888#0: [lua] producer.lua:258: buffered messages send to kafka err: no resolver defined to resolve "eshop-cache01", retryable: true, topic: access-log, partition_id: 0, length: 1, context: ngx.timer, client: 192.168.99.172, server: 0.0.0.0:80

1
2
3
4
5

TIP

经过实战排错,resolver 8.8.8.8; 可以不配置,只需要修改 kafka 配置文件配置项 advertised.host.name = 对应机器 ip 即可

解决方法:

vi /usr/servers/nginx/conf/nginx.conf

在 http 部分,加入 resolver 8.8.8.8;
1
2
3

再次尝试发现日志变更了

2019/05/07 20:20:55 [error] 9891#0: [lua] producer.lua:258: buffered messages send to kafka err: eshop-cache01 could not be resolved (3: Host not found), retryable: true, topic: access-log, partition_id: 0, length: 1, context: ngx.timer, client: 192.168.99.172, server: 0.0.0.0:80
1

可以看到日志,的确是去解析了,但是这个是我们本地自定义的肯定解析不到,那么这个问题是哪里的问题呢?

我懒一点,视频中说到,需要更改 kafka 的配置文件,让用本机 ip 而不是 hostName

vi /usr/local/kafka_2.9.2-0.8.1.1/config/server.properties

# 默认是 hostname,更改为自己机器的 ip 地址
#advertised.host.name=<hostname routable by clients>
advertised.host.name = 192.168.99.170
1
2
3
4
5

再重启 kafka

[root@eshop-cache01 lua]# jps
12698 Jps
12310 logviewer
1576 Kafka

kill -9 1576

cd /usr/local/kafka_2.9.2-0.8.1.1
nohup bin/kafka-server-start.sh config/server.properties &
# 查看是否启动是否报错
cat nohup.out
1
2
3
4
5
6
7
8
9
10
11

再次访问,发现能接受到信息了

[root@eshop-cache01 kafka_2.9.2-0.8.1.1]# bin/kafka-console-consumer.sh --zookeeper 192.168.99.170:2181,192.168.99.171:2181,192.168.99.172:2181 --topic access-log --from-beginning
{"method":"GET","http_version":1.1,"raw_reader":"GET \/product?productId=1&shopId=1 HTTP\/1.1\r\nHost: 192.168.99.171\r\nUser-Agent: lua-resty-http\/0.13 (Lua) ngx_lua\/9014\r\n\r\n","uri_args":{"productId":"1","shopId":"1"},"headers":{"host":"192.168.99.171","user-agent":"lua-resty-http\/0.13 (Lua) ngx_lua\/9014"}}
1
2