diff --git a/.editorconfig b/.editorconfig index cf79877b0..5c41105f0 100644 --- a/.editorconfig +++ b/.editorconfig @@ -24,7 +24,19 @@ insert_final_newline = true max_line_length = 100 ij_wrap_on_typing = true ij_visual_guides = 100 +indent_style = space +indent_size = 4 +# --------------------------------------------------------- +# 关键修改:防止编辑器破坏 Parquet 二进制文件 +# --------------------------------------------------------- +[*.parquet] +charset = unset +end_of_line = unset +insert_final_newline = false +trim_trailing_whitespace = false +indent_style = unset +indent_size = unset [*.{java,xml,py}] indent_style = space diff --git a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/reader/file/FileLineFetcher.java b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/reader/file/FileLineFetcher.java index d2e05ab7b..62a62f7d9 100644 --- a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/reader/file/FileLineFetcher.java +++ b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/reader/file/FileLineFetcher.java @@ -37,20 +37,19 @@ import org.apache.hadoop.io.compress.SnappyCodec; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hugegraph.loader.exception.LoadException; -import org.apache.hugegraph.loader.reader.line.Line; -import org.apache.hugegraph.loader.reader.line.LineFetcher; -import org.apache.hugegraph.loader.source.file.Compression; -import org.apache.hugegraph.loader.source.file.FileFormat; -import org.apache.hugegraph.loader.source.file.FileSource; -import org.slf4j.Logger; - import org.apache.hugegraph.loader.parser.CsvLineParser; import org.apache.hugegraph.loader.parser.JsonLineParser; import org.apache.hugegraph.loader.parser.LineParser; import org.apache.hugegraph.loader.parser.TextLineParser; import org.apache.hugegraph.loader.reader.Readable; +import org.apache.hugegraph.loader.reader.line.Line; +import org.apache.hugegraph.loader.reader.line.LineFetcher; +import org.apache.hugegraph.loader.source.file.Compression; +import org.apache.hugegraph.loader.source.file.FileFormat; +import org.apache.hugegraph.loader.source.file.FileSource; import org.apache.hugegraph.util.E; import org.apache.hugegraph.util.Log; +import org.slf4j.Logger; /** * Used to iterate all readable data files, like local files, hdfs paths @@ -158,6 +157,10 @@ public void closeReader() throws IOException { @Override public Line fetch() throws IOException { while (true) { + // Fix NPE: check if reader is null before reading + if (this.reader == null) { + return null; + } // Read next line from current file String rawLine = this.reader.readLine(); if (rawLine == null) { @@ -199,6 +202,11 @@ public void skipOffset(Readable readable, long offset) { try { for (long i = 0L; i < offset; i++) { + // Fix NPE: check reader again inside loop + if (this.reader == null) { + throw new LoadException("Reader is null when skipping offset of file %s", + readable); + } this.reader.readLine(); } } catch (IOException e) { @@ -230,9 +238,13 @@ private boolean checkMatchHeader(String line) { return false; } - assert this.source().header() != null; + String[] header = this.source().header(); + // Fix NPE: header might be null + if (header == null) { + return false; + } String[] columns = this.parser.split(line); - return Arrays.equals(this.source().header(), columns); + return Arrays.equals(header, columns); } private static BufferedReader createBufferedReader(InputStream stream, diff --git a/hugegraph-loader/src/test/java/org/apache/hugegraph/loader/test/functional/HDFSLoadTest.java b/hugegraph-loader/src/test/java/org/apache/hugegraph/loader/test/functional/HDFSLoadTest.java index 70c3fab10..2e3e19699 100644 --- a/hugegraph-loader/src/test/java/org/apache/hugegraph/loader/test/functional/HDFSLoadTest.java +++ b/hugegraph-loader/src/test/java/org/apache/hugegraph/loader/test/functional/HDFSLoadTest.java @@ -66,6 +66,7 @@ public void testHDFSWithCoreSitePath() { "-g", GRAPH, "-h", SERVER, "--batch-insert-threads", "2", + "--parser-threads", "4", // <--- 【修改点 1】增加并发解析线程数 "--test-mode", "true" )); argsList.addAll(Arrays.asList("--username", "admin", "--password", "pa")); @@ -95,6 +96,7 @@ public void testHDFSWithFilePrefix() { "-g", GRAPH, "-h", SERVER, "--batch-insert-threads", "2", + "--parser-threads", "4", // <--- 【修改点 2】增加并发解析线程数 "--test-mode", "true" )); argsList.addAll(Arrays.asList("--username", "admin", "--password", "pa")); diff --git a/hugegraph-loader/src/test/java/org/apache/hugegraph/loader/test/functional/HDFSUtil.java b/hugegraph-loader/src/test/java/org/apache/hugegraph/loader/test/functional/HDFSUtil.java index 3faeef3f2..e15c9bd02 100644 --- a/hugegraph-loader/src/test/java/org/apache/hugegraph/loader/test/functional/HDFSUtil.java +++ b/hugegraph-loader/src/test/java/org/apache/hugegraph/loader/test/functional/HDFSUtil.java @@ -17,24 +17,18 @@ package org.apache.hugegraph.loader.test.functional; -import java.io.File; import java.io.IOException; +import java.io.OutputStream; import java.net.URI; import java.nio.charset.Charset; -import java.util.Arrays; -import java.util.Objects; import org.apache.commons.compress.compressors.CompressorException; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; -import org.slf4j.Logger; - -import org.apache.hugegraph.loader.exception.LoadException; import org.apache.hugegraph.loader.source.file.Compression; import org.apache.hugegraph.util.Log; +import org.slf4j.Logger; public class HDFSUtil implements IOUtil { @@ -42,18 +36,15 @@ public class HDFSUtil implements IOUtil { private final String storePath; private final Configuration conf; - private final FileSystem hdfs; + private final FileSystem fs; public HDFSUtil(String storePath) { this.storePath = storePath; - this.conf = loadConfiguration(); - // HDFS doesn't support write by default - this.conf.setBoolean("dfs.support.write", true); - this.conf.setBoolean("fs.hdfs.impl.disable.cache", true); + this.conf = new Configuration(); try { - this.hdfs = FileSystem.get(URI.create(storePath), this.conf); + this.fs = FileSystem.get(URI.create(storePath), this.conf); } catch (IOException e) { - throw new LoadException("Failed to create HDFS file system", e); + throw new RuntimeException("Failed to init HDFS file system", e); } } @@ -67,62 +58,41 @@ public Configuration config() { return this.conf; } - private static Configuration loadConfiguration() { - // Just use local hadoop with default config in test - String fileName = "hdfs_with_core_site_path/core-site.xml"; - String confPath = Objects.requireNonNull(HDFSUtil.class.getClassLoader() - .getResource(fileName)).getPath(); - Configuration conf = new Configuration(); - conf.addResource(new Path(confPath)); - return conf; - } - @Override - public void mkdirs(String dir) { - Path path = new Path(this.storePath, dir); + public void mkdirs(String path) { try { - this.hdfs.mkdirs(path); + this.fs.mkdirs(new Path(this.storePath, path)); } catch (IOException e) { - throw new RuntimeException(String.format("Failed to create directory '%s'", path), e); + throw new RuntimeException("Failed to mkdirs: " + path, e); } } @Override - public void write(String fileName, Charset charset, - Compression compress, String... lines) { + public void write(String fileName, Charset charset, Compression compression, String... lines) { Path path = new Path(this.storePath, fileName); - checkPath(path); - - if (compress == Compression.NONE) { - try (FSDataOutputStream fos = this.hdfs.append(path)) { + try (OutputStream os = this.fs.create(path, true)) { + if (compression == Compression.NONE) { for (String line : lines) { - fos.write(line.getBytes(charset)); - fos.write("\n".getBytes(charset)); + os.write(line.getBytes(charset)); + os.write("\n".getBytes(charset)); } - fos.flush(); - } catch (IOException e) { - throw new RuntimeException(String.format("Failed to write lines '%s' to path '%s'", - Arrays.asList(lines), path), e); - } - } else { - try (FSDataOutputStream fos = this.hdfs.append(path)) { - IOUtil.compress(fos, charset, compress, lines); - } catch (IOException | CompressorException e) { - throw new RuntimeException(String.format("Failed to write lines '%s' to file " + - "'%s' in '%s' compression format", - Arrays.asList(lines), path, compress), e); + } else { + IOUtil.compress(os, charset, compression, lines); } + } catch (IOException | CompressorException e) { + throw new RuntimeException("Failed to write file: " + fileName, e); } } @Override public void copy(String srcPath, String destPath) { try { - FileUtil.copy(new File(srcPath), this.hdfs, new Path(destPath), - false, this.conf); + // 通常测试场景是将本地文件上传到 HDFS + Path src = new Path(srcPath); + Path dst = new Path(this.storePath, destPath); + this.fs.copyFromLocalFile(src, dst); } catch (IOException e) { - throw new RuntimeException(String.format("Failed to copy file '%s' to '%s'", - srcPath, destPath)); + throw new RuntimeException("Failed to copy file from " + srcPath + " to " + destPath, e); } } @@ -130,34 +100,20 @@ public void copy(String srcPath, String destPath) { public void delete() { Path path = new Path(this.storePath); try { - this.hdfs.delete(path, true); + if (this.fs.exists(path)) { + this.fs.delete(path, true); + } } catch (IOException e) { - throw new RuntimeException(String.format("Failed to delete file '%s'", path), e); + throw new RuntimeException("Failed to delete path: " + this.storePath, e); } } @Override public void close() { try { - this.hdfs.close(); + this.fs.close(); } catch (IOException e) { LOG.warn("Failed to close HDFS file system", e); } } - - private void checkPath(Path path) { - try { - if (!this.hdfs.exists(path)) { - this.hdfs.mkdirs(path.getParent()); - this.hdfs.createNewFile(path); - } else { - if (!this.hdfs.isFile(path)) { - throw new RuntimeException(String.format("Please ensure the path '%s' is file", - path.getName())); - } - } - } catch (IOException e) { - throw new RuntimeException(String.format("Failed to check HDFS path '%s'", path), e); - } - } } diff --git a/hugegraph-loader/src/test/resources/parquet_compress_file/vertex_person.parquet b/hugegraph-loader/src/test/resources/parquet_compress_file/vertex_person.parquet new file mode 100644 index 000000000..604ad3c48 Binary files /dev/null and b/hugegraph-loader/src/test/resources/parquet_compress_file/vertex_person.parquet differ