hadoop项目实战
网友 博客园

项目描述

 项目简介

在远程服务器上的数据库中有两张表,user order,现需要对表中的数据做分析,将分析后的结果再存到mysql中。两张表的结构如下图所示

 

现需要分析每一天user,oder的新增数量。

在远程的日志服务器上存放有用户浏览网站所留下的apache日志数据,现在需要对日志文件进行ETL处理,并分析用户的行为。

日志文件的格式如下

221.204.14.33 - - [11/Jul/2014:01:23:22 +0800] "GET /static/image/common/pic-next.png HTTP/1.0" 200 1496 "http://www.aboutyun.com/thread-7977-1-1.html" "Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7.0; rv:11.0) like Gecko"

 项目分析

由于这是一个学习项目,没有用户的真实数据,所以我们在本机windows上安装mysql数据,在虚拟机中用Python脚本模拟用户注册信息,向数据库中插入数据。然后在写一个python脚本,利用sqoop工具向虚拟机中的HIVE中导入数据,第一导入为全量导入,以后为增量导入。然后利用HIVE做大数据分析,分析完成后再利用sqoop工具把分析结果存入mysql中。

在日志文件处理方面,我们先在网上下载好离线的apache日志数据,利用python脚本每天定时向HDFS中导入日志数据,然后利用mapreduce对日志数据进行清洗,清洗完以后的数据再导入到HIVE中,然后在HIVE中对日志文件进行分析。分析完成后利用sqoop工具将分析结果导入到Mysql中。

这就是这个项目的的主要工作类容及工作流程。

 项目流程图

 

 项目开发

创建数据库db_etl,新建两张表user oder。表结构如第一部分图所示。

编写python脚本,实现自动向mysql中插入数据。

新建python 项目,目录结构如下图

 

编写代码如下:

