一切开始源于学以致用!终于开始研究Spark Sql了,之前一直参考官网的例子和大神博客,学习过这些例子之后,发现要么基于本地数据源,要么就是基于hdfs数据源。而我试图想从Hbase获取数据后,再实现Spark Sql例子,而这种示例也是我多天来没有google到的,所以想记录下来方便自己查阅,或许也能帮到像我一下开始学习Spark Sql的菜菜鸟!废话不多说,马上开始!
部署环境:Centos6.6
部署软件:Hadoop-2.7.0 + Hbase-0.98.12.1 + Spark-1.3.1
依赖:jdk1.7+scala-2.10.4
开发环境:ideaIU-14.1.4
运行环境:Centos6.6/win7
示例代码:
import
java.io.Serializable
import
java.util.logging.Logger
import
org.apache.hadoop.hbase.HBaseConfiguration
import
org.apache.hadoop.hbase.mapreduce.TableInputFormat
import
org.apache.spark.{SparkConf, SparkContext}
object MySparkSql
extends
Serializable {
case
class
Score(name: String, chinese: Int, english: Int, math: Int)
val logger = Logger.getLogger(MySparkSql.getClass.getName)
def main(args: Array[String]) {
val jars: Array[String] = Array(
"D:\\workspace\\mysparksql_2.10-1.0.jar"
)
System.setProperty(
"hadoop.home.dir"
,
"E:\\Program Files\\hadoop-2.7.0"
)
//win7环境下运行须加
val sconf =
new
SparkConf()
.setMaster(
"local"
)
.setAppName(
"MySparkSql"
)
//win7环境下设置
.set(
"spark.executor.memory"
,
"1g"
)
.set(
"spark.serializer"
,
"org.apache.spark.serializer.KryoSerializer"
)
// .setJars(jars)//在集群测试下,设置应用所依赖的jar包
val sc =
new
SparkContext(sconf)
val conf = HBaseConfiguration.create()
conf.set(
"hbase.zookeeper.property.clientPort"
,
"2181"
)
conf.set(
"hbase.zookeeper.quorum"
,
"h230"
)
conf.set(
"hbase.master"
,
"h230:60000"
)
// conf.addResource("/home/hadoop/SW/hbase/conf/hbase-site.xml")//替代以上三条配置信息
conf.set(TableInputFormat.INPUT_TABLE,
"score"
)
// Scan操作
val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result])
val score = hBaseRDD.map(m => m._2.listCells()).map(c =>
Score(
new
String(c.get(
0
).getRow()),
new
String(c.get(
0
).getValue).toInt,
new
String(c.get(
1
).getValue).toInt,
new
String(c.get(
2
).getValue).toInt)
)
score.foreach(println)
val sqlContext =
new
org.apache.spark.sql.SQLContext(sc)
val scoreSchema = sqlContext.createDataFrame(score)
scoreSchema.registerTempTable(
"score"
)
var result = sqlContext.sql(
"SELECT count(0) FROM score"
)
result.collect().foreach(println)
result = sqlContext.sql(
"SELECT sum(chinese) FROM score"
)
result.collect().foreach(println)
result = sqlContext.sql(
"SELECT avg(chinese) FROM score"
)
result.collect().foreach(println)
result = sqlContext.sql(
"SELECT sum(math) FROM score"
)
result.collect().foreach(println)
result = sqlContext.sql(
"SELECT avg(math) FROM score"
)
result.collect().foreach(println)
result = sqlContext.sql(
"SELECT sum(english) FROM score"
)
result.collect().foreach(println)
result = sqlContext.sql(
"SELECT avg(english) FROM score"
)
result.collect().foreach(println)
}
}
我的build.sbt配置:
?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 | name := "MySparkSql"
version := "1.0"
scalaVersion := "2.10.4"
autoScalaLibrary := false
libraryDependencies ++= Seq( "org.apache.hadoop" % "hadoop-main" % "2.7.0" ,
"org.scala-lang" % "scala-library" % "2.10.4" ,
"org.scala-lang" % "scala-reflect" % "2.10.4" ,
"org.scala-lang" % "scala-compiler" % "2.10.4" ,
"org.scala-lang" % "scalap" % "2.10.4" ,
"org.apache.thrift" % "libthrift" % "0.9.2" ,
"org.apache.hbase" % "hbase-server" % "0.98.12.1-hadoop2" ,
"org.apache.hbase" % "hbase-client" % "0.98.12.1-hadoop2" ,
"org.apache.hbase" % "hbase-common" % "0.98.12.1-hadoop2" ,
"org.apache.spark" % "spark-sql_2.10" % "1.3.1" ,
"org.apache.spark" % "spark-core_2.10" % "1.3.1"
)
|
关于代码中的数据库表‘score’,在前面的文章中有:http://mangocool.com/1435117759266.html
最后运行结果:
?
1 2 3 4 5 6 7 8 9 10 | Score(lisi,96,76,88)
Score(wangwu,98,99,89)
Score(zhangsan,98,78,80)
[3]
[292]
[97.33333333333333]
[257]
[85.66666666666667]
[253]
[84.33333333333333]
|
CIO之家 www.ciozj.com 公众号:imciow