kafka+storm+hbase架构设计

来源:lib.csdn.net 作者:网友

kafka+storm+hbase架构设计:kafka作为分布式消息系统,实时消息系统,有生产者和消费者;storm作为大数据的实时处理系统;hbase是apache hadoop 的数据库,其具有高效的读写性能!
这里把kafka生产的数据作为storm的源头spout来消费,经过bolt处理把结果保存到hbase。

基础环境:这里就不介绍了!!
hadoop集群(zookeeper)
kafka集群
storm集群

1、kafka测试API(包括生产者消费者)

生产者

import java.util.Properties;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
public class Producer extends Thread {
   private final kafka.javaapi.producer.Producer<Integer, String> producer;
   private final String topic;
   private final Properties props = new Properties();

   public Producer(String topic) {
       props.put("serializer.class", "kafka.serializer.StringEncoder");    
       props.put("metadata.broker.list","192.168.80.20:9092,192.168.80.21:9092,192.168.80.22:9092");
       producer = new kafka.javaapi.producer.Producer<Integer, String>(new ProducerConfig(props));
       this.topic = topic;
   }

   public void run() {
       for (int i = 0; i < 2000; i++) {
           String messageStr = new String("Message_" + i);
           System.out.println("product:"+messageStr);
           producer.send(new KeyedMessage<Integer, String>(topic, messageStr));
       }

   }

   public static void main(String[] args) {
       Producer producerThread = new Producer(KafkaProperties.topic);
       producerThread.start();
   }
}




2、消费者测试API:

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
public class Consumer extends Thread {
   private final ConsumerConnector consumer;
   private final String topic;

   public Consumer(String topic) {
       consumer = kafka.consumer.Consumer
               .createJavaConsumerConnector(createConsumerConfig());
       this.topic = topic;
   }

   private static ConsumerConfig createConsumerConfig() {
       Properties props = new Properties();
       props.put("zookeeper.connect", KafkaProperties.zkConnect);
       props.put("group.id", KafkaProperties.groupId);
       //props.put("zookeeper.session.timeout.ms", "400");
       //props.put("zookeeper.sync.time.ms", "200");
       props.put("auto.commit.interval.ms", "60000");//

       return new ConsumerConfig(props);

   }
// push消费方式,服务端推送过来。主动方式是pull
   public void run() {
       Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
       topicCountMap.put(topic, new Integer(1));
       Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer
               .createMessageStreams(topicCountMap);
       KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
       ConsumerIterator<byte[], byte[]> it = stream.iterator();
       
       while (it.hasNext()){
           //逻辑处理
           System.out.println("consumer:"+new String(it.next().message()));
           
       }
           
   }

   public static void main(String[] args) {
       Consumer consumerThread = new Consumer(KafkaProperties.topic);
       consumerThread.start();
   }
}




3、定义kafka消费者的一些常量:

public interface KafkaProperties
{
 final static String zkConnect = "192.168.80.20:2181,192.168.80.20:2181,192.168.80.20:2181";
 final static  String groupId = "group";
 final static String topic = "test";
}




4、在进行项目之前准备一些hbase工具类:


import java.util.List;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
public interface HBaseDAO {

   public void save(Put put,String tableName) ;
   public void insert(String tableName,String rowKey,String family,String quailifer,String value) ;
   public void save(List<Put>Put ,String tableName) ;
   public Result getOneRow(String tableName,String rowKey) ;
   public List<Result> getRows(String tableName,String rowKey_like) ;
   public List<Result> getRows(String tableName, String rowKeyLike, String cols[]) ;
   public List<Result> getRows(String tableName,String startRow,String stopRow) ;
}





import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.PrefixFilter;
import HBaseDAO;

public class HBaseDAOImp implements HBaseDAO{

