# 071. 基于 storm+kafka 完成商品访问次数实时统计拓扑的开发

本节的代码思路如下:

之前已经完成过 storm hellowd 了,在这模板基础上添加业务代码。

  1. 编写消费 kafka 的 spout
  2. 编写解析日志的 bolt,获取到商品 id
  3. 编写统计商品次数的 bolt

# 项目 build.gradle

需要重新写一个项目,因为是业务代码了。需要依赖 kafka 等库

//指定编译的编码
tasks.withType(JavaCompile) {
    options.encoding = "UTF-8"
}
version = '1.0.0'
sourceCompatibility = '1.7'

dependencies {
    // 打包的时候不打包该依赖
    // 注意:直接在 idea 中运行的话使用 compileOnly 会报错找不到依赖
    // 打包的时候,使用 compile ,提交到 storm 中又会报错,所以打包和开发注意下依赖问题
//    compileOnly 'org.apache.storm:storm-core:1.1.0'
    compile 'org.apache.storm:storm-core:1.1.0'
    compile 'commons-collections:commons-collections:3.2.1'
    compile 'com.alibaba:fastjson:1.1.43'
    compile ('org.apache.kafka:kafka_2.9.2:0.8.1.1'){
        exclude group: 'org.slf4j', module: 'slf4j-log4j12'
    }
    compile ('org.apache.zookeeper:zookeeper:3.4.5'){
        exclude group: 'org.slf4j', module: 'slf4j-log4j12'
    }
}

jar {
    manifestContentCharset 'utf-8'
    metadataCharset 'utf-8'
    manifest {
        attributes(
                "Manifest-Version": 1.0,
                "Main-Class": "cn.mrcode.cachepdp.eshop.storm.HotProductTopology")
    }
    from { configurations.compile.collect { it.isDirectory() ? it : zipTree(it) } }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33

# HotProductTopology 编码

package cn.mrcode.cachepdp.eshop.storm;

import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.LinkedTransferQueue;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;

/**
 * 消费 kafka 数据的 spout
 *
 * @author : zhuqiang
 * @date : 2019/5/22 23:01
 */
public class AccessLogConsumerSpout extends BaseRichSpout {
    private LinkedTransferQueue<String> queue;
    private SpoutOutputCollector collector;

    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        queue = new LinkedTransferQueue();
        this.collector = collector;
        startKafka();
    }

    @Override
    public void nextTuple() {
        try {
            // 使用 LinkedTransferQueue 的目的是:
            // kafka put 会一直阻塞,直到有一个 take 执行,才会返回
            // 这里能真实的反应客户端消费 kafka 的能力
            // 而不是无限消费,存在内存中
            String message = queue.take();
            collector.emit(new Values(message));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("message"));
    }

    private ConsumerConnector consumer;
    private String topic;

    private void startKafka() {
        consumer = Consumer.createJavaConsumerConnector(createConsumerConfig(
                "192.168.99.170:2181," +
                        "192.168.99.171:2181," +
                        "192.168.99.172:2181",
                "eshop-cache-group"));
        this.topic = "access-log";
        new Thread(new Runnable() {
            @Override
            public void run() {
                Map<String, Integer> topicCountMap = new HashMap<>();
                topicCountMap.put(topic, 1);
                Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
                List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);

                for (final KafkaStream stream : streams) {
                    ConsumerIterator<byte[], byte[]> it = stream.iterator();
                    while (it.hasNext()) {
                        MessageAndMetadata<byte[], byte[]> next = it.next();
                        String message = new String(next.message());
                        try {
                            queue.transfer(message);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }
        }).start();
    }

    private ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {
        Properties props = new Properties();
        props.put("zookeeper.connect", a_zookeeper);
        props.put("group.id", a_groupId);
        props.put("zookeeper.session.timeout.ms", "40000");
        props.put("zookeeper.sync.time.ms", "200");
        props.put("auto.commit.interval.ms", "1000");
        return new ConsumerConfig(props);
    }
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
package cn.mrcode.cachepdp.eshop.storm;

import com.alibaba.fastjson.JSONObject;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import java.util.Map;

/**
 * ${todo}
 *
 * @author : zhuqiang
 * @date : 2019/5/22 23:26
 */
public class LogParseBolt extends BaseRichBolt {
    private OutputCollector collector;

    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
    }

    @Override
    public void execute(Tuple input) {
        String message = input.getStringByField("message");
        JSONObject jsonObject = JSONObject.parseObject(message);
        // "uri_args":{"productId":"1","shopId":"1"}
        JSONObject uri_args = jsonObject.getJSONObject("uri_args");
        Long productId = uri_args.getLong("productId");
        if (productId != null) {
            collector.emit(new Values(productId));
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("productId"));
    }
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package cn.mrcode.cachepdp.eshop.storm;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.trident.util.LRUMap;
import org.apache.storm.tuple.Tuple;

import java.util.Map;

/**
 * ${todo}
 *
 * @author : zhuqiang
 * @date : 2019/5/22 23:29
 */
public class ProductCountBolt extends BaseRichBolt {
    private LRUMap<Long, Long> countMap = new LRUMap(100);

    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {

    }

    @Override
    public void execute(Tuple input) {
        Long productId = input.getLongByField("productId");
        Long count = countMap.get(productId);
        if (count == null) {
            count = 0L;
        }
        countMap.put(productId, ++count);
        System.out.println("商品 " + productId + ",次数 " + countMap.get(productId));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {

    }
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package cn.mrcode.cachepdp.eshop.storm;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;

import java.util.concurrent.TimeUnit;

/**
 * ${todo}
 *
 * @author : zhuqiang
 * @date : 2019/5/22 22:58
 */
public class HotProductTopology {
    public static void main(String[] args) throws InvalidTopologyException, AuthorizationException, AlreadyAliveException, InterruptedException {
        // 构建拓扑,也就是手动定义业务流程
        // 其他的提交到 storm 集群后,由 storm 去调度在哪些机器上启动你所定义的 拓扑
        TopologyBuilder builder = new TopologyBuilder();
        // id、spout、并发数量
        builder.setSpout(AccessLogConsumerSpout.class.getSimpleName(),
                new AccessLogConsumerSpout(), 2);
        builder.setBolt(LogParseBolt.class.getSimpleName(),
                new LogParseBolt(), 5)
                .setNumTasks(5)
                .shuffleGrouping(AccessLogConsumerSpout.class.getSimpleName());
        builder.setBolt(ProductCountBolt.class.getSimpleName(),
                new ProductCountBolt(), 5)
                .setNumTasks(5)
                .fieldsGrouping(LogParseBolt.class.getSimpleName(), new Fields("productId"));

        Config conf = new Config();
        conf.setDebug(false);
        if (args != null && args.length > 0) {
            // 表示在命令行中运行的,需要提交的 storm 集群中去
            conf.setNumWorkers(3);
            StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
        } else {
            conf.setMaxTaskParallelism(3);
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("HotProductTopology", conf, builder.createTopology());
            TimeUnit.SECONDS.sleep(60);
            cluster.shutdown();
        }
    }
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52

# 代码测试

  1. 先本地运行 HotProductTopology
  2. 访问地址:http://eshop-cache03/product?method=product&productId=1&shopId=1

这个时候回分发到两个 应用层 ningx 上,就会上报到 kafka。

观察打印的日志信息,访问一次就会打印一次

商品 1,次数 1
商品 1,次数 2
商品 1,次数 3
1
2
3