sparkstreaming从指定offset处消费Kafka数据

来源:CSDN博客 作者:网友

一、情景:当Spark streaming程序意外退出时,数据仍然再往Kafka中推送,然而由于Kafka默认是从latest的offset读取,这会导致数据丢失。为了避免数据丢失,那么我们需要记录每次消费的offset,以便下次检查并且从指定的offset开始读取

二、环境:kafka-0.9.0、spark-1.6.0、jdk-1.7、Scala-2.10.5、idea16

三、实现代码:

      1、引入spark和kafka的相关依赖包


<?xml version="1.0" encoding="UTF-8"?>

<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

         xmlns="http://maven.apache.org/POM/4.0.0"

         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <modelVersion>4.0.0</modelVersion>


    <groupId>com.ngaa</groupId>

    <artifactId>test-my</artifactId>

    <version>1.0-SNAPSHOT</version>

    <inceptionYear>2008</inceptionYear>

    <properties>

        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>

        <!--add  maven release-->

        <maven.compiler.source>1.7</maven.compiler.source>

        <maven.compiler.target>1.7</maven.compiler.target>

        <encoding>UTF-8</encoding>

        <!--scala版本-->

        <scala.version>2.10.5</scala.version>

        <!--测试机器上的scala版本-->

        <test.scala.version>2.11.7</test.scala.version>


        <jackson.version>2.3.0</jackson.version>

        <!--slf4j版本-->

        <slf4j-version>1.7.20</slf4j-version>

        <!--cdh-spark-->

        <spark.cdh.version>1.6.0-cdh5.8.0</spark.cdh.version>

        <spark.streaming.cdh.version>1.6.0-cdh5.8.0</spark.streaming.cdh.version>

        <kafka.spark.cdh.version>1.6.0-cdh5.8.0</kafka.spark.cdh.version>

        <!--cdh-hadoop-->

        <hadoop.cdh.version>2.6.0-cdh5.8.0</hadoop.cdh.version>

        <!--http client必需要兼容CDH中的hadoop版本(cd /opt/cloudera/parcels/CDH/lib/hadoop/lib)-->

        <httpclient.version>4.2.5</httpclient.version>


        <!--http copre-->

        <httpcore.version>4.2.5</httpcore.version>

        <!--fastjson-->

        <fastjson.version>1.1.39</fastjson.version>


    </properties>


    <repositories>

        <repository>

            <id>scala-tools.org</id>

            <name>Scala-Tools Maven2 Repository</name>

            <url>http://scala-tools.org/repo-releases</url>

        </repository>

        <!--配置依赖库地址(用于加载CDH依赖的jar包) -->

        <repository>

            <id>cloudera</id>

            <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>

        </repository>

    </repositories>


    <pluginRepositories>

        <pluginRepository>

            <id>scala-tools.org</id>

            <name>Scala-Tools Maven2 Repository</name>

            <url>http://scala-tools.org/repo-releases</url>

        </pluginRepository>

    </pluginRepositories>


    <dependencies>


        <!--fastjson-->

        <dependency>

            <groupId>com.alibaba</groupId>

            <artifactId>fastjson</artifactId>

            <version>${fastjson.version}</version>

        </dependency>

        <!--httpclient-->

        <dependency>

            <groupId>org.apache.httpcomponents</groupId>

            <artifactId>httpclient</artifactId>

            <version>${httpclient.version}</version>

        </dependency>


        <!--http core-->

        <dependency>

            <groupId>org.apache.httpcomponents</groupId>

            <artifactId>httpcore</artifactId>

            <version>${httpcore.version}</version>

        </dependency>


        <!--slf4j-->

        <dependency>

            <groupId>org.slf4j</groupId>

            <artifactId>slf4j-log4j12</artifactId>

            <version>${slf4j-version}</version>

        </dependency>

        <!--hadoop-->

        <dependency>

            <groupId>org.apache.hadoop</groupId>

            <artifactId>hadoop-client</artifactId>

            <version>${hadoop.cdh.version}</version>

            <exclusions>

                <exclusion>

                    <groupId>javax.servlet</groupId>

                    <artifactId>*</artifactId>

                </exclusion>

            </exclusions>

        </dependency>

        <dependency>

            <groupId>org.apache.hadoop</groupId>

            <artifactId>hadoop-common</artifactId>

            <version>${hadoop.cdh.version}</version>

            <exclusions>

                <exclusion>

                    <groupId>javax.servlet</groupId>

                    <artifactId>*</artifactId>

                </exclusion>

            </exclusions>

        </dependency>

        <dependency>

            <groupId>org.apache.hadoop</groupId>

            <artifactId>hadoop-hdfs</artifactId>

            <version>${hadoop.cdh.version}</version>

            <exclusions>

                <exclusion>

                    <groupId>javax.servlet</groupId>

                    <artifactId>*</artifactId>

                </exclusion>

            </exclusions>

        </dependency>

        <!--spark scala-->

        <dependency>

            <groupId>org.scala-lang</groupId>

            <artifactId>scala-library</artifactId>

            <version>${scala.version}</version>

        </dependency>

        <dependency>

            <groupId>com.fasterxml.jackson.core</groupId>

            <artifactId>jackson-databind</artifactId>

            <version>${jackson.version}</version>

        </dependency>


        <!--spark streaming和kafka的相关包-->

        <dependency>

            <groupId>org.apache.spark</groupId>

            <artifactId>spark-streaming_2.10</artifactId>

            <version>${spark.streaming.cdh.version}</version>

        </dependency>

        <dependency>

            <groupId>org.apache.spark</groupId>

            <artifactId>spark-streaming-kafka_2.10</artifactId>

            <version>${kafka.spark.cdh.version}</version>

        </dependency>

        <dependency>

            <groupId>junit</groupId>

            <artifactId>junit</artifactId>

            <version>4.12</version>

            <scope>test</scope>

        </dependency>


        <!--引入windows本地库的spark包-->

        <dependency>

        <groupId>org.apache.spark</groupId>

        <artifactId>spark-assembly_2.10</artifactId>

        <version>${spark.cdh.version}</version>

        <scope>system</scope>

        <systemPath>D:/crt_send_document/spark-assembly-1.6.0-cdh5.8.0-hadoop2.6.0-cdh5.8.0.jar</systemPath>

        </dependency>


        <!--引入测试环境linux本地库的spark包-->

        <!--<dependency>-->

            <!--<groupId>org.apache.spark</groupId>-->

            <!--<artifactId>spark-assembly_2.10</artifactId>-->

            <!--<version>${spark.cdh.version}</version>-->

            <!--<scope>system</scope>-->

            <!--<systemPath>/opt/cloudera/parcels/CDH/lib/spark/lib/spark-examples-1.6.0-cdh5.8.0-hadoop2.6.0-cdh5.8.0.jar-->

            <!--</systemPath>-->

        <!--</dependency>-->


        <!--引入中央仓库的spark包-->

        <!--<dependency>-->

        <!--<groupId>org.apache.spark</groupId>-->

        <!--<artifactId>spark-assembly_2.10</artifactId>-->

        <!--<version>${spark.cdh.version}</version>-->

        <!--</dependency>-->


        <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-yarn-server-web-proxy -->

        <dependency>

            <groupId>org.apache.hadoop</groupId>

            <artifactId>hadoop-yarn-server-web-proxy</artifactId>

            <version>2.6.0-cdh5.8.0</version>

        </dependency>


    </dependencies>


    <!--maven打包-->

    <build>

        <finalName>test-my</finalName>

        <sourceDirectory>src/main/scala</sourceDirectory>

        <testSourceDirectory>src/test/scala</testSourceDirectory>

        <plugins>

            <plugin>

                <groupId>org.scala-tools</groupId>

                <artifactId>maven-scala-plugin</artifactId>

                <version>2.15.2</version>

                <executions>

                    <execution>

                        <goals>

                            <goal>compile</goal>

                            <goal>testCompile</goal>

                        </goals>

                    </execution>

                </executions>

                <configuration>

                    <scalaVersion>${scala.version}</scalaVersion>

                    <args>

                        <arg>-target:jvm-1.7</arg>

                    </args>

                </configuration>

            </plugin>

            <plugin>

                <groupId>org.apache.maven.plugins</groupId>

                <artifactId>maven-eclipse-plugin</artifactId>

                <configuration>

                    <downloadSources>true</downloadSources>

                    <buildcommands>

                        <buildcommand>ch.epfl.lamp.sdt.core.scalabuilder</buildcommand>

                    </buildcommands>

                    <additionalProjectnatures>

                        <projectnature>ch.epfl.lamp.sdt.core.scalanature</projectnature>

                    </additionalProjectnatures>

                    <classpathContainers>

                        <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>

                        <classpathContainer>ch.epfl.lamp.sdt.launching.SCALA_CONTAINER</classpathContainer>

                    </classpathContainers>

                </configuration>

            </plugin>

            <plugin>

                <artifactId>maven-assembly-plugin</artifactId>

                <configuration>

                    <descriptorRefs>

                        <descriptorRef>jar-with-dependencies</descriptorRef>

                    </descriptorRefs>

                    <archive>

                        <manifest>

                            <mainClass></mainClass>

                        </manifest>

                    </archive>

                </configuration>

                <executions>

                    <execution>

                        <id>make-assembly</id>

                        <phase>package</phase>

                        <goals>

                            <goal>single</goal>

                        </goals>

                    </execution>

                </executions>

            </plugin>

        </plugins>

    </build>

    <reporting>

        <plugins>

            <plugin>

                <groupId>org.scala-tools</groupId>

                <artifactId>maven-scala-plugin</artifactId>

                <configuration>

                    <scalaVersion>${scala.version}</scalaVersion>

                </configuration>

            </plugin>

        </plugins>

    </reporting>


