初识Hadoop
鸣宇淳 博客园

第一部分:              初识Hadoop

一、             谁说大象不能跳舞

业务数据越来越多,用关系型数据库来存储和处理数据越来越感觉吃力,一个查询或者一个导出,要执行很长时间,这是因为数据的吞吐量太大了,导致整个程序看上去像一只体型庞大、行动笨拙的大象。

Hadoop天生就是来解决数据吞吐量太大的,它可以使大数据的存储和处理变的快速、使得应用程序运行的更加的轻盈。像《Hadoop权威指南》封皮上那句话:谁说大象不能跳舞?!

二、             Hadoop解决的问题

Hadoop解决的问题就是大数据存储和运算问题。

这里要说一个宏观的问题,就是大数据产业链,来说明Hadoop在产业链中所处的位置,以便于更好的理解Hadoop是做什么的。

所谓大数据的产业链,就像于普通商品的产业链,普通商品的产业链先是原材料的搜集、原材料的预加工、深加工、 制造成各种各样的商品、最终销售变现获得利润。

大数据产业链非常类似,也可以分为三个阶段:

1、  大数据的搜集、整理阶段

2、  大数据的存储、处理阶段

3、  大数据的应用、变现阶段

Hadoop就是来解决第二步,数据存储和处理的。

拿一般公司的系统而言,数据的搜集渠道很多,也已经有了,比如订单数据、PV、UV数据等等。搜集到数据这是第一步。

第二步是如何存储和运算这些大数据,这个就是Hadoop擅长的事情了。

第三步是和具体业务相关的,是从需求的角度为出发点利用第二步中存储的大数据,来做一个应用程序。比如想做一个基于PV、UV、网友在页面上的操作路径数据,来进行分析网友行为的应用程序,来指导如何优化页面和流程,以便于更好的引导客户下单,从而实现我们的利润。

三、             Hadoop的核心和概念

像前面所说的,Hadoop是一个存储和处理大数据的解决方案,相对应地Hadoop的核心有两个:

HDFS:HDFS是分布式文件系统,提供了数据的存储方案。

MapReduce:MapReduce是一个平行运算架构,提供了数据的处理解决方案。

咱们平常说Hadoop,怎么理解所说的Hadoop,它的概念是什么?

其实Hadoop概念分狭义的和广义的:

1、  狭义的Hadoop只是Hadoop核心部分 (HDFSMapReduce)

2、  广义的Hadoop其实是泛指在HDFS+MapReduce核心上衍生出来的一个大的Hadoop生态系统。衍生出来的有:HbasePigHive等等

四、             Hadoop生态系统


Hadoop的生态系统中,除了HDFSMapReduce核心部分外,其他的这些灰色的,是衍生出来的项目,他们一起组成了Hadoop的生态系统。

(1)  HDFS和MapReduceHadoop的核心,一个负责数据存储,一个负责数据处理

(2)  HDFS特点是分布式存储、顺序读、只能追加

l  分布式:会将一个大文件分割后存储在不同的节点,每一个节点叫一个DataNode

l  顺序读:在分割后的各个DataNode块上,只能从前往后读取。而且在一个具体应用中,几乎每次读取都要读取全部数据。

l  只能追加:HDFS中,数据写入只能追加到文件的末尾,不支持随机读写。

l  这样的设计看上去挺傻的、简单粗暴,但正是分布式、顺序读、只追加这些简单的规则组合起来,才是解决数据吞吐量大这个问题的方法。

l  因为HDFS不支持随机读写,所以对数据访问的实时性没法保证的。它不适合做对响应时间要求比较高的应用,Hbase可以提供实时响应的功能。

(3)  MapReduce是线性的、可伸缩的编程模型,核心是两个函数:map()reduce()

l  线性表现在:所有的数据处理都是顺序执行的,各个执行节点顺序执行map()reduce(),这个思想和原来的结构化编程思想很像。

l  可伸缩表现在:可以在很多运算节点并行执行,而节点数量理论上是可以无限增加的。

(4)  其他都是衍生出来的项目或者叫做工具,作用是利于人们使用Hadoop,他们各有各的分工,各有各的专长