   HConnection hTablePool = null;
   public HBaseDAOImp()
   {
       Configuration conf = new Configuration();
       conf.set("hbase.zookeeper.quorum","192.168.80.20,192.168.80.21,192.168.80.22");
       conf.set("hbase.rootdir", "hdfs://cluster/hbase");
       try {
           hTablePool = HConnectionManager.createConnection(conf) ;
       } catch (IOException e) {
           e.printStackTrace();
       }
   }
   @Override
   public void save(Put put, String tableName) {
       // TODO Auto-generated method stub
       HTableInterface table = null;
       try {
           table = hTablePool.getTable(tableName) ;
           table.put(put) ;
           
       } catch (Exception e) {
           e.printStackTrace() ;
       }finally{
           try {
               table.close() ;
           } catch (IOException e) {
               e.printStackTrace();
           }
       }
   }
   @Override
   public void insert(String tableName, String rowKey, String family,
           String quailifer, String value) {
       // TODO Auto-generated method stub
       HTableInterface table = null;
       try {
           table = hTablePool.getTable(tableName) ;
           Put put = new Put(rowKey.getBytes());
           put.add(family.getBytes(), quailifer.getBytes(), value.getBytes()) ;
           table.put(put);
       } catch (Exception e) {
           e.printStackTrace();
       }finally
       {
           try {
               table.close() ;
           } catch (IOException e) {
               e.printStackTrace();
           }
       }
   }
   
   @Override
   public void save(List<Put> Put, String tableName) {
       // TODO Auto-generated method stub
       HTableInterface table = null;
       try {
           table = hTablePool.getTable(tableName) ;
           table.put(Put) ;
       }
       catch (Exception e) {
           // TODO: handle exception
       }finally
       {
           try {
               table.close() ;
           } catch (IOException e) {
               e.printStackTrace();
           }
       }
       
   }


   @Override
   public Result getOneRow(String tableName, String rowKey) {
       // TODO Auto-generated method stub
       HTableInterface table = null;
       Result rsResult = null;
       try {
           table = hTablePool.getTable(tableName) ;
           Get get = new Get(rowKey.getBytes()) ;
           rsResult = table.get(get) ;
       } catch (Exception e) {
           e.printStackTrace() ;
       }
       finally
       {
           try {
               table.close() ;
           } catch (IOException e) {
               e.printStackTrace();
           }
       }
       return rsResult;
   }

   @Override
   public List<Result> getRows(String tableName, String rowKeyLike) {
       // TODO Auto-generated method stub
       HTableInterface table = null;
       List<Result> list = null;
       try {
           table = hTablePool.getTable(tableName) ;
           PrefixFilter filter = new PrefixFilter(rowKeyLike.getBytes());
           Scan scan = new Scan();
           scan.setFilter(filter);
           ResultScanner scanner = table.getScanner(scan) ;
           list = new ArrayList<Result>() ;
           for (Result rs : scanner) {
               list.add(rs) ;
           }
       } catch (Exception e) {
           e.printStackTrace() ;
       }
       finally
       {
           try {
               table.close() ;
           } catch (IOException e) {
               e.printStackTrace();
           }
       }
       return list;
   }
   
   public List<Result> getRows(String tableName, String rowKeyLike ,String cols[]) {
       // TODO Auto-generated method stub
       HTableInterface table = null;
       List<Result> list = null;
       try {
           table = hTablePool.getTable(tableName) ;
           PrefixFilter filter = new PrefixFilter(rowKeyLike.getBytes());
           Scan scan = new Scan();
           for (int i = 0; i < cols.length; i++) {
               scan.addColumn("cf".getBytes(), cols[i].getBytes()) ;
           }
           scan.setFilter(filter);
           ResultScanner scanner = table.getScanner(scan) ;
           list = new ArrayList<Result>() ;
           for (Result rs : scanner) {
               list.add(rs) ;
           }
       } catch (Exception e) {
           e.printStackTrace() ;
       }
       finally
       {
           try {
               table.close() ;
           } catch (IOException e) {
               e.printStackTrace();
           }
       }
       return list;
   }
   public List<Result> getRows(String tableName,String startRow,String stopRow)
   {
       HTableInterface table = null;
       List<Result> list = null;
       try {
           table = hTablePool.getTable(tableName) ;
           Scan scan = new Scan() ;
           scan.setStartRow(startRow.getBytes()) ;
           scan.setStopRow(stopRow.getBytes()) ;
           ResultScanner scanner = table.getScanner(scan) ;
           list = new ArrayList<Result>() ;
           for (Result rsResult : scanner) {
               list.add(rsResult) ;
           }
           
       }catch (Exception e) {
           e.printStackTrace() ;
       }
       finally
       {
           try {
               table.close() ;
           } catch (IOException e) {
               e.printStackTrace();
           }
       }
       return list;
   }
   
