hive生产环境初探
查询某个分区的数据量, 并测试hive的分区函数
select count(*) from activity where year=2015 and month=7 and day=1select distinct from_unixtime(cast(occur_time/1000 as BIGINT),'yyyyMMdd') occur_timefrom activity where year=2015 and month=7 and day=1
--建表: 发生时间(天), 合作方, 事件类型(登陆,注册..), 风险等级(通过,拒绝)
在测试时候, 表结构会变化, 删除表的话也要删除数据. 因为是外部表.
/usr/install/hadoop/bin/hadoop fs -rmr /user/tongdun/activity_yunyin
/usr/install/apache-hive-0.13.1-bin/bin/hivedrop table activity_yunyin;
创建外部表, 并引用一个外部路径.
CREATE EXTERNAL TABLE `activity_yunyin`( `occur_time` string, `appName` string, `eventType` string, `riskStatus` string, `cnt` int
)ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'LOCATION 'hdfs://tdhdfs/user/tongdun/activity_yunyin';
--插入数据, 考虑每天跑昨天的数据, 增量生成数据
insert into table activity_yunyin
select occur_time,appName,eventType,riskStatus,count(*) from( select from_unixtime(cast(occur_time/1000 as BIGINT),'yyyy-MM-dd') occur_time,
event_result_map['appName'] appName,
event_result_map['eventType'] eventType,
event_result_map['riskStatus'] riskStatus from activity where year=2015 and month=4) t2 group by occur_time,appName,eventType,riskStatus
--检查数据
select * from activity_yunyin limit 50;
--检查某个合作方
select * from activity_yunyin where appName='ppdai' order by occur_time,appName,eventType,riskStatus desc
--指定时间段, 某个合作方的数据汇总
select appName,eventType,riskStatus,sum(cnt) from activity_yunyin
where appName='ppdai' and occur_time>'2015-06-10' and occur_time<'2015-06-15'group by appName,eventType,riskStatusorder by appName,eventType,riskStatus desc
--统计过去一周(不包括今天). 比如今天是18号, 则过去一周是[11,17]. 即occur>=18-7 and occur<18
select appName,eventType,riskStatus,sum(cnt) from activity_yunyin where appName='ppdai' and occur_time<from_unixtime(unix_timestamp(),'yyyy-MM-dd') and occur_time>=date_sub(from_unixtime(unix_timestamp(),'yyyy-MM-dd'),7)group by appName,eventType,riskStatus order by appName,eventType,riskStatus desc
--统计这个月目前为止(当然不包括今天)
select appName,eventType,riskStatus,sum(cnt) from activity_yunyin where appName='ppdai' and year(occur_time)='2015' and month(occur_time)='6'group by appName,eventType,riskStatus order by appName,eventType,riskStatus desc
分区会更好
--分区后, 要不要提供occur_time这个字段, 除非原始数据在每天的分区中的occur_time的值包含的不是这个分区里的数据. 当然这种情况不会有的.
CREATE EXTERNAL TABLE `activity_yunyin`( `appName` string, `eventType` string, `riskStatus` string, `cnt` int
)
PARTITIONED BY ( `year` int, `month` int, `day` int)
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'LOCATION 'hdfs://tdhdfs/user/tongdun/activity_yunyin';
--手动指定分区
insert into table activity_yunyin partition(year=2015,month=6,day=1)select appName,eventType,riskStatus,count(*) from( select
event_result_map['appName'] appName,
event_result_map['eventType'] eventType,
event_result_map['riskStatus'] riskStatus from activity where year=2015 and month=6 and day=1) t2 group by appName,eventType,riskStatus
--能不能根据日期字段的值, 自动分区,即动态分区?--注意day字段必须和创建表时指定的字段名一样
insert into table activity_yunyin partition(year=2014,month=7,day)select appName,eventType,riskStatus,count(*),day from( select
event_result_map['appName'] appName,
event_result_map['eventType'] eventType,
event_result_map['riskStatus'] riskStatus, day(from_unixtime(cast(occur_time/1000 as BIGINT),'yyyy-MM-dd')) day
from activity where year=2014 and month=7) t2 group by appName,eventType,riskStatus,day
日志输出:
Hadoop job information for Stage-1: number of mappers: 7; number of reducers: 22015-07-16 17:06:21,002 Stage-1 map = 0%, reduce = 0%2015-07-16 17:07:27,646 Stage-1 map = 100%, reduce = 100%, Cumulative CPU 206.25 sec
Hadoop job information for Stage-2: number of mappers: 2; number of reducers: 12015-07-16 17:07:36,608 Stage-2 map = 0%, reduce = 0%2015-07-16 17:07:54,497 Stage-2 map = 100%, reduce = 100%, Cumulative CPU 11.12 sec
Loading data to table default.activity_yunyin partition (year=2014, month=7, day=null)
Loading partition {year=2014, month=7, day=11}
Loading partition {year=2014, month=7, day=23}
Loading partition {year=2014, month=7, day=29}
Loading partition {year=2014, month=7, day=5}
Partition default.activity_yunyin{year=2014, month=7, day=1} stats: [numFiles=1, numRows=13, totalSize=630, rawDataSize=52]
Partition default.activity_yunyin{year=2014, month=7, day=10} stats: [numFiles=1, numRows=14, totalSize=609, rawDataSize=56]
Partition default.activity_yunyin{year=2014, month=7, day=11} stats: [numFiles=1, numRows=12, totalSize=590, rawDataSize=48]
Partition default.activity_yunyin{year=2014, month=7, day=12} stats: [numFiles=1, numRows=11, totalSize=586, rawDataSize=44]
MapReduce Jobs Launched:
Job 0: Map: 7 Reduce: 2 Cumulative CPU: 206.25 sec HDFS Read: 200980034 HDFS Write: 23570 SUCCESS
Job 1: Map: 2 Reduce: 1 Cumulative CPU: 11.12 sec HDFS Read: 24298 HDFS Write: 22399 SUCCESS
Total MapReduce CPU Time Spent: 3 minutes 37 seconds 370 msec
OKTime taken: 117.499 seconds
--如果是历史数据, 一次性要全部生成, 也可以这么干. 但是在线上运行的时候报错(可能是数据量比较大.但是由于线上看日志很麻烦,暂时先不纠结这个了).
insert into table activity_yunyin partition(year,month,day)select appName,eventType,riskStatus,count(*),year,month,dayfrom( select
event_result_map['appName'] appName,
event_result_map['eventType'] eventType,
event_result_map['riskStatus'] riskStatus, year(from_unixtime(cast(occur_time/1000 as BIGINT),'yyyy-MM-dd')) year, month(from_unixtime(cast(occur_time/1000 as BIGINT),'yyyy-MM-dd')) month, day(from_unixtime(cast(occur_time/1000 as BIGINT),'yyyy-MM-dd')) day
from activity
) t2 group by appName,eventType,riskStatus,year,month,daydistribute by year,month,day
所以导入历史数据, 暂时每个月每个月地导入. 可以考虑写到一个文件中, 然后用 hive -f **.sql
执行这个文件.
注意, 由于用hive -f执行, 暂时无法传递变量, 所以只能一段一段地拷贝, 并修改月份. 如果hive和bash能够结合起来, 则可以循环处理. 这是后话了.
验证每天指标表的数据和原始数据
指标表: 合作方,事件类型,风险,数量
select appName,eventType,riskStatus,sum(cnt)
from activity_yunyin where year=2014 and month=12 and day between 1 and 15and appName='ppdai'group by appName,eventType,riskStatus
原始表: 直接查询原始数据的map字段
select eventType,riskStatus,count(*)from( select
event_result_map['appName'] appName,
event_result_map['eventType'] eventType,
event_result_map['riskStatus'] riskStatus from activity where year=2015 and month=6 and day between 8 and 21
and event_result_map['appName']='ppdai') t2 group by eventType,riskStatus
order by eventType,riskStatus desc
其实从原始表中, 就可以给出运营数据需要的结果, 那么我们为什么要费劲建表,导数据,还要考虑定时任务的运行?
Loan Review 532Loan Reject 69Loan Accept 5518Login Review 5168Login Reject 7830Login Accept 86676Modify Review 307Modify Accept 18600Payment Review 544Payment Accept 13814Register Review 31Register Reject 1Register Accept 6658Withdraw Review 68Withdraw Reject 5Withdraw Accept 3314
如何转换成下面的格式: 一维表转为二维表??
EVENTTYPE|Accept|Review|Reject|Login |111111|222222|333333|Register |222222|333333|444444|
运营报告的一些需求:
不同合作方, 事件类型可能不一样, 但是风险类型只有三种是固定的.
其实就是行转列了. 举个我们常见的考试分数的例子, 大家就能明白了.
大白 语文 100 姓名 语文 数学 英语
大白 数据 99 ==> 大白 100 99 88大白 英语 88 小白 100 88 99
SELECT eventType,
MAX(CASE riskStatus WHEN 'Accept' THEN cnt ELSE 0 END) AS Accept,
MAX(CASE riskStatus WHEN 'Review' THEN cnt ELSE 0 END) AS Review,
MAX(CASE riskStatus WHEN 'Reject' THEN cnt ELSE 0 END) AS Reject
FROM (
select eventType,riskStatus,count(*) cnt
from(
select
event_result_map['appName'] appName,
event_result_map['eventType'] eventType,
event_result_map['riskStatus'] riskStatus
from activity where year=2015 and month=6 and day between 8 and 21
and event_result_map['appName']='ppdai'
) t2 group by eventType,riskStatus
) t3
GROUP BY eventType
EVENTTYPE Accept Review Reject
Loan 5518 532 69Register 6658 31 1Withdraw 3314 68 5Payment 13814 544 0Login 86676 5168 7830Modify 18600 307 0
定时任务每天运行
有了上面的动态分区, 就可以写出增量数据生成的脚本. 由于原始数据是一天生成一个分区, 所以增量的数据每天只查询最近的一个分区的数据!!!
动态分区例子可以参考: http://blog.csdn.net/jiedushi/article/details/7356015
http://f.dataguru.cn/thread-346419-1-1.html
insert into table activity_yunyin partition(year,month,day)select appName,eventType,riskStatus,count(*),year,month,dayfrom( select
event_result_map['appName'] appName,
event_result_map['eventType'] eventType,
event_result_map['riskStatus'] riskStatus, year(from_unixtime(cast(occur_time/1000 as BIGINT),'yyyy-MM-dd')) year, month(from_unixtime(cast(occur_time/1000 as BIGINT),'yyyy-MM-dd')) month, day(from_unixtime(cast(occur_time/1000 as BIGINT),'yyyy-MM-dd')) day
from activity where
year=year(date_sub(from_unixtime(unix_timestamp(),'yyyy-MM-dd'),1)) and month=month(date_sub(from_unixtime(unix_timestamp(),'yyyy-MM-dd'),1)) and day=day(date_sub(from_unixtime(unix_timestamp(),'yyyy-MM-dd'),1))
) t2 group by appName,eventType,riskStatus,year,month,daydistribute by year,month,day
定时任务的话, 可以考虑把上面的语句放在脚本中. 然后写一个bash, 用crontab运行.
hive导出结果以及执行方式
hive除了支持hive-shell交互式查询外, 也支持 hive -e "SQL", hive -f scriptFile
hive -e
sudo /usr/install/apache-hive-1.2.0-bin/bin/hive -e "select * from tbl_activity_eventresult_hive limit 10"baihe Accept Message
baihe Accept Message
baihe Reject Message
nonobank Reject Other
也可以支持多行, 用的是终端的\连接符
sudo /usr/install/apache-hive-1.2.0-bin/bin/hive -e "select * from tbl_activity_eventresult_hive \
> limit 10"
hive -f
> cat hive_test.sql
select * from tbl_activity_eventresult_hive
limit 10
> sudo /usr/install/apache-hive-1.2.0-bin/bin/hive -f hive_test.sql
hive+bash
https://github.com/dropbox/PyHive/blob/master/scripts/make_many_rows.s...
> vi hive_test.sh#!/bin/shcd /usr/install/apache-hive-1.2.0-bin
lmt=100## hive -e "SQL" > filesudo bin/hive -e "
select * from tbl_activity_eventresult_hive limit $lmt" > ~/hive_result
> chmod 755 hive_test.sh
> ./hive_test.sh
可以指定分隔符, 只能指定文件夹的名字, 里面文件名是0000_0这样的格式
sudo bin/hive -e "insert overwrite local directory '/home/qihuang.zheng/result'row format delimited fields terminated by ','select * from tbl_activity_eventresult_hive limit 10;
"
使用hive -e的结果,将tab分隔符替换为,: http://stackoverflow.com/questions/18129581/how-do-i-output-the-result...
sudo /usr/install/apache-hive-1.2.0-bin/bin/hive -e 'select * from tbl_activity_eventresult_hive limit 10' | sed 's/[[:space:]]\+/,/g'sudo bin/hive -e "
SET hive.cli.print.header=true;
select * from tbl_activity_eventresult_hive limit $lmt;
" | sed 's/[[:space:]]\+/,/g' > ~/hive_result
查看结果, 以逗号分隔. 注意hive -e也支持多条语句(比如这里的输出字段头), 和hive-shell一样,以分号分隔.
$ head hive_result
tbl_activity_eventresult_hive.appname,tbl_activity_eventresult_hive.riskstatus,tbl_activity_eventresult_hive.eventtype
baihe,Accept,Messagebaihe,Accept,Messagebaihe,Reject,Messagenonobank,Reject,Other
生产环境使用脚本获得运营统计报表(Version1)
1.编写脚本:
功能: 某个合作方的时间段指标统计(指标现在是固定的)
参数: 合作方(ppdai), 年(2015), 月(6), 开始时间(8), 结束时间(21)
#!/bin/shcd /usr/install/apache-hive-0.13.1-bin# ./hive_query.sh ppdai 2015 6 8 21lmt=10
appName=$1 #合作方,第一个参数,从1开始!, 第0个元素是脚本的文件名称year=$2 #年month=$3 #月startDay=$4 #开始时间endDay=$5 #结束时间currentDate=`date "+%Y%m%d%H%M%S"`
resFileName="$appName.$currentDate.csv"sql="
set mapreduce.map.memory.mb=4096;
set mapreduce.reduce.memory.mb=5120;
SELECT eventType,
MAX(CASE riskStatus WHEN 'Accept' THEN cnt ELSE 0 END) AS Accept,
MAX(CASE riskStatus WHEN 'Review' THEN cnt ELSE 0 END) AS Review,
MAX(CASE riskStatus WHEN 'Reject' THEN cnt ELSE 0 END) AS Reject
FROM (
select eventType,riskStatus,count(*) cnt
from(
select
event_result_map['appName'] appName,
event_result_map['eventType'] eventType,
event_result_map['riskStatus'] riskStatus
from activity where year=$year and month=$month and day between $startDay and $endDay
and event_result_map['appName']='$appName'
) t2 group by eventType,riskStatus
) t3
GROUP BY eventType;
"echo "HiveQL: $sql"# 执行sql语句bin/hive -e "$sql" | sed 's/[[:space:]]\+/,/g' > ~/$resFileName
2.脚本运行
[qihuang.zheng@spark047213 ~]$ sh hive_query.sh ppdai 2015 6 8 21
HiveQL:set mapreduce.map.memory.mb=4096;set mapreduce.reduce.memory.mb=5120;SELECT eventType,MAX(CASE riskStatus WHEN 'Accept' THEN cnt ELSE 0 END) AS Accept,MAX(CASE riskStatus WHEN 'Review' THEN cnt ELSE 0 END) AS Review,MAX(CASE riskStatus WHEN 'Reject' THEN cnt ELSE 0 END) AS RejectFROM ( select eventType,riskStatus,count(*) cnt from( select
event_result_map['appName'] appName,
event_result_map['eventType'] eventType,
event_result_map['riskStatus'] riskStatus from activity where year=2015 and month=6 and day between 8 and 21
and event_result_map['appName']='ppdai'
) t2 group by eventType,riskStatus
) t3GROUP BY eventType;
3.检查结果
[qihuang.zheng@spark047213 ~]$ ll -rt
-rwxr-xr-x. 1 qihuang.zheng users 1175 7月 17 09:38 hive_query.sh
-rw-r--r--. 1 qihuang.zheng users 116 7月 17 09:40 ppdai.20150717093904.csv
[qihuang.zheng@spark047213 ~]$ cat ppdai.20150717093904.csv
Loan,5518,532,69Register,6658,31,1Withdraw,3314,68,5Payment,13814,544,0Login,86676,5168,7830Modify,18600,307,0
支持动态分区的选项
set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
SET hive.exec.max.dynamic.partitions=100000;
SET hive.exec.max.dynamic.partitions.pernode=100000;
执行hive脚本时, 虚拟内存不够:
hive> select count(*) from activity where year=2015 and month=7 and day=1;
Container [pid=27329,containerID=container_1437013300118_0012_01_000036] is running beyond virtual memory limits. Current usage: 379.9 MB of 2 GB physical memory used; 6.9 GB of 4.2 GB virtual memory used. Killing container.
https://altiscale.zendesk.com/hc/en-us/articles/200801519-Configuring-...
set mapreduce.map.memory.mb=4096;set mapreduce.reduce.memory.mb=5120;
手贱误删除了生产上的activity表的恢复过程
CIO之家 www.ciozj.com 公众号:imciow