(5)  Sqoop:Hadoop的数据来源往往是原有的关系型数据库,经过Hadoop的处理后,往往是把结果数据再存入关系型数据库中。Sqoop就是用来做数据导入导出的,Sqoop是关系型数据库和HDFS之间数据的传输工具。

(6)  HBase是在HDFS基础上开发出来的面向列的分布式数据库。它用巧妙的方式解决了在不违反HDFS存取规则的前提下,提供随机读写、实时响应的功能。

(7)  ChukWa是一个对整个Hadoop集群运行情况进行分析和反馈的工具,叫他监控不太合适,因为他有一定的延迟。他能够为Hadoop集群的使用者、Hadoop程序开发者、Hadoop集群的运维人员等人提供他们各自关心的内容。

l  使用者就是在集群上执行自己任务的人员,他们可以看到自己的任务运行状态、作业日志和作业输出结果。

l  程序开发者可以看到自己所写的程序运行情况、运行瓶颈、错误日志。

l  运维人员可以看到集群中硬件故障情况、异常状态、资源消耗情况

(8)  ZooKeeper:Hadoop是分布式大的集群,在Job调度上必然变的非常复杂,ZooKeeper就是为分布式应用程序,开发的协调服务。

(9)  Pig:Hadoop中数据的处理,就是通过执行很多很多map()reduce()函数来操作,所以程序员应该写大量的map函数和reduce函数,并且要了解数据存储和数据转换的细节,是一个非常有挑战的工作。

然后雅虎开发了Pig Lation 语言,使得写程序更简单。Pig Lation程序最终还是会转换为一堆map、reduce函数来执行。

Pig项目分为两个部分:Pig Latin语言和Pig Latin的执行环境。

(10)雅虎有PigFacebook也有自己的解决方案,当时为了给熟悉SQL的人员提供一个好用的工具来对Hadoop中的数据来做数据分析,所以Facebook就开发了HiveHive的语法更像SQL。和Pig一样,Hive写的语句,最终还是要翻译为mapreduce函数来执行。

整个Hadoop生态系统中,HDFS和MapReduce是比较底层的东西,实际应用开发时,不太可能会用到,而是利用基于它们这两个核心之上的各种工具来开发。

这里就体现了开源的好处,大师级的人物设计了核心的东西,各路大牛就在核心上开发出简单好用的工具。

第二部分:              MapReduce执行机制

五、             MapReduce的组成

在MapReduce中,有两类节点控制整个程序的执行:一个jobtracker和多个tasktracker。

1、  Jobtracker是管理者,tasktracker是执行者。

2、  Jobtracker调度tasktracker上运行的任务,tasktracker会将运行状态向jobtracker报告,如果一个tasktracker失败了,jobtracker就会调度另外一个tasktracker重新运行。

3、  TaskTracker分为Map TaskReduce Task

六、             基本执行过程

MapReduce过程分为两个阶段:map函数阶段和reduce函数阶段

1、  map函数是数据准备阶段,并筛选掉非需要的数据,以键值对的形式输出,map函数核心目的是形成对数据的索引,以供reduce函数方便对数据进行分析。

2、  reduce函数以Map函数的输出数据为数据源,对数据进行相应的分析,输出结果为最终的目标数据。

3、  由于map任务的输出结果传递给reduce任务过程中,是在节点间的传输,是占用带宽的,这样带宽就制约了程序执行过程的最大吞吐量,为了减少mapreduce间的数据传输,在map后面添加了combiner函数来就map结果进行预处理,combiner函数是运行在map所在节点的。

七、             图解MapReduce执行过程

多个map节点和多个reduce节点

Hadoop将MapReduce输入的数据划分为等长的小分片,一般每个分片是64M,因为HDFS的每个块是64MHadoop 2.X中将这个数改为128M

1、  每个分片数据分配一个map任务,任务内容是用户写的map函数,map函数是尽量运行在数据分片的机器上,这样保证了“数据本地优化”。

2、  map任务的结果是各自排好序的,各个map结果进行再次排序合并后,作为reduce任务的输入。

