Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions .editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,19 @@ insert_final_newline = true
max_line_length = 100
ij_wrap_on_typing = true
ij_visual_guides = 100
indent_style = space
indent_size = 4

# ---------------------------------------------------------
# 关键修改:防止编辑器破坏 Parquet 二进制文件
Copy link

Copilot AI Jan 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment contains Chinese text. For consistency with the rest of the file and international collaboration, consider using English, e.g., "Key change: Prevent editor from corrupting Parquet binary files".

Suggested change
# 关键修改:防止编辑器破坏 Parquet 二进制文件
# Key change: Prevent editor from corrupting Parquet binary files

Copilot uses AI. Check for mistakes.
# ---------------------------------------------------------
[*.parquet]
charset = unset
end_of_line = unset
insert_final_newline = false
trim_trailing_whitespace = false
indent_style = unset
indent_size = unset

[*.{java,xml,py}]
indent_style = space
Expand Down
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems the judgement logic in method is not precise & duplicated

maybe we have a better solution for it

Original file line number Diff line number Diff line change
Expand Up @@ -37,20 +37,19 @@
import org.apache.hadoop.io.compress.SnappyCodec;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hugegraph.loader.exception.LoadException;
import org.apache.hugegraph.loader.reader.line.Line;
import org.apache.hugegraph.loader.reader.line.LineFetcher;
import org.apache.hugegraph.loader.source.file.Compression;
import org.apache.hugegraph.loader.source.file.FileFormat;
import org.apache.hugegraph.loader.source.file.FileSource;
import org.slf4j.Logger;

import org.apache.hugegraph.loader.parser.CsvLineParser;
import org.apache.hugegraph.loader.parser.JsonLineParser;
import org.apache.hugegraph.loader.parser.LineParser;
import org.apache.hugegraph.loader.parser.TextLineParser;
import org.apache.hugegraph.loader.reader.Readable;
import org.apache.hugegraph.loader.reader.line.Line;
import org.apache.hugegraph.loader.reader.line.LineFetcher;
import org.apache.hugegraph.loader.source.file.Compression;
import org.apache.hugegraph.loader.source.file.FileFormat;
import org.apache.hugegraph.loader.source.file.FileSource;
import org.apache.hugegraph.util.E;
import org.apache.hugegraph.util.Log;
import org.slf4j.Logger;
Comment on lines -40 to +52
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use style file to avoid code format diff

https://github.com/apache/incubator-hugegraph-toolchain/blob/master/.editorconfig (use it to override original IDE format)


