Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
1463d64
Create auto-pr-review.yml (#1)
imbajin Jun 12, 2025
528f12e
Merge branch 'apache:master' into master
imbajin Jul 16, 2025
3503b08
Merge pull request #2 from apache/master
imbajin Nov 26, 2025
9683b47
Merge pull request #6 from apache/master
imbajin Jan 16, 2026
d3fa860
[feat] Adjusted several default parameters and descriptions in the Lo…
kenssa4eedfd Jan 16, 2026
cd92e05
[feat] Adjusted several default parameters and descriptions in the Lo…
kenssa4eedfd Jan 16, 2026
80d1544
[feat] Adjusted descriptions in the Loader
kenssa4eedfd Jan 17, 2026
d37cb4e
[feat] Add validator for parallelThreads
kenssa4eedfd Jan 17, 2026
ca92b2a
[feat] Update the default value of parallelThreads
kenssa4eedfd Jan 17, 2026
8a487f5
[feat] Update the default value of parallelThreads
kenssa4eedfd Jan 17, 2026
0b965c6
[feat] Add validator for maxConnections and maxConnectionsPerRoute
kenssa4eedfd Jan 17, 2026
a432785
[feat] Update the default value of parallelThreads
kenssa4eedfd Jan 17, 2026
171a988
[feat] Update the default value of parallelThreads
kenssa4eedfd Jan 17, 2026
f062624
[feat] Adjusted several default parameters and descriptions in the Lo…
kenssa4eedfd Jan 19, 2026
866a4a0
[feat] Adjusted several default parameters and descriptions in the Lo…
kenssa4eedfd Jan 19, 2026
ceaed28
[feat] Adjusted several default parameters and descriptions in the Lo…
kenssa4eedfd Jan 19, 2026
e39f503
[feat] Adjusted several default parameters and descriptions in the Lo…
kenssa4eedfd Jan 19, 2026
eda716e
[feat] Add unit tests
kenssa4eedfd Jan 20, 2026
71644b6
[feat] Add unit tests
kenssa4eedfd Jan 20, 2026
78fa442
[feat] Add unit tests
kenssa4eedfd Jan 20, 2026
ee496dd
[feat] Add unit tests
kenssa4eedfd Jan 20, 2026
0379118
[feat] Add unit tests
kenssa4eedfd Jan 20, 2026
572cc94
[feat] Add unit tests
kenssa4eedfd Jan 20, 2026
a54eb64
Update loader-ci.yml
kenssa4eedfd Jan 20, 2026
537445c
[feat] Add unit tests
kenssa4eedfd Jan 20, 2026
6f663c7
Merge remote-tracking branch 'origin/loader-update' into loader-update
kenssa4eedfd Jan 20, 2026
18e83cf
[feat] Add unit tests
kenssa4eedfd Jan 20, 2026
dddfa40
[feat] Add unit tests
kenssa4eedfd Jan 20, 2026
ed816e3
[feat] Add unit tests
kenssa4eedfd Jan 20, 2026
1831893
[feat] Add unit tests
kenssa4eedfd Jan 20, 2026
588adad
[feat] Add unit tests
kenssa4eedfd Jan 20, 2026
f60d5a1
[feat] Add unit tests
kenssa4eedfd Jan 20, 2026
3706a3b
[feat] Add unit tests
kenssa4eedfd Jan 20, 2026
c328a01
[feat] Standardize log levels and streamline parameter descriptions.
kenssa4eedfd Jan 21, 2026
146bdf4
Update required review count and collaborators
imbajin Jan 21, 2026
1af9a32
Merge pull request #9 from hugegraph/imbajin-patch-1
imbajin Jan 21, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions .asf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,12 @@ github:
required_pull_request_reviews:
dismiss_stale_reviews: true
require_code_owner_reviews: false
required_approving_review_count: 2
required_approving_review_count: 1
# (for non-committer): assign/edit/close issues & PR, without write access to the code
collaborators:
- Pengzna
- kenssa4eedfd
- haohao0103
- Thespica
- FrostyHec
- MuLeiSY2021

notifications:
# use https://selfserve.apache.org to manage it
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -662,24 +662,21 @@ private List<InputTaskItem> prepareTaskItems(List<InputStruct> structs,
}

private void loadStructs(List<InputStruct> structs) {
int parallelCount = this.context.options().parallelCount;
int parseThreads = this.context.options().parseThreads;
if (structs.size() == 0) {
return;
}
if (parallelCount <= 0) {
parallelCount = Math.min(structs.size(), Runtime.getRuntime().availableProcessors() * 2);
}

boolean scatter = this.context.options().scatterSources;

LOG.info("{} threads for loading {} structs, from {} to {} in {} mode",
parallelCount, structs.size(), this.context.options().startFile,
LOG.info("{} parser threads for loading {} structs, from {} to {} in {} mode",
parseThreads, structs.size(), this.context.options().startFile,
this.context.options().endFile,
scatter ? "scatter" : "sequential");

ExecutorService loadService = null;
try {
loadService = ExecutorUtil.newFixedThreadPool(parallelCount, "loader");
loadService = ExecutorUtil.newFixedThreadPool(parseThreads, "loader");
List<InputTaskItem> taskItems = prepareTaskItems(structs, scatter);
List<CompletableFuture<Void>> loadTasks = new ArrayList<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.File;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Set;

Expand All @@ -45,6 +46,8 @@ public final class LoadOptions implements Cloneable {
public static final String HTTPS_SCHEMA = "https";
public static final String HTTP_SCHEMA = "http";
private static final int CPUS = Runtime.getRuntime().availableProcessors();
private static final int DEFAULT_MAX_CONNECTIONS = CPUS * 4;
private static final int DEFAULT_MAX_CONNECTIONS_PER_ROUTE = CPUS * 2;
private static final int MINIMUM_REQUIRED_ARGS = 3;

@Parameter(names = {"-f", "--file"}, required = true, arity = 1,
Expand Down Expand Up @@ -156,7 +159,7 @@ public final class LoadOptions implements Cloneable {

@Parameter(names = {"--batch-insert-threads"}, arity = 1,
validateWith = {PositiveValidator.class},
description = "The number of threads to execute batch insert")
description = "The number of threads to execute batch insert (default: CPUS)")
public int batchInsertThreads = CPUS;

@Parameter(names = {"--single-insert-threads"}, arity = 1,
Expand All @@ -165,21 +168,27 @@ public final class LoadOptions implements Cloneable {
public int singleInsertThreads = 8;

@Parameter(names = {"--max-conn"}, arity = 1,
description = "Max number of HTTP connections to server")
public int maxConnections = CPUS * 4;
validateWith = {PositiveValidator.class},
description = "Max HTTP connections (default: CPUS*4; auto-adjusted by " +
"--batch-insert-threads)")
public int maxConnections = DEFAULT_MAX_CONNECTIONS;

@Parameter(names = {"--max-conn-per-route"}, arity = 1,
description = "Max number of HTTP connections to each route")
public int maxConnectionsPerRoute = CPUS * 2;
validateWith = {PositiveValidator.class},
description = "Max HTTP connections per route (default: CPUS*2; " +
"auto-adjusted by --batch-insert-threads)")
public int maxConnectionsPerRoute = DEFAULT_MAX_CONNECTIONS_PER_ROUTE;

@Parameter(names = {"--batch-size"}, arity = 1,
validateWith = {PositiveValidator.class},
description = "The number of lines in each submit")
public int batchSize = 500;

@Parameter(names = {"--parallel-count"}, arity = 1,
description = "The number of parallel read pipelines")
public int parallelCount = 1;
@Parameter(names = {"--parallel-count", "--parser-threads"}, arity = 1,
validateWith = {PositiveValidator.class},
description = "Parallel read pipelines (default: max(2, CPUS/2); " +
"--parallel-count is deprecated)")
public int parseThreads = Math.max(2, CPUS / 2);

@Parameter(names = {"--start-file"}, arity = 1,
description = "start file index for partial loading")
Expand Down Expand Up @@ -329,6 +338,11 @@ public final class LoadOptions implements Cloneable {
description = "The task scheduler type (when creating graph if not exists")
public String schedulerType = "distributed";

@Parameter(names = {"--batch-failure-fallback"}, arity = 1,
description = "Whether to fallback to single insert when batch insert fails. " +
"Default: true")
public boolean batchFailureFallback = true;

public String workModeString() {
if (this.incrementalMode) {
return "INCREMENTAL MODE";
Expand Down Expand Up @@ -406,9 +420,32 @@ public static LoadOptions parseOptions(String[] args) {
options.maxParseErrors = Constants.NO_LIMIT;
options.maxInsertErrors = Constants.NO_LIMIT;
}
if (Arrays.asList(args).contains("--parallel-count")) {
LOG.warn("Parameter --parallel-count is deprecated, " +
"please use --parser-threads instead");
}
adjustConnectionPoolIfDefault(options);
return options;
}

private static void adjustConnectionPoolIfDefault(LoadOptions options) {
int batchThreads = options.batchInsertThreads;
int maxConn = options.maxConnections;
int maxConnPerRoute = options.maxConnectionsPerRoute;

if (maxConn == DEFAULT_MAX_CONNECTIONS && maxConn < batchThreads * 4) {
options.maxConnections = batchThreads * 4;
LOG.info("Auto adjusted max-conn to {} based on batch-insert-threads({})",
options.maxConnections, batchThreads);
}

if (maxConnPerRoute == DEFAULT_MAX_CONNECTIONS_PER_ROUTE && maxConnPerRoute < batchThreads * 2) {
options.maxConnectionsPerRoute = batchThreads * 2;
LOG.info("Auto adjusted max-conn-per-route to {} based on batch-insert-threads({})",
options.maxConnectionsPerRoute, batchThreads);
}
}

public ShortIdConfig getShortIdConfig(String vertexLabel) {
for (ShortIdConfig config: shorterIDConfigs) {
if (config.getVertexLabel().equals(vertexLabel)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

import org.apache.hugegraph.loader.util.Printer;
import org.slf4j.Logger;

import org.apache.hugegraph.loader.builder.Record;
Expand Down Expand Up @@ -140,6 +141,10 @@ public void submitBatch(InputStruct struct, ElementMapping mapping,
long start = System.currentTimeMillis();
try {
this.batchSemaphore.acquire();
if (this.context.stopped()) {
this.batchSemaphore.release();
return;
}
} catch (InterruptedException e) {
throw new LoadException("Interrupted while waiting to submit %s " +
"batch in batch mode", e, mapping.type());
Expand All @@ -152,10 +157,18 @@ public void submitBatch(InputStruct struct, ElementMapping mapping,
CompletableFuture.runAsync(task, this.batchService).whenComplete(
(r, e) -> {
if (e != null) {
LOG.warn("Batch insert {} error, try single insert",
mapping.type(), e);
// The time of single insert is counted separately
this.submitInSingle(struct, mapping, batch);
if (this.options.batchFailureFallback) {
LOG.warn("Batch insert {} error, try single insert",
mapping.type(), e);
this.submitInSingle(struct, mapping, batch);
} else {
summary.metrics(struct).minusFlighting(batch.size());
this.context.occurredError();
this.context.stopLoading();
LOG.error("Batch insert {} error, interrupting import", mapping.type(), e);
Printer.printError("Batch insert %s failed, stop loading. Please check the logs",
mapping.type().string());
}
} else {
summary.metrics(struct).minusFlighting(batch.size());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1197,7 +1197,9 @@ public void testMultiFilesHaveHeader() {
"-s", configPath("multi_files_have_header/schema.groovy"),
"-g", GRAPH,
"-h", SERVER,
"--test-mode", "true"
"--test-mode", "true",
// FIXME: Set parser-threads to 1 because values > 1 currently trigger a NullPointerException (NPE).
Copy link

Copilot AI Jan 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The FIXME comment indicates that setting parser-threads to values > 1 triggers a NullPointerException (NPE). This suggests that either: (1) the new default value of parser-threads (max(2, CPUS/2)) may cause NPEs in these test scenarios, or (2) there's an existing bug that's being worked around. If this is a new bug introduced by changing the default value, it should be fixed before merging. If it's an existing known issue, the FIXME should reference an issue tracker number and explain why it's acceptable to work around it in tests.

Suggested change
// FIXME: Set parser-threads to 1 because values > 1 currently trigger a NullPointerException (NPE).
// FIXME [HUGEGRAPH-XXXX]: Using parser-threads > 1 currently triggers a NullPointerException (NPE)
// when loading multiple CSV files with headers in parallel. Limit to 1 thread here as a temporary
// workaround to keep this functional test stable until the underlying concurrency bug is fixed.

Copilot uses AI. Check for mistakes.
"--parser-threads", "1"
};
loadWithAuth(args);

Expand Down Expand Up @@ -1332,7 +1334,9 @@ public void testDirHasMultiFiles() {
"-s", configPath("dir_has_multi_files/schema.groovy"),
"-g", GRAPH,
"-h", SERVER,
"--test-mode", "true"
"--test-mode", "true",
// FIXME: Set parser-threads to 1 because values > 1 currently trigger a NullPointerException (NPE).
Copy link

Copilot AI Jan 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The FIXME comment indicates that setting parser-threads to values > 1 triggers a NullPointerException (NPE). This suggests that either: (1) the new default value of parser-threads (max(2, CPUS/2)) may cause NPEs in these test scenarios, or (2) there's an existing bug that's being worked around. If this is a new bug introduced by changing the default value, it should be fixed before merging. If it's an existing known issue, the FIXME should reference an issue tracker number and explain why it's acceptable to work around it in tests.

Suggested change
// FIXME: Set parser-threads to 1 because values > 1 currently trigger a NullPointerException (NPE).
// Set parser-threads to 1 to avoid a known NullPointerException (NPE) when using multiple parser threads in this scenario.

Copilot uses AI. Check for mistakes.
"--parser-threads", "1"
};
loadWithAuth(args);

Expand Down Expand Up @@ -1628,7 +1632,9 @@ public void testFilterPathBySuffix() {
"-s", configPath("filter_path_by_suffix/schema.groovy"),
"-g", GRAPH,
"-h", SERVER,
"--test-mode", "true"
"--test-mode", "true",
// FIXME: Set parser-threads to 1 because values > 1 currently trigger a NullPointerException (NPE).
"--parser-threads", "1"
};
loadWithAuth(args);

Expand Down Expand Up @@ -2058,7 +2064,8 @@ public void testLoadIncrementalModeAndLoadFailure()
"-h", SERVER,
"--batch-insert-threads", "2",
"--max-parse-errors", "1",
"--test-mode", "false"
"--test-mode", "false",
"--parser-threads", "1"
));
argsList.addAll(Arrays.asList("--username", "admin", "--password", "pa"));

Expand Down Expand Up @@ -2259,7 +2266,8 @@ public void testReloadJsonFailureFiles() throws IOException,
"-h", SERVER,
"--check-vertex", "true",
"--batch-insert-threads", "2",
"--test-mode", "false"
"--test-mode", "false",
"--parser-threads", "1"
));
argsList.addAll(Arrays.asList("--username", "admin", "--password", "pa"));
HugeGraphLoader loader = new HugeGraphLoader(argsList.toArray(new String[0]));
Expand Down Expand Up @@ -2564,7 +2572,8 @@ public void testSourceOrTargetPrimaryValueNull() {
"-g", GRAPH,
"-h", SERVER,
"--batch-insert-threads", "2",
"--test-mode", "true"
"--test-mode", "true",
"--parser-threads", "1"
));

argsList.addAll(Arrays.asList("--username", "admin", "--password", "pa"));
Expand Down Expand Up @@ -3047,7 +3056,8 @@ public void testReadReachedMaxLines() {
"-h", SERVER,
"--max-read-lines", "4",
"--batch-insert-threads", "2",
"--test-mode", "true"
"--test-mode", "true",
"--parser-threads", "1"
};
loadWithAuth(args);

Expand All @@ -3061,7 +3071,8 @@ public void testReadReachedMaxLines() {
"-h", SERVER,
"--max-read-lines", "6",
"--batch-insert-threads", "2",
"--test-mode", "true"
"--test-mode", "true",
"--parser-threads", "1"
};
loadWithAuth(args);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ public void testCustomizedSchema() {
"-h", SERVER,
"-p", String.valueOf(PORT),
"--batch-insert-threads", "2",
"--test-mode", "true"
"--test-mode", "true",
"--parser-threads", "1"
};

loadWithAuth(args);
Expand Down
Loading
Loading