3、  reduce任务执行reduce函数来处理数据,得到最终结果后,存入HDFS

4、  会有多个reduce任务,每个reduce任务的输入都来自于许多map任务,map任务和reduce任务之间是需要传输数据的,占用网络资源,影响效率,为了减少数据传输,可以在map()函数后,添加一个combiner函数来对结果做预处理。

八、             一个Java版本的MapReduce例子

1、  需求

我们有很多订单,订单有是下给经销商的,所以有dealerid信息,也有没有指定经销商的无主订单,dealerid为0。数据存储在一个文本文件中orderdata.txt,假设这个文件中大约有3亿条数据,存储在HDFS上。

文件格式如下,各个字段之间用\t分割。

需求是:统计各个经销商下的订单数量,无主订单不统计。

输出应该类似于:

2、  代码

/**
 * Created by 
鸣宇淳 on 2016/3/23.
 */
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import java.io.IOException;
import java.util.StringTokenizer;

/*
功能:OrderDemo用来统计订单中下到各个经销商的订单数量,无主订单不统计
文件:订单信息在orderdata.txt文件中,文件中信息存储各个是:
格式:手机号      经销商ID   省份ID   城市ID   品牌ID   车型ID
例如:13146198476 3012   440000   440100   25     3467
分隔符:\t
 */
public class OrderDemo {

    public static void main(String[] args) throws Exception {

        //
新建Hadoop配置类
        Configuration conf = new Configuration();

        //
新建一个MapReduce JobJob名字为OrderDemoJob,这个名字会显示在监控中
        Job job = new Job(conf, "OrderDemoJob");

        //
设置MapReducer任务所用的jar,根据类找Jar
        job.setJarByClass(OrderDemo.class);

        //
设置Map方法所在类
        job.setMapperClass(Map.class);

 

//设置Combiner

job.setCombinerClass(Combiner.class);


        //
设置Reduce方法所在类
        job.setReducerClass(Reduce.class);

        //
设置MapReduce Job输入数据格式
        job.setInputFormatClass(TextInputFormat.class);

        //
设置输出数据格式
        job.setOutputFormatClass(TextOutputFormat.class);

        //
设置MapReduce Job最终输出键的格式
        job.setOutputKeyClass(Text.class);

        //
设置MapReduce Job最终输出值的格式
        job.setOutputValueClass(IntWritable.class);

        //
设置输入文件路径
        FileInputFormat.addInputPath(job, new Path(args[0]));

        //
设置输出文件路径
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        //
提交任务给JobTracker,等待返回值
        job.waitForCompletion(true);
    }
    public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
        public void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            String line = value.toString();
            String dealerid= line.split("\t")[1];
            if(!dealerid.equals("0"))//
不计算无主订单
            {
                context.write(new Text(dealerid), new IntWritable(1));
            }
        }
    }
    public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {
        public void reduce(Text key, Iterable<IntWritable> values, Context context)
                throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            context.write(key, new IntWritable(sum));
        }
    }

   public static class Combiner extends Reducer<Text, IntWritable, Text, IntWritable> {
    public void reduce(Text key, Iterable<IntWritable> values, Context context)
            throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable val : values) {
            sum += val.get();
        }
        context.write(key, new IntWritable(sum));
    }
}

}

3、  代码执行过程推演

 

4、   在Hadoop上运行

(1)  拷贝数据源文件orderdata.txt、程序OrderDemo.jar包文件到Linux

(2)  HDFS上新建输入目录input

[centos@hadoop1 ~]$ hadoop fs -mkdir /chybindemo/input

(3)  orderdata.txt放入input目录中

[centos@hadoop1~]$ hadoop fs -put /chybindemo/orderdata.txt /chybindemo/input

(4)  执行Hadoop,运行OrderDemo.jar程序

[centos@hadoop1~]$ hadoop  jar  /chybindemo/input/hadoopdemo.jar OrderDemo /chybindemo/input  /chybindemo/output

格式:

hadoop jar  [jar包路径]  [类名]  [输入路径]  [输出路径]

