本地运行Flink退出时java.nio.file.DirectoryNotEmptyException解决方法

1. 背景

在开发大数据平台XSailboat中的查看Flink任务的状态数据工具时,用State Process API解析保存点数据,将其从HDFS上读取出来再将其解析过后下沉到HDFS以CSV格式保存,然后由其它接口提供对这个文件的分页加载功能。

解析保存点和检查点中状态数据的功能代码是集成在SailWorks生产环境中台服务和DataStudio中台服务中的,状态数据解析出来以后Flink Job相关的资源销毁,服务进程并不会退出。销毁Flink Job资源的过程中会在日志中记录一个DirectoryNotEmptyException异常。这个异常虽然对结果无影响,但如果不解决,每次解析状态数据,都会出这样的异常,缓存目录下的文件夹也会日积月累,越来越多,删都删不掉(提示被占用着,停了服务进程才能删),感觉很不爽,所以决定找找原因。

环境信息:

flink. 1.17.1
java 1.8

2. 问题

ava.nio.file.DirectoryNotEmptyException: D:\product\sailboat\temp\MicroService\SailMSPivot\flink\minicluster_679c1f062891e53639b71e5e782f0d8a\tm_0
    at sun.nio.fs.WindowsFileSystemProvider.implDelete(WindowsFileSystemProvider.java:266)
    at sun.nio.fs.AbstractFileSystemProvider.deleteIfExists(AbstractFileSystemProvider.java:108)
    at java.nio.file.Files.deleteIfExists(Files.java:1165)
    at org.apache.flink.util.FileUtils.deleteDirectoryInternal(FileUtils.java:332)
    at org.apache.flink.util.FileUtils.deleteFileOrDirectoryInternal(FileUtils.java:308)
    at org.apache.flink.util.FileUtils.guardIfWindows(FileUtils.java:396)
    at org.apache.flink.util.FileUtils.guardIfNotThreadSafe(FileUtils.java:375)
    at org.apache.flink.util.FileUtils.deleteFileOrDirectory(FileUtils.java:247)
    at org.apache.flink.util.FileUtils.cleanDirectoryInternal(FileUtils.java:361)
    at org.apache.flink.util.FileUtils.deleteDirectoryInternal(FileUtils.java:322)
    at org.apache.flink.util.FileUtils.guardIfWindows(FileUtils.java:396)
    at org.apache.flink.util.FileUtils.guardIfNotThreadSafe(FileUtils.java:375)
    at org.apache.flink.util.FileUtils.deleteDirectory(FileUtils.java:265)
    at org.apache.flink.runtime.entrypoint.WorkingDirectory.delete(WorkingDirectory.java:63)
    at org.apache.flink.runtime.minicluster.MiniCluster.deleteDirectories(MiniCluster.java:1327)
    at org.apache.flink.util.concurrent.FutureUtils.lambda$runAfterwardsAsync$15(FutureUtils.java:595)
    at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
    at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
    at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
    at org.apache.flink.util.concurrent.DirectExecutorService.execute(DirectExecutorService.java:217)
    at java.util.concurrent.CompletableFuture$UniCompletion.claim(CompletableFuture.java:529)
    at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:751)
    at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
    at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
    at org.apache.flink.util.concurrent.FutureUtils.lambda$null$16(FutureUtils.java:638)
    at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
    at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
    at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1595)
    at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1582)
    at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
    at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
    at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
    at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)

3. 分析过程及解决办法

Flink在本地化运行的话,会在临时缓存目录下创建如下的目录结构


Flink本地化运行缓存目录结构.png

通常情况下,这个缓存目录是在系统的临时文件目录下的,这里笔者把它移动到了产品的缓存目录下。
缓存目录修改方法:

System.setProperty("java.io.tmpdir", new File(MSApp.instance().getAppPaths().getTempDir() , "flink").getAbsolutePath()) ;

