kafka+storm+hbase架构设计
网友 lib.csdn.net

kafka+storm+hbase架构设计:kafka作为分布式消息系统,实时消息系统,有生产者和消费者;storm作为大数据的实时处理系统;hbase是apache hadoop 的数据库,其具有高效的读写性能!
这里把kafka生产的数据作为storm的源头spout来消费,经过bolt处理把结果保存到hbase。

基础环境:这里就不介绍了!!
hadoop集群(zookeeper)
kafka集群
storm集群

1、kafka测试API(包括生产者消费者)

生产者

import java.util.Properties;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
public class Producer extends Thread {
   private final kafka.javaapi.producer.Producer<Integer, String> producer;
   private final String topic;
   private final Properties props = new Properties();

   public Producer(String topic) {
       props.put("serializer.class", "kafka.serializer.StringEncoder");    
       props.put("metadata.broker.list","192.168.80.20:9092,192.168.80.21:9092,192.168.80.22:9092");
       producer = new kafka.javaapi.producer.Producer<Integer, String>(new ProducerConfig(props));
       this.topic = topic;
   }

   public void run() {
       for (int i = 0; i < 2000; i++) {
           String messageStr = new String("Message_" + i);
           System.out.println("product:"+messageStr);
           producer.send(new KeyedMessage<Integer, String>(topic, messageStr));
       }

   }

   public static void main(String[] args) {
       Producer producerThread = new Producer(KafkaProperties.topic);
       producerThread.start();
   }
}




2、消费者测试API:

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
public class Consumer extends Thread {
   private final ConsumerConnector consumer;
   private final String topic;

   public Consumer(String topic) {
       consumer = kafka.consumer.Consumer
               .createJavaConsumerConnector(createConsumerConfig());
       this.topic = topic;
   }

   private static ConsumerConfig createConsumerConfig() {
       Properties props = new Properties();
       props.put("zookeeper.connect", KafkaProperties.zkConnect);
       props.put("group.id", KafkaProperties.groupId);
       //props.put("zookeeper.session.timeout.ms", "400");
       //props.put("zookeeper.sync.time.ms", "200");
       props.put("auto.commit.interval.ms", "60000");//

       return new ConsumerConfig(props);

   }
// push消费方式,服务端推送过来。主动方式是pull
   public void run() {
       Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
       topicCountMap.put(topic, new Integer(1));
       Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer
               .createMessageStreams(topicCountMap);
       KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
       ConsumerIterator<byte[], byte[]> it = stream.iterator();
       
       while (it.hasNext()){
           //逻辑处理
           System.out.println("consumer:"+new String(it.next().message()));
           
       }
           
   }

   public static void main(String[] args) {
       Consumer consumerThread = new Consumer(KafkaProperties.topic);
       consumerThread.start();
   }
}




3、定义kafka消费者的一些常量:

public interface KafkaProperties
{
 final static String zkConnect = "192.168.80.20:2181,192.168.80.20:2181,192.168.80.20:2181";
 final static  String groupId = "group";
 final static String topic = "test";
}




4、在进行项目之前准备一些hbase工具类:


import java.util.List;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
public interface HBaseDAO {

   public void save(Put put,String tableName) ;
   public void insert(String tableName,String rowKey,String family,String quailifer,String value) ;
   public void save(List<Put>Put ,String tableName) ;
   public Result getOneRow(String tableName,String rowKey) ;
   public List<Result> getRows(String tableName,String rowKey_like) ;
   public List<Result> getRows(String tableName, String rowKeyLike, String cols[]) ;
   public List<Result> getRows(String tableName,String startRow,String stopRow) ;
}





import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.PrefixFilter;
import HBaseDAO;

public class HBaseDAOImp implements HBaseDAO{

   HConnection hTablePool = null;
   public HBaseDAOImp()
   {
       Configuration conf = new Configuration();
       conf.set("hbase.zookeeper.quorum","192.168.80.20,192.168.80.21,192.168.80.22");
       conf.set("hbase.rootdir", "hdfs://cluster/hbase");
       try {
           hTablePool = HConnectionManager.createConnection(conf) ;
       } catch (IOException e) {
           e.printStackTrace();
       }
   }
   @Override
   public void save(Put put, String tableName) {
       // TODO Auto-generated method stub
       HTableInterface table = null;
       try {
           table = hTablePool.getTable(tableName) ;
           table.put(put) ;
           
       } catch (Exception e) {
           e.printStackTrace() ;
       }finally{
           try {
               table.close() ;
           } catch (IOException e) {
               e.printStackTrace();
           }
       }
   }
   @Override
   public void insert(String tableName, String rowKey, String family,
           String quailifer, String value) {
       // TODO Auto-generated method stub
       HTableInterface table = null;
       try {
           table = hTablePool.getTable(tableName) ;
           Put put = new Put(rowKey.getBytes());
           put.add(family.getBytes(), quailifer.getBytes(), value.getBytes()) ;
           table.put(put);
       } catch (Exception e) {
           e.printStackTrace();
       }finally
       {
           try {
               table.close() ;
           } catch (IOException e) {
               e.printStackTrace();
           }
       }
   }
   