(5)  查看输出目录

[centos@hadoop1 ~]$ hadoop fs -ls -R /chybindemo/output

(6)  查看结果文件

[centos@hadoop1 ~]$ hadoop fs -cat /chybindemo/output/part-r-00000

九、             MapReduce执行流程细节(Hadoop 1.x版本下)

1、  客户端提交MapReduce作业。

客户端调用Jobclient.runJob()方法,新建JobClient实例并调用submitJob()方法,将作业提交给JobTracker,submitJob过程如下:

(1)  jobtracker请求一个新的Job ID,是调用的getNewJob()方法。

(2)  检查作业的相关路径配置,如输入目录、输出目录是否正确。

(3)  计算作业的输入分片个数。

(4)  将运行作业所需要的资源复制到HDFS中。资源如:Jar包、配置文件等

(5)  告知jobtracker作业准备执行,调用submitJob方法提交作业。

2、  作业初始化。

jobtracker根据每个分片信息,创建一系列的map任务和一定数量的reduce任务。

3、  分配任务。

并不是Jobtracker主动给tasktracker分配任务,而是tasktrakcer来主动领任务。tasktracker通过心跳来告知jobtracker自己是否可以接受任务,jobtracker给tasktracker分配任务后,tasktracker来执行任务。

分配map任务时,jobtracker要考虑数据本地化。

分配reduce任务时,不需要考虑数据本地化。

4、  执行任务

执行任务过程包括:

(1)  复制运行用的资源文件复制到tasktracker所在的文件系统,实现作业的Jar包本地化。

(2)  tasktracker为任务新建一个本地工作目录,并把jar包中内容解压到这个目录下。

(3)  tasktracker新建一个taskRunner实例来运行该任务

5、  进度、状态报告

Tasktracker通过心跳来告诉jobtracker作业执行状态,jobtracker汇中整个作业的状态,客户端JobClient通过每秒查询JobTracker来查询作业进度

6、  任务完成

Jobtracker接受到最后一个任务已完成的通知后,就把作业状态设置为“成功”,jobclient查询状态时发现已经成功,就退出runJob方法。

十、             MapReduce和关系型数据库管理系统的比较

MapReduce适合一次写入,多次读取的应用场景;关系型数据库则适用于持续更新的应用场景;

MapReduce数据存储是非结构化或者半结构化得;关系型数据库则严格符合结构化的存储要求;

在数据量和数据操作时间的关系上,MapReduce是线性的,而关系型数据库非线性的。

MapReduce一个核心特征是数据本地化;关系型数据库是偏重于数据的集中存储。

第三部分:              HDFS介绍

十一、      HDFS简介

1、  HDFS全称

Hadoop Distributed FileSystem,Hadoop分布式文件系统。

Hadoop有一个抽象文件系统的概念,Hadoop提供了一个抽象类org.apache.hadoop.fs.FilesSystemHDFS是这个抽象类的一个实现。其他还有:

文件系统

URI方案

