package com.lin.keyword;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
/**
* 功能概要:数据去重
*
* @author linbingwen
* @since 2016年7月31日
*/
publicclass CleanSameData {
// map将输入中的value复制到输出数据的key上,并直接输出
publicstaticclass Map extends Mapper<Object, Text, Text, Text> {
// 实现map函数
@Override
publicvoid map(Object key, Text value, Context context) throws IOException, InterruptedException {
// 将输入的纯文本文件的数据转化成String
String line = value.toString();
// 将输入的数据首先按行进行分割
StringTokenizer tokenizerArticle = new StringTokenizer(line, "\n");
// 分别对每一行进行处理
while (tokenizerArticle.hasMoreElements()) {
// 每行按空格划分
StringTokenizer tokenizerLine = new StringTokenizer(tokenizerArticle.nextToken());
String c1 = tokenizerLine.nextToken();//
String c2 = tokenizerLine.nextToken();// 关键词
c2 = c2.substring(1, c2.length() - 1);
Text newline = new Text(c1 + " " + c2);
context.write(newline, new Text(""));
}
}
}
// reduce将输入中的key复制到输出数据的key上,并直接输出
publicstaticclass Reduce extends Reducer<Text, Text, Text, Text> {
// 实现reduce函数
@Override
publicvoid reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
context.write(key, new Text(""));
}
}
publicstaticvoid main(String[] args) throws Exception {
Configuration conf = new Configuration();
// 设置hadoop的机器、端口
conf.set("mapred.job.tracker", "10.75.201.125:9000");
// 设置输入输出文件目录
String[] ioArgs = new String[] { "hdfs://hmaster:9000/data_in", "hdfs://hmaster:9000/clean_same_out" };
String[] otherArgs = new GenericOptionsParser(conf, ioArgs).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: <in> <out>");
System.exit(2);
}
// 设置一个job
Job job = Job.getInstance(conf, "clean same data");
job.setJarByClass(CleanSameData.class);
// 设置Map、Combine和Reduce处理类
job.setMapperClass(Map.class);
job.setCombinerClass(Reduce.class);
job.setReducerClass(Reduce.class);
// 设置输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// 将输入的数据集分割成小数据块splites,提供一个RecordReder的实现
job.setInputFormatClass(TextInputFormat.class);
// 提供一个RecordWriter的实现,负责数据输出
job.setOutputFormatClass(TextOutputFormat.class);
// 设置输入和输出目录
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.lin</groupId>
<artifactId>zdy-hadoop</artifactId>
<version>0.0.1-SNAPSHOT</version>
<properties>
<!-- spring版本号 -->
<spring.version>4.3.0.RELEASE</spring.version>
<!-- log4j日志文件管理包版本 -->
<slf4j.version>1.6.6</slf4j.version>
<log4j.version>1.2.12</log4j.version>
<!-- junit版本号 -->
<junit.version>4.12</junit.version>
<hadoop.version>2.6.0</hadoop.version>
<!-- mybatis版本号 -->
<mybatis.version>3.2.1</mybatis.version>
<mysql.connect.version>6.0.3</mysql.connect.version>
</properties>
<dependencies>
<!--单元测试依赖 -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
<!-- logback start -->
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.1.2</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>1.1.2</version>
</dependency>
<dependency>
<groupId>org.logback-extensions</groupId>
<artifactId>logback-ext-spring</artifactId>
<version>0.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>jdk.tools</groupId>
<artifactId>jdk.tools</artifactId>
<version>1.8</version>
<scope>system</scope>
<systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>
</dependency>
<!-- mysql驱动包 -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.connect.version}</version>
</dependency>
</dependencies>
<build>
<finalName>zdy-hadoop</finalName>
<filters>
<!-- 表示打包哪个环境下的properties文件 -->
<filter>src/main/resources/env/${env}.properties</filter>
</filters>
<!-- 配置将properties时的变量用pom.xml里的变量赋值替换 -->
<resources>
<resource>
<directory>src/main/resources</directory>
<targetPath>${basedir}/target/classes</targetPath>
<includes>
<include>**/*.properties</include>
<include>**/*.xml</include>
</includes>
<filtering>true</filtering>
</resource>
<resource>
<directory>src/main/resources</directory>
<targetPath>${basedir}/target/resources</targetPath>
<includes>
<include>**/*.properties</include>
<include>**/*.xml</include>
</includes>
<filtering>true</filtering>
</resource>
</resources>
<!-- </plugins> -->
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.0.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<executions>
<!-- 定义在prepare-package时将classes/com打jar -->
<execution>
<phase>prepare-package</phase>
<goals>
<goal>jar</goal>
</goals>
<configuration>
<includes>
<include>com/**</include>
</includes>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<!-- not append assembly id in release file name -->
<appendAssemblyId>false</appendAssemblyId>
<finalName>${project.artifactId}</finalName>
<descriptors>
<descriptor>src/main/assemble/assembly.xml</descriptor>
</descriptors>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
<profiles>
<!-- 开发环境,配置文件放在src/main/resources/env下 -->
<profile>
<id>dev</id>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<properties>
<env>dev</env>
</properties>
</profile>
</profiles>
</project>