   public static void main(String[] args) {
       // TODO Auto-generated method stub
       HBaseDAO dao = new HBaseDAOImp();
       List<Put> list = new ArrayList<Put>();
       Put put = new Put("aa".getBytes());
       put.add("cf".getBytes(), "name".getBytes(), "zhangsan".getBytes()) ;
       list.add(put) ;
//        dao.save(put, "test") ;
       put.add("cf".getBytes(), "addr".getBytes(), "beijing".getBytes()) ;
       list.add(put) ;
       put.add("cf".getBytes(), "age".getBytes(), "30".getBytes()) ;
       list.add(put) ;
       put.add("cf".getBytes(), "tel".getBytes(), "13567882341".getBytes()) ;
       list.add(put) ;
       
       dao.save(list, "test");
//        dao.save(put, "test") ;
//        dao.insert("test", "testrow", "cf", "age", "35") ;
//        dao.insert("test", "testrow", "cf", "cardid", "12312312335") ;
//        dao.insert("test", "testrow", "cf", "tel", "13512312345") ;
       
   }

}




下面正式编写简单项目代码

1)实现写kafka生产者:

import java.util.Properties;
import java.util.Random;
import DateFmt;
import backtype.storm.utils.Utils;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

public class Producer extends Thread {
   private final kafka.javaapi.producer.Producer<Integer, String> producer;
   private final String topic;
   private final Properties props = new Properties();

   public Producer(String topic) {
       props.put("serializer.class", "kafka.serializer.StringEncoder");// 字符串消息
       props.put("metadata.broker.list", "192.168.80.20:9092,192.168.80.21:9092,192.168.80.22:9092");
       producer = new kafka.javaapi.producer.Producer<Integer, String>( new ProducerConfig(props));
       this.topic = topic;
   }

   public void run() {
       // order_id,order_amt,create_time,area_id
       Random random = new Random();
       String[] order_amt = { "10.10", "20.10", "30.10","40.0", "60.10" };
       String[] area_id = { "1","2","3","4","5" };
       
       int i =0 ;
       while(true) {
           i ++ ;
           String messageStr = i+"\t"+order_amt[random.nextInt(5)]+"\t"+DateFmt.getCountDate(null, DateFmt.date_long)+"\t"+area_id[random.nextInt(5)] ;
           System.out.println("product:"+messageStr);
           producer.send(new KeyedMessage<Integer, String>(topic, messageStr));
           //Utils.sleep(1000) ;

       }

   }

   public static void main(String[] args) {
       Producer producerThread = new Producer(KafkaProperties.topic);
       producerThread.start();
   }
}




2)这里用到其他时间转换工具类:

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;

public class DateFmt {

   public static final String date_long = "yyyy-MM-dd HH:mm:ss" ;
   public static final String date_short = "yyyy-MM-dd" ;
   
   public static SimpleDateFormat sdf = new SimpleDateFormat(date_short);
   
   public static String getCountDate(String date,String patton)
   {
       SimpleDateFormat sdf = new SimpleDateFormat(patton);
       Calendar cal = Calendar.getInstance();
       if (date != null) {
           try {
               cal.setTime(sdf.parse(date)) ;
           } catch (ParseException e) {
               e.printStackTrace();
           }
       }
       return sdf.format(cal.getTime());
   }
   
   public static String getCountDate(String date,String patton,int step)
   {
       SimpleDateFormat sdf = new SimpleDateFormat(patton);
       Calendar cal = Calendar.getInstance();
       if (date != null) {
           try {
               cal.setTime(sdf.parse(date)) ;
           } catch (ParseException e) {
               e.printStackTrace();
           }
       }
       cal.add(Calendar.DAY_OF_MONTH, step) ;
       return sdf.format(cal.getTime());
   }
   
