分享

在eclipse操作数据到HDFS

LoveJW 发表于 2014-11-21 16:21:54 [显示全部楼层] 只看大图 回帖奖励 阅读模式 关闭右栏 13 47301
本帖最后由 LoveJW 于 2014-11-21 16:23 编辑

1111.png 3333.png
我在eclipse里往hdfs的表里追加数据,代码是追加操作,为什么一直报该文件已存在的错误呢??

错误:
org.apache.hadoop.ipc.RemoteException: failed to create file /user/root/uair11/air_info_package_cache_/part-m-00000 for DFSClient_NONMAPREDUCE_1323857387_57 on client 192.168.22.129 because current leaseholder is trying to recreate file.
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2342)
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFileInternal(FSNamesystem.java:2220)
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFileInt(FSNamesystem.java:2453)
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFile(FSNamesystem.java:2414)
    at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.append(NameNodeRpcServer.java:508)
    at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.append(ClientNamenodeProtocolServerSideTranslatorPB.java:320)
    at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java:48063)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:585)
    at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:928)
    at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2048)
    at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2044)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:396)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
    at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2042)

    at org.apache.hadoop.ipc.Client.call(Client.java:1347) ~[hadoop-common-2.2.0-transwarp.jar:na]
    at org.apache.hadoop.ipc.Client.call(Client.java:1300) ~[hadoop-common-2.2.0-transwarp.jar:na]
    at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206) ~[hadoop-common-2.2.0-transwarp.jar:na]
    at com.sun.proxy.$Proxy65.append(Unknown Source) ~[na:na]
    at sun.reflect.GeneratedMethodAccessor97.invoke(Unknown Source) ~[na:na]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) ~[na:1.6.0_41]
    at java.lang.reflect.Method.invoke(Method.java:597) ~[na:1.6.0_41]
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186) ~[hadoop-common-2.2.0-transwarp.jar:na]
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) ~[hadoop-common-2.2.0-transwarp.jar:na]
    at com.sun.proxy.$Proxy65.append(Unknown Source) ~[na:na]
    at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.append(ClientNamenodeProtocolTranslatorPB.java:245) ~[hadoop-hdfs-2.2.0-transwarp.jar:na]
    at org.apache.hadoop.hdfs.DFSClient.callAppend(DFSClient.java:1480) ~[hadoop-hdfs-2.2.0-transwarp.jar:na]
    at org.apache.hadoop.hdfs.DFSClient.append(DFSClient.java:1520) ~[hadoop-hdfs-2.2.0-transwarp.jar:na]
    at org.apache.hadoop.hdfs.DFSClient.append(DFSClient.java:1508) ~[hadoop-hdfs-2.2.0-transwarp.jar:na]
    at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:310) ~[hadoop-hdfs-2.2.0-transwarp.jar:na]
    at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:306) ~[hadoop-hdfs-2.2.0-transwarp.jar:na]
    at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) ~[hadoop-common-2.2.0-transwarp.jar:na]
    at org.apache.hadoop.hdfs.DistributedFileSystem.append(DistributedFileSystem.java:306) ~[hadoop-hdfs-2.2.0-transwarp.jar:na]
    at org.apache.hadoop.fs.FileSystem.append(FileSystem.java:1160) ~[hadoop-common-2.2.0-transwarp.jar:na]
    at cn.lds.uAir.transwarp.Impl.HDFSFileImpl.appendDataToFile(HDFSFileImpl.java:49) ~[bizfuse-uAir.jar:na]
    at cn.lds.uAir.job.MSAQIJob$DataThread.run(MSAQIJob.java:99) [bizfuse-uAir.jar:na]

已有(14)人评论

跳转到指定楼层
sstutu 发表于 2014-11-21 17:19:40
回复

使用道具 举报

sstutu 发表于 2014-11-21 16:35:11
本帖最后由 sstutu 于 2014-11-21 16:36 编辑
楼主两个参数有误,导致系统认为是重新创建文件

String hdfs_path= "hdfs://ip:xx/file/fileuploadFileName";//文件路径
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(hdfs_path), conf);
InputStream in = new BufferedInputStream(new FileInputStream(file));//要追加的文件流,file为文件
OutputStream out = fs.append(new Path(hdfs_path));
IOUtils.copyBytes(in, out, 4096, true);


