sparkstreaming从指定offset处消费Kafka数据
网友 CSDN博客

一、情景:当Spark streaming程序意外退出时,数据仍然再往Kafka中推送,然而由于Kafka默认是从latest的offset读取,这会导致数据丢失。为了避免数据丢失,那么我们需要记录每次消费的offset,以便下次检查并且从指定的offset开始读取

二、环境:kafka-0.9.0、spark-1.6.0、jdk-1.7、Scala-2.10.5、idea16

三、实现代码:

      1、引入spark和kafka的相关依赖包


<?xml version="1.0" encoding="UTF-8"?>

<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

         xmlns="http://maven.apache.org/POM/4.0.0"

         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <modelVersion>4.0.0</modelVersion>


    <groupId>com.ngaa</groupId>

    <artifactId>test-my</artifactId>

    <version>1.0-SNAPSHOT</version>

    <inceptionYear>2008</inceptionYear>

    <properties>

        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>

        <!--add  maven release-->

        <maven.compiler.source>1.7</maven.compiler.source>

        <maven.compiler.target>1.7</maven.compiler.target>

        <encoding>UTF-8</encoding>

        <!--scala版本-->

        <scala.version>2.10.5</scala.version>

        <!--测试机器上的scala版本-->

        <test.scala.version>2.11.7</test.scala.version>


        <jackson.version>2.3.0</jackson.version>

        <!--slf4j版本-->

        <slf4j-version>1.7.20</slf4j-version>

        <!--cdh-spark-->

        <spark.cdh.version>1.6.0-cdh5.8.0</spark.cdh.version>

        <spark.streaming.cdh.version>1.6.0-cdh5.8.0</spark.streaming.cdh.version>

        <kafka.spark.cdh.version>1.6.0-cdh5.8.0</kafka.spark.cdh.version>

        <!--cdh-hadoop-->

        <hadoop.cdh.version>2.6.0-cdh5.8.0</hadoop.cdh.version>

        <!--http client必需要兼容CDH中的hadoop版本(cd /opt/cloudera/parcels/CDH/lib/hadoop/lib)-->

        <httpclient.version>4.2.5</httpclient.version>


        <!--http copre-->

        <httpcore.version>4.2.5</httpcore.version>

        <!--fastjson-->

        <fastjson.version>1.1.39</fastjson.version>


    </properties>


    <repositories>

        <repository>

            <id>scala-tools.org</id>

            <name>Scala-Tools Maven2 Repository</name>

            <url>http://scala-tools.org/repo-releases</url>

        </repository>

        <!--配置依赖库地址(用于加载CDH依赖的jar包) -->

        <repository>

            <id>cloudera</id>

            <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>

        </repository>

    </repositories>


    <pluginRepositories>

        <pluginRepository>

            <id>scala-tools.org</id>

            <name>Scala-Tools Maven2 Repository</name>

            <url>http://scala-tools.org/repo-releases</url>

        </pluginRepository>

    </pluginRepositories>


    <dependencies>


        <!--fastjson-->

        <dependency>

            <groupId>com.alibaba</groupId>

            <artifactId>fastjson</artifactId>

            <version>${fastjson.version}</version>

        </dependency>

        <!--httpclient-->

        <dependency>

            <groupId>org.apache.httpcomponents</groupId>

            <artifactId>httpclient</artifactId>

            <version>${httpclient.version}</version>

        </dependency>


        <!--http core-->

        <dependency>

            <groupId>org.apache.httpcomponents</groupId>

            <artifactId>httpcore</artifactId>

            <version>${httpcore.version}</version>

        </dependency>


        <!--slf4j-->

        <dependency>

            <groupId>org.slf4j</groupId>

            <artifactId>slf4j-log4j12</artifactId>

            <version>${slf4j-version}</version>

        </dependency>

        <!--hadoop-->

        <dependency>

            <groupId>org.apache.hadoop</groupId>

            <artifactId>hadoop-client</artifactId>

            <version>${hadoop.cdh.version}</version>

            <exclusions>

                <exclusion>

                    <groupId>javax.servlet</groupId>

                    <artifactId>*</artifactId>

                </exclusion>

            </exclusions>

        </dependency>

        <dependency>

            <groupId>org.apache.hadoop</groupId>

            <artifactId>hadoop-common</artifactId>

            <version>${hadoop.cdh.version}</version>

            <exclusions>

                <exclusion>

                    <groupId>javax.servlet</groupId>

                    <artifactId>*</artifactId>

                </exclusion>

            </exclusions>

        </dependency>

        <dependency>

            <groupId>org.apache.hadoop</groupId>

            <artifactId>hadoop-hdfs</artifactId>

            <version>${hadoop.cdh.version}</version>

            <exclusions>

                <exclusion>

                    <groupId>javax.servlet</groupId>

                    <artifactId>*</artifactId>

                </exclusion>

            </exclusions>

        </dependency>

        <!--spark scala-->

        <dependency>

            <groupId>org.scala-lang</groupId>

            <artifactId>scala-library</artifactId>

            <version>${scala.version}</version>

        </dependency>

        <dependency>

            <groupId>com.fasterxml.jackson.core</groupId>

            <artifactId>jackson-databind</artifactId>

            <version>${jackson.version}</version>

        </dependency>


        <!--spark streaming和kafka的相关包-->

        <dependency>

            <groupId>org.apache.spark</groupId>

            <artifactId>spark-streaming_2.10</artifactId>

            <version>${spark.streaming.cdh.version}</version>

        </dependency>

        <dependency>

            <groupId>org.apache.spark</groupId>

            <artifactId>spark-streaming-kafka_2.10</artifactId>

            <version>${kafka.spark.cdh.version}</version>

        </dependency>

        <dependency>

            <groupId>junit</groupId>

            <artifactId>junit</artifactId>

            <version>4.12</version>

            <scope>test</scope>

        </dependency>


        <!--引入windows本地库的spark包-->

        <dependency>

        <groupId>org.apache.spark</groupId>

        <artifactId>spark-assembly_2.10</artifactId>

        <version>${spark.cdh.version}</version>

        <scope>system</scope>

        <systemPath>D:/crt_send_document/spark-assembly-1.6.0-cdh5.8.0-hadoop2.6.0-cdh5.8.0.jar</systemPath>

        </dependency>


        <!--引入测试环境linux本地库的spark包-->

        <!--<dependency>-->

            <!--<groupId>org.apache.spark</groupId>-->

            <!--<artifactId>spark-assembly_2.10</artifactId>-->

            <!--<version>${spark.cdh.version}</version>-->

            <!--<scope>system</scope>-->

            <!--<systemPath>/opt/cloudera/parcels/CDH/lib/spark/lib/spark-examples-1.6.0-cdh5.8.0-hadoop2.6.0-cdh5.8.0.jar-->

            <!--</systemPath>-->

        <!--</dependency>-->


        <!--引入中央仓库的spark包-->

        <!--<dependency>-->

        <!--<groupId>org.apache.spark</groupId>-->

        <!--<artifactId>spark-assembly_2.10</artifactId>-->

        <!--<version>${spark.cdh.version}</version>-->

        <!--</dependency>-->


        <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-yarn-server-web-proxy -->

        <dependency>

            <groupId>org.apache.hadoop</groupId>

            <artifactId>hadoop-yarn-server-web-proxy</artifactId>

            <version>2.6.0-cdh5.8.0</version>

        </dependency>


    </dependencies>


    <!--maven打包-->

    <build>

        <finalName>test-my</finalName>

        <sourceDirectory>src/main/scala</sourceDirectory>

        <testSourceDirectory>src/test/scala</testSourceDirectory>

        <plugins>

            <plugin>

                <groupId>org.scala-tools</groupId>

                <artifactId>maven-scala-plugin</artifactId>

                <version>2.15.2</version>

                <executions>

                    <execution>

                        <goals>

                            <goal>compile</goal>

                            <goal>testCompile</goal>

                        </goals>

                    </execution>

                </executions>

                <configuration>

                    <scalaVersion>${scala.version}</scalaVersion>

                    <args>

                        <arg>-target:jvm-1.7</arg>

                    </args>

                </configuration>

            </plugin>

            <plugin>

                <groupId>org.apache.maven.plugins</groupId>

                <artifactId>maven-eclipse-plugin</artifactId>

                <configuration>

                    <downloadSources>true</downloadSources>

                    <buildcommands>

                        <buildcommand>ch.epfl.lamp.sdt.core.scalabuilder</buildcommand>

                    </buildcommands>

                    <additionalProjectnatures>

                        <projectnature>ch.epfl.lamp.sdt.core.scalanature</projectnature>

                    </additionalProjectnatures>

                    <classpathContainers>

                        <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>

                        <classpathContainer>ch.epfl.lamp.sdt.launching.SCALA_CONTAINER</classpathContainer>

                    </classpathContainers>

                </configuration>

            </plugin>

            <plugin>

                <artifactId>maven-assembly-plugin</artifactId>

                <configuration>

                    <descriptorRefs>

                        <descriptorRef>jar-with-dependencies</descriptorRef>

                    </descriptorRefs>

                    <archive>

                        <manifest>

                            <mainClass></mainClass>

                        </manifest>

                    </archive>

                </configuration>

                <executions>

                    <execution>

                        <id>make-assembly</id>

                        <phase>package</phase>

                        <goals>

                            <goal>single</goal>

                        </goals>

                    </execution>

                </executions>

            </plugin>

        </plugins>

    </build>

    <reporting>

        <plugins>

            <plugin>

                <groupId>org.scala-tools</groupId>

                <artifactId>maven-scala-plugin</artifactId>

                <configuration>

                    <scalaVersion>${scala.version}</scalaVersion>

                </configuration>

            </plugin>

        </plugins>

    </reporting>