   public static Date parseDate(String dateStr) throws Exception
   {
       return sdf.parse(dateStr);
   }
   
   public static void main(String[] args) throws Exception{
   System.out.println(DateFmt.getCountDate(null, DateFmt.date_short));
   //System.out.println(DateFmt.getCountDate("2014-03-01 12:13:14", DateFmt.date_short));

   //System.out.println(parseDate("2014-05-02").after(parseDate("2014-05-01")));
   }

}




3)下面写项目中的kafka消费comsumer:


这里把消费者的消费的数据保存到一个有顺序的队列里!(为了作为storm spout数据的来源)--------------非常重要哦!!!!!!

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;

import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.productor.KafkaProperties;

public class OrderConsumer extends Thread {
   private final ConsumerConnector consumer;
   private final String topic;

   private Queue<String> queue = new ConcurrentLinkedQueue<String>() ;//有序队列
   
   public OrderConsumer(String topic) {
       consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig());
       this.topic = topic;
   }

   private static ConsumerConfig createConsumerConfig() {
       Properties props = new Properties();
       props.put("zookeeper.connect", KafkaProperties.zkConnect);
       props.put("group.id", KafkaProperties.groupId);
       props.put("zookeeper.session.timeout.ms", "400");
       props.put("zookeeper.sync.time.ms", "200");
       props.put("auto.commit.interval.ms", "1000");//zookeeper offset偏移量

       return new ConsumerConfig(props);

   }
// push消费方式,服务端推送过来。主动方式是pull
   public void run() {
       Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
       topicCountMap.put(topic, new Integer(1));
       Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer
               .createMessageStreams(topicCountMap);
       KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
       ConsumerIterator<byte[], byte[]> it = stream.iterator();
       
       while (it.hasNext()){
           //逻辑处理
           System.out.println("consumer:"+new String(it.next().message()));
           queue.add(new String(it.next().message())) ;
           System.err.println("队列----->"+queue);
       }
           
   }

   public Queue<String> getQueue()
   {
       return queue ;
   }
   
   public static void main(String[] args) {
       OrderConsumer consumerThread = new OrderConsumer(KafkaProperties.Order_topic);
       consumerThread.start();
     
   }
}




4)下面开始写storm部分包括spout和bolt

import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import kafka.consumers.OrderConsumer;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

public class OrderBaseSpout implements IRichSpout {

   String topic = null;
   public OrderBaseSpout(String topic)
   {
       this.topic = topic ;
   }
   /**
    * 公共基类spout
    */
   private static final long serialVersionUID = 1L;
   Integer TaskId = null;
   SpoutOutputCollector collector = null;
   Queue<String> queue = new ConcurrentLinkedQueue<String>() ;
   
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
       // TODO Auto-generated method stub

       declarer.declare(new Fields("order")) ;
   }

   public void nextTuple() {
       // TODO Auto-generated method stub
       if (queue.size() > 0) {
           String str = queue.poll() ;
           //进行数据过滤
           System.err.println("TaskId:"+TaskId+";  str="+str);
           collector.emit(new Values(str)) ;
       }
   }

   public void open(Map conf, TopologyContext context,
           SpoutOutputCollector collector) {
       this.collector = collector ;
       TaskId = context.getThisTaskId() ;
//        Thread.currentThread().getId()
       OrderConsumer consumer = new OrderConsumer(topic) ;
       consumer.start() ;
       queue = consumer.getQueue() ;
   }

   
   public void ack(Object msgId) {
       // TODO Auto-generated method stub
       
   }

   
   public void activate() {
       // TODO Auto-generated method stub
       
   }

   
   public void close() {
       // TODO Auto-generated method stub
       
   }

   
   public void deactivate() {
       // TODO Auto-generated method stub
       
   }

   
   public void fail(Object msgId) {
       // TODO Auto-generated method stub
       
   }

   
   public Map<String, Object> getComponentConfiguration() {
       // TODO Auto-generated method stub
       return null;
   }
}