Java实现(org.apache.hadoop

Local

file

fs.LocalFileSystem

HDFS

hdfs

hdfs.DistrbutedFilesSystem

HFTP

hftp

hdfs.HftpFilesSystem

HSFTP

hsftp

hdfs.HsftpFilesSystem

HAR

har

fs.HarFileSystem

KFS

kfs

fs.kfs.KosmosFilesSystem

FTP

ftp

Fs.ftp.FtpFileSystem

 

2、  HDFS特点:

(1)  超大文件数据集群

(2)  流式数据访问方式读取文件

(3)  对硬件要求并不是特别高,有很好的容错机制。

(4)  数据访问有一定的延迟,这是因为HDFS优化的是数据吞吐量,是要以提高延迟为代价的。

(5)  HDFS无法高效存储大量小文件。因为NameNode限制了文件个数。

(6)  HDFS不支持多个写入者,也不支持随机写。

十二、      HDFS体系结构

1、  体系结构图

2、  体系结构介绍

(1)  HDFS由ClientNameNodeDataNodeSecondaryNameNode组成。

(2)  Client提供了文件系统的调用接口。

(3)  NameNode由fsimage(HDFS元数据镜像文件)editlog(HDFS文件改动日志)组成,NameNode在内存中保存着每个文件和数据块的引用关系。NameNode中的引用关系不存在硬盘中,每次都是HDFS启动时重新构造出来的。

(4)  SecondaryNameNode的任务有两个:

(5)  定期合并fsimageeditlog,并传输给NameNode

(6)  NameNode提供热备份。

(7)  一般是一个机器上安装一个DataNode,一个DataNode上又分为很多很多数据块(block)。数据块是HDFS中最小的寻址单位,一般一个块的大小为64M,不像单机的文件系统,少于一个块大小的文件不会占用一整块的空间。

(8)  设置块比较大的原因是减少寻址开销,但是块设置的也不能过大,因为一个Map任务处理一个块的数据,如果块设置的太大,Map任务处理的数据量就会过大,会导致效率并不高。

(9)  DataNode会通过心跳定时向NameNode发送所存储的文件块信息。

(10)HDFS的副本存放规则

默认的副本系数是3,一个副本存在本地机架的本机器上,第二个副本存储在本地机架的其他机器上,第三个副本存在其他机架的一个节点上。

这样减少了写操作的网络数据传输,提高了写操作的效率;另一方面,机架的错误率远比节点的错误率低,所以不影响数据的可靠性。

十三、       HDFS读写过程

1、  数据读取流程图

2、  读取过程说明

(1)  HDFS客户端调用DistributedFileSystem类的open()方法,通过RPC协议请求NameNode来确定说请求的文件所在位置,找出最近的DataNode节点的地址。

(2)  DistributedFileSystem会返回一个FSDataInputStream输入流对象给客户端。

(3)  客户端会在FSDatatInputStream上调用read()函数,按照每个DataNode的距离从近到远依次读取。

(4)  读取完每个DataNode后,在FSDataInputStream上调用close()函数。

(5)  如果读取出现故障,就会读取数据块的副本,同时向NameNode报告这个消息。

3、  文件的写入流程图

4、  写入流程说明

(1)  客户端调用DistributedFileSystem对象的create()方法,通过RPC协议调用NameNode,在命名空间创建一个新文件,此时还没有关联的DataNode与之关联。

(2)  create()方法会返回一个FSDataOutputStream对象给客户端用来写入数据。

(3)  写入数据前,会将文件分割成包,放入一个“数据队列”中。

(4)  NameNode为文件包分配合适的DateNode存放副本,返回一个DataNode的管道。

(5)  根据管道依次保存文件包在各个DataNode上。

(6)  各个DataNode保存好文件包后,会返回确认信息,确认消息保存在确认队列里,当管道中所有的DataNode都返回成功的的确认信息后,就会从确认队列里删除。

(7)  管道中所有的DataNode都保存完成后,调用FileSystem对象的close()关闭数据流。

十四、      HDFSJava API

1、  使用URL读取数据

//URL接口读取HDFS中文件
static  {
    URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory() );
}
public  String  GetHDFSByURL(String url) throws MalformedURLException,IOException
{
    String str="";
    InputStream in =null;
    OutputStream out=null;
    try {
        in=new URL(url).openStream();
        //IOUtils.copyBytes(in,out,4096,false);
        str=out.toString();
    }
    finally {
        IOUtils.closeStream(in);
        IOUtils.closeStream(out);
    }
    return  str;
}

 

2、  FileSystem API读取数据

//ReadFile
//url:"/user/hadoop/data/write.txt"
public  String  ReadFile(String url)throws IOException
{
    String fileContent="";
    Configuration conf = new Configuration();
    FileSystem fs = FileSystem.get(conf);
    Path path = new Path(url);

    if(fs.exists(path)){
        FSDataInputStream is = fs.open(path);
        FileStatus status = fs.getFileStatus(path);
        byte[] buffer = new byte[Integer.parseInt(String.valueOf(status.getLen()))];
        is.readFully(0, buffer);
        is.close();
        fs.close();
        fileContent=buffer.toString();
    }
    return fileContent;
}

 