回复

使用道具 举报

LoveJW 发表于 2014-11-21 16:37:56
sstutu 发表于 2014-11-21 16:35
本帖最后由 sstutu 于 2014-11-21 16:36 编辑
楼主两个参数有误,导致系统认为是重新创建文件

能详细的说下怎么改吗??
回复

使用道具 举报

LoveJW 发表于 2014-11-21 16:55:04
回复

使用道具 举报

LoveJW 发表于 2014-11-21 18:33:47
本帖最后由 LoveJW 于 2014-11-21 18:36 编辑
sstutu 发表于 2014-11-21 17:19
可以尝试下,不过你先贴出代码来,让我一个个敲出来,唉

public boolean appendDataToFile(String table, String data) {
        boolean result = false;
        try {
            if(StringUtils.isBlank(table)||StringUtils.isBlank(data)){
                throw new NullPointerException("appendDataToFile:table or data is null!");
            }
            Path filePath = new Path(host + path + table + "/part-m-00000");
            Configuration conf = new Configuration();
            FileSystem fs = filePath.getFileSystem(conf);
            fs.setReplication(filePath, (short) 3);
            OutputStream out = fs.append(filePath);
            InputStream in = new BufferedInputStream(new ByteArrayInputStream(data.getBytes()));
            IOUtils.copyBytes(in, out, conf);
//            fs.close();
//            IOUtils.closeStream(in);
            result = true;
            

        } catch (Exception e) {
            log.warn("appendDataToFile:data append error",e);
        }   
        return result;
    }

我就是要直接把字符串存过去的啊,Hdfs上有建好的文件,就是要实现追加功能,那两个错误参数是哪两个??



回复

使用道具 举报

LoveJW 发表于 2014-11-25 15:02:47
sstutu 发表于 2014-11-21 17:19
可以尝试下,不过你先贴出代码来,让我一个个敲出来,唉

上面那个问题能再帮忙看看吗??
回复

使用道具 举报

desehawk 发表于 2014-11-25 18:58:20
LoveJW 发表于 2014-11-25 15:02
上面那个问题能再帮忙看看吗??




建议改成这种格式
  1. public class PutMerge {
  2. /**
  3. * @param args
  4. *            void
  5. * @throws IOException
  6. */
  7. public static void main(String[] args) throws IOException {
  8. // TODO Auto-generated method stub
  9.    Configuration conf = new Configuration();
  10.         FileSystem hdfs = FileSystem.get(conf);
  11.         FileSystem local = FileSystem.getLocal(conf);
  12.         
  13.         Path inputDir = new Path("/home/du/inout");
  14.         Path hdfsFile = new Path("/myhdfs");
  15.         
  16.         try
  17.         {
  18.             FileStatus[] inputFiles = local.listStatus(inputDir);
  19.             FSDataOutputStream out = hdfs.append(hdfsFile);
  20.             
  21.             for(int i = 0; i < inputFiles.length; i++ )
  22.             {
  23.                 System.out.println(inputFiles[i].getPath().getName());
  24.                 FSDataInputStream in = local.open(inputFiles[i].getPath());
  25.                 byte buffer[] = new byte[1024];
  26.                 int bytesRead = 0;
  27.                 while( (bytesRead = in.read(buffer)) > 0)
  28.                 {
  29.                     out.write(buffer, 0, bytesRead);
  30.                 }
  31.                 in.close();
  32.             }
  33.             out.close();
  34.         }
  35.         catch (IOException e)
  36.         {
  37.             e.printStackTrace();
  38.         }
  39. }
  40. }
复制代码



回复

使用道具 举报

desehawk 发表于 2014-11-25 19:00:03
LoveJW 发表于 2014-11-21 18:33
public boolean appendDataToFile(String table, String data) {
        boolean result = false;
    ...




楼主你调试了,确定文件名称已经存在。建议使用简单的已经存在的名字。
回复

使用道具 举报

LoveJW 发表于 2014-11-25 19:54:48
desehawk 发表于 2014-11-25 19:00
楼主你调试了,确定文件名称已经存在。建议使用简单的已经存在的名字。

确定啊,我写的绝对路径 调试看的路径也对,但是就是报新建错误,我没新建的操作啊
回复

使用道具 举报

12下一页
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条