Kafka的2套ConsumerAPI
网友 CSDN博客

Kafka提供了两套API给Consumer

  1. The high-level Consumer API

  2. The SimpleConsumer API     

第一种高度抽象的Consumer API,它使用起来简单、方便,但是对于某些特殊的需求我们可能要用到第二种更底层的API,那么先介绍下第二种API能够帮助我们做哪些事情

  • 一个消息读取多次

  • 在一个处理过程中只消费Partition其中的一部分消息

  • 添加事务管理机制以保证消息被处理且仅被处理一次

使用SimpleConsumer有哪些弊端呢?

  • 必须在程序中跟踪offset值

  • 必须找出指定Topic Partition中的lead broker

  • 必须处理broker的变动

使用SimpleConsumer的步骤

  1. 从所有活跃的broker中找出哪个是指定Topic Partition中的leader broker

  2. 找出指定Topic Partition中的所有备份broker

  3. 构造请求

  4. 发送请求查询数据

  5. 处理leader broker变更

代码举例一:


package com.ngaa.spark.create;


import kafka.api.PartitionOffsetRequestInfo;

import kafka.common.TopicAndPartition;

import kafka.javaapi.OffsetResponse;

import kafka.javaapi.PartitionMetadata;

import kafka.javaapi.TopicMetadata;

import kafka.javaapi.TopicMetadataRequest;

import kafka.javaapi.consumer.SimpleConsumer;


import java.util.*;

import java.util.Map.Entry;


/**

 * Created by yangjf on 2016/12/18

 * Update date:

 * Time: 11:00

 * Describle :获取Kafka的每个分区的最新offset,以及求和

 * Result of Test:测试通过

 * Command:

 * Email: jifei.yang@ngaa.com.cn

 */

public class KafkaOffsetTools {

    public KafkaOffsetTools() {

    }


    public static void main(String[] args) {

        String topic = "20161121a";      //topic名称

        String seed = "192.168.1.54";    //broker的地址(任意一个地址)

        int port = 9092;                 //broker端口号


        List<String> seeds = new ArrayList<String>();

        seeds.add(seed);

        KafkaOffsetTools kot = new KafkaOffsetTools();


        TreeMap<Integer, PartitionMetadata> metadatas = kot.findLeader(seeds, port, topic);


        int sum = 0;


        for (Entry<Integer, PartitionMetadata> entry : metadatas.entrySet()) {

            int partition = entry.getKey();

            String leadBroker = entry.getValue().leader().host();

            String clientName = "Client_" + topic + "_" + partition;

            SimpleConsumer consumer = new SimpleConsumer(leadBroker, port, 100000,

                    64 * 1024, clientName);

            long readOffset = getLastOffset(consumer, topic, partition,

                    kafka.api.OffsetRequest.LatestTime(), clientName);

            sum += readOffset;

            System.out.println("partion is :" + partition + ",offset:" + readOffset);

            if (consumer != null) consumer.close();

        }

        System.out.println("message的总和:" + sum);


    }


    public static long getLastOffset(SimpleConsumer consumer, String topic,

                                     int partition, long whichTime, String clientName) {

        TopicAndPartition topicAndPartition = new TopicAndPartition(topic,

                partition);

        Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();

        requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(

                whichTime, 1));

        kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(

                requestInfo, kafka.api.OffsetRequest.CurrentVersion(),

                clientName);

        OffsetResponse response = consumer.getOffsetsBefore(request);


        if (response.hasError()) {

            System.out.println("Error fetching data Offset Data the Broker. Reason: "

                    + response.errorCode(topic, partition));

            return 0;

        }

        long[] offsets = response.offsets(topic, partition);

// long[] offsets2 = response.offsets(topic, 3);

        return offsets[0];

    }


    //寻找leader

    private TreeMap<Integer, PartitionMetadata> findLeader(List<String> a_seedBrokers, int a_port, String a_topic) {

        TreeMap<Integer, PartitionMetadata> map = new TreeMap<Integer, PartitionMetadata>();

        loop:

        for (String seed : a_seedBrokers) {

            SimpleConsumer consumer = null;

            try {

                consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024,

                        "leaderLookup" + new Date().getTime());

                List<String> topics = Collections.singletonList(a_topic);

                TopicMetadataRequest req = new TopicMetadataRequest(topics);

                kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);


                List<TopicMetadata> metaData = resp.topicsMetadata();

                for (TopicMetadata item : metaData) {

                    for (PartitionMetadata part : item.partitionsMetadata()) {

                        map.put(part.partitionId(), part);

// if (part.partitionId() == a_partition) {

// returnMetaData = part;

// break loop;

// }

                    }

                }

            } catch (Exception e) {

                System.out.println("Error communicating with Broker [" + seed

                        + "] to find Leader for [" + a_topic + ", ] Reason: " + e);

            } finally {

                if (consumer != null)

                    consumer.close();

            }

        }

