基于Hbase的SparkSql简单示例
mangocool mangocool

一切开始源于学以致用!终于开始研究Spark Sql了,之前一直参考官网的例子和大神博客,学习过这些例子之后,发现要么基于本地数据源,要么就是基于hdfs数据源。而我试图想从Hbase获取数据后,再实现Spark Sql例子,而这种示例也是我多天来没有google到的,所以想记录下来方便自己查阅,或许也能帮到像我一下开始学习Spark Sql的菜菜鸟!废话不多说,马上开始!

部署环境:Centos6.6

部署软件:Hadoop-2.7.0 + Hbase-0.98.12.1 + Spark-1.3.1

依赖:jdk1.7+scala-2.10.4

开发环境:ideaIU-14.1.4

运行环境:Centos6.6/win7

示例代码:

import java.io.Serializable
import java.util.logging.Logger
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.spark.{SparkConf, SparkContext}
object MySparkSql extends Serializable {
  case class Score(name: String, chinese: Int, english: Int, math: Int)
  val logger = Logger.getLogger(MySparkSql.getClass.getName)
  def main(args: Array[String]) {
    val jars: Array[String] = Array("D:\\workspace\\mysparksql_2.10-1.0.jar")
    System.setProperty("hadoop.home.dir" , "E:\\Program Files\\hadoop-2.7.0" )//win7环境下运行须加
    val sconf = new SparkConf()
      .setMaster("local")
//      .setMaster("spark://h230:7077")//在集群测试下设置,h230是我的hostname,须自行修改
      .setAppName("MySparkSql")//win7环境下设置
      .set("spark.executor.memory", "1g")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
//      .setJars(jars)//在集群测试下,设置应用所依赖的jar包
    val sc = new SparkContext(sconf)
    val conf = HBaseConfiguration.create()
    conf.set("hbase.zookeeper.property.clientPort", "2181")
    conf.set("hbase.zookeeper.quorum", "h230")
    conf.set("hbase.master", "h230:60000")
//    conf.addResource("/home/hadoop/SW/hbase/conf/hbase-site.xml")//替代以上三条配置信息
    conf.set(TableInputFormat.INPUT_TABLE, "score")
//    Scan操作
    val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
      classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
      classOf[org.apache.hadoop.hbase.client.Result])
    val score = hBaseRDD.map(m => m._2.listCells()).map(c =>
      Score(new String(c.get(0).getRow()),
        new String(c.get(0).getValue).toInt,
        new String(c.get(1).getValue).toInt,
        new String(c.get(2).getValue).toInt)
    )
    score.foreach(println)
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    val scoreSchema = sqlContext.createDataFrame(score)
    scoreSchema.registerTempTable("score")
    var result = sqlContext.sql("SELECT count(0) FROM score")
    result.collect().foreach(println)
    result = sqlContext.sql("SELECT sum(chinese) FROM score")
    result.collect().foreach(println)
    result = sqlContext.sql("SELECT avg(chinese) FROM score")
    result.collect().foreach(println)
    result = sqlContext.sql("SELECT sum(math) FROM score")
    result.collect().foreach(println)
    result = sqlContext.sql("SELECT avg(math) FROM score")
    result.collect().foreach(println)
    result = sqlContext.sql("SELECT sum(english) FROM score")
    result.collect().foreach(println)
    result = sqlContext.sql("SELECT avg(english) FROM score")
    result.collect().foreach(println)
  }
}




我的build.sbt配置:

?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
name := "MySparkSql"
version := "1.0"
scalaVersion := "2.10.4"
autoScalaLibrary := false
libraryDependencies ++= Seq("org.apache.hadoop" % "hadoop-main" % "2.7.0",
  "org.scala-lang" % "scala-library" % "2.10.4",
  "org.scala-lang" % "scala-reflect" % "2.10.4",
  "org.scala-lang" % "scala-compiler" % "2.10.4",
  "org.scala-lang" % "scalap" % "2.10.4",
  "org.apache.thrift" % "libthrift" % "0.9.2",
  "org.apache.hbase" % "hbase-server" % "0.98.12.1-hadoop2",
  "org.apache.hbase" % "hbase-client" % "0.98.12.1-hadoop2",
  "org.apache.hbase" % "hbase-common" % "0.98.12.1-hadoop2",
  "org.apache.spark" % "spark-sql_2.10" % "1.3.1",
  "org.apache.spark" % "spark-core_2.10" % "1.3.1"
)

关于代码中的数据库表‘score’,在前面的文章中有:http://mangocool.com/1435117759266.html

最后运行结果:

?

1
2
3
4
5
6
7
8
9
10
Score(lisi,96,76,88)
Score(wangwu,98,99,89)
Score(zhangsan,98,78,80)
[3]
[292]
[97.33333333333333]
[257]
[85.66666666666667]
[253]
[84.33333333333333]



CIO之家 www.ciozj.com 公众号:imciow
关联的文档
也许您喜欢