在tm0下还有一层和图中几乎一样的目录结构,这并不是哪里配置错误引起的,而是最外面一层是总体环境env的目录结构,tm_0是第0个TaskManager的目录结构

回到最初的问题:文件夹被占用,删不掉!

笔者知道java代码里面什么操作能造成文件占用,无法删除,可文件夹被占用,是什么代码操作引起的,这个真不清楚!一番搜索,猜测验证之后,仍然没有答案。所以只好跟踪代码,一行一行看与localState目录相关的代码(幸好不是超级多),可仍然没有找到可疑点。

在经历一番寻寻觅觅冷冷清清凄凄惨惨戚戚(此处略过1万字)之后,终于想到一个定位问题的办法。


重点开始


在windows系统里,文件/文件夹被占用,会在资源监视器中的“CPU”一栏的关联具柄哪里,出现。如下图:


资源监视器.png

所以可以搜索它来一点点排查localState相关的代码。目标代码肯定在搜不到localState文件夹被占用的代码点位和能搜到localState文件夹被占用的代码点位之间。通过收紧两端,就能找到目标代码。

最终发现org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager中的下面代码会引起文件夹被锁:

@VisibleForTesting
@Nonnull
static Collection<Path> listAllocationDirectoriesIn(File localStateRootDirectory)
        throws IOException {
    return Files.list(localStateRootDirectory.toPath())
               .filter(path -> path.getFileName().toString().startsWith(ALLOCATION_DIR_PREFIX))
               .collect(Collectors.toList());
}

这一点先前真是想不到,所以自己另外写一个简单测试类,使用相同逻辑结构的一段代码测试,结果发现的确会造成文件夹被占用。

public static void main(String[] args) throws Exception
{       
    File localStateRootDirectory = new File("D:\\product\\test") ;
    localStateRootDirectory.mkdirs() ;
//  File[] files = localStateRootDirectory.listFiles() ;
        
    Files.list(localStateRootDirectory.toPath())
        .filter(path -> path.getFileName().toString().startsWith("a"))
        .collect(Collectors.toList());
        
    System.out.println(localStateRootDirectory.delete()); 
    // 模拟程序还在干活,没有退出
    JCommon.sleepInSeconds(30) ;
}

它的表现是这样的,localStateRootDirectory的delete操作返回的是true,也就是说文件夹删除成功。但打开windows文件资源浏览器,能看到这个文件夹依然存在,如果点击进入文件夹,会弹窗报错。


弹窗报错.png

如果用java的File类的exists()方法判断文件是否存在,返回false,表示文件不存在。
如果在这个文件夹的父文件夹上listFiles(),其中有被删除的那个文件夹对象,exists()为false。它就像残影一样,看得见、不存在、不能用。

笔者又试验了一下File对象上的listFiles()方法,发现它是不会出现文件夹被占用的情况的。
笔者又仔细看了一下这代码,发现Stream上有close方法,那是不是调用close方法关闭掉stream就可以呢?!
测试之后,果然!!!………………_
所以要解决Flink的这个文件,只能把这段代码改掉,用新class文件替换掉jar包里面的这个class。

@VisibleForTesting
@Nonnull
static Collection<Path> listAllocationDirectoriesIn(File localStateRootDirectory)
            throws IOException {
    // 解决Flink Job退出,文件夹无法被清理干净的问题
    try(Stream<Path> stream = Files.list(localStateRootDirectory.toPath())
            .filter(path -> path.getFileName().toString().startsWith(ALLOCATION_DIR_PREFIX)))
    {
        return stream.collect(Collectors.toList()) ;
    }
}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 200,045评论 5 468
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 84,114评论 2 377
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 147,120评论 0 332
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 53,902评论 1 272
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,828评论 5 360
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,132评论 1 277
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,590评论 3 390
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,258评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,408评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,335评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,385评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,068评论 3 315
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,660评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,747评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,967评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,406评论 2 346
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,970评论 2 341

推荐阅读更多精彩内容