storm有了源头数据数据,该如何处理呢?下面要根据自己公司业务逻辑进行处理,我这里只是简单处理,只是为了把流程走完整而已!
下面有3个bolt:

import java.util.Map;
import DateFmt;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

public class AreaFilterBolt implements IBasicBolt {

   private static final long serialVersionUID = 1L;

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
       declarer.declare(new Fields("area_id","order_amt","order_date"));
       
   }

   @Override
   public Map<String, Object> getComponentConfiguration() {
       // TODO Auto-generated method stub
       return null;
   }

   @Override
   public void prepare(Map stormConf, TopologyContext context) {
       // TODO Auto-generated method stub
       
   }

   @Override
   public void execute(Tuple input, BasicOutputCollector collector) {
       String order = input.getString(0);
       if(order != null){
           String[] orderArr = order.split("\\t");
           // ared_id,order_amt,create_time
           collector.emit(new Values(orderArr[3],orderArr[1],DateFmt.getCountDate(orderArr[2], DateFmt.date_short)));
       System.out.println("--------------》"+orderArr[3]+orderArr[1]);
       }
       
   }

   @Override
   public void cleanup() {
       // TODO Auto-generated method stub
       
   }

}







import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Result;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import HBaseDAO;
import HBaseDAOImp;
import DateFmt;

public class AreaAmtBolt  implements IBasicBolt{


   private static final long serialVersionUID = 1L;
   Map <String,Double> countsMap = null ;
   String today = null;
   HBaseDAO dao = null;
   
   @Override
   public void cleanup() {
       //???
       countsMap.clear() ;
   }
   
   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
       declarer.declare(new Fields("date_area","amt")) ;    
       
   }

   @Override
   public Map<String, Object> getComponentConfiguration() {
       return null;
   }

   @Override
   public void prepare(Map stormConf, TopologyContext context) {
       countsMap = new HashMap<String, Double>() ;
       dao = new HBaseDAOImp() ;
       //根据hbase里初始值进行初始化countsMap
       today = DateFmt.getCountDate(null, DateFmt.date_short);
       countsMap = this.initMap(today, dao);
       for(String key:countsMap.keySet())
       {
           System.err.println("key:"+key+"; value:"+countsMap.get(key));
       }
   }

   @Override
   public void execute(Tuple input, BasicOutputCollector collector) {
       if (input != null) {
           String area_id = input.getString(0) ;
           double order_amt = 0.0;
           //order_amt = input.getDouble(1) ;
           try {
               order_amt = Double.parseDouble(input.getString(1)) ;
           } catch (Exception e) {
               System.out.println(input.getString(1)+":---------------------------------");
               e.printStackTrace() ;
           }
           
           String order_date = input.getStringByField("order_date") ;
           
           if (! order_date.equals(today)) {
               //跨天处理
               countsMap.clear() ;
           }
           
           Double count = countsMap.get(order_date+"_"+area_id) ;
           if (count == null) {
               count = 0.0 ;
           }
           count += order_amt ;
           countsMap.put(order_date+"_"+area_id, count) ;
           System.err.println("areaAmtBolt:"+order_date+"_"+area_id+"="+count);
           collector.emit(new Values(order_date+"_"+area_id,count)) ;
           System.out.println("***********"+order_date+"_"+area_id+count);
       }
       
   }
   
   public Map<String, Double> initMap(String rowKeyDate, HBaseDAO dao)
   {
       Map <String,Double> countsMap = new HashMap<String, Double>() ;
       List<Result> list = dao.getRows("area_order", rowKeyDate, new String[]{"order_amt"});
       for(Result rsResult : list)
       {
           String rowKey = new String(rsResult.getRow());
           for(KeyValue keyValue : rsResult.raw())
           {
               if("order_amt".equals(new String(keyValue.getQualifier())))
               {
                   countsMap.put(rowKey, Double.parseDouble(new String(keyValue.getValue()))) ;
                   break;
               }
           }
       }
       
       return countsMap;
       
   }
   



}