   @Override
   public void save(List<Put> Put, String tableName) {
       // TODO Auto-generated method stub
       HTableInterface table = null;
       try {
           table = hTablePool.getTable(tableName) ;
           table.put(Put) ;
       }
       catch (Exception e) {
           // TODO: handle exception
       }finally
       {
           try {
               table.close() ;
           } catch (IOException e) {
               e.printStackTrace();
           }
       }
       
   }


   @Override
   public Result getOneRow(String tableName, String rowKey) {
       // TODO Auto-generated method stub
       HTableInterface table = null;
       Result rsResult = null;
       try {
           table = hTablePool.getTable(tableName) ;
           Get get = new Get(rowKey.getBytes()) ;
           rsResult = table.get(get) ;
       } catch (Exception e) {
           e.printStackTrace() ;
       }
       finally
       {
           try {
               table.close() ;
           } catch (IOException e) {
               e.printStackTrace();
           }
       }
       return rsResult;
   }

   @Override
   public List<Result> getRows(String tableName, String rowKeyLike) {
       // TODO Auto-generated method stub
       HTableInterface table = null;
       List<Result> list = null;
       try {
           table = hTablePool.getTable(tableName) ;
           PrefixFilter filter = new PrefixFilter(rowKeyLike.getBytes());
           Scan scan = new Scan();
           scan.setFilter(filter);
           ResultScanner scanner = table.getScanner(scan) ;
           list = new ArrayList<Result>() ;
           for (Result rs : scanner) {
               list.add(rs) ;
           }
       } catch (Exception e) {
           e.printStackTrace() ;
       }
       finally
       {
           try {
               table.close() ;
           } catch (IOException e) {
               e.printStackTrace();
           }
       }
       return list;
   }
   
   public List<Result> getRows(String tableName, String rowKeyLike ,String cols[]) {
       // TODO Auto-generated method stub
       HTableInterface table = null;
       List<Result> list = null;
       try {
           table = hTablePool.getTable(tableName) ;
           PrefixFilter filter = new PrefixFilter(rowKeyLike.getBytes());
           Scan scan = new Scan();
           for (int i = 0; i < cols.length; i++) {
               scan.addColumn("cf".getBytes(), cols[i].getBytes()) ;
           }
           scan.setFilter(filter);
           ResultScanner scanner = table.getScanner(scan) ;
           list = new ArrayList<Result>() ;
           for (Result rs : scanner) {
               list.add(rs) ;
           }
       } catch (Exception e) {
           e.printStackTrace() ;
       }
       finally
       {
           try {
               table.close() ;
           } catch (IOException e) {
               e.printStackTrace();
           }
       }
       return list;
   }
   public List<Result> getRows(String tableName,String startRow,String stopRow)
   {
       HTableInterface table = null;
       List<Result> list = null;
       try {
           table = hTablePool.getTable(tableName) ;
           Scan scan = new Scan() ;
           scan.setStartRow(startRow.getBytes()) ;
           scan.setStopRow(stopRow.getBytes()) ;
           ResultScanner scanner = table.getScanner(scan) ;
           list = new ArrayList<Result>() ;
           for (Result rsResult : scanner) {
               list.add(rsResult) ;
           }
           
       }catch (Exception e) {
           e.printStackTrace() ;
       }
       finally
       {
           try {
               table.close() ;
           } catch (IOException e) {
               e.printStackTrace();
           }
       }
       return list;
   }
   
   public static void main(String[] args) {
       // TODO Auto-generated method stub
       HBaseDAO dao = new HBaseDAOImp();
       List<Put> list = new ArrayList<Put>();
       Put put = new Put("aa".getBytes());
       put.add("cf".getBytes(), "name".getBytes(), "zhangsan".getBytes()) ;
       list.add(put) ;
//        dao.save(put, "test") ;
       put.add("cf".getBytes(), "addr".getBytes(), "beijing".getBytes()) ;
       list.add(put) ;
       put.add("cf".getBytes(), "age".getBytes(), "30".getBytes()) ;
       list.add(put) ;
       put.add("cf".getBytes(), "tel".getBytes(), "13567882341".getBytes()) ;
       list.add(put) ;
       
       dao.save(list, "test");
//        dao.save(put, "test") ;
//        dao.insert("test", "testrow", "cf", "age", "35") ;
//        dao.insert("test", "testrow", "cf", "cardid", "12312312335") ;
//        dao.insert("test", "testrow", "cf", "tel", "13512312345") ;
       
   }

}




