项目描述
一 项目简介
在远程服务器上的数据库中有两张表,user 和order,现需要对表中的数据做分析,将分析后的结果再存到mysql中。两张表的结构如下图所示
现需要分析每一天user和,oder的新增数量。
在远程的日志服务器上存放有用户浏览网站所留下的apache日志数据,现在需要对日志文件进行ETL处理,并分析用户的行为。
日志文件的格式如下
221.204.14.33 - - [11/Jul/2014:01:23:22 +0800] "GET /static/image/common/pic-next.png HTTP/1.0" 200 1496 "http://www.aboutyun.com/thread-7977-1-1.html" "Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7.0; rv:11.0) like Gecko"
二 项目分析
由于这是一个学习项目,没有用户的真实数据,所以我们在本机windows上安装mysql数据,在虚拟机中用Python脚本模拟用户注册信息,向数据库中插入数据。然后在写一个python脚本,利用sqoop工具向虚拟机中的HIVE中导入数据,第一导入为全量导入,以后为增量导入。然后利用HIVE做大数据分析,分析完成后再利用sqoop工具把分析结果存入mysql中。
在日志文件处理方面,我们先在网上下载好离线的apache日志数据,利用python脚本每天定时向HDFS中导入日志数据,然后利用mapreduce对日志数据进行清洗,清洗完以后的数据再导入到HIVE中,然后在HIVE中对日志文件进行分析。分析完成后利用sqoop工具将分析结果导入到Mysql中。
这就是这个项目的的主要工作类容及工作流程。
三 项目流程图
四 项目开发
1 创建数据库db_etl,新建两张表user 和oder。表结构如第一部分图所示。
2 编写python脚本,实现自动向mysql中插入数据。
新建python 项目,目录结构如下图
编写代码如下:
# _*_ coding:UTF-8 _*_'''Created on 2016年12月1日
@author: duking'''import MySQLdb
import random,stringimport time
import threading'''数据库连接'''def ConnMysql():
#连接数据库
conn = MySQLdb.connect(host = "192.168.0.154", user = 'root', passwd = '123456', db = 'db_etl', charset = 'utf8')
cursor = conn.cursor() return conn,cursor'''插入user数据'''def AddUserInfo(username,passwd):
conn,cursor = ConnMysql()
sql = "insert into userinfo(username,passwd) values(%s,%s)"
param = (username,passwd)
cursor.execute(sql,param)
conn.commit()
cursor.close()
conn.close()
'''插入order数据'''def AddOderInfo(warename,price):
conn,cursor = ConnMysql()
sql = "insert into oderinfo(warename,price) values(%s,%s)"
param = (warename,price)
cursor.execute(sql,param)
conn.commit()
cursor.close()
conn.close()'''随机产生字符串'''def Random_Str(randomlength):
a = list(string.ascii_letters)
random.shuffle(a) return ''.join(a[:randomlength])
#随机生成订单信息
def MakeOderInfo(threadname): while(True):
#随机10~100秒生成一条Oder信息
time.sleep(random.randint(10,100))
AddOderInfo(Random_Str(random.randint(6,10)),float(round(random.uniform(10,100),2)))
print threadname + ':a new OderInfo is Maked ' + time.ctime(time.time())
#随机生成用户信息
def MakeUserInfo(threadname): while(True):
time.sleep(random.randint(20,100))
AddUserInfo(Random_Str(random.randint(6,10)),Random_Str(random.randint(6,10)))
print threadname + ':a new UserInfo is Maked ' +time.ctime(time.time())
#python 模块的入口:main函数if __name__ == '__main__':
#多线程
thread_1 = threading.Thread(target=MakeOderInfo,args=('thread_1', ))
thread_2 = threading.Thread(target=MakeUserInfo,args=('thread_2', ))
#启动线程
thread_1.start()
thread_2.start()
注意:python调用mysql需要引入MySQLdb模块,改模块的安装请看另外的教程
最后,将写好的python在linux中运行。
运行后查看数据库就可以看见数据在不断的增长了。
一 在HIVE中创建ETL数据库
->create database etl;
二 在工程目录下新建MysqlToHive.py 和conf文件夹
在conf文件夹下新建如下文件,最后的工程目录如下图
三 源码
Import.xml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | <?xml version= "1.0" encoding= "UTF-8" ?>
<root>
<importtype>
<value>add</value> <!-- 增量导入或者全导入 -->
</importtype>
<task type= "all" >
<table>user_all</table> <!-- 数据库中需要增量导入的第一张表名 -->
<table>oder_all</table> <!-- 数据库中需要增量导入的第一张表名 -->
</task>
<task type= "add" >
<table>user_add</table> <!-- 数据库中需要增量导入的第一张表名 -->
<table>oder_add</table> <!-- 数据库中需要增量导入的第一张表名 -->
</task>
</root>
|
oder_add.xml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | <?xml version= "1.0" encoding= "UTF-8" ?>
<root>
<sqoop-shell type= "import" >
<param key= "connect" >jdbc:mysql: //192.168.0.154:3306/db_etl</param> <!-- 数据库连接地址 -->
<param key= "username" >root</param> <!-- 数据库用户名 -->
<param key= "password" > 123456 </param> <!-- 数据库密码 -->
<param key= "table" >oderinfo</param><!-- 数据库中待导出的表名 -->
<param key= "hive-database" >etl</param> <!-- 指定导入到HIVE的哪个数据库中 -->
<param key= "hive-partition-key" >dt</param> <!-- 通过时间分区 -->
<param key= "hive-partition-value" >$dt</param>
<param key= "hive-import" ></param>
<param key= "check-column" >crt_time</param> <!-- 增量导入检查的列 -->
<param key= "incremental" >lastmodified</param> <!-- 按照时间簇来进行增量导入 -->
<param key= "last-value" > 23 : 59 : 59 </param> <!-- 增量导入时间划分点 -->
<param key= "num-mappers" > 1 </param> <!-- 使用map任务个数 -->
<param key= "split-by" >id</param> <!-- 将表按照id水平切分交给map处理 -->
</sqoop-shell>
</root>
|
oder_all.xml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 | <?xml version= "1.0" encoding= "UTF-8" ?>
<root>
<sqoop-shell type= "import" >
<param key= "connect" >jdbc:mysql: //192.168.0.154:3306/db_etl</param> <!-- 数据库连接地址 -->
<param key= "username" >root</param><!-- 数据库用户名 -->
<param key= "password" > 123456 </param><!-- 数据库密码 -->
<param key= "table" >oderinfo</param><!-- 数据库中待导出的表名 -->
<param key= "hive-database" >etl</param> <!-- 指定导入到HIVE的哪个数据库中 -->
<param key= "hive-partition-key" >dt</param> <!-- 通过时间分区 -->
<param key= "hive-partition-value" >$dt</param>
<param key= "hive-import" ></param>
<param key= "create-hive-table" ></param> <!-- 在hive中新建一张同名同结构的表 -->
<param key= "hive-overwrite" ></param> <!-- 覆盖原来以存在的表 -->
<param key= "num-mappers" > 1 </param> <!-- 使用map任务个数 -->
<param key= "split-by" >id</param> <!-- 将表按照id水平切分交给map处理 -->
</sqoop-shell>
</root>
|
user_add.xml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | <?xml version= "1.0" encoding= "UTF-8" ?>
<root>
<sqoop-shell type= "import" >
<param key= "connect" >jdbc:mysql: //192.168.0.154:3306/db_etl</param> <!-- 数据库连接地址 -->
<param key= "username" >root</param> <!-- 数据库用户名 -->
<param key= "password" > 123456 </param> <!-- 数据库密码 -->
<param key= "table" >userinfo</param><!-- 数据库中待导出的表名 -->
<param key= "hive-database" >etl</param> <!-- 指定导入到HIVE的哪个数据库中 -->
<param key= "hive-partition-key" >dt</param> <!-- 通过时间分区 -->
<param key= "hive-partition-value" >$dt</param>
<param key= "hive-import" ></param>
<param key= "check-column" >crt_time</param> <!-- 增量导入检查的列 -->
<param key= "incremental" >lastmodified</param> <!-- 按照时间簇来进行增量导入 -->
<param key= "last-value" > 23 : 59 : 59 </param> <!-- 增量导入时间划分点 -->
<param key= "num-mappers" > 1 </param> <!-- 使用map任务个数 -->
<param key= "split-by" >id</param> <!-- 将表按照id水平切分交给map处理 -->
</sqoop-shell>
</root>
|
user_all.xml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 | <?xml version= "1.0" encoding= "UTF-8" ?>
<root>
<sqoop-shell type= "import" >
<param key= "connect" >jdbc:mysql: //192.168.0.154:3306/db_etl</param> <!-- 数据库连接地址 -->
<param key= "username" >root</param><!-- 数据库用户名 -->
<param key= "password" > 123456 </param><!-- 数据库密码 -->
<param key= "table" >userinfo</param><!-- 数据库中待导出的表名 -->
<param key= "hive-database" >etl</param> <!-- 指定导入到HIVE的哪个数据库中 -->
<param key= "hive-partition-key" >dt</param> <!-- 通过时间分区 -->
<param key= "hive-partition-value" >$dt</param>
<param key= "hive-import" ></param>
<param key= "create-hive-table" ></param> <!-- 在hive中新建一张同名同结构的表 -->
<param key= "hive-overwrite" ></param> <!-- 覆盖原来以存在的表 -->
<param key= "num-mappers" > 1 </param> <!-- 使用map任务个数 -->
<param key= "split-by" >id</param> <!-- 将表按照id水平切分交给map处理 -->
</sqoop-shell>
</root>
|
MysqlToHive.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 | # _*_ coding:UTF-8 _*_
'''
Created on 2016??12??1??
@author: duking
'''
import datetime
import os
import xml.etree.ElementTree as ET
import collections
#获取昨天时间
def getYesterday():
today = datetime.date.today()
oneday = datetime.timedelta(days = 1 )
yesterday = today - oneday
return yesterday
def Resolve_Conf(dt):
#获取当前工程目录
PROJECT_DIR = os.getcwd()
#获得配置文件名
conf_file = PROJECT_DIR + "\conf\Import.xml"
#解析配置文件
xml_tree = ET.parse(conf_file)
#提取出本次导入的类型 全导入或者增量导入 通过配置import.xml中的plan标签的value值设定
import_types = xml_tree.findall( './importtype' )
for import_type in import_types:
aim_types = import_type.findall( './value' )
for i in range ( len (aim_types)):
aim_type = aim_types[i].text
#获得task元素
tasks = xml_tree.findall( './task' )
#用来保存待执行的sqoop命令的集合
cmds = []
for task in tasks:
#获得导入类型,增量导入或者全量导入
import_type = task.attrib[ "type" ]
#如果task的标签导入类型与设定类型不同,结束本次循环
if (import_type ! = aim_type):
continue
#获得表名集合
tables = task.findall( './table' )
#迭代表名集合,解析表配置文件
for i in range ( len (tables)):
#表名
table_name = tables[i].text
#表配置文件名
table_conf_file = PROJECT_DIR + "\conf\\" + table_name + " .xml"
#解析表配置文件
xmlTree = ET.parse(table_conf_file)
#获取sqoop-shell 节点
sqoopNodes = xmlTree.findall( "./sqoop-shell" )
#获取sqoop 命令类型
sqoop_cmd_type = sqoopNodes[ 0 ].attrib[ "type" ]
#首先组装成sqoop命令头
command = "sqoop " + sqoop_cmd_type
#获取
praNodes = sqoopNodes[ 0 ].findall( "./param" )
#用来保存param的信息的有序字典
cmap = collections.OrderedDict()
#将所有param中的key-value存入字典中
for i in range ( len (praNodes)):
#获取key的属性值
key = praNodes[i].attrib[ "key" ]
#获取param标签中的值
value = praNodes[i].text
#保存到字典中
cmap[key] = value
#迭代字典将param的信息拼装成字符串
for key in cmap:
value = cmap[key]
#如果不是键值对形式的命令 或者值为空,跳出此次循环
if (value = = None or value = = " " or value == " "):
value = ""
if (key = = "hive-partition-value" ):
value = value.replace( '$dt' , str (dt))
#合成前一天的时间
if (key = = "last-value" ):
value = '"' + str(dt) + " " + value + '"'
#拼装为命令
command + = " --" + key + " " + value + " "
#将命令加入至待执行的命令集合
cmds.append(command)
return cmds
#python 模块的入口:main函数
if __name__ = = '__main__' :
dt = getYesterday();
#解析配置文件,生成相应的HQL语句
cmds = Resolve_Conf(dt)
#迭代集合,执行命令
for i in range ( len (cmds)):
cmd = cmds[i]
print cmd
#执行导入过秤
os.system(cmd)
|
CIO之家 www.ciozj.com 公众号:imciow