</project>



   2、新建测试类


import kafka.common.TopicAndPartition

import kafka.message.MessageAndMetadata

import kafka.serializer.StringDecoder

import org.apache.log4j.{Level, Logger}

import org.apache.spark.{SparkConf, TaskContext}

import org.apache.spark.streaming.dstream.InputDStream

import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}

import org.apache.spark.streaming.{Seconds, StreamingContext}

import org.slf4j.LoggerFactory


/**

  * Created by yangjf on 2016/12/18

  * Update date:

  * Time: 11:10

  * Describle :从指定偏移量读取kafka数据

  * Result of Test:

  * Command:

  * Email: jifei.yang@ngaa.com.cn

  */

object ReadBySureOffsetTest {

  val logger = LoggerFactory.getLogger(ReadBySureOffsetTest.getClass)


  def main(args: Array[String]) {

    //设置打印日志级别

    Logger.getLogger("org.apache.kafka").setLevel(Level.ERROR)

    Logger.getLogger("org.apache.zookeeper").setLevel(Level.ERROR)

    Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)

    logger.info("测试从指定offset消费kafka的主程序开始")

    if (args.length < 1) {

      System.err.println("Your arguments were " + args.mkString(","))

      System.exit(1)

      logger.info("主程序意外退出")

    }

    //hdfs://hadoop1:8020/user/root/spark/checkpoint

