一、情景:当Spark streaming程序意外退出时,数据仍然再往Kafka中推送,然而由于Kafka默认是从latest的offset读取,这会导致数据丢失。为了避免数据丢失,那么我们需要记录每次消费的offset,以便下次检查并且从指定的offset开始读取
二、环境:kafka-0.9.0、spark-1.6.0、jdk-1.7、Scala-2.10.5、idea16
三、实现代码:
1、引入spark和kafka的相关依赖包
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.ngaa</groupId>
<artifactId>test-my</artifactId>
<version>1.0-SNAPSHOT</version>
<inceptionYear>2008</inceptionYear>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<!--add maven release-->
<maven.compiler.source>1.7</maven.compiler.source>
<maven.compiler.target>1.7</maven.compiler.target>
<encoding>UTF-8</encoding>
<!--scala版本-->
<scala.version>2.10.5</scala.version>
<!--测试机器上的scala版本-->
<test.scala.version>2.11.7</test.scala.version>
<jackson.version>2.3.0</jackson.version>
<!--slf4j版本-->
<slf4j-version>1.7.20</slf4j-version>
<!--cdh-spark-->
<spark.cdh.version>1.6.0-cdh5.8.0</spark.cdh.version>
<spark.streaming.cdh.version>1.6.0-cdh5.8.0</spark.streaming.cdh.version>
<kafka.spark.cdh.version>1.6.0-cdh5.8.0</kafka.spark.cdh.version>
<!--cdh-hadoop-->
<hadoop.cdh.version>2.6.0-cdh5.8.0</hadoop.cdh.version>
<!--http client必需要兼容CDH中的hadoop版本(cd /opt/cloudera/parcels/CDH/lib/hadoop/lib)-->
<httpclient.version>4.2.5</httpclient.version>
<!--http copre-->
<httpcore.version>4.2.5</httpcore.version>
<!--fastjson-->
<fastjson.version>1.1.39</fastjson.version>
</properties>
<repositories>
<repository>
<id>scala-tools.org</id>
<name>Scala-Tools Maven2 Repository</name>
<url>http://scala-tools.org/repo-releases</url>
</repository>
<!--配置依赖库地址(用于加载CDH依赖的jar包) -->
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
<id>scala-tools.org</id>
<name>Scala-Tools Maven2 Repository</name>
<url>http://scala-tools.org/repo-releases</url>
</pluginRepository>
</pluginRepositories>
<dependencies>
<!--fastjson-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>${fastjson.version}</version>
</dependency>
<!--httpclient-->
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>${httpclient.version}</version>
</dependency>
<!--http core-->
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
<version>${httpcore.version}</version>
</dependency>
<!--slf4j-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j-version}</version>
</dependency>
<!--hadoop-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.cdh.version}</version>
<exclusions>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.cdh.version}</version>
<exclusions>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${hadoop.cdh.version}</version>
<exclusions>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<!--spark scala-->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
</dependency>
<!--spark streaming和kafka的相关包-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>${spark.streaming.cdh.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.10</artifactId>
<version>${kafka.spark.cdh.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
<!--引入windows本地库的spark包-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-assembly_2.10</artifactId>
<version>${spark.cdh.version}</version>
<scope>system</scope>
<systemPath>D:/crt_send_document/spark-assembly-1.6.0-cdh5.8.0-hadoop2.6.0-cdh5.8.0.jar</systemPath>
</dependency>
<!--引入测试环境linux本地库的spark包-->
<!--<dependency>-->
<!--<groupId>org.apache.spark</groupId>-->
<!--<artifactId>spark-assembly_2.10</artifactId>-->
<!--<version>${spark.cdh.version}</version>-->
<!--<scope>system</scope>-->
<!--<systemPath>/opt/cloudera/parcels/CDH/lib/spark/lib/spark-examples-1.6.0-cdh5.8.0-hadoop2.6.0-cdh5.8.0.jar-->
<!--</systemPath>-->
<!--</dependency>-->
<!--引入中央仓库的spark包-->
<!--<dependency>-->
<!--<groupId>org.apache.spark</groupId>-->
<!--<artifactId>spark-assembly_2.10</artifactId>-->
<!--<version>${spark.cdh.version}</version>-->
<!--</dependency>-->
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-yarn-server-web-proxy -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-web-proxy</artifactId>
<version>2.6.0-cdh5.8.0</version>
</dependency>
</dependencies>
<!--maven打包-->
<build>
<finalName>test-my</finalName>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<version>2.15.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
<configuration>
<scalaVersion>${scala.version}</scalaVersion>
<args>
<arg>-target:jvm-1.7</arg>
</args>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-eclipse-plugin</artifactId>
<configuration>
<downloadSources>true</downloadSources>
<buildcommands>
<buildcommand>ch.epfl.lamp.sdt.core.scalabuilder</buildcommand>
</buildcommands>
<additionalProjectnatures>
<projectnature>ch.epfl.lamp.sdt.core.scalanature</projectnature>
</additionalProjectnatures>
<classpathContainers>
<classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
<classpathContainer>ch.epfl.lamp.sdt.launching.SCALA_CONTAINER</classpathContainer>
</classpathContainers>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass></mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
<reporting>
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<configuration>
<scalaVersion>${scala.version}</scalaVersion>
</configuration>
</plugin>
</plugins>
</reporting>
</project>
2、新建测试类
import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, TaskContext}
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.slf4j.LoggerFactory
/**
* Created by yangjf on 2016/12/18
* Update date:
* Time: 11:10
* Describle :从指定偏移量读取kafka数据
* Result of Test:
* Command:
* Email: jifei.yang@ngaa.com.cn
*/
object ReadBySureOffsetTest {
val logger = LoggerFactory.getLogger(ReadBySureOffsetTest.getClass)
def main(args: Array[String]) {
//设置打印日志级别
Logger.getLogger("org.apache.kafka").setLevel(Level.ERROR)
Logger.getLogger("org.apache.zookeeper").setLevel(Level.ERROR)
Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
logger.info("测试从指定offset消费kafka的主程序开始")
if (args.length < 1) {
System.err.println("Your arguments were " + args.mkString(","))
System.exit(1)
logger.info("主程序意外退出")
}
//hdfs://hadoop1:8020/user/root/spark/checkpoint
val Array(checkpointDirectory) = args
logger.info("checkpoint检查:" + checkpointDirectory)
val ssc = StreamingContext.getOrCreate(checkpointDirectory,
() => {
createContext(checkpointDirectory)
})
logger.info("streaming开始启动")
ssc.start()
ssc.awaitTermination()
}
def createContext(checkpointDirectory: String): StreamingContext = {
//获取配置
val brokers = "hadoop3:9092,hadoop4:9092"
val topics = "20161218a"
//默认为5秒
val split_rdd_time = 8
// 创建上下文
val sparkConf = new SparkConf()
.setAppName("SendSampleKafkaDataToApple").setMaster("local[2]")
.set("spark.app.id", "streaming_kafka")
val ssc = new StreamingContext(sparkConf, Seconds(split_rdd_time))
ssc.checkpoint(checkpointDirectory)
// 创建包含brokers和topic的直接kafka流
val topicsSet: Set[String] = topics.split(",").toSet
//kafka配置参数
val kafkaParams: Map[String, String] = Map[String, String](
"metadata.broker.list" -> brokers,
"group.id" -> "apple_sample",
"serializer.class" -> "kafka.serializer.StringEncoder"
// "auto.offset.reset" -> "largest" //自动将偏移重置为最新偏移(默认)
// "auto.offset.reset" -> "earliest" //自动将偏移重置为最早的偏移
// "auto.offset.reset" -> "none" //如果没有为消费者组找到以前的偏移,则向消费者抛出异常
)
/**
* 从指定位置开始读取kakfa数据
* 注意:由于Exactly Once的机制,所以任何情况下,数据只会被消费一次!
* 指定了开始的offset后,将会从上一次Streaming程序停止处,开始读取kafka数据
*/
val offsetList = List((topics, 0, 22753623L),(topics, 1, 327041L)) //指定topic,partition_no,offset
val fromOffsets = setFromOffsets(offsetList) //构建参数
val messageHandler = (mam: MessageAndMetadata[String, String]) => (mam.topic, mam.message()) //构建MessageAndMetadata
//使用高级API从指定的offset开始消费,欲了解详情,
//请进入"http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.kafka.KafkaUtils$"查看
val messages: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, fromOffsets, messageHandler)
//数据操作
messages.foreachRDD(mess => {
//获取offset集合
val offsetsList = mess.asInstanceOf[HasOffsetRanges].offsetRanges
mess.foreachPartition(lines => {
lines.foreach(line => {
val o: OffsetRange = offsetsList(TaskContext.get.partitionId)
logger.info("++++++++++++++++++++++++++++++此处记录offset+++++++++++++++++++++++++++++++++++++++")
logger.info(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
logger.info("+++++++++++++++++++++++++++++++此处消费数据操作++++++++++++++++++++++++++++++++++++++")
logger.info("The kafka line is " + line)
})
})
})
ssc
}
//构建Map
def setFromOffsets(list: List[(String, Int, Long)]): Map[TopicAndPartition, Long] = {
var fromOffsets: Map[TopicAndPartition, Long] = Map()
for (offset <- list) {
val tp = TopicAndPartition(offset._1, offset._2)//topic和分区数
fromOffsets += (tp -> offset._3) // offset位置
}
fromOffsets
}
}
四、参考文档:
1、spark API http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.kafka.KafkaUtils$
2、Kafka官方配置说明:http://kafka.apache.org/documentation.html#configuration
3、Kafka SampleConsumer:https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example
4、Spark streaming 消费遍历offset说明:http://spark.apache.org/docs/1.6.0/streaming-kafka-integration.html
5、Kafka官方API说明:http://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
CIO之家 www.ciozj.com 公众号:imciow