可以通过[mw_shl_code=bash,true]hadoop fsck -openforwrite [/mw_shl_code] 命令查看发现有文件没有关闭。 |
本帖最后由 arsenduan 于 2017-3-29 19:47 编辑 org.apache.flume.sink.hdfs.bucketwriter.close failed to close() hdfswrite for file org.apache.flume.sink.hdfs.bucketwriter.close 390 failed to rename() file 我们来看,close方法: [mw_shl_code=java,true] public synchronized void close(boolean callCloseCallback) throws IOException, InterruptedException { checkAndThrowInterruptedException(); try { // close的时候先执行flush方法,清空batchCount,并调用HDFSWriter的sync方法 flush(); } catch (IOException e) { LOG.warn("pre-close flush failed", e); } boolean failedToClose = false; LOG.info("Closing {}", bucketPath); // 创建一个关闭线程,这个线程会调用HDFSWriter的close方法 CallRunner<Void> closeCallRunner = createCloseCallRunner(); if (isOpen) { // 如果文件还开着 try { // 执行HDFSWriter的close方法 callWithTimeout(closeCallRunner); sinkCounter.incrementConnectionClosedCount(); } catch (IOException e) { LOG.warn( "failed to close() HDFSWriter for file (" + bucketPath + "). Exception follows.", e); sinkCounter.incrementConnectionFailedCount(); failedToClose = true; // 关闭文件失败的话起个线程,retryInterval秒后继续执行 final Callable<Void> scheduledClose = createScheduledCloseCallable(closeCallRunner); timedRollerPool.schedule(scheduledClose, retryInterval, TimeUnit.SECONDS); } isOpen = false; } else { LOG.info("HDFSWriter is already closed: {}", bucketPath); } // timedRollFuture就是根据hdfs.rollInterval配置生成的一个属性。如果hdfs.rollInterval配置为0,那么不会执行以下代码 // 因为要close文件,所以如果开启了hdfs.rollInterval等待时间到了flush文件,由于文件已经关闭,再次关闭会有问题 // 所以这里取消timedRollFuture线程的执行 if (timedRollFuture != null && !timedRollFuture.isDone()) { timedRollFuture.cancel(false); // do not cancel myself if running! timedRollFuture = null; } // 没有配置hdfs.idleTimeout, 不会执行 if (idleFuture != null && !idleFuture.isDone()) { idleFuture.cancel(false); // do not cancel myself if running! idleFuture = null; } // 重命名文件,如果报错了,不会重命名文件 if (bucketPath != null && fileSystem != null && !failedToClose) { // 将 /data/2015/07/20/15/flume.1437375933234.txt.tmp 重命名为 /data/2015/07/20/15/flume.1437375933234.txt renameBucket(bucketPath, targetPath, fileSystem); } if (callCloseCallback) { // callCloseCallback是close方法的参数 // 调用关闭文件的回调函数,也就是BucketWriter的onCloseCallback属性 // 这个onCloseCallback属性就是在HDFSEventSink里的回调函数closeCallback。 用来处理sfWriters.remove(bucketPath); // 如果onCloseCallback属性为true,那么说明这个BucketWriter已经不会再次open新的文件了。生命周期已经到了。 // onCloseCallback只有在append方法中调用shouldRotate方法的时候需要close文件的时候才会传入false,其他情况都是true runCloseAction(); closed = true; } }[/mw_shl_code] 上面是关闭方法,及重命名还有超时 // 没有配置hdfs.idleTimeout, 不会执行 if (idleFuture != null && !idleFuture.isDone()) { idleFuture.cancel(false); // do not cancel myself if running! idleFuture = null; } // 重命名文件,如果报错了,不会重命名文件 if (bucketPath != null && fileSystem != null && !failedToClose) { // 将 /data/2015/07/20/15/flume.1437375933234.txt.tmp 重命名为 /data/2015/07/20/15/flume.1437375933234.txt renameBucket(bucketPath, targetPath, fileSystem); } 要么是超时的问题,要么是close的问题。其实楼主已经设置的超时时间不短了。 可是滚动的size太大了。 [mw_shl_code=bash,true]device_online_event.sinks.sinkHdfs.hdfs.rollSize = 125000000 [/mw_shl_code] 建议减小。当然也可以修改源码,失败的可以重试。 |