    val Array(checkpointDirectory) = args

    logger.info("checkpoint检查:" + checkpointDirectory)

    val ssc = StreamingContext.getOrCreate(checkpointDirectory,

      () => {

        createContext(checkpointDirectory)

      })

    logger.info("streaming开始启动")

    ssc.start()

    ssc.awaitTermination()

  }


  def createContext(checkpointDirectory: String): StreamingContext = {

    //获取配置

    val brokers = "hadoop3:9092,hadoop4:9092"

    val topics = "20161218a"


    //默认为5秒

    val split_rdd_time = 8

    // 创建上下文

    val sparkConf = new SparkConf()

      .setAppName("SendSampleKafkaDataToApple").setMaster("local[2]")

      .set("spark.app.id", "streaming_kafka")


    val ssc = new StreamingContext(sparkConf, Seconds(split_rdd_time))


    ssc.checkpoint(checkpointDirectory)


    // 创建包含brokers和topic的直接kafka流

    val topicsSet: Set[String] = topics.split(",").toSet

    //kafka配置参数

    val kafkaParams: Map[String, String] = Map[String, String](

      "metadata.broker.list" -> brokers,

      "group.id" -> "apple_sample",

      "serializer.class" -> "kafka.serializer.StringEncoder"

//      "auto.offset.reset" -> "largest"   //自动将偏移重置为最新偏移(默认)

//      "auto.offset.reset" -> "earliest"  //自动将偏移重置为最早的偏移

//      "auto.offset.reset" -> "none"      //如果没有为消费者组找到以前的偏移,则向消费者抛出异常

    )

    /**

      * 从指定位置开始读取kakfa数据

      * 注意:由于Exactly  Once的机制,所以任何情况下,数据只会被消费一次!

      *      指定了开始的offset后,将会从上一次Streaming程序停止处,开始读取kafka数据

      */

    val offsetList = List((topics, 0, 22753623L),(topics, 1, 327041L))                          //指定topic,partition_no,offset

    val fromOffsets = setFromOffsets(offsetList)     //构建参数

    val messageHandler = (mam: MessageAndMetadata[String, String]) => (mam.topic, mam.message()) //构建MessageAndMetadata

   //使用高级API从指定的offset开始消费,欲了解详情,

   //请进入"http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.kafka.KafkaUtils$"查看

    val messages: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, fromOffsets, messageHandler)


    //数据操作

    messages.foreachRDD(mess => {

      //获取offset集合

      val offsetsList = mess.asInstanceOf[HasOffsetRanges].offsetRanges

      mess.foreachPartition(lines => {

        lines.foreach(line => {

          val o: OffsetRange = offsetsList(TaskContext.get.partitionId)

          logger.info("++++++++++++++++++++++++++++++此处记录offset+++++++++++++++++++++++++++++++++++++++")

          logger.info(s"${o.topic}  ${o.partition}  ${o.fromOffset}  ${o.untilOffset}")

          logger.info("+++++++++++++++++++++++++++++++此处消费数据操作++++++++++++++++++++++++++++++++++++++")

          logger.info("The kafka  line is " + line)

        })

      })

    })

    ssc

  }


  //构建Map

  def setFromOffsets(list: List[(String, Int, Long)]): Map[TopicAndPartition, Long] = {

    var fromOffsets: Map[TopicAndPartition, Long] = Map()

    for (offset <- list) {

      val tp = TopicAndPartition(offset._1, offset._2)//topic和分区数

      fromOffsets += (tp -> offset._3)           // offset位置

    }

    fromOffsets

  }

}