/**
* Used to iterate all readable data files, like local files, hdfs paths
Expand Down Expand Up @@ -158,6 +157,10 @@ public void closeReader() throws IOException {
@Override
public Line fetch() throws IOException {
while (true) {
// Fix NPE: check if reader is null before reading
if (this.reader == null) {
return null;
}
Comment on lines +160 to +163
Copy link

Copilot AI Jan 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR title mentions "avoid NPE in FileLineFetcher close method" but no changes were made to the closeReader() method (lines 151-155), which already has a null check for this.reader. The actual changes are in fetch(), skipOffset(), and checkMatchHeader() methods. Consider updating the PR title to accurately reflect what was changed, e.g., "fix: add null checks in FileLineFetcher fetch/skipOffset/checkMatchHeader methods".

Copilot uses AI. Check for mistakes.
// Read next line from current file
String rawLine = this.reader.readLine();
if (rawLine == null) {
Expand Down Expand Up @@ -199,6 +202,11 @@ public void skipOffset(Readable readable, long offset) {

try {
for (long i = 0L; i < offset; i++) {
// Fix NPE: check reader again inside loop
if (this.reader == null) {
throw new LoadException("Reader is null when skipping offset of file %s",
readable);
Comment on lines +207 to +208
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hit 100 chars in one line here?

}
Comment on lines +205 to +209
Copy link

Copilot AI Jan 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Contradictory null checking logic: Line 201 enforces that reader must not be null with E.checkState(this.reader != null, ...), but then lines 206-209 add a null check inside the loop. If the reader can become null during iteration (e.g., in a multi-threaded environment), the initial check at line 201 is insufficient. If it cannot, then the null check inside the loop is unnecessary. Consider either: 1) removing the check at line 201 and relying on the null check in the loop, or 2) using synchronized access if reader can be modified by other threads.

Suggested change
// Fix NPE: check reader again inside loop
if (this.reader == null) {
throw new LoadException("Reader is null when skipping offset of file %s",
readable);
}

Copilot uses AI. Check for mistakes.
this.reader.readLine();
}
} catch (IOException e) {
Expand Down Expand Up @@ -230,9 +238,13 @@ private boolean checkMatchHeader(String line) {
return false;
}

assert this.source().header() != null;
String[] header = this.source().header();
// Fix NPE: header might be null
if (header == null) {
return false;
}
String[] columns = this.parser.split(line);
return Arrays.equals(this.source().header(), columns);
return Arrays.equals(header, columns);
}

private static BufferedReader createBufferedReader(InputStream stream,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public void testHDFSWithCoreSitePath() {
"-g", GRAPH,
"-h", SERVER,
"--batch-insert-threads", "2",
"--parser-threads", "4", // <--- 【修改点 1】增加并发解析线程数
Copy link

Copilot AI Jan 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment contains Chinese text in a codebase that appears to use English for comments. For consistency, consider translating to English, e.g., "Modification point 1: Increase concurrent parser thread count".

Copilot uses AI. Check for mistakes.
"--test-mode", "true"
));
argsList.addAll(Arrays.asList("--username", "admin", "--password", "pa"));
Expand Down Expand Up @@ -95,6 +96,7 @@ public void testHDFSWithFilePrefix() {
"-g", GRAPH,
"-h", SERVER,
"--batch-insert-threads", "2",
"--parser-threads", "4", // <--- 【修改点 2】增加并发解析线程数
Copy link

Copilot AI Jan 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment contains Chinese text in a codebase that appears to use English for comments. For consistency, consider translating to English, e.g., "Modification point 2: Increase concurrent parser thread count".

Copilot uses AI. Check for mistakes.
"--test-mode", "true"
));
argsList.addAll(Arrays.asList("--username", "admin", "--password", "pa"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,43 +17,34 @@

package org.apache.hugegraph.loader.test.functional;

import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;
import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.Objects;

import org.apache.commons.compress.compressors.CompressorException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;

import org.apache.hugegraph.loader.exception.LoadException;
import org.apache.hugegraph.loader.source.file.Compression;
import org.apache.hugegraph.util.Log;
import org.slf4j.Logger;

public class HDFSUtil implements IOUtil {

private static final Logger LOG = Log.logger(HDFSUtil.class);

private final String storePath;
private final Configuration conf;
private final FileSystem hdfs;
private final FileSystem fs;

public HDFSUtil(String storePath) {
this.storePath = storePath;
this.conf = loadConfiguration();
// HDFS doesn't support write by default
this.conf.setBoolean("dfs.support.write", true);
this.conf.setBoolean("fs.hdfs.impl.disable.cache", true);
this.conf = new Configuration();
try {
this.hdfs = FileSystem.get(URI.create(storePath), this.conf);
this.fs = FileSystem.get(URI.create(storePath), this.conf);
} catch (IOException e) {
throw new LoadException("Failed to create HDFS file system", e);
throw new RuntimeException("Failed to init HDFS file system", e);
}
}

Expand All @@ -67,97 +58,62 @@ public Configuration config() {
return this.conf;
}

private static Configuration loadConfiguration() {
// Just use local hadoop with default config in test
String fileName = "hdfs_with_core_site_path/core-site.xml";
String confPath = Objects.requireNonNull(HDFSUtil.class.getClassLoader()
.getResource(fileName)).getPath();
Configuration conf = new Configuration();
conf.addResource(new Path(confPath));
return conf;
}

@Override
public void mkdirs(String dir) {
Path path = new Path(this.storePath, dir);
public void mkdirs(String path) {
try {
this.hdfs.mkdirs(path);
this.fs.mkdirs(new Path(this.storePath, path));
} catch (IOException e) {
throw new RuntimeException(String.format("Failed to create directory '%s'", path), e);
throw new RuntimeException("Failed to mkdirs: " + path, e);
}
}

@Override
public void write(String fileName, Charset charset,
Compression compress, String... lines) {
public void write(String fileName, Charset charset, Compression compression, String... lines) {
Path path = new Path(this.storePath, fileName);
checkPath(path);

if (compress == Compression.NONE) {
try (FSDataOutputStream fos = this.hdfs.append(path)) {
try (OutputStream os = this.fs.create(path, true)) {
if (compression == Compression.NONE) {
for (String line : lines) {
fos.write(line.getBytes(charset));
fos.write("\n".getBytes(charset));
os.write(line.getBytes(charset));
os.write("\n".getBytes(charset));
}
fos.flush();
} catch (IOException e) {
throw new RuntimeException(String.format("Failed to write lines '%s' to path '%s'",
Arrays.asList(lines), path), e);
}
} else {
try (FSDataOutputStream fos = this.hdfs.append(path)) {
IOUtil.compress(fos, charset, compress, lines);
} catch (IOException | CompressorException e) {
throw new RuntimeException(String.format("Failed to write lines '%s' to file " +
"'%s' in '%s' compression format",
Arrays.asList(lines), path, compress), e);
} else {
IOUtil.compress(os, charset, compression, lines);
}
} catch (IOException | CompressorException e) {
throw new RuntimeException("Failed to write file: " + fileName, e);
}
}

@Override
public void copy(String srcPath, String destPath) {
try {
FileUtil.copy(new File(srcPath), this.hdfs, new Path(destPath),
false, this.conf);
// 通常测试场景是将本地文件上传到 HDFS
Copy link

Copilot AI Jan 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment contains Chinese text in a codebase that appears to use English for comments. For consistency, consider translating to English, e.g., "Usually in test scenarios, upload local files to HDFS".

Suggested change
// 通常测试场景是将本地文件上传到 HDFS
// Usually in test scenarios, upload local files to HDFS

Copilot uses AI. Check for mistakes.
Path src = new Path(srcPath);
Path dst = new Path(this.storePath, destPath);
this.fs.copyFromLocalFile(src, dst);
} catch (IOException e) {
throw new RuntimeException(String.format("Failed to copy file '%s' to '%s'",
srcPath, destPath));
throw new RuntimeException("Failed to copy file from " + srcPath + " to " + destPath, e);
}
}

@Override
public void delete() {
Path path = new Path(this.storePath);
try {
this.hdfs.delete(path, true);
if (this.fs.exists(path)) {
this.fs.delete(path, true);
}
} catch (IOException e) {
throw new RuntimeException(String.format("Failed to delete file '%s'", path), e);
throw new RuntimeException("Failed to delete path: " + this.storePath, e);
}
}

@Override
public void close() {
try {
this.hdfs.close();
this.fs.close();
} catch (IOException e) {
LOG.warn("Failed to close HDFS file system", e);
}
}

private void checkPath(Path path) {
try {
if (!this.hdfs.exists(path)) {
this.hdfs.mkdirs(path.getParent());
this.hdfs.createNewFile(path);
} else {
if (!this.hdfs.isFile(path)) {
throw new RuntimeException(String.format("Please ensure the path '%s' is file",
path.getName()));
}
}
} catch (IOException e) {
throw new RuntimeException(String.format("Failed to check HDFS path '%s'", path), e);
}
}
}
Binary file not shown.
Loading