// if (returnMetaData != null) {

// m_replicaBrokers.clear();

// for (kafka.cluster.Broker replica : returnMetaData.replicas()) {

// m_replicaBrokers.add(replica.host());

// }

// }

        return map;

    }


}


代码举例二:


import java.nio.ByteBuffer;  

import java.util.ArrayList;  

import java.util.Collections;  

import java.util.HashMap;  

import java.util.List;  

import java.util.Map;  

  

import kafka.api.FetchRequest;  

import kafka.api.FetchRequestBuilder;  

import kafka.api.PartitionOffsetRequestInfo;  

import kafka.common.ErrorMapping;  

import kafka.common.TopicAndPartition;  

import kafka.javaapi.FetchResponse;  

import kafka.javaapi.OffsetResponse;  

import kafka.javaapi.PartitionMetadata;  

import kafka.javaapi.TopicMetadata;  

import kafka.javaapi.TopicMetadataRequest;  

import kafka.javaapi.consumer.SimpleConsumer;  

import kafka.message.MessageAndOffset;  

/**

 * Created by yangjf on 2016/12/18

 * Update date:

 * Time: 10:44

 * Describle :SampleConsumer  API举例  ,获取最新的offset

 * Result of Test:未测试

 * Command:

 * Email: jifei.yang@ngaa.com.cn

 */

public class SampleKafkaConsumer {  

    private List<String> m_replicaBrokers = new ArrayList<String>();  

  

    public SimpleExample() {  

        m_replicaBrokers = new ArrayList<String>();  

    }  

 

    public static void main(String args[]) {  

        SimpleExample example = new SimpleExample();  

        // 最大读取消息数量  

        long maxReads = Long.parseLong("3");  

        // 要订阅的topic  

        String topic = "20161218a";  

        // 要查找的分区  

        int partition = Integer.parseInt("0");  

        // broker节点的ip  

        List<String> seeds = new ArrayList<String>();  

        seeds.add("hadoop1");  //可以是ip或者主机名  

        seeds.add("hadoop2");  

        seeds.add("hadoop3");  

        // 端口  

        int port = Integer.parseInt("9092");  

        try {  

            example.run(maxReads, topic, partition, seeds, port);  

        } catch (Exception e) {  

            System.out.println("Oops:" + e);  

            e.printStackTrace();  

        }  

    }  

  

    public void run(long a_maxReads, String a_topic, int a_partition, List<String> a_seedBrokers, int a_port) throws Exception {  

        // 获取指定Topic partition的元数据  

        PartitionMetadata metadata = findLeader(a_seedBrokers, a_port, a_topic, a_partition);  

        if (metadata == null) {  

            System.out.println("Can't find metadata for Topic and Partition. Exiting");  

            return;  

        }  

        if (metadata.leader() == null) {  

            System.out.println("Can't find Leader for Topic and Partition. Exiting");  

            return;  

        }  

        String leadBroker = metadata.leader().host();  

        String clientName = "Client_" + a_topic + "_" + a_partition;  

  

        SimpleConsumer consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName);  

        long readOffset = getLastOffset(consumer, a_topic, a_partition, kafka.api.OffsetRequest.EarliestTime(), clientName);  

        int numErrors = 0;  

        while (a_maxReads > 0) {  

            if (consumer == null) {  

                consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName);  

            }  

            FetchRequest req = new FetchRequestBuilder().clientId(clientName).addFetch(a_topic, a_partition, readOffset, 100000).build();  

            FetchResponse fetchResponse = consumer.fetch(req);  

  