下面正式编写简单项目代码

1)实现写kafka生产者:

import java.util.Properties;
import java.util.Random;
import DateFmt;
import backtype.storm.utils.Utils;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

public class Producer extends Thread {
   private final kafka.javaapi.producer.Producer<Integer, String> producer;
   private final String topic;
   private final Properties props = new Properties();

   public Producer(String topic) {
       props.put("serializer.class", "kafka.serializer.StringEncoder");// 字符串消息
       props.put("metadata.broker.list", "192.168.80.20:9092,192.168.80.21:9092,192.168.80.22:9092");
       producer = new kafka.javaapi.producer.Producer<Integer, String>( new ProducerConfig(props));
       this.topic = topic;
   }

   public void run() {
       // order_id,order_amt,create_time,area_id
       Random random = new Random();
       String[] order_amt = { "10.10", "20.10", "30.10","40.0", "60.10" };
       String[] area_id = { "1","2","3","4","5" };
       
       int i =0 ;
       while(true) {
           i ++ ;
           String messageStr = i+"\t"+order_amt[random.nextInt(5)]+"\t"+DateFmt.getCountDate(null, DateFmt.date_long)+"\t"+area_id[random.nextInt(5)] ;
           System.out.println("product:"+messageStr);
           producer.send(new KeyedMessage<Integer, String>(topic, messageStr));
           //Utils.sleep(1000) ;

       }

   }

   public static void main(String[] args) {
       Producer producerThread = new Producer(KafkaProperties.topic);
       producerThread.start();
   }
}




2)这里用到其他时间转换工具类:

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;

public class DateFmt {

   public static final String date_long = "yyyy-MM-dd HH:mm:ss" ;
   public static final String date_short = "yyyy-MM-dd" ;
   
   public static SimpleDateFormat sdf = new SimpleDateFormat(date_short);
   
   public static String getCountDate(String date,String patton)
   {
       SimpleDateFormat sdf = new SimpleDateFormat(patton);
       Calendar cal = Calendar.getInstance();
       if (date != null) {
           try {
               cal.setTime(sdf.parse(date)) ;
           } catch (ParseException e) {
               e.printStackTrace();
           }
       }
       return sdf.format(cal.getTime());
   }
   
   public static String getCountDate(String date,String patton,int step)
   {
       SimpleDateFormat sdf = new SimpleDateFormat(patton);
       Calendar cal = Calendar.getInstance();
       if (date != null) {
           try {
               cal.setTime(sdf.parse(date)) ;
           } catch (ParseException e) {
               e.printStackTrace();
           }
       }
       cal.add(Calendar.DAY_OF_MONTH, step) ;
       return sdf.format(cal.getTime());
   }
   
   public static Date parseDate(String dateStr) throws Exception
   {
       return sdf.parse(dateStr);
   }
   
   public static void main(String[] args) throws Exception{
   System.out.println(DateFmt.getCountDate(null, DateFmt.date_short));
   //System.out.println(DateFmt.getCountDate("2014-03-01 12:13:14", DateFmt.date_short));

   //System.out.println(parseDate("2014-05-02").after(parseDate("2014-05-01")));
   }

}




3)下面写项目中的kafka消费comsumer:


这里把消费者的消费的数据保存到一个有顺序的队列里!(为了作为storm spout数据的来源)--------------非常重要哦!!!!!!

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;

import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.productor.KafkaProperties;

public class OrderConsumer extends Thread {
   private final ConsumerConnector consumer;
   private final String topic;

   private Queue<String> queue = new ConcurrentLinkedQueue<String>() ;//有序队列
   
   public OrderConsumer(String topic) {
       consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig());
       this.topic = topic;
   }

   private static ConsumerConfig createConsumerConfig() {
       Properties props = new Properties();
       props.put("zookeeper.connect", KafkaProperties.zkConnect);
       props.put("group.id", KafkaProperties.groupId);
       props.put("zookeeper.session.timeout.ms", "400");
       props.put("zookeeper.sync.time.ms", "200");
       props.put("auto.commit.interval.ms", "1000");//zookeeper offset偏移量

       return new ConsumerConfig(props);

   }
