# 071. 基于 storm+kafka 完成商品访问次数实时统计拓扑的开发
本节的代码思路如下:
之前已经完成过 storm hellowd 了,在这模板基础上添加业务代码。
- 编写消费 kafka 的 spout
- 编写解析日志的 bolt,获取到商品 id
- 编写统计商品次数的 bolt
# 项目 build.gradle
需要重新写一个项目,因为是业务代码了。需要依赖 kafka 等库
//指定编译的编码
tasks.withType(JavaCompile) {
options.encoding = "UTF-8"
}
version = '1.0.0'
sourceCompatibility = '1.7'
dependencies {
// 打包的时候不打包该依赖
// 注意:直接在 idea 中运行的话使用 compileOnly 会报错找不到依赖
// 打包的时候,使用 compile ,提交到 storm 中又会报错,所以打包和开发注意下依赖问题
// compileOnly 'org.apache.storm:storm-core:1.1.0'
compile 'org.apache.storm:storm-core:1.1.0'
compile 'commons-collections:commons-collections:3.2.1'
compile 'com.alibaba:fastjson:1.1.43'
compile ('org.apache.kafka:kafka_2.9.2:0.8.1.1'){
exclude group: 'org.slf4j', module: 'slf4j-log4j12'
}
compile ('org.apache.zookeeper:zookeeper:3.4.5'){
exclude group: 'org.slf4j', module: 'slf4j-log4j12'
}
}
jar {
manifestContentCharset 'utf-8'
metadataCharset 'utf-8'
manifest {
attributes(
"Manifest-Version": 1.0,
"Main-Class": "cn.mrcode.cachepdp.eshop.storm.HotProductTopology")
}
from { configurations.compile.collect { it.isDirectory() ? it : zipTree(it) } }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# HotProductTopology 编码
package cn.mrcode.cachepdp.eshop.storm;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.LinkedTransferQueue;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
/**
* 消费 kafka 数据的 spout
*
* @author : zhuqiang
* @date : 2019/5/22 23:01
*/
public class AccessLogConsumerSpout extends BaseRichSpout {
private LinkedTransferQueue<String> queue;
private SpoutOutputCollector collector;
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
queue = new LinkedTransferQueue();
this.collector = collector;
startKafka();
}
@Override
public void nextTuple() {
try {
// 使用 LinkedTransferQueue 的目的是:
// kafka put 会一直阻塞,直到有一个 take 执行,才会返回
// 这里能真实的反应客户端消费 kafka 的能力
// 而不是无限消费,存在内存中
String message = queue.take();
collector.emit(new Values(message));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("message"));
}
private ConsumerConnector consumer;
private String topic;
private void startKafka() {
consumer = Consumer.createJavaConsumerConnector(createConsumerConfig(
"192.168.99.170:2181," +
"192.168.99.171:2181," +
"192.168.99.172:2181",
"eshop-cache-group"));
this.topic = "access-log";
new Thread(new Runnable() {
@Override
public void run() {
Map<String, Integer> topicCountMap = new HashMap<>();
topicCountMap.put(topic, 1);
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
for (final KafkaStream stream : streams) {
ConsumerIterator<byte[], byte[]> it = stream.iterator();
while (it.hasNext()) {
MessageAndMetadata<byte[], byte[]> next = it.next();
String message = new String(next.message());
try {
queue.transfer(message);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}).start();
}
private ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {
Properties props = new Properties();
props.put("zookeeper.connect", a_zookeeper);
props.put("group.id", a_groupId);
props.put("zookeeper.session.timeout.ms", "40000");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
return new ConsumerConfig(props);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
package cn.mrcode.cachepdp.eshop.storm;
import com.alibaba.fastjson.JSONObject;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import java.util.Map;
/**
* ${todo}
*
* @author : zhuqiang
* @date : 2019/5/22 23:26
*/
public class LogParseBolt extends BaseRichBolt {
private OutputCollector collector;
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(Tuple input) {
String message = input.getStringByField("message");
JSONObject jsonObject = JSONObject.parseObject(message);
// "uri_args":{"productId":"1","shopId":"1"}
JSONObject uri_args = jsonObject.getJSONObject("uri_args");
Long productId = uri_args.getLong("productId");
if (productId != null) {
collector.emit(new Values(productId));
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("productId"));
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package cn.mrcode.cachepdp.eshop.storm;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.trident.util.LRUMap;
import org.apache.storm.tuple.Tuple;
import java.util.Map;
/**
* ${todo}
*
* @author : zhuqiang
* @date : 2019/5/22 23:29
*/
public class ProductCountBolt extends BaseRichBolt {
private LRUMap<Long, Long> countMap = new LRUMap(100);
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
}
@Override
public void execute(Tuple input) {
Long productId = input.getLongByField("productId");
Long count = countMap.get(productId);
if (count == null) {
count = 0L;
}
countMap.put(productId, ++count);
System.out.println("商品 " + productId + ",次数 " + countMap.get(productId));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package cn.mrcode.cachepdp.eshop.storm;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
import java.util.concurrent.TimeUnit;
/**
* ${todo}
*
* @author : zhuqiang
* @date : 2019/5/22 22:58
*/
public class HotProductTopology {
public static void main(String[] args) throws InvalidTopologyException, AuthorizationException, AlreadyAliveException, InterruptedException {
// 构建拓扑,也就是手动定义业务流程
// 其他的提交到 storm 集群后,由 storm 去调度在哪些机器上启动你所定义的 拓扑
TopologyBuilder builder = new TopologyBuilder();
// id、spout、并发数量
builder.setSpout(AccessLogConsumerSpout.class.getSimpleName(),
new AccessLogConsumerSpout(), 2);
builder.setBolt(LogParseBolt.class.getSimpleName(),
new LogParseBolt(), 5)
.setNumTasks(5)
.shuffleGrouping(AccessLogConsumerSpout.class.getSimpleName());
builder.setBolt(ProductCountBolt.class.getSimpleName(),
new ProductCountBolt(), 5)
.setNumTasks(5)
.fieldsGrouping(LogParseBolt.class.getSimpleName(), new Fields("productId"));
Config conf = new Config();
conf.setDebug(false);
if (args != null && args.length > 0) {
// 表示在命令行中运行的,需要提交的 storm 集群中去
conf.setNumWorkers(3);
StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
} else {
conf.setMaxTaskParallelism(3);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("HotProductTopology", conf, builder.createTopology());
TimeUnit.SECONDS.sleep(60);
cluster.shutdown();
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# 代码测试
- 先本地运行 HotProductTopology
- 访问地址:http://eshop-cache03/product?method=product&productId=1&shopId=1
这个时候回分发到两个 应用层 ningx 上,就会上报到 kafka。
观察打印的日志信息,访问一次就会打印一次
商品 1,次数 1
商品 1,次数 2
商品 1,次数 3
1
2
3
2
3