四、参考文档:

    1、spark API  http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.kafka.KafkaUtils$

    2、Kafka官方配置说明:http://kafka.apache.org/documentation.html#configuration

    3、Kafka SampleConsumer:https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example

    4、Spark streaming 消费遍历offset说明:http://spark.apache.org/docs/1.6.0/streaming-kafka-integration.html

    5、Kafka官方API说明:http://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html


相关文档推荐

腾讯大数据基于StarRocks的向量检索探索.PDF

1737425434 赵裕隆 3.48MB 34页 积分6

B站一站式大数据集群管理平台.PDF

1737421412 刘明刚 1.37MB 30页 积分6

StarRocks在爱奇艺大数据场景的实践.PDF

1737365327 林豪 3.57MB 27页 积分5

农业农村大数据平台互联互通规范.PDF

1736163860  0.49MB 11页 积分5

工业大数据管理与治理智能制造的基座.PDF

1733702095 王宏志 3.83MB 54页 积分6

中国出海互联网公司数据保护合规对策.PDF

1732603379  2.22MB 14页 积分4

数据跨境现状调查与分析报告.PDF

1732603016  1.98MB 53页 积分5

企业数据合规指引个人信息保护指引.PDF

1732591271  15.22MB 23页 积分5

相关文章推荐

大数据开发流程及规范

网络收集 CIO之家的朋友 

大数据技术名词解释

51CTO CIO之家的朋友 

大数据常见问题之数据倾斜

CSDN CIO之家的朋友 

详解大数据批流处理中的两大架构

数仓宝贝库 韩锐、刘驰 

什么是大数据战略以及如何构建

51CTO CIO之家的朋友 

大数据的数据治理与应用场景

51CTO CIO之家的朋友