import java.util.HashMap;
import java.util.Map;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
import HBaseDAO;
import HBaseDAOImp;
public class AreaRsltBolt implements IBasicBolt
{


    private static final long serialVersionUID = 1L;
    Map <String,Double> countsMap = null ;
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }

    @Override
    public void prepare(Map stormConf, TopologyContext context) {
        dao = new HBaseDAOImp() ;
        countsMap = new HashMap<String, Double>() ;
    }

    HBaseDAO dao = null;
    long beginTime = System.currentTimeMillis() ;
    long endTime = 0L ;
    
    @Override
    public void execute(Tuple input, BasicOutputCollector collector) {
        String date_areaid = input.getString(0);
        double order_amt = input.getDouble(1) ;
        countsMap.put(date_areaid, order_amt) ;
        endTime = System.currentTimeMillis() ;
        if (endTime - beginTime >= 5 * 1000) {
            for(String key : countsMap.keySet())
            {
                // put into hbase
                //这里把处理结果保存到hbase中
                dao.insert("area_order", key, "cf", "order_amt", countsMap.get(key)+"") ;
                System.err.println("rsltBolt put hbase: key="+key+"; order_amt="+countsMap.get(key));
            }
        }
    }

    @Override
    public void cleanup() {
        
    }

}

最后 main方法:

import kafka.productor.KafkaProperties;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import AreaAmtBolt;
import AreaFilterBolt;
import AreaRsltBolt;
import OrderBaseSpout;

public class MYTopology {

   public static void main(String[] args) {
       TopologyBuilder builder = new TopologyBuilder();
       builder.setSpout("spout", new OrderBaseSpout(KafkaProperties.topic), 5);
       builder.setBolt("filterblot", new AreaFilterBolt() , 5).shuffleGrouping("spout") ;
       builder.setBolt("amtbolt", new AreaAmtBolt() , 2).fieldsGrouping("filterblot", new Fields("area_id")) ;
       builder.setBolt("rsltolt", new AreaRsltBolt(), 1).shuffleGrouping("amtbolt");
       
       
       Config conf = new Config() ;
       conf.setDebug(false);
       if (args.length > 0) {
           try {
               StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
           } catch (AlreadyAliveException e) {
               e.printStackTrace();
           } catch (InvalidTopologyException e) {
               e.printStackTrace();
           }
       }else {
           //本地测试!!!!!!!!!!!!
           LocalCluster localCluster = new LocalCluster();
           localCluster.submitTopology("mytopology", conf, builder.createTopology());
       }
       
       
   }

}







到这里架构基本完成了单独学kafka、storm、hbase、这些东西不难,如何把他们整合起来,这就是不一样!!!!呵呵


相关文档推荐

SRE Copilot大语言模型智能运维框架.PDF

1741936996 王宁 5.04MB 24页 积分6

2024智算运维发展研究报告.PDF

1740033222  1.71MB 30页 积分5

腾讯大数据基于StarRocks的向量检索探索.PDF

1737425434 赵裕隆 3.48MB 34页 积分6

B站一站式大数据集群管理平台.PDF

1737421412 刘明刚 1.37MB 30页 积分6

StarRocks在爱奇艺大数据场景的实践.PDF

1737365327 林豪 3.57MB 27页 积分5

智算平台运维运营技术研究报告.PDF

1736479643  3.95MB 66页 积分5

农业农村大数据平台互联互通规范.PDF

1736163860  0.49MB 11页 积分5

工业大数据管理与治理智能制造的基座.PDF

1733702095 王宏志 3.83MB 54页 积分6

相关文章推荐

运维指标体系在银行业务的应用实践

CIO之家的朋友 CIO之家的朋友 

面向业务应用交易的IT运维监控思路

CIO之家的朋友们 张晓丹 

云计算的成本与价值概要分析

数字地平线 8小时coding 

运维85条军规

CIO之家的朋友们 ANZHIHE 

AIOps之前,运维层面能做什么

嘉为蓝鲸? 赵海兵