            if (fetchResponse.hasError()) {  

                numErrors++;  

                // Something went wrong!  

                short code = fetchResponse.errorCode(a_topic, a_partition);  

                System.out.println("Error fetching data from the Broker:" + leadBroker + " Reason: " + code);  

                if (numErrors > 5)  

                    break;  

                if (code == ErrorMapping.OffsetOutOfRangeCode()) {  

                    // We asked for an invalid offset. For simple case ask for  

                    // the last element to reset  

                    readOffset = getLastOffset(consumer, a_topic, a_partition, kafka.api.OffsetRequest.LatestTime(), clientName);  

                    continue;  

                }  

                consumer.close();  

                consumer = null;  

                leadBroker = findNewLeader(leadBroker, a_topic, a_partition, a_port);  

                continue;  

            }  

            numErrors = 0;  

  

            long numRead = 0;  

            for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(a_topic, a_partition)) {  

                long currentOffset = messageAndOffset.offset();  

                if (currentOffset < readOffset) {  

                    System.out.println("Found an old offset: " + currentOffset + " Expecting: " + readOffset);  

                    continue;  

                }  

  

                readOffset = messageAndOffset.nextOffset();  

                ByteBuffer payload = messageAndOffset.message().payload();  

  

                byte[] bytes = new byte[payload.limit()];  

                payload.get(bytes);  

                System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8"));  

                numRead++;  

                a_maxReads--;  

            }  

  

            if (numRead == 0) {  

                try {  

                    Thread.sleep(1000);  

                } catch (InterruptedException ie) {  

                }  

            }  

        }  

        if (consumer != null)  

            consumer.close();  

    }  

  

    public static long getLastOffset(SimpleConsumer consumer, String topic, int partition, long whichTime, String clientName) {  

        TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);  

        Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();  

        requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));  

        kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);  

        OffsetResponse response = consumer.getOffsetsBefore(request);  

  

        if (response.hasError()) {  

            System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition));  

            return 0;  

        }  

        long[] offsets = response.offsets(topic, partition);  

        return offsets[0];  

    }  

  

    /** 

     * @param a_oldLeader 

     * @param a_topic 

     * @param a_partition 

     * @param a_port 

     * @return String 

     * @throws Exception 

     *             找一个leader broker 

     */  

    private String findNewLeader(String a_oldLeader, String a_topic, int a_partition, int a_port) throws Exception {  

        for (int i = 0; i < 3; i++) {  

            boolean goToSleep = false;  

            PartitionMetadata metadata = findLeader(m_replicaBrokers, a_port, a_topic, a_partition);  

            if (metadata == null) {  

                goToSleep = true;  

            } else if (metadata.leader() == null) {  

                goToSleep = true;  

            } else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) {  

                // first time through if the leader hasn't changed give  

                // ZooKeeper a second to recover  

                // second time, assume the broker did recover before failover,  

                // or it was a non-Broker issue  

                //  

                goToSleep = true;  

            } else {  

                return metadata.leader().host();  

            }  

            if (goToSleep) {  

                try {  

                    Thread.sleep(1000);  

                } catch (InterruptedException ie) {  

                }  

            }  

        }  

        System.out.println("Unable to find new leader after Broker failure. Exiting");  

        throw new Exception("Unable to find new leader after Broker failure. Exiting");  

    }  

  

    private PartitionMetadata findLeader(List<String> a_seedBrokers, int a_port, String a_topic, int a_partition) {  

        PartitionMetadata returnMetaData = null;  

        loop: for (String seed : a_seedBrokers) {  

            SimpleConsumer consumer = null;  

            try {  

                consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024, "leaderLookup");  

                List<String> topics = Collections.singletonList(a_topic);  

                TopicMetadataRequest req = new TopicMetadataRequest(topics);  

                kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);  

  

                List<TopicMetadata> metaData = resp.topicsMetadata();  

                for (TopicMetadata item : metaData) {  

                    for (PartitionMetadata part : item.partitionsMetadata()) {  

                        if (part.partitionId() == a_partition) {  

                            returnMetaData = part;  

                            break loop;  

                        }  

                    }  

                }  

            } catch (Exception e) {  

                System.out.println("Error communicating with Broker [" + seed + "] to find Leader for [" + a_topic + ", " + a_partition + "] Reason: " + e);  

            } finally {  

                if (consumer != null)  

                    consumer.close();  

            }  

        }  

        if (returnMetaData != null) {  

            m_replicaBrokers.clear();  

            for (kafka.cluster.Broker replica : returnMetaData.replicas()) {  

                m_replicaBrokers.add(replica.host());  

            }  

        }  

        return returnMetaData;  

    }  

}


官方举例:https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example


CIO之家 www.ciozj.com 公众号:imciow
关联的文档
也许您喜欢