3、  FileSystem API创建目录

//HDFS
//dirpath
 "/user/hadoop/data/20130709"
public  void  MakeDir(String dirpath) throws IOException {
    Configuration conf = new Configuration();
    FileSystem fs = FileSystem.get(conf);
    Path path = new Path(dirpath);
    fs.create(path);
    fs.close();
}

 

4、  FileSystem API写数据

//HDFS写文件
//fileurl:"/user/hadoop/data/write.txt"
public  void  WriteFile(String fileurl,String fileContent)throws IOException
{
    Configuration conf = new Configuration();
    FileSystem fs = FileSystem.get(conf);
    Path path = new Path(fileurl);
    FSDataOutputStream out = fs.create(path);
    out.writeUTF(fileContent);
    fs.close();
}

 

5、  FileSystem API删除文件

//删除文件
//fileurl :"/user/hadoop/data/word.txt"
public void  DeleteFile(String fileurl)throws IOException
{
    Configuration conf = new Configuration();
    FileSystem fs = FileSystem.get(conf);

    Path path = new Path(fileurl);
    fs.delete(path,true);
    fs.close();
}

 

6、  查询元数据

//查询文件的元数据
public  void  ShowFileStatus(String fileUrl) throws  IOException
{
    Configuration conf = new Configuration();
    FileSystem fs = FileSystem.get(conf);
    Path file=new Path(fileUrl);
    FileStatus stat=fs.getFileStatus(file);
    
    System.out.println("
文件路径:"+stat.getPath());
    System.out.println("
是否是目录:"+stat.isDirectory());
    System.out.println("
是否是文件:"+stat.isFile());
    System.out.println("
块的大小:"+stat.getBlockSize());
    System.out.println("
文件所有者:"+stat.getOwner()+":"+stat.getGroup());
    System.out.println("
文件权限:"+stat.getPermission());
    System.out.println("
文件长度:"+stat.getLen());
    System.out.println("
备份数:"+stat.getReplication());
    System.out.println("
修改时间:"+stat.getModificationTime());
}

 

十五、      HDFS常用命令

-ls      -ls <路径>      查看指定路径的当前目录结构

-lsr      -lsr <路径>      递归查看指定路径的目录结构

-du      -du <路径>      统计目录下个文件大小

-dus      -dus <路径>      汇总统计目录下文件()大小

-count      -count [-q] <路径>      统计文件()数量

-mv      -mv <源路径> <目的路径>      移动

-cp      -cp <源路径> <目的路径>      复制

-rm      -rm [-skipTrash] <路径>      删除文件/空白文件夹

-rmr      -rmr [-skipTrash] <路径>      递归删除

-put      -put <多个linux上的文件> <hdfs路径>      上传文件

-copyFromLocal      -copyFromLocal <多个linux上的文件> <hdfs路径>      从本地复制

-moveFromLocal      -moveFromLocal <多个linux上的文件> <hdfs路径>      从本地移动

-getmerge      -getmerge <源路径> <linux路径>      合并到本地

-cat      -cat <hdfs路径>      查看文件内容

-text      -text <hdfs路径>      查看文件内容

-copyToLocal      -copyToLocal [-ignoreCrc] [-crc] [hdfs源路径] [linux目的路径]      从本地复制

-moveToLocal      -moveToLocal [-crc] <hdfs源路径> <linux目的路径>      从本地移动

-mkdir      -mkdir <hdfs路径>      创建空白文件夹

-setrep      -setrep [-R] [-w] <副本数> <路径>      修改副本数量

-touchz      -touchz <文件路径>      创建空白文件

-stat      -stat [format] <路径>      显示文件统计信息

-tail      -tail [-f] <文件>      查看文件尾部信息

-chmod      -chmod [-R] <权限模式> [路径]      修改权限

-chown      -chown [-R] [属主][:[属组]] 路径      修改属主

-chgrp      -chgrp [-R] 属组名称 路径      修改属组

-help      -help [命令选项]      帮助

 


CIO之家 www.ciozj.com 公众号:imciow
关联的文档
也许您喜欢