大数据我们也许听得很多了,一般都知道hadoop,但并不都是hadoop。那么我们该如何构建自己的大数据项目呢?对于离线处理,hadoop还是比较适合的,但对于实时性比较强的,数据量比较大的,可以采用spark,那spark又跟什么技术搭配才能做一个适合自己的项目呢?各技术之间又是如何整合的呢?这篇文章将给大家介绍下大数据项目中用到的各技术框架知识,并通过一个实际项目的分布式集群部署和实际业务应用来详细讲述大数据架构是如何构建的,供大家参考。
1.1 数据采集
负责从各节点上实时采集数据,选用flume来实现
Flume介绍
Flume是Cloudera(Hadoop数据管理软件与服务提供商)提供的一个分布式、可靠、和高可用的海量日志采集、聚合和传输的日志收集系统,支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。
Flume的一些核心概念
Flume的数据流模型
Flume以agent为最小的独立运行单位。一个agent就是一个JVM(JavaVirtual Machine)。单agent由Source、Sink和Channel三大组件构成,如下图:
Flume的数据流由事件(Event)贯穿始终。事件是Flume的基本数据单位,它携带日志数据(字节数组形式)并且携带有头信息,这些Event由Agent外部的Client,比如图中的WebServer生成。当Source捕获事件后会进行特定的格式化,然后Source会把事件推入(单个或多个)Channel中。你可以把Channel看作是一个缓冲区,它将保存事件直到Sink处理完该事件。Sink负责持久化日志或者把事件推向另一个Source。
Flume Sources
我们系统中使用的是Syslog Sources
Flume Sinks
我们系统中使用的是HDFS Sink和Kafka Sink
1.2、数据接入
由于采集数据的速度和数据处理的速度不一定同步,因此添加一个消息中间件来作为缓冲,选用apache的kafka,对于离线数据,选用hdfs
HDFS介绍
HDFS(Hadoop Distribute File System Hadoop分布式文件系统)是一个运行在通用硬件上的分布式文件系统,是ApacheHadoop Core项目的一部分。HDFS有着高容错性(fault-tolerant)的特点,并且设计用来部署在低廉的(low-cost)硬件上。而且它提供高吞吐量(high throughput)来访问应用程序的数据,适合那些有着超大数据集(large data set)的应用程序。HDFS放宽了(relax)POSIX的要求(requirements)这样可以实现流的形式访问(streaming access)文件系统中的数据。
HDFS的一些核心概念
HDFS的高可用性
在一个典型的HA(High Availability)集群中,每个NameNode是一台独立的服务器。在任一时刻,只有一个NameNode处于active状态,另一个处于standby状态。其中,active状态的NameNode负责所有的客户端操作,standby状态的NameNode处于从属地位,维护着数据状态,随时准备切换。
两个NameNode为了数据同步,会通过一组称作JournalNodes的独立进程进行相互通信。当active状态的NameNode的命名空间有任何修改时,会告知大部分的JournalNodes进程。standby状态的NameNode有能力读取JNs中的变更信息,并且一直监控edit log的变化,把变化应用于自己的命名空间。standby可以确保在集群出错时,命名空间状态已经完全同步了,如图所示
HDFS的体系结构
HDFS是一个主/从(Master/Slave)体系结构,从最终用户的角度来看,它就像传统的文件系统一样,可以通过目录路径对文件执行CRUD(Create、Read、Update和Delete)操作。但由于分布式存储的性质,HDFS集群拥有一个NameNode和一些DataNode。NameNode管理文件系统的元数据,DataNode存储实际的数据。客户端通过同NameNode和DataNodes的交互访问文件系统。客户端联系NameNode以获取文件的元数据,而真正的文件I/O操作是直接和DataNode进行交互的。如图所示。
Kafka介绍
Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。 对于像Hadoop的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群机来提供实时的消费。
Kafka的一些核心概念
Kafka拓扑结构
一个典型的Kafka集群中包含若干Producer(可以是web前端产生的PageView,或者是服务器日志,系统CPU、Memory等),若干broker(Kafka支持水平扩展,一般broker数量越多,集群吞吐率越高),若干Consumer Group,以及一个Zookeeper集群。Kafka通过Zookeeper管理集群配置,选举leader,以及在Consumer Group发生变化时进行rebalance。Producer使用push模式将消息发布到broker,Consumer使用pull模式从broker订阅并消费消息。如图所示
Zookeeper介绍
ZooKeeper是一个为分布式应用所设计的分布的、开源的协调服务。它提供了一些简单的操作,使得分布式应用可以基于这些接口实现诸如配置维护、域名服务、分布式同步、组服务等。
Zookeeper很容易编程接入,它使用了一个和文件树结构相似的数据模型。可以使用Java或者C来进行编程接入。
Zookeeper的一些基本概念
Zookeeper的系统模型
Zookeeper的设计目的
(1)最终一致性
client不论连接到哪个Server,展示给它都是同一个视图,这是zookeeper最重要的性能。
(2)可靠性
具有简单、健壮、良好的性能,如果消息m被一台服务器接受,那么它将被所有的服务器接受。
(3)实时性
Zookeeper保证客户端将在一个时间间隔范围内获得服务器的更新信息,或者服务器失效的信息。但由于网络延时等原因,Zookeeper不能保证两个客户端能同时得到刚更新的数据,如果需要最新数据,应该在读数据之前调用sync()接口。
(4)等待无关(wait-free)
慢的或者失效的client不得干预快速的client的请求,使得每个client都能有效的等待
(5)原子性
更新只能成功或者失败,没有中间状态。
(6)顺序性
包括全局有序和偏序两种:全局有序是指如果在一台服务器上消息a在消息b前发布,则在所有Server上消息a都将在消息b前被发布;偏序是指如果一个消息b在消息a后被同一个发送者发布,a必将排在b前面。
1.3、流式计算
对采集到的数据进行实时分析,选用apache的Spark
Spark介绍
Spark是UCBerkeley AMP lab (加州大学伯克利分校的AMP实验室)所开源的类Hadoop MapReduce的通用并行框架,而后又加入到Apache孵化器项目。Spark,拥有HadoopMapReduce所具有的优点;但不同于MapReduce的是Job中间输出结果可以保存在内存中,从而不再需要读写磁盘,因此Spark能更好地适用于数据挖掘与机器学习等需要迭代的MapReduce的算法。
Spark不仅实现了MapReduce的算子map 函数和reduce函数及计算模型,还提供更为丰富的算子,如filter、join、groupByKey等。是一个用来实现快速而通用的集群计算的平台。
Spark支持多处理模式以及支持库,Spark Streaming、Spark SQL、Spark MLlib、Spark GraphX等极大程度方便开发者在大数据处理方面的不同需求
Spark的生态体系
MapReduce属于Hadoop生态体系之一,Spark则属于BDAS(BerkeleyData Analytics Stack,中文:伯克利数据分析栈)生态体系之一
Hadoop包含了MapReduce、HDFS、HBase、Hive、Zookeeper、Pig、Sqoop等
BDAS包含了Spark、Shark(相当于Hive)、BlinkDB、Spark Streaming(消息实时处理框架,类似Storm)等等
Spark的运行模式
l Local (用于测试、开发)
l Standlone (独立集群模式)
l Spark on Yarn (Spark在Yarn上)
l Spark on Mesos (Spark在Mesos)
我们系统中使用的是Spark on Yarn
1.4、数据输出
对分析后的结果持久化,可以使用HDFS、Hbase、MongoDB
Hbase介绍
HBase是ApacheHadoop的数据库,能够对大型数据提供随机、实时的读写访问。HBase的目标是存储并处理大型的数据。HBase是一个开源的,分布式的,多版本的,面向列的存储模型。它存储的是松散型数据。
HBase是一个构建在HDFS上的分布式列存储系统;
HBase是基于GoogleBigTable模型开发的,典型的key/value系统;
HBase是ApacheHadoop生态系统中的重要一员,主要用于海量结构化数据存储;
从逻辑上讲,HBase将数据按照表、行和列进行存储。
与hadoop一样,Hbase目标主要依靠横向扩展,通过不断增加廉价的商用服务器,来增加计算和存储能力。
Hbase的系统架构
关于HBase与ZooKeeper,可以分三点来描述:
A、Zookeeper集群的职责
A.1、负责监控整个hbase集群中节点的状态和通信。
A.2、管理hbase 集群的-ROOT-表,即所有Region Server的地址和HTable信息。
A.3、避免HMsater的单点故障问题(重启故障的HMaster;如果zkLeader挂掉,重新选举出leader)。
B、HMaster Server的职责
B.1、为HRegionserver分配HRegion,并持续均衡负载;
B.2、当有HRegionserver失效时,由HMaster负责重新分配其上的HRegion。
C、HRegion Server的职责
C.1、维护HMaster分配的HRegin,响应客户端的请求(增删改查)。
C.2、管理.META.表数据,该表中包含当前HRegion Server上HRegion的相关信息。
C.3、负责region的切分,并将相关region切分信息更新到.META.表中。
MongoDB介绍
MongoDB是一个基于分布式文件存储的数据库。由C++语言编写。旨在为WEB应用提供可扩展的高性能数据存储解决方案。
MongoDB是一个介于关系数据库和非关系数据库之间的产品,是非关系数据库当中功能最丰富,最像关系数据库的。他支持的数据结构非常松散,是类似json的bson格式,因此可以存储比较复杂的数据类型。MongoDB最大的特点是他支持的查询语言非常强大,其语法有点类似于面向对象的查询语言,几乎可以实现类似关系数据库单表查询的绝大部分功能,而且还支持对数据建立索引。
MongoDB的一些基本概念
MongoDB的数据模型
一个MongoDB 实例可以包含一组数据库,一个DataBase 可以包含一组Collection(集合),一个集合可以包含一组Document(文档)。一个Document包含一组field(字段),每一个字段都是一个key/value pair。
key: 必须为字符串类型。
value:可以包含如下类型:
1、基本类型,例如,string,int,float,timestamp,binary 等类型。
2、一个document。
3、数组类型。
文档结构如下:
2.1、机器规划
2.2、Hadoop+Hbase+Zookeeper集群部署
(1)修改各机器上的/etc/hosts文件,删除文件中本机ip与 机器名对应的一行
作用:有时候/etc/hosts文件里对应的主机名包含一些特殊字符或其他原因导致不能正确解析主机名,最好是去掉这一行
(2)ssh免密码登录
作用:Hadoop中主节点管理从节点是通过SSH协议登录到从节点实现的,而一般的SSH登录,都是需要输入密码验证的,为了Hadoop主节点方便管理成千上百的从节点,这里将主节点公钥拷贝到从节点,实现SSH协议免秘钥登录,我这里做的是所有主从节点之间机器免秘钥登录
在三台机器上分别执行:
ssh-keygen -t rsa
chmod 755 用户目录
chmod 700 用户目录的~/.ssh
在机器上执行了ssh-keygen -t rsa命令后会在用户目录的~/.ssh文件夹中生成这两个文件:id_rsa和id_rsa.pub,其中id_rsa中存的算法rsa生成的私钥,id_rsa.pub存放算法rsa生成的公钥。
下一步,将所有机器上的id_rsa.pub中的内容汇总到某一台机器上,并写入文件:用户目录的~/.ssh/authorized_keys中。
注意:每台机器的id_rsa.pub中的内容在文件authorized_keys中是一行
下一步:将用户目录的~/.ssh/authorized_keys文件复制到其他两台机器上
在每台机器上执行:
chmod 600 用户目录的~/.ssh/authorized_keys
(3)Zookeeper部署
A、下载zookeeper的安装包, 解压到合适目录
B、修改配置文件:zoo.cfg
集群中部署了几台zookeeper机器,文件中就添加几行,并且集群中的zookeeper机器上的zoo.cfg文件必须一致
(4)Hadoop部署
A、下载hadoop的安装包, 解压到合适目录
B、配置hadoop
修改hadoopconf目录下的文件,特别是core-site.xml,hadoop-env.sh,hdfs-site.xml,mapred-site.xml,yarn-site.xml,注意根据实际部署机器,修改ip和端口
复制hadoopconf目录到集群中其他两台机器
(5)Hbase部署
A、下载hbase的安装包, 解压到合适目录
B、配置Hbase
修改hbaseconf目录下的文件,特别是backup-masters,hbase-env.sh,hbase-site.xml,regionservers,注意根据实际部署机器,修改ip和端口
复制hbaseconf目录到集群中其他两台机器
(6)启动集群
A、首次启动集群
B、非首次启动集群
(7)关闭集群
注意:一定要按顺序停止,如果先停 ZooKeeper 再停 HBase 的话,基本停不下来
(8)HDFS实用工具
HDFS状态查询:bin/hdfsdfsadmin -report
离开安全模式:bin/hdfs dfsadmin -safemodeleave
文件目录查询:bin/hadoop fs -ls
文件内容查看:bin/hadoop fs -cat
文件上传:bin/hadoop fs -put
文件下载:bin/hadoop fs -get
文件(夹)删除:bin/hadoop fs -rm (-rmr)
(9)Hbase实用工具
连接hbase客户端:bin/hbaseshell
shell命令:
表清单:list
表创建:
create 'UserProfile', {NAME => 'realtime',VERSIONS => 1, TTL => 2147483647},{NAME => 'profile', VERSIONS =>1, TTL => 2147483647}
表查询:
scan 'UserProfile', LIMIT => 10
表数据构造:(put '表名','openid','列族:列','值')
put 'UserProfile','1222234','realtime:sex','1'
put 'UserProfile','4555554','realtime:age','25'
表删除,分两步:
disable 'UserProfile'
drop 'UserProfile'
查看表结构:
describe 'UserProfile'
(10)Kafka部署
A、下载kafka的安装包,解压到合适目录
B、配置Kafka
修改kafkaconf目录下的文件,特别是server.properties、zookeeper.properties、producer.properties、consumer.properties
,注意根据实际部署机器,修改ip和端口
C、启动Kafka
kafka/bin/kafka-server-start.sh../config/server.properties
启动成功后可通过jps命令查看进程
D、停止Kafka
kafka/bin/kafka-server-stop.sh
E、Kafka实用命令
数据查询:bin/kafka-console-consumer.sh
数据构造:bin/kafka-console-producer.sh
Topic查询修改:bin/kafka-topics.sh
(11)Flume部署
A、下载flume的安装包,解压到合适目录
B、配置Flume
修改flume/conf/目录下的文件,特别是core-site-lf.xml、hdfs-site-lf.xml、flume-wq.conf,注意根据实际部署机器,修改ip和端口
C、启动Flume
flume/sbin/flume-ng agent --conf conf --conf-fileconf/flume-wq.conf --name a1
启动成功后可通过jps命令查看进程
(12)Mongodb部署
A、下载mongodb的安装包,解压到合适目录
B、配置Mongodb
修改/export/shadows/mongodb/conf目录下的文件:configsvr.conf,mongos.conf,shard1.conf
C、启动Mongodb
启动配置服务:
mongodb/bin/mongod -f conf/configsvr.conf
启动路由服务:
mongos -f conf/mongos.conf
启动物理存储:
mongod -f conf/shard1.conf
启动成功后查看进程:ps -ef |grep mongo
D、Mongodb实用工具
连接客户端:mongo --port 30000
客户端命令:
数据库清单:show databases;
数据库切换:use tablename1;
集合(表)清单:show collections;
查询表所有记录:db.tablename1.find(); //相当于select * from tablename1;
条件查询:db.tablename1.find({'groupId':'8037'}); //相当于select * from tablename1where groupId = 8037
3.1、业务曝光
(1)配置中心修改如下手工服务为flume机器的ip和端口
mcoss_ds_mmart_show-------全量数据曝光
mcoss_ds_simpshow-----------简要数据曝光
mcoss_ds_keystat--------------关键数据曝光
(2)访问曝光tws服务
http://wq.jd.com/mcoss/mmart/show?actid=xxxx&pc=xxxx
(3)数据验证
A hdfs数据验证
hadoop fs -ls -h /data/flume 类似于ls命令
hadoop fs -cat /data/flume/xxx/xxx/xxx/xxx 类似于cat查看文件内容
下载文件到本地:
hadoop fs-get /data/flume/xxx/xxx/xxx/xxx /tmp/xxx
B Kafka数据验证
查看有哪些topic
kafka/bin/kafka-topics.sh --list --zookeeper zookeeper机器ip:端口,zookeeper机器ip:端口,zookeeper机器ip:端口
数据查询:
./kafka-console-consumer.sh --zookeeper zookeeper机器ip:端口,zookeeper机器ip:端口,zookeeper机器ip:端口 --from-beginning --topic mmart_show|more
CIO之家 www.ciozj.com 公众号:imciow