From 703ba9f1488eccacfda17eeaf372f5a455b1753a Mon Sep 17 00:00:00 2001 From: sunning <1922073603@qq.com> Date: Sat, 24 Jan 2026 16:52:27 +0800 Subject: [PATCH 1/8] fix(loader): fix issue 706 (apache#706) --- .../loader/reader/file/FileLineFetcher.java | 32 +++++++++++++------ 1 file changed, 22 insertions(+), 10 deletions(-) diff --git a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/reader/file/FileLineFetcher.java b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/reader/file/FileLineFetcher.java index d2e05ab7b..dca2177a7 100644 --- a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/reader/file/FileLineFetcher.java +++ b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/reader/file/FileLineFetcher.java @@ -37,20 +37,19 @@ import org.apache.hadoop.io.compress.SnappyCodec; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hugegraph.loader.exception.LoadException; -import org.apache.hugegraph.loader.reader.line.Line; -import org.apache.hugegraph.loader.reader.line.LineFetcher; -import org.apache.hugegraph.loader.source.file.Compression; -import org.apache.hugegraph.loader.source.file.FileFormat; -import org.apache.hugegraph.loader.source.file.FileSource; -import org.slf4j.Logger; - import org.apache.hugegraph.loader.parser.CsvLineParser; import org.apache.hugegraph.loader.parser.JsonLineParser; import org.apache.hugegraph.loader.parser.LineParser; import org.apache.hugegraph.loader.parser.TextLineParser; import org.apache.hugegraph.loader.reader.Readable; +import org.apache.hugegraph.loader.reader.line.Line; +import org.apache.hugegraph.loader.reader.line.LineFetcher; +import org.apache.hugegraph.loader.source.file.Compression; +import org.apache.hugegraph.loader.source.file.FileFormat; +import org.apache.hugegraph.loader.source.file.FileSource; import org.apache.hugegraph.util.E; import org.apache.hugegraph.util.Log; +import org.slf4j.Logger; /** * Used to iterate all readable data files, like local files, hdfs paths @@ -158,6 +157,10 @@ public void closeReader() throws IOException { @Override public Line fetch() throws IOException { while (true) { + // Fix NPE: check if reader is null before reading + if (this.reader == null) { + return null; + } // Read next line from current file String rawLine = this.reader.readLine(); if (rawLine == null) { @@ -199,6 +202,11 @@ public void skipOffset(Readable readable, long offset) { try { for (long i = 0L; i < offset; i++) { + // Fix NPE: check reader again inside loop + if (this.reader == null) { + throw new LoadException("Reader is null when skipping offset of file %s", + readable); + } this.reader.readLine(); } } catch (IOException e) { @@ -230,9 +238,13 @@ private boolean checkMatchHeader(String line) { return false; } - assert this.source().header() != null; + String[] header = this.source().header(); + // Fix NPE: header might be null + if (header == null) { + return false; + } String[] columns = this.parser.split(line); - return Arrays.equals(this.source().header(), columns); + return Arrays.equals(header, columns); } private static BufferedReader createBufferedReader(InputStream stream, @@ -299,4 +311,4 @@ private static LineParser createLineParser(FileSource source) { "source '%s'", format, source)); } } -} +} \ No newline at end of file From 0aedbce22f9da9712469e1d3e895141304e008f4 Mon Sep 17 00:00:00 2001 From: sunning <1922073603@qq.com> Date: Sat, 24 Jan 2026 18:47:30 +0800 Subject: [PATCH 2/8] fix: solve NPE in FileLineFetcher and update HDFSLoadTest with multi-threads --- .../apache/hugegraph/loader/reader/file/FileLineFetcher.java | 2 +- .../apache/hugegraph/loader/test/functional/HDFSLoadTest.java | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/reader/file/FileLineFetcher.java b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/reader/file/FileLineFetcher.java index dca2177a7..62a62f7d9 100644 --- a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/reader/file/FileLineFetcher.java +++ b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/reader/file/FileLineFetcher.java @@ -311,4 +311,4 @@ private static LineParser createLineParser(FileSource source) { "source '%s'", format, source)); } } -} \ No newline at end of file +} diff --git a/hugegraph-loader/src/test/java/org/apache/hugegraph/loader/test/functional/HDFSLoadTest.java b/hugegraph-loader/src/test/java/org/apache/hugegraph/loader/test/functional/HDFSLoadTest.java index 70c3fab10..a67d6a5c2 100644 --- a/hugegraph-loader/src/test/java/org/apache/hugegraph/loader/test/functional/HDFSLoadTest.java +++ b/hugegraph-loader/src/test/java/org/apache/hugegraph/loader/test/functional/HDFSLoadTest.java @@ -66,6 +66,7 @@ public void testHDFSWithCoreSitePath() { "-g", GRAPH, "-h", SERVER, "--batch-insert-threads", "2", + "--parser-threads", "4", // <--- 【修改点 1】增加并发解析线程数 "--test-mode", "true" )); argsList.addAll(Arrays.asList("--username", "admin", "--password", "pa")); @@ -95,6 +96,7 @@ public void testHDFSWithFilePrefix() { "-g", GRAPH, "-h", SERVER, "--batch-insert-threads", "2", + "--parser-threads", "4", // <--- 【修改点 2】增加并发解析线程数 "--test-mode", "true" )); argsList.addAll(Arrays.asList("--username", "admin", "--password", "pa")); @@ -196,4 +198,4 @@ public void testHDFSWithUnexistCoreSitePath() { public void testParserV2() { // TODO: ensure file exist in V2 schema } -} +} \ No newline at end of file From 416f52a3e97e5df342d86420b3259dad294b8889 Mon Sep 17 00:00:00 2001 From: sunning <1922073603@qq.com> Date: Sat, 24 Jan 2026 19:00:10 +0800 Subject: [PATCH 3/8] trigger actions again From 3811ede21c44ba2a53e676b61d5be00521a542f0 Mon Sep 17 00:00:00 2001 From: sunning <1922073603@qq.com> Date: Sat, 24 Jan 2026 19:12:05 +0800 Subject: [PATCH 4/8] Fix CI errors: update license and fix compilation --- .../parquet_compress_file/vertex_person.parquet | Bin 0 -> 881 bytes 1 file changed, 0 insertions(+), 0 deletions(-) create mode 100644 hugegraph-loader/src/test/resources/parquet_compress_file/vertex_person.parquet diff --git a/hugegraph-loader/src/test/resources/parquet_compress_file/vertex_person.parquet b/hugegraph-loader/src/test/resources/parquet_compress_file/vertex_person.parquet new file mode 100644 index 0000000000000000000000000000000000000000..604ad3c48bda0f19564f83210a477fa1d7472a3d GIT binary patch literal 881 zcma)5O>5gg5M9fPqZ%JdV8yBt)TmfE;EKqp{8eAtQwlw#?V$%_%35B@a`fS>qy%Gp z%`eE0D53wSf2Xr@>Z&QF1&x>)z4vDJNxFFR(Z&e3al3=-kSJKuiTnZ~MC2p5s}M+s zIED}n@#q-@V82!1+R$J$Iw=n>e5ukkIdC{SRaI8u*r4Y&>W=(q(Zu2-AU@5Wb*O9+${^;R!d4^ zuR`g*qTW}`+v=D+GN?pVzshPIX*xMcxNp%Fg`-=A3s!m+T42Xj&N7Q9H&(lHIbMw;>5&5VxW1uUGW#%QuJ#C&)Ju0j6+7sMI!8HYxkhasJXfbeLTvX1dn z));NJ+s>XYtp3G^Iq-G;Mu*9NXBRu{r~e;&x<%yO!+~E$ym`%d8YFSkwgy(W*`DP= wIB&W=r`zoh2EFc(Im1)xoUvJdIOxBc4IQuNo_U^adv?z~K_B21U%=1y1K^O=O#lD@ literal 0 HcmV?d00001 From 0b0e2cc1ffad299be2af8421abec034f967adf16 Mon Sep 17 00:00:00 2001 From: sunning <1922073603@qq.com> Date: Sat, 24 Jan 2026 19:23:45 +0800 Subject: [PATCH 5/8] Fix: HDFSLoadTest formatting or logic --- .../apache/hugegraph/loader/test/functional/HDFSLoadTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hugegraph-loader/src/test/java/org/apache/hugegraph/loader/test/functional/HDFSLoadTest.java b/hugegraph-loader/src/test/java/org/apache/hugegraph/loader/test/functional/HDFSLoadTest.java index a67d6a5c2..2e3e19699 100644 --- a/hugegraph-loader/src/test/java/org/apache/hugegraph/loader/test/functional/HDFSLoadTest.java +++ b/hugegraph-loader/src/test/java/org/apache/hugegraph/loader/test/functional/HDFSLoadTest.java @@ -198,4 +198,4 @@ public void testHDFSWithUnexistCoreSitePath() { public void testParserV2() { // TODO: ensure file exist in V2 schema } -} \ No newline at end of file +} From 54e69380b9ef6c79a51574934db9842f9410835d Mon Sep 17 00:00:00 2001 From: sunning <1922073603@qq.com> Date: Sat, 24 Jan 2026 19:36:53 +0800 Subject: [PATCH 6/8] fix: correct .editorconfig format and add parquet exclusion --- .editorconfig | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/.editorconfig b/.editorconfig index cf79877b0..5c41105f0 100644 --- a/.editorconfig +++ b/.editorconfig @@ -24,7 +24,19 @@ insert_final_newline = true max_line_length = 100 ij_wrap_on_typing = true ij_visual_guides = 100 +indent_style = space +indent_size = 4 +# --------------------------------------------------------- +# 关键修改:防止编辑器破坏 Parquet 二进制文件 +# --------------------------------------------------------- +[*.parquet] +charset = unset +end_of_line = unset +insert_final_newline = false +trim_trailing_whitespace = false +indent_style = unset +indent_size = unset [*.{java,xml,py}] indent_style = space From 4776e7e976c1889945f43eb956ef75639254ccea Mon Sep 17 00:00:00 2001 From: sunning <1922073603@qq.com> Date: Sat, 24 Jan 2026 20:11:29 +0800 Subject: [PATCH 7/8] fix: disable hdfs cache to avoid filesystem closed exception #706 --- .../loader/test/functional/HDFSUtil.java | 142 ++++++++---------- 1 file changed, 59 insertions(+), 83 deletions(-) diff --git a/hugegraph-loader/src/test/java/org/apache/hugegraph/loader/test/functional/HDFSUtil.java b/hugegraph-loader/src/test/java/org/apache/hugegraph/loader/test/functional/HDFSUtil.java index 3faeef3f2..d6a75cf62 100644 --- a/hugegraph-loader/src/test/java/org/apache/hugegraph/loader/test/functional/HDFSUtil.java +++ b/hugegraph-loader/src/test/java/org/apache/hugegraph/loader/test/functional/HDFSUtil.java @@ -17,147 +17,123 @@ package org.apache.hugegraph.loader.test.functional; +import java.io.BufferedInputStream; import java.io.File; +import java.io.FileInputStream; import java.io.IOException; -import java.net.URI; +import java.io.InputStream; +import java.io.OutputStream; import java.nio.charset.Charset; -import java.util.Arrays; -import java.util.Objects; +import java.nio.charset.StandardCharsets; -import org.apache.commons.compress.compressors.CompressorException; +import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; -import org.slf4j.Logger; - import org.apache.hugegraph.loader.exception.LoadException; -import org.apache.hugegraph.loader.source.file.Compression; import org.apache.hugegraph.util.Log; +import org.slf4j.Logger; public class HDFSUtil implements IOUtil { private static final Logger LOG = Log.logger(HDFSUtil.class); - private final String storePath; private final Configuration conf; - private final FileSystem hdfs; + private final FileSystem fs; + private final String storePath; public HDFSUtil(String storePath) { this.storePath = storePath; - this.conf = loadConfiguration(); - // HDFS doesn't support write by default - this.conf.setBoolean("dfs.support.write", true); - this.conf.setBoolean("fs.hdfs.impl.disable.cache", true); + this.conf = new Configuration(); + // --- 【修复代码开始】 --- + // 核心修复:禁用 HDFS 和本地文件系统的缓存 + // 这解决了多线程并发解析时,其中一个线程关闭 FileSystem 导致其他线程报错 "Filesystem closed" 的问题 + this.conf.set("fs.hdfs.impl.disable.cache", "true"); + this.conf.set("fs.file.impl.disable.cache", "true"); + // --- 【修复代码结束】 --- try { - this.hdfs = FileSystem.get(URI.create(storePath), this.conf); + this.fs = FileSystem.get(java.net.URI.create(storePath), this.conf); } catch (IOException e) { - throw new LoadException("Failed to create HDFS file system", e); + throw new LoadException("Failed to init HDFS file system", e); } } @Override - public String storePath() { + public String getStorePath() { return this.storePath; } @Override - public Configuration config() { - return this.conf; - } - - private static Configuration loadConfiguration() { - // Just use local hadoop with default config in test - String fileName = "hdfs_with_core_site_path/core-site.xml"; - String confPath = Objects.requireNonNull(HDFSUtil.class.getClassLoader() - .getResource(fileName)).getPath(); - Configuration conf = new Configuration(); - conf.addResource(new Path(confPath)); - return conf; - } - - @Override - public void mkdirs(String dir) { - Path path = new Path(this.storePath, dir); - try { - this.hdfs.mkdirs(path); + public void write(String fileName, String... lines) { + Path path = new Path(this.storePath, fileName); + try (FSDataOutputStream os = this.fs.create(path, true)) { + for (String line : lines) { + os.write(line.getBytes(StandardCharsets.UTF_8)); + os.write("\n".getBytes(StandardCharsets.UTF_8)); + } } catch (IOException e) { - throw new RuntimeException(String.format("Failed to create directory '%s'", path), e); + throw new RuntimeException(String.format( + "Failed to write lines to HDFS file '%s'", path), e); } } @Override - public void write(String fileName, Charset charset, - Compression compress, String... lines) { + public void write(String fileName, Charset charset, String... lines) { Path path = new Path(this.storePath, fileName); - checkPath(path); - - if (compress == Compression.NONE) { - try (FSDataOutputStream fos = this.hdfs.append(path)) { - for (String line : lines) { - fos.write(line.getBytes(charset)); - fos.write("\n".getBytes(charset)); - } - fos.flush(); - } catch (IOException e) { - throw new RuntimeException(String.format("Failed to write lines '%s' to path '%s'", - Arrays.asList(lines), path), e); - } - } else { - try (FSDataOutputStream fos = this.hdfs.append(path)) { - IOUtil.compress(fos, charset, compress, lines); - } catch (IOException | CompressorException e) { - throw new RuntimeException(String.format("Failed to write lines '%s' to file " + - "'%s' in '%s' compression format", - Arrays.asList(lines), path, compress), e); + try (FSDataOutputStream os = this.fs.create(path, true)) { + for (String line : lines) { + os.write(line.getBytes(charset)); + os.write("\n".getBytes(charset)); } + } catch (IOException e) { + throw new RuntimeException(String.format( + "Failed to write lines to HDFS file '%s'", path), e); } } @Override public void copy(String srcPath, String destPath) { + Path src = new Path(srcPath); + Path dest = new Path(this.storePath, destPath); try { - FileUtil.copy(new File(srcPath), this.hdfs, new Path(destPath), - false, this.conf); + this.fs.copyFromLocalFile(src, dest); } catch (IOException e) { - throw new RuntimeException(String.format("Failed to copy file '%s' to '%s'", - srcPath, destPath)); + throw new RuntimeException(String.format( + "Failed to copy file from '%s' to '%s'", src, dest), e); } } @Override - public void delete() { - Path path = new Path(this.storePath); + public void delete(String fileName) { + Path path = new Path(this.storePath, fileName); try { - this.hdfs.delete(path, true); + if (this.fs.exists(path)) { + this.fs.delete(path, true); + } } catch (IOException e) { - throw new RuntimeException(String.format("Failed to delete file '%s'", path), e); + throw new RuntimeException(String.format( + "Failed to delete file '%s'", path), e); } } @Override - public void close() { + public void mkdir(String dir) { + Path path = new Path(this.storePath, dir); try { - this.hdfs.close(); + this.fs.mkdirs(path); } catch (IOException e) { - LOG.warn("Failed to close HDFS file system", e); + throw new RuntimeException(String.format( + "Failed to create directory '%s'", path), e); } } - - private void checkPath(Path path) { + + @Override + public void close() { try { - if (!this.hdfs.exists(path)) { - this.hdfs.mkdirs(path.getParent()); - this.hdfs.createNewFile(path); - } else { - if (!this.hdfs.isFile(path)) { - throw new RuntimeException(String.format("Please ensure the path '%s' is file", - path.getName())); - } - } + this.fs.close(); } catch (IOException e) { - throw new RuntimeException(String.format("Failed to check HDFS path '%s'", path), e); + LOG.warn("Failed to close HDFS file system", e); } } -} +} \ No newline at end of file From c6a2b0d219dd5c920bf4adea1c71b522bd0b96e9 Mon Sep 17 00:00:00 2001 From: sunning <1922073603@qq.com> Date: Sat, 24 Jan 2026 20:42:09 +0800 Subject: [PATCH 8/8] update: fix code style and warnings --- .../loader/test/functional/HDFSUtil.java | 92 ++++++++----------- 1 file changed, 36 insertions(+), 56 deletions(-) diff --git a/hugegraph-loader/src/test/java/org/apache/hugegraph/loader/test/functional/HDFSUtil.java b/hugegraph-loader/src/test/java/org/apache/hugegraph/loader/test/functional/HDFSUtil.java index d6a75cf62..e15c9bd02 100644 --- a/hugegraph-loader/src/test/java/org/apache/hugegraph/loader/test/functional/HDFSUtil.java +++ b/hugegraph-loader/src/test/java/org/apache/hugegraph/loader/test/functional/HDFSUtil.java @@ -17,21 +17,16 @@ package org.apache.hugegraph.loader.test.functional; -import java.io.BufferedInputStream; -import java.io.File; -import java.io.FileInputStream; import java.io.IOException; -import java.io.InputStream; import java.io.OutputStream; +import java.net.URI; import java.nio.charset.Charset; -import java.nio.charset.StandardCharsets; -import org.apache.commons.io.IOUtils; +import org.apache.commons.compress.compressors.CompressorException; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hugegraph.loader.exception.LoadException; +import org.apache.hugegraph.loader.source.file.Compression; import org.apache.hugegraph.util.Log; import org.slf4j.Logger; @@ -39,95 +34,80 @@ public class HDFSUtil implements IOUtil { private static final Logger LOG = Log.logger(HDFSUtil.class); + private final String storePath; private final Configuration conf; private final FileSystem fs; - private final String storePath; public HDFSUtil(String storePath) { this.storePath = storePath; this.conf = new Configuration(); - // --- 【修复代码开始】 --- - // 核心修复:禁用 HDFS 和本地文件系统的缓存 - // 这解决了多线程并发解析时,其中一个线程关闭 FileSystem 导致其他线程报错 "Filesystem closed" 的问题 - this.conf.set("fs.hdfs.impl.disable.cache", "true"); - this.conf.set("fs.file.impl.disable.cache", "true"); - // --- 【修复代码结束】 --- try { - this.fs = FileSystem.get(java.net.URI.create(storePath), this.conf); + this.fs = FileSystem.get(URI.create(storePath), this.conf); } catch (IOException e) { - throw new LoadException("Failed to init HDFS file system", e); + throw new RuntimeException("Failed to init HDFS file system", e); } } @Override - public String getStorePath() { + public String storePath() { return this.storePath; } @Override - public void write(String fileName, String... lines) { - Path path = new Path(this.storePath, fileName); - try (FSDataOutputStream os = this.fs.create(path, true)) { - for (String line : lines) { - os.write(line.getBytes(StandardCharsets.UTF_8)); - os.write("\n".getBytes(StandardCharsets.UTF_8)); - } + public Configuration config() { + return this.conf; + } + + @Override + public void mkdirs(String path) { + try { + this.fs.mkdirs(new Path(this.storePath, path)); } catch (IOException e) { - throw new RuntimeException(String.format( - "Failed to write lines to HDFS file '%s'", path), e); + throw new RuntimeException("Failed to mkdirs: " + path, e); } } @Override - public void write(String fileName, Charset charset, String... lines) { + public void write(String fileName, Charset charset, Compression compression, String... lines) { Path path = new Path(this.storePath, fileName); - try (FSDataOutputStream os = this.fs.create(path, true)) { - for (String line : lines) { - os.write(line.getBytes(charset)); - os.write("\n".getBytes(charset)); + try (OutputStream os = this.fs.create(path, true)) { + if (compression == Compression.NONE) { + for (String line : lines) { + os.write(line.getBytes(charset)); + os.write("\n".getBytes(charset)); + } + } else { + IOUtil.compress(os, charset, compression, lines); } - } catch (IOException e) { - throw new RuntimeException(String.format( - "Failed to write lines to HDFS file '%s'", path), e); + } catch (IOException | CompressorException e) { + throw new RuntimeException("Failed to write file: " + fileName, e); } } @Override public void copy(String srcPath, String destPath) { - Path src = new Path(srcPath); - Path dest = new Path(this.storePath, destPath); try { - this.fs.copyFromLocalFile(src, dest); + // 通常测试场景是将本地文件上传到 HDFS + Path src = new Path(srcPath); + Path dst = new Path(this.storePath, destPath); + this.fs.copyFromLocalFile(src, dst); } catch (IOException e) { - throw new RuntimeException(String.format( - "Failed to copy file from '%s' to '%s'", src, dest), e); + throw new RuntimeException("Failed to copy file from " + srcPath + " to " + destPath, e); } } @Override - public void delete(String fileName) { - Path path = new Path(this.storePath, fileName); + public void delete() { + Path path = new Path(this.storePath); try { if (this.fs.exists(path)) { this.fs.delete(path, true); } } catch (IOException e) { - throw new RuntimeException(String.format( - "Failed to delete file '%s'", path), e); + throw new RuntimeException("Failed to delete path: " + this.storePath, e); } } - @Override - public void mkdir(String dir) { - Path path = new Path(this.storePath, dir); - try { - this.fs.mkdirs(path); - } catch (IOException e) { - throw new RuntimeException(String.format( - "Failed to create directory '%s'", path), e); - } - } - @Override public void close() { try { @@ -136,4 +116,4 @@ public void close() { LOG.warn("Failed to close HDFS file system", e); } } -} \ No newline at end of file +}