storm+kafka集成简单应用
网友 CSDN博客

基本场景是应用出现错误,发送日志到kafka的某个topic,storm订阅该topic,然后进行后续处理。场景非常简单,但是在学习过程中,遇到一个奇怪的异常情况:使用KafkaSpout读取topic数据时,没有向ZK写offset数据,致使每次都从头开始读取。纠结了两天,终于碰巧找到原因:应该使用BaseBasicBolt作为bolt的父类,而不是BaseRichBolt

基本订阅 :

基本场景:订阅kafka的某个topic,然后在读取的消息前加上自定义的字符串,然后写回到kafka另外一个topic。  从Kafka读取数据的Spout使用storm.kafka.KafkaSpout,向Kafka写数据的Bolt使用storm.kafka.bolt.KafkaBolt。中间进行进行数据处理的Bolt定义为TopicMsgBolt。

import backtype.storm.Config;

import backtype.storm.LocalCluster;

import backtype.storm.StormSubmitter;

import backtype.storm.spout.SchemeAsMultiScheme;

import backtype.storm.topology.IBasicBolt;

import backtype.storm.topology.TopologyBuilder;

import backtype.storm.utils.Utils;

import storm.kafka.BrokerHosts;

import storm.kafka.KafkaSpout;

import storm.kafka.SpoutConfig;

import storm.kafka.ZkHosts;

import storm.kafka.bolt.KafkaBolt;

import java.util.Properties;

public class TopicMsgTopology {

    public static void main(String[] args) throws Exception {

        // 配置Zookeeper地址

        BrokerHosts brokerHosts = new ZkHosts("localhost:2181");

        // 配置Kafka订阅的Topic,以及zookeeper中数据节点目录和名字

        SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, "msgTopic1""/topology/root1""topicMsgTopology");

        // 配置KafkaBolt中的kafka.broker.properties

        Config conf = new Config();

        Properties props = new Properties();

        // 配置Kafka broker地址

        props.put("metadata.broker.list""localhost:9092");

        // serializer.class为消息的序列化类

        props.put("serializer.class""kafka.serializer.StringEncoder");

        conf.put("kafka.broker.properties", props);

        // 配置KafkaBolt生成的topic

        conf.put("topic""msgTopic2");

        spoutConfig.scheme = new SchemeAsMultiScheme(new MessageScheme());

        TopologyBuilder builder = new TopologyBuilder();

        builder.setSpout("msgKafkaSpout"new KafkaSpout(spoutConfig));

        builder.setBolt("msgSentenceBolt", (IBasicBolt) new TopicMsgBolt()).shuffleGrouping("msgKafkaSpout");

        builder.setBolt("msgKafkaBolt"new KafkaBolt<String, Integer>()).shuffleGrouping("msgSentenceBolt");

        if (args.length == 0) {

            String topologyName = "kafkaTopicTopology";

            LocalCluster cluster = new LocalCluster();

            cluster.submitTopology(topologyName, conf, builder.createTopology());

            Utils.sleep(100000);

            cluster.killTopology(topologyName);

            cluster.shutdown();

        else {

            conf.setNumWorkers(1);

            StormSubmitter.submitTopology(args[0], conf, builder.createTopology());

        }

    }

}

storm.kafka.ZkHosts构造方法的参数是zookeeper标准配置地址的形式

storm.kafka.SpoutConfig构造方法第一个参数为上述的storm.kafka.ZkHosts对象,第二个为待订阅的topic名称,java培训的第三个参数zkRoot为写读取topic时的偏移量offset数据的节点(zk node),第四个参数为该节点上的次级节点名(有个地方说这个是spout的id)。  backtype.storm.Config对象是配置storm的topology(拓扑)所需要的基础配置。  backtype.storm.spout.SchemeAsMultiScheme的构造方法输入的参数是订阅kafka数据的处理参数,这里的MessageScheme是自定义的,代码如下:

import backtype.storm.spout.Scheme;

import backtype.storm.tuple.Fields;

import backtype.storm.tuple.Values;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import java.io.UnsupportedEncodingException;

import java.util.List;

public class MessageScheme implements Scheme {

    private static final Logger logger = LoggerFactory.getLogger(MessageScheme.class);

    @Override

    public List<Object> deserialize(byte[] ser) {

        try {

            String msg = new String(ser, "UTF-8");

            logger.info("get one message is {}", msg);

            return new Values(msg);

        catch (UnsupportedEncodingException ignored) {

            return null;

        }

    }

    @Override

    public Fields getOutputFields() {

        return new Fields("msg");

    }

}

MessageScheme类中getOutputFields方法是KafkaSpout向后发送tuple(storm传输数据的最小结构)的名字,需要与接收数据的Bolt中统一(在这个例子中可以不统一,因为后面直接取第0条数据,但是在wordCount的那个例子中就需要统一了)。  TopicMsgBolt类是从storm.kafka.KafkaSpout接收数据的Bolt,对接收到的数据进行处理,然后向后传输给storm.kafka.bolt.KafkaBolt。

import backtype.storm.topology.BasicOutputCollector;

import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.topology.base.BaseBasicBolt;

import backtype.storm.tuple.Fields;

import backtype.storm.tuple.Tuple;

import backtype.storm.tuple.Values;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

public class TopicMsgBolt extends BaseBasicBolt {

    private static final Logger logger = LoggerFactory.getLogger(TopicMsgBolt.class);

    @Override

    public void execute(Tuple input, BasicOutputCollector collector) {

        String word = (String) input.getValue(0);

        String out = "Message got is '" + word + "'!";

        logger.info("out={}", out);

        collector.emit(new Values(out));

    }

    @Override

    public void declareOutputFields(OutputFieldsDeclarer declarer) {

        declarer.declare(new Fields("message"));

    }

}

此处需要特别注意的是,要使用backtype.storm.topology.base.BaseBasicBolt对象作为父类,否则不会在zk记录偏移量offset数据。 需要编写的代码已完成,接下来就是在搭建好的storm、kafka中进行测试:

# 创建topic./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic msgTopic1

./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic msgTopic2

接下来需要分别对msgTopic1、msgTopic2启动producer(生产者)与consumer(消费者)

# 对msgTopic1启动producer,用于发送数据 ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic msgTopic1 

# 对msgTopic2启动consumer,用于查看发送数据的处理结果 ./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic msgTopic2 --from-beginning

执行storm的jar命令运行程序:

storm jar stormkafka.jar stormkafka1.TopicMsgTopology

待对应的worker启动好之后,就可以在msgTopic1的producer对应终端输入数据,然后在msgTopic2的consumer对应终端查看输出结果了。  

有几点需要注意的:  必须先创建msgTopic1、msgTopic2两个topic; 定义的bolt必须使用BaseBasicBolt作为父类,不能够使用BaseRichBolt,否则无法记录偏移量; zookeeper最好使用至少三个节点的分布式模式或伪分布式模式,否则会出现一些异常情况; 在整个storm下,spout、bolt的id必须唯一,否则会出现异常。 TopicMsgBolt类作为storm.kafka.bolt.KafkaBolt前的最后一个Bolt,需要将输出数据名称定义为message,否则KafkaBolt无法接收数据。


CIO之家 www.ciozj.com 公众号:imciow
关联的文档
也许您喜欢