javaapi操作hdfs

来源:mangocool 作者:mangocool

依赖:jdk1.7,hadoop-2.7.2

开发环境:ideaIU-14.1.4

测试环境:win7

建立maven工程Upload2HiveThrift,在pom.xml配置文件添加必要的依赖:


<?xml version="1.0" encoding="UTF-8"?>
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.xbdp.upload2hive</groupId>
    <artifactId>upload2hive</artifactId>
    <version>1.0-SNAPSHOT</version>
    <dependencies>
        <dependency>
            <groupId>org.apache.thrift</groupId>
            <artifactId>libthrift</artifactId>
            <version>0.9.2</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.12</version>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.7.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.7.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-jdbc</artifactId>
            <version>2.1.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpcore</artifactId>
            <version>4.4.5</version>
        </dependency>
        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpclient</artifactId>
            <version>4.5.2</version>
        </dependency>
        <dependency>
            <groupId>net.sf.json-lib</groupId>
            <artifactId>json-lib</artifactId>
            <version>2.2.2</version>
        </dependency>
    </dependencies>
</project>






Oper2Hdfs.java类:

package com.xbdp.hdfs;
import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Progressable;
/**
 * Created by MANGOCOOL on 2016/8/18.
 */
public class Oper2Hdfs {
    static Configuration conf = new Configuration();
    static FileSystem fs;
    static String path = "/home/hadoop/SW/hadoop/etc/hadoop/";
    static String hdfsUrl = "hdfs://h8:9000/";
    static
    {
        // 如果这些配置放在项目的resources目录下,就不需要加路径,会默认读取
        conf.addResource(new Path(path + "core-site.xml"));
        conf.addResource(new Path(path + "hdfs-site.xml"));
        conf.addResource(new Path(path + "mapred-site.xml"));
        // 设置fs.defaultFS参数,如果没有设置,会出现java.lang.IllegalArgumentException:
        // Wrong FS:hdfs://master:9000/xxx,expected: file:///
        // 也可将hadoop集群中的core-site.xml配置文件拷贝到项目下,这样在读取配置文件时就能够识别hdfs文件系统
        // 读取配置方式,可以不加,即便是集群中配置了standby节点也没关系,会自动识别
        conf.set("fs.defaultFS", hdfsUrl);
        //设置fs.hdfs.impl和fs.file.impl,否则可能出现java.io.IOException: No FileSystem for scheme: hdfs
        //也可以在core-default.xml
        //<property>
        //<name>fs.hdfs.impl</name>
        //<value>org.apache.hadoop.hdfs.DistributedFileSystem</value>
        //<description>The FileSystem for hdfs: uris.</description>
        //</property>
        conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
        conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());
        try {
            //fs = FileSystem.get(new URI(hdfsUrl), conf, "hadoop");// 获取hdfs实例
            fs = FileSystem.get(conf);// 读取配置方式,可以用这个简单方法
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    /**
     * 上传文件到HDFS
     * @param localPath
     * @param file
     * @throws IOException
     */
    private static void upload2Hdfs(String localPath, String file) throws IOException
    {
        String dst = hdfsUrl + file;
        InputStream in = new BufferedInputStream(new FileInputStream(localPath));
        OutputStream out = fs.create(new Path(dst), new Progressable() {
            public void progress() {
            }
        });
        IOUtils.copyBytes(in, out, 4096, true);
    }
    /**
     * 从HDFS上读取文件
     * @param hdfsPath
     * @param localPath
     * @throws IOException
     */
    private static void readFromHdfs(String hdfsPath, String localPath) throws IOException
    {
        FSDataInputStream hdfsInStream = fs.open(new Path(hdfsPath));
        OutputStream out = new FileOutputStream(localPath);
        byte[] ioBuffer = new byte[1024];
        int readLen = hdfsInStream.read(ioBuffer);
        while(-1 != readLen){
            out.write(ioBuffer, 0, readLen);
            readLen = hdfsInStream.read(ioBuffer);
        }
        out.close();
        hdfsInStream.close();
    }
    /**
     * 删除HDFS上的文件
     * @param hdfsPath
     * @return
     * @throws IOException
     */
    private static boolean deleteFromHdfs(String hdfsPath) throws IOException
    {
        boolean flag = true;
        Path path = new Path(hdfsPath);
        if(fs.exists(path))
        {
            fs.deleteOnExit(path);
        } else
        {
            flag = false;
            System.out.println("路径不存在!");
        }
        return flag;
    }
    /**
     * 创建HDFS目录
     * @param hdfsDir
     * @throws IOException
     */
    public static void createDir(String hdfsDir) throws IOException
    {
        Path path = new Path(hdfsDir);
        fs.mkdirs(path);
        System.out.println("new dir \t" + conf.get("fs.default.name") + " | " + hdfsDir);
    }
    /**
     * 遍历HDFS上的文件和目录
     * @param hdfsDir
     * @throws IOException
     */
    private static void getDirFromHdfs(String hdfsDir) throws IOException
    {
        FileStatus fileList[] = fs.listStatus(new Path(hdfsDir));
        int size = fileList.length;
        for(int i = 0; i < size; i++){
            System.out.println("name:" + fileList[i].getPath().getName() + "\tsize:" + fileList[i].getLen());
        }
    }
    /**
     * main函数
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception {
        System.setProperty("hadoop.home.dir", "E:\\Program Files\\hadoop-2.7.0");
        try {
            createDir("/test");
            String localPath = "E:\\Program Files\\XX-Net-2.9.2/LICENSE.txt";
            String file = "test/LICENSE.txt";
            upload2Hdfs(localPath, file);
            String hdfsPath = hdfsUrl + "test/LICENSE.txt";
            localPath = "/home/LICENSE.txt";
            readFromHdfs(hdfsPath, localPath);
            String hdfsDir = hdfsUrl + "/test";
            getDirFromHdfs(hdfsDir);
            hdfsPath = hdfsUrl + "test/";
            deleteFromHdfs(hdfsPath);
        } catch (Exception e) {
            e.printStackTrace();
        }
        finally {
            if(fs != null)
                fs.close();
        }
    }
}





遇到问题:

1、java.io.IOException: No FileSystem for scheme: hdfs

?

1
2
3
4
5
6
7
8
9
java.io.IOException: No FileSystem for scheme: hdfs
        at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2421)
        at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2428)
        at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:88)
        at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2467)
        at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2449)
        at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:367)
        at FileCopyToHdfs.readFromHdfs(FileCopyToHdfs.java:65)
        at FileCopyToHdfs.main(FileCopyToHdfs.java:26)

加入以下代码即可:

?

1
2
conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());

2、java.lang.UnsatisfiedLinkError: org.apache.hadoop.util.NativeCrc32.nativeComputeChunkedSumsByteArray(II[BI[BIILjava/lang/String;JZ)V

?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Exception in thread "main" java.lang.UnsatisfiedLinkError: org.apache.hadoop.util.NativeCrc32.nativeComputeChunkedSumsByteArray(II[BI[BIILjava/lang/String;JZ)V
    at org.apache.hadoop.util.NativeCrc32.nativeComputeChunkedSumsByteArray(Native Method)
    at org.apache.hadoop.util.NativeCrc32.calculateChunkedSumsByteArray(NativeCrc32.java:86)
    at org.apache.hadoop.util.DataChecksum.calculateChunkedSums(DataChecksum.java:430)
    at org.apache.hadoop.fs.FSOutputSummer.writeChecksumChunks(FSOutputSummer.java:202)
    at org.apache.hadoop.fs.FSOutputSummer.flushBuffer(FSOutputSummer.java:163)
    at org.apache.hadoop.fs.FSOutputSummer.flushBuffer(FSOutputSummer.java:144)
    at org.apache.hadoop.hdfs.DFSOutputStream.closeImpl(DFSOutputStream.java:2254)
    at org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:2236)
    at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
    at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
    at org.apache.hadoop.io.IOUtils.copyBytes(IOUtils.java:61)
    at com.xbdp.hdfs.Oper2hdfs.uploadToHdfs(Oper2hdfs.java:68)
    at com.xbdp.hdfs.Oper2hdfs.main(Oper2hdfs.java:143)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)

这是由于hadoop.dll 版本问题,2.4之前的和之后的需要的不一样,下载后加入你win下的hadoop/bin目录。

下载地址:https://github.com/steveloughran/winutils

别忘了加入代码:

?

1
System.setProperty("hadoop.home.dir", "E:\\Program Files\\hadoop-2.7.0");

最好把下载的winutils.exe也加入hadoop/bin中。

3、java.io.FileNotFoundException: \home (拒绝访问。)

?

1
2
3
4
5
6
7
8
9
10
11
java.io.FileNotFoundException: \home (拒绝访问。)
    at java.io.FileOutputStream.open(Native Method)
    at java.io.FileOutputStream.<init>(FileOutputStream.java:221)
    at java.io.FileOutputStream.<init>(FileOutputStream.java:110)
    at com.xbdp.hdfs.Oper2hdfs.readFromHdfs(Oper2hdfs.java:79)
    at com.xbdp.hdfs.Oper2hdfs.main(Oper2hdfs.java:149)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)

这个简单,意思就是你要操作的本来是文件,但是你这里只指定了文件的目录,当然拒绝你啊!把文件名补上即可。



相关文档推荐

腾讯大数据基于StarRocks的向量检索探索.PDF

1737425434 赵裕隆 3.48MB 34页 积分6

B站一站式大数据集群管理平台.PDF

1737421412 刘明刚 1.37MB 30页 积分6

StarRocks在爱奇艺大数据场景的实践.PDF

1737365327 林豪 3.57MB 27页 积分5

农业农村大数据平台互联互通规范.PDF

1736163860  0.49MB 11页 积分5

工业大数据管理与治理智能制造的基座.PDF

1733702095 王宏志 3.83MB 54页 积分6

中国出海互联网公司数据保护合规对策.PDF

1732603379  2.22MB 14页 积分4

数据跨境现状调查与分析报告.PDF

1732603016  1.98MB 53页 积分5

企业数据合规指引个人信息保护指引.PDF

1732591271  15.22MB 23页 积分5

相关文章推荐

大数据开发流程及规范

网络收集 CIO之家的朋友 

大数据技术名词解释

51CTO CIO之家的朋友 

大数据常见问题之数据倾斜

CSDN CIO之家的朋友 

详解大数据批流处理中的两大架构

数仓宝贝库 韩锐、刘驰 

什么是大数据战略以及如何构建

51CTO CIO之家的朋友 

大数据的数据治理与应用场景

51CTO CIO之家的朋友