// push消费方式,服务端推送过来。主动方式是pull
   public void run() {
       Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
       topicCountMap.put(topic, new Integer(1));
       Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer
               .createMessageStreams(topicCountMap);
       KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
       ConsumerIterator<byte[], byte[]> it = stream.iterator();
       
       while (it.hasNext()){
           //逻辑处理
           System.out.println("consumer:"+new String(it.next().message()));
           queue.add(new String(it.next().message())) ;
           System.err.println("队列----->"+queue);
       }
           
   }

   public Queue<String> getQueue()
   {
       return queue ;
   }
   
   public static void main(String[] args) {
       OrderConsumer consumerThread = new OrderConsumer(KafkaProperties.Order_topic);
       consumerThread.start();
     
   }
}




4)下面开始写storm部分包括spout和bolt

import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import kafka.consumers.OrderConsumer;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

public class OrderBaseSpout implements IRichSpout {

   String topic = null;
   public OrderBaseSpout(String topic)
   {
       this.topic = topic ;
   }
   /**
    * 公共基类spout
    */
   private static final long serialVersionUID = 1L;
   Integer TaskId = null;
   SpoutOutputCollector collector = null;
   Queue<String> queue = new ConcurrentLinkedQueue<String>() ;
   
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
       // TODO Auto-generated method stub

       declarer.declare(new Fields("order")) ;
   }

   public void nextTuple() {
       // TODO Auto-generated method stub
       if (queue.size() > 0) {
           String str = queue.poll() ;
           //进行数据过滤
           System.err.println("TaskId:"+TaskId+";  str="+str);
           collector.emit(new Values(str)) ;
       }
   }

   public void open(Map conf, TopologyContext context,
           SpoutOutputCollector collector) {
       this.collector = collector ;
       TaskId = context.getThisTaskId() ;
//        Thread.currentThread().getId()
       OrderConsumer consumer = new OrderConsumer(topic) ;
       consumer.start() ;
       queue = consumer.getQueue() ;
   }

   
   public void ack(Object msgId) {
       // TODO Auto-generated method stub
       
   }

   
   public void activate() {
       // TODO Auto-generated method stub
       
   }

   
   public void close() {
       // TODO Auto-generated method stub
       
   }

   
   public void deactivate() {
       // TODO Auto-generated method stub
       
   }

   
   public void fail(Object msgId) {
       // TODO Auto-generated method stub
       
   }

   
   public Map<String, Object> getComponentConfiguration() {
       // TODO Auto-generated method stub
       return null;
   }
}





storm有了源头数据数据,该如何处理呢?下面要根据自己公司业务逻辑进行处理,我这里只是简单处理,只是为了把流程走完整而已!
下面有3个bolt:

import java.util.Map;
import DateFmt;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

public class AreaFilterBolt implements IBasicBolt {

   private static final long serialVersionUID = 1L;

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
       declarer.declare(new Fields("area_id","order_amt","order_date"));
       
   }

   @Override
   public Map<String, Object> getComponentConfiguration() {
       // TODO Auto-generated method stub
       return null;
   }

   @Override
   public void prepare(Map stormConf, TopologyContext context) {
       // TODO Auto-generated method stub
       
   }

   @Override
   public void execute(Tuple input, BasicOutputCollector collector) {
       String order = input.getString(0);
       if(order != null){
           String[] orderArr = order.split("\\t");
           // ared_id,order_amt,create_time
           collector.emit(new Values(orderArr[3],orderArr[1],DateFmt.getCountDate(orderArr[2], DateFmt.date_short)));
       System.out.println("--------------》"+orderArr[3]+orderArr[1]);
       }
       
   }

   @Override
   public void cleanup() {
       // TODO Auto-generated method stub
       
   }

}







import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Result;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import HBaseDAO;
import HBaseDAOImp;
import DateFmt;

public class AreaAmtBolt  implements IBasicBolt{


   private static final long serialVersionUID = 1L;
   Map <String,Double> countsMap = null ;
   String today = null;
   HBaseDAO dao = null;
   