</project>



   2、新建测试类


import kafka.common.TopicAndPartition

import kafka.message.MessageAndMetadata

import kafka.serializer.StringDecoder

import org.apache.log4j.{Level, Logger}

import org.apache.spark.{SparkConf, TaskContext}

import org.apache.spark.streaming.dstream.InputDStream

import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}

import org.apache.spark.streaming.{Seconds, StreamingContext}

import org.slf4j.LoggerFactory


/**

  * Created by yangjf on 2016/12/18

  * Update date:

  * Time: 11:10

  * Describle :从指定偏移量读取kafka数据

  * Result of Test:

  * Command:

  * Email: jifei.yang@ngaa.com.cn

  */

object ReadBySureOffsetTest {

  val logger = LoggerFactory.getLogger(ReadBySureOffsetTest.getClass)


  def main(args: Array[String]) {

    //设置打印日志级别

    Logger.getLogger("org.apache.kafka").setLevel(Level.ERROR)

    Logger.getLogger("org.apache.zookeeper").setLevel(Level.ERROR)

    Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)

    logger.info("测试从指定offset消费kafka的主程序开始")

    if (args.length < 1) {

      System.err.println("Your arguments were " + args.mkString(","))

      System.exit(1)

      logger.info("主程序意外退出")

    }

    //hdfs://hadoop1:8020/user/root/spark/checkpoint