# _*_ coding:UTF-8 _*_'''Created on 2016年12月1日
@author: duking'''import MySQLdb
import random,stringimport time
import threading'''数据库连接'''def ConnMysql():
    #连接数据库
    conn = MySQLdb.connect(host = "192.168.0.154", user = 'root', passwd = '123456', db = 'db_etl', charset = 'utf8')
    cursor = conn.cursor()    return conn,cursor'''插入user数据'''def AddUserInfo(username,passwd):
    
    conn,cursor = ConnMysql()
    
    sql = "insert into userinfo(username,passwd) values(%s,%s)"
    param = (username,passwd)
     
    cursor.execute(sql,param)
    
    conn.commit()
    cursor.close()
    conn.close()    
'''插入order数据'''def AddOderInfo(warename,price):
    
    conn,cursor = ConnMysql()
    
    sql = "insert into oderinfo(warename,price) values(%s,%s)"
    param = (warename,price)
     
    cursor.execute(sql,param)
    
    conn.commit()
    cursor.close()
    conn.close()'''随机产生字符串'''def Random_Str(randomlength):
    a = list(string.ascii_letters)
    random.shuffle(a)    return ''.join(a[:randomlength])
    
#随机生成订单信息
def MakeOderInfo(threadname):    while(True):
        #随机10~100秒生成一条Oder信息
        time.sleep(random.randint(10,100))   
        AddOderInfo(Random_Str(random.randint(6,10)),float(round(random.uniform(10,100),2)))
        print threadname + ':a new OderInfo is Maked    ' + time.ctime(time.time())
#随机生成用户信息
def MakeUserInfo(threadname):    while(True):
        time.sleep(random.randint(20,100))
        AddUserInfo(Random_Str(random.randint(6,10)),Random_Str(random.randint(6,10)))
        print threadname + ':a new UserInfo is Maked    ' +time.ctime(time.time())
    
    
#python 模块的入口:main函数if __name__ == '__main__':
    
    #多线程
    thread_1 = threading.Thread(target=MakeOderInfo,args=('thread_1', ))
    thread_2 = threading.Thread(target=MakeUserInfo,args=('thread_2', ))
    
    #启动线程
    thread_1.start()
    thread_2.start()

注意:python调用mysql需要引入MySQLdb模块,改模块的安装请看另外的教程

最后,将写好的pythonlinux中运行。

运行后查看数据库就可以看见数据在不断的增长了。

一 在HIVE中创建ETL数据库

 ->create database etl;

二 在工程目录下新建MysqlToHive.py 和conf文件夹

 在conf文件夹下新建如下文件,最后的工程目录如下图

 

三 源码

 Import.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
<?xml version="1.0" encoding="UTF-8"?>
<root>
    <importtype>
        <value>add</value>    <!-- 增量导入或者全导入 -->
    </importtype>
    <task type="all">
        <table>user_all</table> <!-- 数据库中需要增量导入的第一张表名 -->
        <table>oder_all</table> <!-- 数据库中需要增量导入的第一张表名 -->
    </task>
    <task type="add">
        <table>user_add</table> <!-- 数据库中需要增量导入的第一张表名 -->
        <table>oder_add</table> <!-- 数据库中需要增量导入的第一张表名 -->
    </task>
</root>

 oder_add.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
<?xml version="1.0" encoding="UTF-8"?>
<root>
    <sqoop-shell type="import">
        <param key="connect">jdbc:mysql://192.168.0.154:3306/db_etl</param> <!-- 数据库连接地址 -->
        <param key="username">root</param> <!-- 数据库用户名 -->
        <param key="password">123456</param> <!-- 数据库密码 -->
        <param key="table">oderinfo</param><!-- 数据库中待导出的表名 -->
        <param key="hive-database">etl</param> <!-- 指定导入到HIVE的哪个数据库中 -->
        <param key="hive-partition-key">dt</param>   <!-- 通过时间分区 -->
        <param key="hive-partition-value">$dt</param>
        <param key="hive-import"></param>
        <param key="check-column">crt_time</param> <!-- 增量导入检查的列 -->
        <param key="incremental">lastmodified</param> <!-- 按照时间簇来进行增量导入 -->
        <param key="last-value">23:59:59</param> <!-- 增量导入时间划分点 -->
        <param key="num-mappers">1</param>   <!-- 使用map任务个数 -->
        <param key="split-by">id</param> <!-- 将表按照id水平切分交给map处理  -->
    </sqoop-shell>
</root>

 oder_all.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
<?xml version="1.0" encoding="UTF-8"?>
<root>
    <sqoop-shell type="import">
        <param key="connect">jdbc:mysql://192.168.0.154:3306/db_etl</param>   <!-- 数据库连接地址 -->
        <param key="username">root</param><!-- 数据库用户名 -->
        <param key="password">123456</param><!-- 数据库密码 -->
        <param key="table">oderinfo</param><!-- 数据库中待导出的表名 -->
        <param key="hive-database">etl</param> <!-- 指定导入到HIVE的哪个数据库中 -->
        <param key="hive-partition-key">dt</param>   <!-- 通过时间分区 -->
        <param key="hive-partition-value">$dt</param>
        <param key="hive-import"></param>                 
        <param key="create-hive-table"></param>   <!-- 在hive中新建一张同名同结构的表 -->
        <param key="hive-overwrite"></param> <!-- 覆盖原来以存在的表 -->
        <param key="num-mappers">1</param>   <!-- 使用map任务个数 -->
        <param key="split-by">id</param> <!-- 将表按照id水平切分交给map处理  -->
    </sqoop-shell>
</root>

 user_add.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
<?xml version="1.0" encoding="UTF-8"?>
<root>
    <sqoop-shell type="import">
        <param key="connect">jdbc:mysql://192.168.0.154:3306/db_etl</param> <!-- 数据库连接地址 -->
        <param key="username">root</param> <!-- 数据库用户名 -->
        <param key="password">123456</param> <!-- 数据库密码 -->
        <param key="table">userinfo</param><!-- 数据库中待导出的表名 -->
        <param key="hive-database">etl</param> <!-- 指定导入到HIVE的哪个数据库中 -->
        <param key="hive-partition-key">dt</param>   <!-- 通过时间分区 -->
        <param key="hive-partition-value">$dt</param>
        <param key="hive-import"></param>
        <param key="check-column">crt_time</param> <!-- 增量导入检查的列 -->
        <param key="incremental">lastmodified</param> <!-- 按照时间簇来进行增量导入 -->
        <param key="last-value">23:59:59</param> <!-- 增量导入时间划分点 -->
        <param key="num-mappers">1</param>   <!-- 使用map任务个数 -->
        <param key="split-by">id</param> <!-- 将表按照id水平切分交给map处理  -->
    </sqoop-shell>
</root>

 user_all.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
<?xml version="1.0" encoding="UTF-8"?>
<root>
    <sqoop-shell type="import">
        <param key="connect">jdbc:mysql://192.168.0.154:3306/db_etl</param>   <!-- 数据库连接地址 -->
        <param key="username">root</param><!-- 数据库用户名 -->
        <param key="password">123456</param><!-- 数据库密码 -->
        <param key="table">userinfo</param><!-- 数据库中待导出的表名 -->
        <param key="hive-database">etl</param> <!-- 指定导入到HIVE的哪个数据库中 -->
        <param key="hive-partition-key">dt</param>   <!-- 通过时间分区 -->
        <param key="hive-partition-value">$dt</param>
        <param key="hive-import"></param>                 
        <param key="create-hive-table"></param>   <!-- 在hive中新建一张同名同结构的表 -->
        <param key="hive-overwrite"></param> <!-- 覆盖原来以存在的表 -->
        <param key="num-mappers">1</param>   <!-- 使用map任务个数 -->
        <param key="split-by">id</param> <!-- 将表按照id水平切分交给map处理  -->
    </sqoop-shell>
</root>

 MysqlToHive.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# _*_ coding:UTF-8 _*_
'''
Created on 2016??12??1??
@author: duking
'''
import datetime
import os
import xml.etree.ElementTree as ET
import collections
#获取昨天时间
def getYesterday():
    today=datetime.date.today()
    oneday=datetime.timedelta(days=1)
    yesterday=today-oneday 
    return yesterday
def Resolve_Conf(dt):
    #获取当前工程目录
    PROJECT_DIR = os.getcwd()
    #获得配置文件名
    conf_file = PROJECT_DIR + "\conf\Import.xml"
    #解析配置文件
    xml_tree = ET.parse(conf_file)
    #提取出本次导入的类型  全导入或者增量导入  通过配置import.xml中的plan标签的value值设定
    import_types = xml_tree.findall('./importtype')
    for import_type in import_types:
        aim_types = import_type.findall('./value')
        for in range(len(aim_types)):
            aim_type = aim_types[i].text
    #获得task元素
    tasks = xml_tree.findall('./task')
    #用来保存待执行的sqoop命令的集合
    cmds = []
    for task in tasks:
        #获得导入类型,增量导入或者全量导入
        import_type = task.attrib["type"]
        #如果task的标签导入类型与设定类型不同,结束本次循环
        if(import_type != aim_type):
            continue
        #获得表名集合
        tables = task.findall('./table')
        #迭代表名集合,解析表配置文件
        for in range(len(tables)):
            #表名
            table_name = tables[i].text
            #表配置文件名
            table_conf_file = PROJECT_DIR + "\conf\\" + table_name + ".xml"
            #解析表配置文件
            xmlTree = ET.parse(table_conf_file)
            #获取sqoop-shell 节点
            sqoopNodes = xmlTree.findall("./sqoop-shell")
            #获取sqoop 命令类型
            sqoop_cmd_type = sqoopNodes[0].attrib["type"]
            #首先组装成sqoop命令头
            command = "sqoop " + sqoop_cmd_type
            #获取
            praNodes = sqoopNodes[0].findall("./param")
            #用来保存param的信息的有序字典
            cmap = collections.OrderedDict()           
            #将所有param中的key-value存入字典中
            for in range(len(praNodes)):
                #获取key的属性值
                key = praNodes[i].attrib["key"]
                #获取param标签中的值
                value = praNodes[i].text
                #保存到字典中
                cmap[key] = value
            #迭代字典将param的信息拼装成字符串
            for key in cmap:
                value = cmap[key]
                #如果不是键值对形式的命令 或者值为空,跳出此次循环
                if(value == None or value == "" or value == " "):
                    value = ""
                if(key == "hive-partition-value"):
                    value = value.replace('$dt',str(dt)) 
                #合成前一天的时间
                if(key == "last-value"):
                    value = '"' + str(dt) + " " + value + '"'
                #拼装为命令
                command += " --" + key + " " + value + " "
            #将命令加入至待执行的命令集合
            cmds.append(command)
    return cmds  
#python 模块的入口:main函数
if __name__ == '__main__':
    dt = getYesterday();
    #解析配置文件,生成相应的HQL语句
    cmds = Resolve_Conf(dt)
    #迭代集合,执行命令
    for in range(len(cmds)):
        cmd = cmds[i]
        print cmd
        #执行导入过秤
        os.system(cmd)


CIO之家 www.ciozj.com 公众号:imciow
关联的文档
也许您喜欢