-
Notifications
You must be signed in to change notification settings - Fork 109
fix(loader): avoid NPE in FileLineFetcher close method & update HDFS test #710
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
703ba9f
0aedbce
416f52a
3811ede
0b0e2cc
54e6938
4776e7e
c6a2b0d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. seems the judgement logic in method is not precise & duplicated maybe we have a better solution for it |
| Original file line number | Diff line number | Diff line change | ||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -37,20 +37,19 @@ | |||||||||||
| import org.apache.hadoop.io.compress.SnappyCodec; | ||||||||||||
| import org.apache.hadoop.util.ReflectionUtils; | ||||||||||||
| import org.apache.hugegraph.loader.exception.LoadException; | ||||||||||||
| import org.apache.hugegraph.loader.reader.line.Line; | ||||||||||||
| import org.apache.hugegraph.loader.reader.line.LineFetcher; | ||||||||||||
| import org.apache.hugegraph.loader.source.file.Compression; | ||||||||||||
| import org.apache.hugegraph.loader.source.file.FileFormat; | ||||||||||||
| import org.apache.hugegraph.loader.source.file.FileSource; | ||||||||||||
| import org.slf4j.Logger; | ||||||||||||
|
|
||||||||||||
| import org.apache.hugegraph.loader.parser.CsvLineParser; | ||||||||||||
| import org.apache.hugegraph.loader.parser.JsonLineParser; | ||||||||||||
| import org.apache.hugegraph.loader.parser.LineParser; | ||||||||||||
| import org.apache.hugegraph.loader.parser.TextLineParser; | ||||||||||||
| import org.apache.hugegraph.loader.reader.Readable; | ||||||||||||
| import org.apache.hugegraph.loader.reader.line.Line; | ||||||||||||
| import org.apache.hugegraph.loader.reader.line.LineFetcher; | ||||||||||||
| import org.apache.hugegraph.loader.source.file.Compression; | ||||||||||||
| import org.apache.hugegraph.loader.source.file.FileFormat; | ||||||||||||
| import org.apache.hugegraph.loader.source.file.FileSource; | ||||||||||||
| import org.apache.hugegraph.util.E; | ||||||||||||
| import org.apache.hugegraph.util.Log; | ||||||||||||
| import org.slf4j.Logger; | ||||||||||||
|
Comment on lines
-40
to
+52
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. use style file to avoid code format diff https://github.com/apache/incubator-hugegraph-toolchain/blob/master/.editorconfig (use it to override original IDE format) |
||||||||||||
|
|
||||||||||||
| /** | ||||||||||||
| * Used to iterate all readable data files, like local files, hdfs paths | ||||||||||||
|
|
@@ -158,6 +157,10 @@ public void closeReader() throws IOException { | |||||||||||
| @Override | ||||||||||||
| public Line fetch() throws IOException { | ||||||||||||
| while (true) { | ||||||||||||
| // Fix NPE: check if reader is null before reading | ||||||||||||
| if (this.reader == null) { | ||||||||||||
| return null; | ||||||||||||
| } | ||||||||||||
|
Comment on lines
+160
to
+163
|
||||||||||||
| // Read next line from current file | ||||||||||||
| String rawLine = this.reader.readLine(); | ||||||||||||
| if (rawLine == null) { | ||||||||||||
|
|
@@ -199,6 +202,11 @@ public void skipOffset(Readable readable, long offset) { | |||||||||||
|
|
||||||||||||
| try { | ||||||||||||
| for (long i = 0L; i < offset; i++) { | ||||||||||||
| // Fix NPE: check reader again inside loop | ||||||||||||
| if (this.reader == null) { | ||||||||||||
| throw new LoadException("Reader is null when skipping offset of file %s", | ||||||||||||
| readable); | ||||||||||||
|
Comment on lines
+207
to
+208
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. hit 100 chars in one line here? |
||||||||||||
| } | ||||||||||||
|
Comment on lines
+205
to
+209
|
||||||||||||
| // Fix NPE: check reader again inside loop | |
| if (this.reader == null) { | |
| throw new LoadException("Reader is null when skipping offset of file %s", | |
| readable); | |
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -66,6 +66,7 @@ public void testHDFSWithCoreSitePath() { | |
| "-g", GRAPH, | ||
| "-h", SERVER, | ||
| "--batch-insert-threads", "2", | ||
| "--parser-threads", "4", // <--- 【修改点 1】增加并发解析线程数 | ||
|
||
| "--test-mode", "true" | ||
| )); | ||
| argsList.addAll(Arrays.asList("--username", "admin", "--password", "pa")); | ||
|
|
@@ -95,6 +96,7 @@ public void testHDFSWithFilePrefix() { | |
| "-g", GRAPH, | ||
| "-h", SERVER, | ||
| "--batch-insert-threads", "2", | ||
| "--parser-threads", "4", // <--- 【修改点 2】增加并发解析线程数 | ||
|
||
| "--test-mode", "true" | ||
| )); | ||
| argsList.addAll(Arrays.asList("--username", "admin", "--password", "pa")); | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -17,43 +17,34 @@ | |||||
|
|
||||||
| package org.apache.hugegraph.loader.test.functional; | ||||||
|
|
||||||
| import java.io.File; | ||||||
| import java.io.IOException; | ||||||
| import java.io.OutputStream; | ||||||
| import java.net.URI; | ||||||
| import java.nio.charset.Charset; | ||||||
| import java.util.Arrays; | ||||||
| import java.util.Objects; | ||||||
|
|
||||||
| import org.apache.commons.compress.compressors.CompressorException; | ||||||
| import org.apache.hadoop.conf.Configuration; | ||||||
| import org.apache.hadoop.fs.FSDataOutputStream; | ||||||
| import org.apache.hadoop.fs.FileSystem; | ||||||
| import org.apache.hadoop.fs.FileUtil; | ||||||
| import org.apache.hadoop.fs.Path; | ||||||
| import org.slf4j.Logger; | ||||||
|
|
||||||
| import org.apache.hugegraph.loader.exception.LoadException; | ||||||
| import org.apache.hugegraph.loader.source.file.Compression; | ||||||
| import org.apache.hugegraph.util.Log; | ||||||
| import org.slf4j.Logger; | ||||||
|
|
||||||
| public class HDFSUtil implements IOUtil { | ||||||
|
|
||||||
| private static final Logger LOG = Log.logger(HDFSUtil.class); | ||||||
|
|
||||||
| private final String storePath; | ||||||
| private final Configuration conf; | ||||||
| private final FileSystem hdfs; | ||||||
| private final FileSystem fs; | ||||||
|
|
||||||
| public HDFSUtil(String storePath) { | ||||||
| this.storePath = storePath; | ||||||
| this.conf = loadConfiguration(); | ||||||
| // HDFS doesn't support write by default | ||||||
| this.conf.setBoolean("dfs.support.write", true); | ||||||
| this.conf.setBoolean("fs.hdfs.impl.disable.cache", true); | ||||||
| this.conf = new Configuration(); | ||||||
| try { | ||||||
| this.hdfs = FileSystem.get(URI.create(storePath), this.conf); | ||||||
| this.fs = FileSystem.get(URI.create(storePath), this.conf); | ||||||
| } catch (IOException e) { | ||||||
| throw new LoadException("Failed to create HDFS file system", e); | ||||||
| throw new RuntimeException("Failed to init HDFS file system", e); | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
|
|
@@ -67,97 +58,62 @@ public Configuration config() { | |||||
| return this.conf; | ||||||
| } | ||||||
|
|
||||||
| private static Configuration loadConfiguration() { | ||||||
| // Just use local hadoop with default config in test | ||||||
| String fileName = "hdfs_with_core_site_path/core-site.xml"; | ||||||
| String confPath = Objects.requireNonNull(HDFSUtil.class.getClassLoader() | ||||||
| .getResource(fileName)).getPath(); | ||||||
| Configuration conf = new Configuration(); | ||||||
| conf.addResource(new Path(confPath)); | ||||||
| return conf; | ||||||
| } | ||||||
|
|
||||||
| @Override | ||||||
| public void mkdirs(String dir) { | ||||||
| Path path = new Path(this.storePath, dir); | ||||||
| public void mkdirs(String path) { | ||||||
| try { | ||||||
| this.hdfs.mkdirs(path); | ||||||
| this.fs.mkdirs(new Path(this.storePath, path)); | ||||||
| } catch (IOException e) { | ||||||
| throw new RuntimeException(String.format("Failed to create directory '%s'", path), e); | ||||||
| throw new RuntimeException("Failed to mkdirs: " + path, e); | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
| @Override | ||||||
| public void write(String fileName, Charset charset, | ||||||
| Compression compress, String... lines) { | ||||||
| public void write(String fileName, Charset charset, Compression compression, String... lines) { | ||||||
| Path path = new Path(this.storePath, fileName); | ||||||
| checkPath(path); | ||||||
|
|
||||||
| if (compress == Compression.NONE) { | ||||||
| try (FSDataOutputStream fos = this.hdfs.append(path)) { | ||||||
| try (OutputStream os = this.fs.create(path, true)) { | ||||||
| if (compression == Compression.NONE) { | ||||||
| for (String line : lines) { | ||||||
| fos.write(line.getBytes(charset)); | ||||||
| fos.write("\n".getBytes(charset)); | ||||||
| os.write(line.getBytes(charset)); | ||||||
| os.write("\n".getBytes(charset)); | ||||||
| } | ||||||
| fos.flush(); | ||||||
| } catch (IOException e) { | ||||||
| throw new RuntimeException(String.format("Failed to write lines '%s' to path '%s'", | ||||||
| Arrays.asList(lines), path), e); | ||||||
| } | ||||||
| } else { | ||||||
| try (FSDataOutputStream fos = this.hdfs.append(path)) { | ||||||
| IOUtil.compress(fos, charset, compress, lines); | ||||||
| } catch (IOException | CompressorException e) { | ||||||
| throw new RuntimeException(String.format("Failed to write lines '%s' to file " + | ||||||
| "'%s' in '%s' compression format", | ||||||
| Arrays.asList(lines), path, compress), e); | ||||||
| } else { | ||||||
| IOUtil.compress(os, charset, compression, lines); | ||||||
| } | ||||||
| } catch (IOException | CompressorException e) { | ||||||
| throw new RuntimeException("Failed to write file: " + fileName, e); | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
| @Override | ||||||
| public void copy(String srcPath, String destPath) { | ||||||
| try { | ||||||
| FileUtil.copy(new File(srcPath), this.hdfs, new Path(destPath), | ||||||
| false, this.conf); | ||||||
| // 通常测试场景是将本地文件上传到 HDFS | ||||||
|
||||||
| // 通常测试场景是将本地文件上传到 HDFS | |
| // Usually in test scenarios, upload local files to HDFS |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment contains Chinese text. For consistency with the rest of the file and international collaboration, consider using English, e.g., "Key change: Prevent editor from corrupting Parquet binary files".