    val Array(checkpointDirectory) = args

    logger.info("checkpoint检查:" + checkpointDirectory)

    val ssc = StreamingContext.getOrCreate(checkpointDirectory,

      () => {

        createContext(checkpointDirectory)

      })

    logger.info("streaming开始启动")

    ssc.start()

    ssc.awaitTermination()

  }


  def createContext(checkpointDirectory: String): StreamingContext = {

    //获取配置

    val brokers = "hadoop3:9092,hadoop4:9092"

    val topics = "20161218a"


    //默认为5秒

    val split_rdd_time = 8

    // 创建上下文

    val sparkConf = new SparkConf()

      .setAppName("SendSampleKafkaDataToApple").setMaster("local[2]")

      .set("spark.app.id", "streaming_kafka")


    val ssc = new StreamingContext(sparkConf, Seconds(split_rdd_time))


    ssc.checkpoint(checkpointDirectory)


    // 创建包含brokers和topic的直接kafka流

    val topicsSet: Set[String] = topics.split(",").toSet

    //kafka配置参数

    val kafkaParams: Map[String, String] = Map[String, String](

      "metadata.broker.list" -> brokers,

      "group.id" -> "apple_sample",

      "serializer.class" -> "kafka.serializer.StringEncoder"

//      "auto.offset.reset" -> "largest"   //自动将偏移重置为最新偏移(默认)

//      "auto.offset.reset" -> "earliest"  //自动将偏移重置为最早的偏移

//      "auto.offset.reset" -> "none"      //如果没有为消费者组找到以前的偏移,则向消费者抛出异常

    )

    /**

      * 从指定位置开始读取kakfa数据

      * 注意:由于Exactly  Once的机制,所以任何情况下,数据只会被消费一次!

      *      指定了开始的offset后,将会从上一次Streaming程序停止处,开始读取kafka数据

      */

    val offsetList = List((topics, 0, 22753623L),(topics, 1, 327041L))                          //指定topic,partition_no,offset

    val fromOffsets = setFromOffsets(offsetList)     //构建参数

    val messageHandler = (mam: MessageAndMetadata[String, String]) => (mam.topic, mam.message()) //构建MessageAndMetadata

   //使用高级API从指定的offset开始消费,欲了解详情,

   //请进入"http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.kafka.KafkaUtils$"查看

    val messages: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, fromOffsets, messageHandler)


    //数据操作

    messages.foreachRDD(mess => {

      //获取offset集合

      val offsetsList = mess.asInstanceOf[HasOffsetRanges].offsetRanges

      mess.foreachPartition(lines => {

        lines.foreach(line => {

          val o: OffsetRange = offsetsList(TaskContext.get.partitionId)

          logger.info("++++++++++++++++++++++++++++++此处记录offset+++++++++++++++++++++++++++++++++++++++")

          logger.info(s"${o.topic}  ${o.partition}  ${o.fromOffset}  ${o.untilOffset}")

          logger.info("+++++++++++++++++++++++++++++++此处消费数据操作++++++++++++++++++++++++++++++++++++++")

          logger.info("The kafka  line is " + line)

        })

      })

    })

    ssc

  }


  //构建Map

  def setFromOffsets(list: List[(String, Int, Long)]): Map[TopicAndPartition, Long] = {

    var fromOffsets: Map[TopicAndPartition, Long] = Map()

    for (offset <- list) {

      val tp = TopicAndPartition(offset._1, offset._2)//topic和分区数

      fromOffsets += (tp -> offset._3)           // offset位置

    }

    fromOffsets

  }

}


四、参考文档:

    1、spark API  http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.kafka.KafkaUtils$

    2、Kafka官方配置说明:http://kafka.apache.org/documentation.html#configuration

    3、Kafka SampleConsumer:https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example

    4、Spark streaming 消费遍历offset说明:http://spark.apache.org/docs/1.6.0/streaming-kafka-integration.html

    5、Kafka官方API说明:http://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html


CIO之家 www.ciozj.com 公众号:imciow
关联的文档
也许您喜欢