   @Override
   public void cleanup() {
       //???
       countsMap.clear() ;
   }
   
   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
       declarer.declare(new Fields("date_area","amt")) ;    
       
   }

   @Override
   public Map<String, Object> getComponentConfiguration() {
       return null;
   }

   @Override
   public void prepare(Map stormConf, TopologyContext context) {
       countsMap = new HashMap<String, Double>() ;
       dao = new HBaseDAOImp() ;
       //根据hbase里初始值进行初始化countsMap
       today = DateFmt.getCountDate(null, DateFmt.date_short);
       countsMap = this.initMap(today, dao);
       for(String key:countsMap.keySet())
       {
           System.err.println("key:"+key+"; value:"+countsMap.get(key));
       }
   }

   @Override
   public void execute(Tuple input, BasicOutputCollector collector) {
       if (input != null) {
           String area_id = input.getString(0) ;
           double order_amt = 0.0;
           //order_amt = input.getDouble(1) ;
           try {
               order_amt = Double.parseDouble(input.getString(1)) ;
           } catch (Exception e) {
               System.out.println(input.getString(1)+":---------------------------------");
               e.printStackTrace() ;
           }
           
           String order_date = input.getStringByField("order_date") ;
           
           if (! order_date.equals(today)) {
               //跨天处理
               countsMap.clear() ;
           }
           
           Double count = countsMap.get(order_date+"_"+area_id) ;
           if (count == null) {
               count = 0.0 ;
           }
           count += order_amt ;
           countsMap.put(order_date+"_"+area_id, count) ;
           System.err.println("areaAmtBolt:"+order_date+"_"+area_id+"="+count);
           collector.emit(new Values(order_date+"_"+area_id,count)) ;
           System.out.println("***********"+order_date+"_"+area_id+count);
       }
       
   }
   
   public Map<String, Double> initMap(String rowKeyDate, HBaseDAO dao)
   {
       Map <String,Double> countsMap = new HashMap<String, Double>() ;
       List<Result> list = dao.getRows("area_order", rowKeyDate, new String[]{"order_amt"});
       for(Result rsResult : list)
       {
           String rowKey = new String(rsResult.getRow());
           for(KeyValue keyValue : rsResult.raw())
           {
               if("order_amt".equals(new String(keyValue.getQualifier())))
               {
                   countsMap.put(rowKey, Double.parseDouble(new String(keyValue.getValue()))) ;
                   break;
               }
           }
       }
       
       return countsMap;
       
   }
   



}





import java.util.HashMap;
import java.util.Map;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
import HBaseDAO;
import HBaseDAOImp;
public class AreaRsltBolt implements IBasicBolt
{


    private static final long serialVersionUID = 1L;
    Map <String,Double> countsMap = null ;
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }

    @Override
    public void prepare(Map stormConf, TopologyContext context) {
        dao = new HBaseDAOImp() ;
        countsMap = new HashMap<String, Double>() ;
    }

    HBaseDAO dao = null;
    long beginTime = System.currentTimeMillis() ;
    long endTime = 0L ;
    
    @Override
    public void execute(Tuple input, BasicOutputCollector collector) {
        String date_areaid = input.getString(0);
        double order_amt = input.getDouble(1) ;
        countsMap.put(date_areaid, order_amt) ;
        endTime = System.currentTimeMillis() ;
        if (endTime - beginTime >= 5 * 1000) {
            for(String key : countsMap.keySet())
            {
                // put into hbase
                //这里把处理结果保存到hbase中
                dao.insert("area_order", key, "cf", "order_amt", countsMap.get(key)+"") ;
                System.err.println("rsltBolt put hbase: key="+key+"; order_amt="+countsMap.get(key));
            }
        }
    }

    @Override
    public void cleanup() {
        
    }

}

最后 main方法:

import kafka.productor.KafkaProperties;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import AreaAmtBolt;
import AreaFilterBolt;
import AreaRsltBolt;
import OrderBaseSpout;

public class MYTopology {

   public static void main(String[] args) {
       TopologyBuilder builder = new TopologyBuilder();
       builder.setSpout("spout", new OrderBaseSpout(KafkaProperties.topic), 5);
       builder.setBolt("filterblot", new AreaFilterBolt() , 5).shuffleGrouping("spout") ;
       builder.setBolt("amtbolt", new AreaAmtBolt() , 2).fieldsGrouping("filterblot", new Fields("area_id")) ;
       builder.setBolt("rsltolt", new AreaRsltBolt(), 1).shuffleGrouping("amtbolt");
       
       
       Config conf = new Config() ;
       conf.setDebug(false);
       if (args.length > 0) {
           try {
               StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
           } catch (AlreadyAliveException e) {
               e.printStackTrace();
           } catch (InvalidTopologyException e) {
               e.printStackTrace();
           }
       }else {
           //本地测试!!!!!!!!!!!!
           LocalCluster localCluster = new LocalCluster();
           localCluster.submitTopology("mytopology", conf, builder.createTopology());
       }
       
       
   }

}







到这里架构基本完成了单独学kafka、storm、hbase、这些东西不难,如何把他们整合起来,这就是不一样!!!!呵呵


CIO之家 www.ciozj.com 公众号:imciow
关联的文档
也许您喜欢