From 1463d64c3dd6e948425665726577900043bec558 Mon Sep 17 00:00:00 2001 From: imbajin Date: Thu, 12 Jun 2025 14:09:40 +0800 Subject: [PATCH 01/31] Create auto-pr-review.yml (#1) --- .github/workflows/auto-pr-review.yml | 35 ++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) create mode 100644 .github/workflows/auto-pr-review.yml diff --git a/.github/workflows/auto-pr-review.yml b/.github/workflows/auto-pr-review.yml new file mode 100644 index 000000000..6a585355f --- /dev/null +++ b/.github/workflows/auto-pr-review.yml @@ -0,0 +1,35 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +name: "Auto PR Commenter" + +on: + pull_request_target: + types: [opened] + +jobs: + add-review-comment: + runs-on: ubuntu-latest + permissions: + pull-requests: write + steps: + - name: Add review comment + uses: peter-evans/create-or-update-comment@v4 + with: + issue-number: ${{ github.event.pull_request.number }} + body: | + @codecov-ai-reviewer review From d3fa86010cdfcd112f91b42866ee2d398e3528c7 Mon Sep 17 00:00:00 2001 From: ken <2979602290@qq.com> Date: Fri, 16 Jan 2026 18:57:46 +0800 Subject: [PATCH 02/31] [feat] Adjusted several default parameters and descriptions in the Loader, and refactored the failure-handling logic for batch inserts. --- .../hugegraph/loader/HugeGraphLoader.java | 13 ++++++---- .../loader/executor/LoadOptions.java | 26 +++++++++++++------ .../hugegraph/loader/task/TaskManager.java | 15 ++++++++--- 3 files changed, 37 insertions(+), 17 deletions(-) diff --git a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/HugeGraphLoader.java b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/HugeGraphLoader.java index 2fb9eb4aa..fe2b74672 100644 --- a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/HugeGraphLoader.java +++ b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/HugeGraphLoader.java @@ -662,24 +662,27 @@ private List prepareTaskItems(List structs, } private void loadStructs(List structs) { - int parallelCount = this.context.options().parallelCount; + Integer parallelThreads = this.context.options().parallelThreads; if (structs.size() == 0) { return; } - if (parallelCount <= 0) { - parallelCount = Math.min(structs.size(), Runtime.getRuntime().availableProcessors() * 2); + if (parallelThreads == null) { + parallelThreads = Math.min(structs.size(), Runtime.getRuntime().availableProcessors()); + } + if (parallelThreads < 1) { + throw new LoadException("The parallel-threads must be >= 1"); } boolean scatter = this.context.options().scatterSources; LOG.info("{} threads for loading {} structs, from {} to {} in {} mode", - parallelCount, structs.size(), this.context.options().startFile, + parallelThreads, structs.size(), this.context.options().startFile, this.context.options().endFile, scatter ? "scatter" : "sequential"); ExecutorService loadService = null; try { - loadService = ExecutorUtil.newFixedThreadPool(parallelCount, "loader"); + loadService = ExecutorUtil.newFixedThreadPool(parallelThreads, "loader"); List taskItems = prepareTaskItems(structs, scatter); List> loadTasks = new ArrayList<>(); diff --git a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/executor/LoadOptions.java b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/executor/LoadOptions.java index 95babb557..eb6c45722 100644 --- a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/executor/LoadOptions.java +++ b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/executor/LoadOptions.java @@ -45,6 +45,8 @@ public final class LoadOptions implements Cloneable { public static final String HTTPS_SCHEMA = "https"; public static final String HTTP_SCHEMA = "http"; private static final int CPUS = Runtime.getRuntime().availableProcessors(); + private static final int DEFAULT_MAX_CONNECTIONS = CPUS * 4; + private static final int DEFAULT_MAX_CONNECTIONS_PER_ROUTE = CPUS * 2; private static final int MINIMUM_REQUIRED_ARGS = 3; @Parameter(names = {"-f", "--file"}, required = true, arity = 1, @@ -156,7 +158,9 @@ public final class LoadOptions implements Cloneable { @Parameter(names = {"--batch-insert-threads"}, arity = 1, validateWith = {PositiveValidator.class}, - description = "The number of threads to execute batch insert") + description = "The number of threads to execute batch insert. " + + "If max-conn/max-conn-per-route keep defaults, " + + "they may be auto-adjusted based on this value") public int batchInsertThreads = CPUS; @Parameter(names = {"--single-insert-threads"}, arity = 1, @@ -165,21 +169,27 @@ public final class LoadOptions implements Cloneable { public int singleInsertThreads = 8; @Parameter(names = {"--max-conn"}, arity = 1, - description = "Max number of HTTP connections to server") - public int maxConnections = CPUS * 4; + description = "Max number of HTTP connections to server. " + + "If left as default and batch-insert-threads is " + + "set, this may be auto-adjusted") + public int maxConnections = DEFAULT_MAX_CONNECTIONS; @Parameter(names = {"--max-conn-per-route"}, arity = 1, - description = "Max number of HTTP connections to each route") - public int maxConnectionsPerRoute = CPUS * 2; + description = "Max number of HTTP connections to each route. " + + "If left as default and batch-insert-threads is " + + "set, this may be auto-adjusted") + public int maxConnectionsPerRoute = DEFAULT_MAX_CONNECTIONS_PER_ROUTE; @Parameter(names = {"--batch-size"}, arity = 1, validateWith = {PositiveValidator.class}, description = "The number of lines in each submit") public int batchSize = 500; - @Parameter(names = {"--parallel-count"}, arity = 1, - description = "The number of parallel read pipelines") - public int parallelCount = 1; + @Parameter(names = {"--parallel-count", "--parser-threads"}, arity = 1, + description = "The number of parallel read pipelines. " + + "Default: auto (min(struct count, cpu)). " + + "Must be >= 1") + public Integer parallelThreads = null; @Parameter(names = {"--start-file"}, arity = 1, description = "start file index for partial loading") diff --git a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/task/TaskManager.java b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/task/TaskManager.java index ce4d77a92..08897afa7 100644 --- a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/task/TaskManager.java +++ b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/task/TaskManager.java @@ -26,6 +26,7 @@ import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; +import org.apache.hugegraph.loader.util.Printer; import org.slf4j.Logger; import org.apache.hugegraph.loader.builder.Record; @@ -137,6 +138,9 @@ public void shutdown() { public void submitBatch(InputStruct struct, ElementMapping mapping, List batch) { + if (this.context.stopped()) { + return; + } long start = System.currentTimeMillis(); try { this.batchSemaphore.acquire(); @@ -152,10 +156,13 @@ public void submitBatch(InputStruct struct, ElementMapping mapping, CompletableFuture.runAsync(task, this.batchService).whenComplete( (r, e) -> { if (e != null) { - LOG.warn("Batch insert {} error, try single insert", - mapping.type(), e); - // The time of single insert is counted separately - this.submitInSingle(struct, mapping, batch); + LOG.error("Batch insert {} error, interrupting import", mapping.type(), e); + this.context.occurredError(); + this.context.stopLoading(); + this.batchService.shutdown(); + this.singleService.shutdown(); + Printer.printError("Batch insert %s failed, stop loading, Please check the logs", + mapping.type().string()); } else { summary.metrics(struct).minusFlighting(batch.size()); } From cd92e051181ed07db08732b2dbb7efff511a7e49 Mon Sep 17 00:00:00 2001 From: ken <2979602290@qq.com> Date: Sat, 17 Jan 2026 01:39:01 +0800 Subject: [PATCH 03/31] [feat] Adjusted several default parameters and descriptions in the Loader, and refactored the failure-handling logic for batch inserts. --- .../java/org/apache/hugegraph/loader/HugeGraphLoader.java | 3 ++- .../java/org/apache/hugegraph/loader/task/TaskManager.java | 6 +----- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/HugeGraphLoader.java b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/HugeGraphLoader.java index fe2b74672..c6d7d76a9 100644 --- a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/HugeGraphLoader.java +++ b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/HugeGraphLoader.java @@ -667,7 +667,8 @@ private void loadStructs(List structs) { return; } if (parallelThreads == null) { - parallelThreads = Math.min(structs.size(), Runtime.getRuntime().availableProcessors()); + parallelThreads = Math.max(2, Math.min(structs.size(), + Runtime.getRuntime().availableProcessors())); } if (parallelThreads < 1) { throw new LoadException("The parallel-threads must be >= 1"); diff --git a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/task/TaskManager.java b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/task/TaskManager.java index 08897afa7..88d094297 100644 --- a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/task/TaskManager.java +++ b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/task/TaskManager.java @@ -157,15 +157,11 @@ public void submitBatch(InputStruct struct, ElementMapping mapping, (r, e) -> { if (e != null) { LOG.error("Batch insert {} error, interrupting import", mapping.type(), e); - this.context.occurredError(); this.context.stopLoading(); - this.batchService.shutdown(); - this.singleService.shutdown(); Printer.printError("Batch insert %s failed, stop loading, Please check the logs", mapping.type().string()); - } else { - summary.metrics(struct).minusFlighting(batch.size()); } + summary.metrics(struct).minusFlighting(batch.size()); this.batchSemaphore.release(); long end = System.currentTimeMillis(); From 80d15441e48994b9e55d2ee389c58cb1e368bde4 Mon Sep 17 00:00:00 2001 From: ken <2979602290@qq.com> Date: Sat, 17 Jan 2026 08:37:42 +0800 Subject: [PATCH 04/31] [feat] Adjusted descriptions in the Loader --- .../java/org/apache/hugegraph/loader/executor/LoadOptions.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/executor/LoadOptions.java b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/executor/LoadOptions.java index eb6c45722..f655830d4 100644 --- a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/executor/LoadOptions.java +++ b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/executor/LoadOptions.java @@ -187,7 +187,7 @@ public final class LoadOptions implements Cloneable { @Parameter(names = {"--parallel-count", "--parser-threads"}, arity = 1, description = "The number of parallel read pipelines. " + - "Default: auto (min(struct count, cpu)). " + + "Default: auto max(2, (min(struct count, cpu))). " + "Must be >= 1") public Integer parallelThreads = null; From d37cb4e74c66fdcda728949106e3604df32e7aec Mon Sep 17 00:00:00 2001 From: ken <2979602290@qq.com> Date: Sat, 17 Jan 2026 08:48:02 +0800 Subject: [PATCH 05/31] [feat] Add validator for parallelThreads --- .../main/java/org/apache/hugegraph/loader/HugeGraphLoader.java | 3 --- .../java/org/apache/hugegraph/loader/executor/LoadOptions.java | 1 + 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/HugeGraphLoader.java b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/HugeGraphLoader.java index c6d7d76a9..e3804acc3 100644 --- a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/HugeGraphLoader.java +++ b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/HugeGraphLoader.java @@ -670,9 +670,6 @@ private void loadStructs(List structs) { parallelThreads = Math.max(2, Math.min(structs.size(), Runtime.getRuntime().availableProcessors())); } - if (parallelThreads < 1) { - throw new LoadException("The parallel-threads must be >= 1"); - } boolean scatter = this.context.options().scatterSources; diff --git a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/executor/LoadOptions.java b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/executor/LoadOptions.java index f655830d4..c399ac526 100644 --- a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/executor/LoadOptions.java +++ b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/executor/LoadOptions.java @@ -186,6 +186,7 @@ public final class LoadOptions implements Cloneable { public int batchSize = 500; @Parameter(names = {"--parallel-count", "--parser-threads"}, arity = 1, + validateWith = {PositiveValidator.class}, description = "The number of parallel read pipelines. " + "Default: auto max(2, (min(struct count, cpu))). " + "Must be >= 1") From ca92b2a81777c9692dcd5d6f93981185f61caa76 Mon Sep 17 00:00:00 2001 From: ken <2979602290@qq.com> Date: Sat, 17 Jan 2026 09:13:11 +0800 Subject: [PATCH 06/31] [feat] Update the default value of parallelThreads --- .../java/org/apache/hugegraph/loader/HugeGraphLoader.java | 4 ---- .../org/apache/hugegraph/loader/executor/LoadOptions.java | 4 ++-- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/HugeGraphLoader.java b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/HugeGraphLoader.java index e3804acc3..d7fa1c48c 100644 --- a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/HugeGraphLoader.java +++ b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/HugeGraphLoader.java @@ -666,10 +666,6 @@ private void loadStructs(List structs) { if (structs.size() == 0) { return; } - if (parallelThreads == null) { - parallelThreads = Math.max(2, Math.min(structs.size(), - Runtime.getRuntime().availableProcessors())); - } boolean scatter = this.context.options().scatterSources; diff --git a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/executor/LoadOptions.java b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/executor/LoadOptions.java index c399ac526..2e5a5e0ae 100644 --- a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/executor/LoadOptions.java +++ b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/executor/LoadOptions.java @@ -188,9 +188,9 @@ public final class LoadOptions implements Cloneable { @Parameter(names = {"--parallel-count", "--parser-threads"}, arity = 1, validateWith = {PositiveValidator.class}, description = "The number of parallel read pipelines. " + - "Default: auto max(2, (min(struct count, cpu))). " + + "Default: auto max(2, cpu). " + "Must be >= 1") - public Integer parallelThreads = null; + public Integer parallelThreads = Math.max(2, CPUS); @Parameter(names = {"--start-file"}, arity = 1, description = "start file index for partial loading") From 8a487f54ec5e48a40fe14ff427163073fb28d1bf Mon Sep 17 00:00:00 2001 From: ken <2979602290@qq.com> Date: Sat, 17 Jan 2026 09:21:17 +0800 Subject: [PATCH 07/31] [feat] Update the default value of parallelThreads --- .../main/java/org/apache/hugegraph/loader/HugeGraphLoader.java | 2 +- .../java/org/apache/hugegraph/loader/executor/LoadOptions.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/HugeGraphLoader.java b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/HugeGraphLoader.java index d7fa1c48c..cca2ceae2 100644 --- a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/HugeGraphLoader.java +++ b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/HugeGraphLoader.java @@ -662,7 +662,7 @@ private List prepareTaskItems(List structs, } private void loadStructs(List structs) { - Integer parallelThreads = this.context.options().parallelThreads; + int parallelThreads = this.context.options().parallelThreads; if (structs.size() == 0) { return; } diff --git a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/executor/LoadOptions.java b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/executor/LoadOptions.java index 2e5a5e0ae..04f172f8e 100644 --- a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/executor/LoadOptions.java +++ b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/executor/LoadOptions.java @@ -190,7 +190,7 @@ public final class LoadOptions implements Cloneable { description = "The number of parallel read pipelines. " + "Default: auto max(2, cpu). " + "Must be >= 1") - public Integer parallelThreads = Math.max(2, CPUS); + public int parallelThreads = Math.max(2, CPUS); @Parameter(names = {"--start-file"}, arity = 1, description = "start file index for partial loading") From 0b965c6a82a20c2f756d3d8476d828af8f3562e6 Mon Sep 17 00:00:00 2001 From: ken <2979602290@qq.com> Date: Sat, 17 Jan 2026 09:23:45 +0800 Subject: [PATCH 08/31] [feat] Add validator for maxConnections and maxConnectionsPerRoute --- .../java/org/apache/hugegraph/loader/executor/LoadOptions.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/executor/LoadOptions.java b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/executor/LoadOptions.java index 04f172f8e..aa2a35f4d 100644 --- a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/executor/LoadOptions.java +++ b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/executor/LoadOptions.java @@ -169,12 +169,14 @@ public final class LoadOptions implements Cloneable { public int singleInsertThreads = 8; @Parameter(names = {"--max-conn"}, arity = 1, + validateWith = {PositiveValidator.class}, description = "Max number of HTTP connections to server. " + "If left as default and batch-insert-threads is " + "set, this may be auto-adjusted") public int maxConnections = DEFAULT_MAX_CONNECTIONS; @Parameter(names = {"--max-conn-per-route"}, arity = 1, + validateWith = {PositiveValidator.class}, description = "Max number of HTTP connections to each route. " + "If left as default and batch-insert-threads is " + "set, this may be auto-adjusted") From a4327854500a2f7376a0968f41f5aaeb1cad0632 Mon Sep 17 00:00:00 2001 From: ken <2979602290@qq.com> Date: Sat, 17 Jan 2026 10:39:27 +0800 Subject: [PATCH 09/31] [feat] Update the default value of parallelThreads --- .../java/org/apache/hugegraph/loader/executor/LoadOptions.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/executor/LoadOptions.java b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/executor/LoadOptions.java index aa2a35f4d..fa4dbc545 100644 --- a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/executor/LoadOptions.java +++ b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/executor/LoadOptions.java @@ -192,7 +192,7 @@ public final class LoadOptions implements Cloneable { description = "The number of parallel read pipelines. " + "Default: auto max(2, cpu). " + "Must be >= 1") - public int parallelThreads = Math.max(2, CPUS); + public int parallelThreads = Math.max(2, CPUS/2); @Parameter(names = {"--start-file"}, arity = 1, description = "start file index for partial loading") From 171a9887ea44c6ab12c53b1650a2cc34146a1100 Mon Sep 17 00:00:00 2001 From: ken <2979602290@qq.com> Date: Sat, 17 Jan 2026 10:43:37 +0800 Subject: [PATCH 10/31] [feat] Update the default value of parallelThreads --- .../java/org/apache/hugegraph/loader/executor/LoadOptions.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/executor/LoadOptions.java b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/executor/LoadOptions.java index fa4dbc545..b1739ab97 100644 --- a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/executor/LoadOptions.java +++ b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/executor/LoadOptions.java @@ -190,7 +190,7 @@ public final class LoadOptions implements Cloneable { @Parameter(names = {"--parallel-count", "--parser-threads"}, arity = 1, validateWith = {PositiveValidator.class}, description = "The number of parallel read pipelines. " + - "Default: auto max(2, cpu). " + + "Default: auto max(2, cpu/2). " + "Must be >= 1") public int parallelThreads = Math.max(2, CPUS/2); From f062624564428d74b15dbff2fe73869797c6fa62 Mon Sep 17 00:00:00 2001 From: ken <2979602290@qq.com> Date: Mon, 19 Jan 2026 09:26:50 +0800 Subject: [PATCH 11/31] [feat] Adjusted several default parameters and descriptions in the Loader, and refactored the failure-handling logic for batch inserts. --- .../hugegraph/loader/HugeGraphLoader.java | 6 ++-- .../loader/executor/LoadOptions.java | 36 ++++++++++++++++--- .../hugegraph/loader/task/TaskManager.java | 12 +++++-- 3 files changed, 44 insertions(+), 10 deletions(-) diff --git a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/HugeGraphLoader.java b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/HugeGraphLoader.java index cca2ceae2..888b33c29 100644 --- a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/HugeGraphLoader.java +++ b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/HugeGraphLoader.java @@ -662,7 +662,7 @@ private List prepareTaskItems(List structs, } private void loadStructs(List structs) { - int parallelThreads = this.context.options().parallelThreads; + int parseThreads = this.context.options().parseThreads; if (structs.size() == 0) { return; } @@ -670,13 +670,13 @@ private void loadStructs(List structs) { boolean scatter = this.context.options().scatterSources; LOG.info("{} threads for loading {} structs, from {} to {} in {} mode", - parallelThreads, structs.size(), this.context.options().startFile, + parseThreads, structs.size(), this.context.options().startFile, this.context.options().endFile, scatter ? "scatter" : "sequential"); ExecutorService loadService = null; try { - loadService = ExecutorUtil.newFixedThreadPool(parallelThreads, "loader"); + loadService = ExecutorUtil.newFixedThreadPool(parseThreads, "loader"); List taskItems = prepareTaskItems(structs, scatter); List> loadTasks = new ArrayList<>(); diff --git a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/executor/LoadOptions.java b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/executor/LoadOptions.java index b1739ab97..ad2c79ca3 100644 --- a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/executor/LoadOptions.java +++ b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/executor/LoadOptions.java @@ -189,10 +189,11 @@ public final class LoadOptions implements Cloneable { @Parameter(names = {"--parallel-count", "--parser-threads"}, arity = 1, validateWith = {PositiveValidator.class}, - description = "The number of parallel read pipelines. " + - "Default: auto max(2, cpu/2). " + - "Must be >= 1") - public int parallelThreads = Math.max(2, CPUS/2); + description = "(--parallel-count is deprecated, use --parser-threads instead) " + + "The number of parallel read pipelines. " + + "Default: max(2, CPUS/2) where CPUS is the number " + + "of available processors. Must be >= 1") + public int parseThreads = Math.max(2, CPUS / 2); @Parameter(names = {"--start-file"}, arity = 1, description = "start file index for partial loading") @@ -342,6 +343,11 @@ public final class LoadOptions implements Cloneable { description = "The task scheduler type (when creating graph if not exists") public String schedulerType = "distributed"; + @Parameter(names = {"--batch-failure-fallback"}, arity = 1, + description = "Whether to fallback to single insert when batch insert fails. " + + "Default: false") + public boolean batchFailureFallback = false; + public String workModeString() { if (this.incrementalMode) { return "INCREMENTAL MODE"; @@ -419,9 +425,31 @@ public static LoadOptions parseOptions(String[] args) { options.maxParseErrors = Constants.NO_LIMIT; options.maxInsertErrors = Constants.NO_LIMIT; } + if (options.batchInsertThreads != CPUS) { + adjustConnectionPoolIfDefault(options); + } return options; } + private static void adjustConnectionPoolIfDefault(LoadOptions options) { + int batchThreads = options.batchInsertThreads; + int maxConn = options.maxConnections; + int maxConnPerRoute = options.maxConnectionsPerRoute; + if (maxConn == DEFAULT_MAX_CONNECTIONS && maxConn < batchThreads * 4) { + options.maxConnections = batchThreads * 4; + LOG.info("Auto adjusted max-conn to {} based on " + + "batch-insert-threads({})", + options.maxConnections, batchThreads); + } + + if (maxConnPerRoute == DEFAULT_MAX_CONNECTIONS_PER_ROUTE && maxConnPerRoute < batchThreads * 2) { + options.maxConnectionsPerRoute = batchThreads * 2; + LOG.info("Auto adjusted max-conn-per-route to {} based on " + + "batch-insert-threads({})", + options.maxConnectionsPerRoute, batchThreads); + } + } + public ShortIdConfig getShortIdConfig(String vertexLabel) { for (ShortIdConfig config: shorterIDConfigs) { if (config.getVertexLabel().equals(vertexLabel)) { diff --git a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/task/TaskManager.java b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/task/TaskManager.java index 88d094297..1afab733b 100644 --- a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/task/TaskManager.java +++ b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/task/TaskManager.java @@ -157,9 +157,15 @@ public void submitBatch(InputStruct struct, ElementMapping mapping, (r, e) -> { if (e != null) { LOG.error("Batch insert {} error, interrupting import", mapping.type(), e); - this.context.stopLoading(); - Printer.printError("Batch insert %s failed, stop loading, Please check the logs", - mapping.type().string()); + if (this.options.batchFailureFallback) { + LOG.warn("Batch insert {} error, try single insert", + mapping.type(), e); + this.submitInSingle(struct, mapping, batch); + } else { + this.context.stopLoading(); + Printer.printError("Batch insert %s failed, stop loading. Please check the logs", + mapping.type().string()); + } } summary.metrics(struct).minusFlighting(batch.size()); From 866a4a0145a1becbfff9eafc0144cc7eac8e8e28 Mon Sep 17 00:00:00 2001 From: ken <2979602290@qq.com> Date: Mon, 19 Jan 2026 09:57:47 +0800 Subject: [PATCH 12/31] [feat] Adjusted several default parameters and descriptions in the Loader, and refactored the failure-handling logic for batch inserts. --- .../main/java/org/apache/hugegraph/loader/task/TaskManager.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/task/TaskManager.java b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/task/TaskManager.java index 1afab733b..227edc0fb 100644 --- a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/task/TaskManager.java +++ b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/task/TaskManager.java @@ -162,12 +162,12 @@ public void submitBatch(InputStruct struct, ElementMapping mapping, mapping.type(), e); this.submitInSingle(struct, mapping, batch); } else { + summary.metrics(struct).minusFlighting(batch.size()); this.context.stopLoading(); Printer.printError("Batch insert %s failed, stop loading. Please check the logs", mapping.type().string()); } } - summary.metrics(struct).minusFlighting(batch.size()); this.batchSemaphore.release(); long end = System.currentTimeMillis(); From ceaed28674c387703d9c20bacfd72dbb303d60ca Mon Sep 17 00:00:00 2001 From: ken <2979602290@qq.com> Date: Mon, 19 Jan 2026 09:58:45 +0800 Subject: [PATCH 13/31] [feat] Adjusted several default parameters and descriptions in the Loader, and refactored the failure-handling logic for batch inserts. --- .../main/java/org/apache/hugegraph/loader/task/TaskManager.java | 1 + 1 file changed, 1 insertion(+) diff --git a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/task/TaskManager.java b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/task/TaskManager.java index 227edc0fb..7f1f6acfe 100644 --- a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/task/TaskManager.java +++ b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/task/TaskManager.java @@ -163,6 +163,7 @@ public void submitBatch(InputStruct struct, ElementMapping mapping, this.submitInSingle(struct, mapping, batch); } else { summary.metrics(struct).minusFlighting(batch.size()); + this.context.occurredError(); this.context.stopLoading(); Printer.printError("Batch insert %s failed, stop loading. Please check the logs", mapping.type().string()); From e39f5036b75b4051f0cb81804f2fec79b5a4c52f Mon Sep 17 00:00:00 2001 From: ken <2979602290@qq.com> Date: Mon, 19 Jan 2026 10:10:14 +0800 Subject: [PATCH 14/31] [feat] Adjusted several default parameters and descriptions in the Loader, and refactored the failure-handling logic for batch inserts. --- .../java/org/apache/hugegraph/loader/task/TaskManager.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/task/TaskManager.java b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/task/TaskManager.java index 7f1f6acfe..97dde9cb9 100644 --- a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/task/TaskManager.java +++ b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/task/TaskManager.java @@ -162,12 +162,13 @@ public void submitBatch(InputStruct struct, ElementMapping mapping, mapping.type(), e); this.submitInSingle(struct, mapping, batch); } else { - summary.metrics(struct).minusFlighting(batch.size()); this.context.occurredError(); this.context.stopLoading(); Printer.printError("Batch insert %s failed, stop loading. Please check the logs", mapping.type().string()); } + } else { + summary.metrics(struct).minusFlighting(batch.size()); } this.batchSemaphore.release(); From eda716ee5362ecc643b6b4ef625a2cbdb41c2cff Mon Sep 17 00:00:00 2001 From: ken <2979602290@qq.com> Date: Tue, 20 Jan 2026 08:49:44 +0800 Subject: [PATCH 15/31] [feat] Add unit tests --- .../loader/executor/LoadOptions.java | 18 +- .../hugegraph/loader/task/TaskManager.java | 8 +- .../loader/test/functional/FileLoadTest.java | 12 +- .../loader/test/unit/LoadOptionsTest.java | 480 ++++++++++++++++++ .../test/unit/TaskManagerFailureTest.java | 348 +++++++++++++ 5 files changed, 851 insertions(+), 15 deletions(-) create mode 100644 hugegraph-loader/src/test/java/org/apache/hugegraph/loader/test/unit/LoadOptionsTest.java create mode 100644 hugegraph-loader/src/test/java/org/apache/hugegraph/loader/test/unit/TaskManagerFailureTest.java diff --git a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/executor/LoadOptions.java b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/executor/LoadOptions.java index ad2c79ca3..4cf7b4c32 100644 --- a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/executor/LoadOptions.java +++ b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/executor/LoadOptions.java @@ -20,6 +20,7 @@ import java.io.File; import java.lang.reflect.Field; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Set; @@ -345,8 +346,8 @@ public final class LoadOptions implements Cloneable { @Parameter(names = {"--batch-failure-fallback"}, arity = 1, description = "Whether to fallback to single insert when batch insert fails. " + - "Default: false") - public boolean batchFailureFallback = false; + "Default: true") + public boolean batchFailureFallback = true; public String workModeString() { if (this.incrementalMode) { @@ -425,9 +426,11 @@ public static LoadOptions parseOptions(String[] args) { options.maxParseErrors = Constants.NO_LIMIT; options.maxInsertErrors = Constants.NO_LIMIT; } - if (options.batchInsertThreads != CPUS) { - adjustConnectionPoolIfDefault(options); + if (Arrays.asList(args).contains("--parallel-count")) { + LOG.warn("Parameter --parallel-count is deprecated, " + + "please use --parser-threads instead"); } + adjustConnectionPoolIfDefault(options); return options; } @@ -435,17 +438,16 @@ private static void adjustConnectionPoolIfDefault(LoadOptions options) { int batchThreads = options.batchInsertThreads; int maxConn = options.maxConnections; int maxConnPerRoute = options.maxConnectionsPerRoute; + if (maxConn == DEFAULT_MAX_CONNECTIONS && maxConn < batchThreads * 4) { options.maxConnections = batchThreads * 4; - LOG.info("Auto adjusted max-conn to {} based on " + - "batch-insert-threads({})", + LOG.info("Auto adjusted max-conn to {} based on batch-insert-threads({})", options.maxConnections, batchThreads); } if (maxConnPerRoute == DEFAULT_MAX_CONNECTIONS_PER_ROUTE && maxConnPerRoute < batchThreads * 2) { options.maxConnectionsPerRoute = batchThreads * 2; - LOG.info("Auto adjusted max-conn-per-route to {} based on " + - "batch-insert-threads({})", + LOG.info("Auto adjusted max-conn-per-route to {} based on batch-insert-threads({})", options.maxConnectionsPerRoute, batchThreads); } } diff --git a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/task/TaskManager.java b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/task/TaskManager.java index 97dde9cb9..63192b12d 100644 --- a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/task/TaskManager.java +++ b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/task/TaskManager.java @@ -138,12 +138,13 @@ public void shutdown() { public void submitBatch(InputStruct struct, ElementMapping mapping, List batch) { - if (this.context.stopped()) { - return; - } long start = System.currentTimeMillis(); try { this.batchSemaphore.acquire(); + if (this.context.stopped()) { + this.batchSemaphore.release(); + return; + } } catch (InterruptedException e) { throw new LoadException("Interrupted while waiting to submit %s " + "batch in batch mode", e, mapping.type()); @@ -162,6 +163,7 @@ public void submitBatch(InputStruct struct, ElementMapping mapping, mapping.type(), e); this.submitInSingle(struct, mapping, batch); } else { + summary.metrics(struct).minusFlighting(batch.size()); this.context.occurredError(); this.context.stopLoading(); Printer.printError("Batch insert %s failed, stop loading. Please check the logs", diff --git a/hugegraph-loader/src/test/java/org/apache/hugegraph/loader/test/functional/FileLoadTest.java b/hugegraph-loader/src/test/java/org/apache/hugegraph/loader/test/functional/FileLoadTest.java index d069aaecf..80e5976d5 100644 --- a/hugegraph-loader/src/test/java/org/apache/hugegraph/loader/test/functional/FileLoadTest.java +++ b/hugegraph-loader/src/test/java/org/apache/hugegraph/loader/test/functional/FileLoadTest.java @@ -2058,7 +2058,8 @@ public void testLoadIncrementalModeAndLoadFailure() "-h", SERVER, "--batch-insert-threads", "2", "--max-parse-errors", "1", - "--test-mode", "false" + "--test-mode", "false", + "--parser-threads", "1" )); argsList.addAll(Arrays.asList("--username", "admin", "--password", "pa")); @@ -2259,7 +2260,8 @@ public void testReloadJsonFailureFiles() throws IOException, "-h", SERVER, "--check-vertex", "true", "--batch-insert-threads", "2", - "--test-mode", "false" + "--test-mode", "false", + "--parser-threads", "1" )); argsList.addAll(Arrays.asList("--username", "admin", "--password", "pa")); HugeGraphLoader loader = new HugeGraphLoader(argsList.toArray(new String[0])); @@ -2564,7 +2566,8 @@ public void testSourceOrTargetPrimaryValueNull() { "-g", GRAPH, "-h", SERVER, "--batch-insert-threads", "2", - "--test-mode", "true" + "--test-mode", "true", + "--parser-threads", "1" )); argsList.addAll(Arrays.asList("--username", "admin", "--password", "pa")); @@ -3047,7 +3050,8 @@ public void testReadReachedMaxLines() { "-h", SERVER, "--max-read-lines", "4", "--batch-insert-threads", "2", - "--test-mode", "true" + "--test-mode", "true", + "--parser-threads", "1" }; loadWithAuth(args); diff --git a/hugegraph-loader/src/test/java/org/apache/hugegraph/loader/test/unit/LoadOptionsTest.java b/hugegraph-loader/src/test/java/org/apache/hugegraph/loader/test/unit/LoadOptionsTest.java new file mode 100644 index 000000000..01de0e2b4 --- /dev/null +++ b/hugegraph-loader/src/test/java/org/apache/hugegraph/loader/test/unit/LoadOptionsTest.java @@ -0,0 +1,480 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hugegraph.loader.test.unit; + +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileWriter; +import java.io.PrintStream; +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.LongAdder; + +import org.apache.log4j.AppenderSkeleton; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.apache.log4j.spi.LoggingEvent; + +import org.apache.hugegraph.driver.GraphManager; +import org.apache.hugegraph.driver.HugeClient; +import org.apache.hugegraph.loader.builder.Record; +import org.apache.hugegraph.loader.executor.LoadContext; +import org.apache.hugegraph.loader.executor.LoadOptions; +import org.apache.hugegraph.loader.mapping.EdgeMapping; +import org.apache.hugegraph.loader.mapping.InputStruct; +import org.apache.hugegraph.loader.metrics.LoadMetrics; +import org.apache.hugegraph.loader.metrics.LoadSummary; +import org.apache.hugegraph.loader.progress.LoadProgress; +import org.apache.hugegraph.loader.task.TaskManager; +import org.apache.hugegraph.structure.graph.Edge; +import org.junit.Test; + +import org.apache.hugegraph.testutil.Assert; + +public class LoadOptionsTest { + + @Test + public void testBatchInsertFailureWithFallbackDisabled() throws Exception { + LoadOptions options = new LoadOptions(); + options.batchFailureFallback = false; + Assert.assertFalse(options.batchFailureFallback); + + LoadContext context = newTestContext(options); + TaskManager taskManager = new TaskManager(context); + + EdgeMapping mapping = new EdgeMapping(Arrays.asList("s"), false, + Arrays.asList("t"), false); + mapping.label("knows"); + + InputStruct struct = new InputStruct(new ArrayList<>(), + new ArrayList<>()); + struct.id("1"); + struct.add(mapping); + + LoadSummary summary = context.summary(); + summary.inputMetricsMap() + .put(struct.id(), new LoadMetrics(struct)); + LoadMetrics metrics = summary.metrics(struct); + + setField(context.client(), "graph", newFailingBatchGraphManager()); + + List batch = new ArrayList<>(); + batch.add(new Record("line1", new Edge("knows"))); + batch.add(new Record("line2", new Edge("knows"))); + + ByteArrayOutputStream errOutput = new ByteArrayOutputStream(); + PrintStream originalErr = System.err; + System.setErr(new PrintStream(errOutput, true, + StandardCharsets.UTF_8.name())); + try { + taskManager.submitBatch(struct, mapping, batch); + taskManager.waitFinished(); + + Assert.assertEquals(0L, flightingCount(metrics)); + Assert.assertTrue(context.stopped()); + Assert.assertFalse(context.noError()); + + String errText = errOutput.toString(StandardCharsets.UTF_8.name()); + Assert.assertTrue(errText.contains( + "Batch insert edges failed, stop loading.")); + + long before = flightingCount(metrics); + taskManager.submitBatch(struct, mapping, batch); + taskManager.waitFinished(); + Assert.assertEquals(before, flightingCount(metrics)); + } finally { + System.setErr(originalErr); + taskManager.shutdown(); + } + } + + @Test + public void testBatchInsertFailureWithFallbackEnabled() throws Exception { + LoadOptions options = new LoadOptions(); + + LoadContext context = newTestContext(options); + TaskManager taskManager = new TaskManager(context); + + EdgeMapping mapping = new EdgeMapping(Arrays.asList("s"), false, + Arrays.asList("t"), false); + mapping.label("knows"); + + InputStruct struct = new InputStruct(new ArrayList<>(), + new ArrayList<>()); + struct.id("1"); + struct.add(mapping); + + LoadSummary summary = context.summary(); + summary.inputMetricsMap() + .put(struct.id(), new LoadMetrics(struct)); + LoadMetrics metrics = summary.metrics(struct); + + FailingBatchGraphManager.BATCH_CALLS.set(0); + FailingBatchGraphManager.SINGLE_CALLS.set(0); + setField(context.client(), "graph", newFailingBatchGraphManager()); + + List batch = new ArrayList<>(); + batch.add(new Record("line1", new Edge("knows"))); + batch.add(new Record("line2", new Edge("knows"))); + + try { + taskManager.submitBatch(struct, mapping, batch); + taskManager.waitFinished(); + + Assert.assertEquals(1, FailingBatchGraphManager.BATCH_CALLS.get()); + Assert.assertEquals(2, FailingBatchGraphManager.SINGLE_CALLS.get()); + Assert.assertEquals(0L, flightingCount(metrics)); + Assert.assertFalse(context.stopped()); + Assert.assertTrue(context.noError()); + } finally { + taskManager.shutdown(); + } + } + + @Test + public void testMultipleBatchFailuresCounterConsistency() throws Exception { + LoadOptions options = new LoadOptions(); + options.batchFailureFallback = true; + + LoadContext context = newTestContext(options); + TaskManager taskManager = new TaskManager(context); + + EdgeMapping mapping = new EdgeMapping(Arrays.asList("s"), false, + Arrays.asList("t"), false); + mapping.label("knows"); + + InputStruct struct = new InputStruct(new ArrayList<>(), + new ArrayList<>()); + struct.id("1"); + struct.add(mapping); + + LoadSummary summary = context.summary(); + summary.inputMetricsMap() + .put(struct.id(), new LoadMetrics(struct)); + LoadMetrics metrics = summary.metrics(struct); + + FailingBatchGraphManager.BATCH_CALLS.set(0); + FailingBatchGraphManager.SINGLE_CALLS.set(0); + setField(context.client(), "graph", newFailingBatchGraphManager()); + + List batch1 = new ArrayList<>(); + batch1.add(new Record("line1", new Edge("knows"))); + batch1.add(new Record("line2", new Edge("knows"))); + + List batch2 = new ArrayList<>(); + batch2.add(new Record("line3", new Edge("knows"))); + batch2.add(new Record("line4", new Edge("knows"))); + + try { + taskManager.submitBatch(struct, mapping, batch1); + taskManager.submitBatch(struct, mapping, batch2); + taskManager.waitFinished(); + + Assert.assertEquals(2, FailingBatchGraphManager.BATCH_CALLS.get()); + Assert.assertEquals(4, FailingBatchGraphManager.SINGLE_CALLS.get()); + Assert.assertEquals(0L, flightingCount(metrics)); + Assert.assertFalse(context.stopped()); + Assert.assertTrue(context.noError()); + + int expectedBatchPermits = 1 + options.batchInsertThreads; + int expectedSinglePermits = 2 * options.singleInsertThreads; + Assert.assertEquals(expectedBatchPermits, + getSemaphorePermits(taskManager, "batchSemaphore")); + Assert.assertEquals(expectedSinglePermits, + getSemaphorePermits(taskManager, "singleSemaphore")); + } finally { + taskManager.shutdown(); + } + } + + @Test + public void testConnectionPoolAutoAdjustWithDefaultBatchThreads() throws Exception { + int cpus = readStaticInt(LoadOptions.class, "CPUS"); + LoadOptions options = new LoadOptions(); + + Assert.assertEquals(cpus * 4, options.maxConnections); + Assert.assertEquals(cpus * 2, options.maxConnectionsPerRoute); + } + + @Test + public void testConnectionPoolAutoAdjustWithCustomBatchThreads() throws Exception { + int cpus = readStaticInt(LoadOptions.class, "CPUS"); + int defaultMaxConn = readStaticInt(LoadOptions.class, "DEFAULT_MAX_CONNECTIONS"); + int defaultMaxConnPerRoute = readStaticInt(LoadOptions.class, + "DEFAULT_MAX_CONNECTIONS_PER_ROUTE"); + LoadOptions options = new LoadOptions(); + options.batchInsertThreads = 20; + + CapturingAppender appender = attachAppender(); + try { + invokeAdjustConnectionPool(options); + } finally { + detachAppender(appender); + } + + int expectedMaxConn = defaultMaxConn; + int expectedMaxConnPerRoute = defaultMaxConnPerRoute; + if (defaultMaxConn == cpus * 4 && defaultMaxConn < 80) { + expectedMaxConn = 80; + } + if (defaultMaxConnPerRoute == cpus * 2 && defaultMaxConnPerRoute < 40) { + expectedMaxConnPerRoute = 40; + } + + Assert.assertEquals(expectedMaxConn, options.maxConnections); + Assert.assertEquals(expectedMaxConnPerRoute, options.maxConnectionsPerRoute); + if (expectedMaxConn == 80 || expectedMaxConnPerRoute == 40) { + Assert.assertTrue(appender.contains("Auto adjusted max-conn")); + } + } + + @Test + public void testConnectionPoolNoAdjustWithCustomMaxConn() throws Exception { + LoadOptions options = new LoadOptions(); + options.batchInsertThreads = 20; + options.maxConnections = 100; + options.maxConnectionsPerRoute = 50; + + CapturingAppender appender = attachAppender(); + try { + invokeAdjustConnectionPool(options); + } finally { + detachAppender(appender); + } + + Assert.assertEquals(100, options.maxConnections); + Assert.assertEquals(50, options.maxConnectionsPerRoute); + Assert.assertFalse(appender.contains("Auto adjusted max-conn")); + } + + @Test + public void testParseThreadsMinValue() { + LoadOptions.PositiveValidator validator = + new LoadOptions.PositiveValidator(); + + validator.validate("--parser-threads", "1"); + + Assert.assertTrue(validateFails(validator, "--parser-threads", "0")); + Assert.assertTrue(validateFails(validator, "--parser-threads", "-1")); + } + + @Test + public void testParseThreadsDefaultValue() throws Exception { + int cpus = readStaticInt(LoadOptions.class, "CPUS"); + LoadOptions options = new LoadOptions(); + Assert.assertEquals(Math.max(2, cpus / 2), options.parseThreads); + } + + @Test + public void testDeprecatedParallelCountParameter() throws Exception { + File mapping = createTempMapping(); + String[] args = new String[]{ + "-f", mapping.getPath(), + "-g", "g", + "-h", "localhost", + "--parallel-count", "4" + }; + + CapturingAppender appender = attachAppender(); + try { + LoadOptions options = LoadOptions.parseOptions(args); + Assert.assertEquals(4, options.parseThreads); + Assert.assertTrue(appender.contains("deprecated")); + } finally { + detachAppender(appender); + mapping.delete(); + } + } + + private static long flightingCount(LoadMetrics metrics) + throws Exception { + Field field = LoadMetrics.class.getDeclaredField("flightingNums"); + field.setAccessible(true); + LongAdder adder = (LongAdder) field.get(metrics); + return adder.longValue(); + } + + private static LoadContext newTestContext(LoadOptions options) + throws Exception { + LoadContext context = (LoadContext) allocateInstance(LoadContext.class); + setField(context, "timestamp", "test"); + setField(context, "closed", false); + setField(context, "stopped", false); + setField(context, "noError", true); + setField(context, "options", options); + setField(context, "summary", new LoadSummary()); + setField(context, "oldProgress", new LoadProgress()); + setField(context, "newProgress", new LoadProgress()); + setField(context, "loggers", new ConcurrentHashMap<>()); + + HugeClient client = (HugeClient) allocateInstance(HugeClient.class); + setField(context, "client", client); + setField(context, "indirectClient", client); + setField(context, "schemaCache", null); + setField(context, "parseGroup", null); + return context; + } + + private static Object allocateInstance(Class type) throws Exception { + Object unsafe = unsafe(); + Method method = unsafe.getClass() + .getMethod("allocateInstance", Class.class); + return method.invoke(unsafe, type); + } + + private static Object unsafe() throws Exception { + Class unsafeClass; + try { + unsafeClass = Class.forName("sun.misc.Unsafe"); + } catch (ClassNotFoundException e) { + unsafeClass = Class.forName("jdk.internal.misc.Unsafe"); + } + Field field = unsafeClass.getDeclaredField("theUnsafe"); + field.setAccessible(true); + return field.get(null); + } + + private static void setField(Object target, String name, Object value) + throws Exception { + Field field = target.getClass().getDeclaredField(name); + field.setAccessible(true); + field.set(target, value); + } + + private static int getSemaphorePermits(Object target, String name) + throws Exception { + Field field = target.getClass().getDeclaredField(name); + field.setAccessible(true); + Semaphore semaphore = (Semaphore) field.get(target); + return semaphore.availablePermits(); + } + + private static int readStaticInt(Class type, String name) + throws Exception { + Field field = type.getDeclaredField(name); + field.setAccessible(true); + return field.getInt(null); + } + + private static void invokeAdjustConnectionPool(LoadOptions options) + throws Exception { + Method method = LoadOptions.class + .getDeclaredMethod("adjustConnectionPoolIfDefault", + LoadOptions.class); + method.setAccessible(true); + method.invoke(null, options); + } + + private static boolean validateFails(LoadOptions.PositiveValidator validator, + String name, String value) { + try { + validator.validate(name, value); + return false; + } catch (Exception ignored) { + return true; + } + } + + private static File createTempMapping() throws Exception { + File file = File.createTempFile("load-options-", ".json", new File(".")); + try (FileWriter writer = new FileWriter(file)) { + writer.write("{\"version\":\"2.0\",\"structs\":[]}"); + } + return file; + } + + private static CapturingAppender attachAppender() { + Logger logger = Logger.getLogger(LoadOptions.class.getName()); + CapturingAppender appender = new CapturingAppender(); + appender.setThreshold(Level.INFO); + logger.addAppender(appender); + return appender; + } + + private static void detachAppender(CapturingAppender appender) { + if (appender == null) { + return; + } + Logger logger = Logger.getLogger(LoadOptions.class.getName()); + logger.removeAppender(appender); + } + + private static final class CapturingAppender extends AppenderSkeleton { + + private final StringBuilder buffer = new StringBuilder(); + + @Override + protected void append(LoggingEvent event) { + if (event == null || event.getRenderedMessage() == null) { + return; + } + buffer.append(event.getRenderedMessage()).append('\n'); + } + + boolean contains(String text) { + return this.buffer.toString().contains(text); + } + + @Override + public void close() { + // No-op. + } + + @Override + public boolean requiresLayout() { + return false; + } + } + + private static GraphManager newFailingBatchGraphManager() throws Exception { + return (GraphManager) allocateInstance(FailingBatchGraphManager.class); + } + + private static final class FailingBatchGraphManager extends GraphManager { + + private static final AtomicInteger BATCH_CALLS = new AtomicInteger(); + private static final AtomicInteger SINGLE_CALLS = new AtomicInteger(); + + private FailingBatchGraphManager() { + super(null, null, null); + } + + @Override + public List addEdges(List edges, boolean checkVertex) { + return this.addEdges(edges); + } + + @Override + public List addEdges(List edges) { + if (edges.size() > 1) { + BATCH_CALLS.incrementAndGet(); + throw new RuntimeException("batch insert failure"); + } + SINGLE_CALLS.addAndGet(edges.size()); + return edges; + } + } + +} diff --git a/hugegraph-loader/src/test/java/org/apache/hugegraph/loader/test/unit/TaskManagerFailureTest.java b/hugegraph-loader/src/test/java/org/apache/hugegraph/loader/test/unit/TaskManagerFailureTest.java new file mode 100644 index 000000000..66428d29f --- /dev/null +++ b/hugegraph-loader/src/test/java/org/apache/hugegraph/loader/test/unit/TaskManagerFailureTest.java @@ -0,0 +1,348 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hugegraph.loader.test.unit; + +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.LongAdder; + +import org.apache.hugegraph.driver.GraphManager; +import org.apache.hugegraph.driver.HugeClient; +import org.apache.hugegraph.loader.builder.Record; +import org.apache.hugegraph.loader.executor.LoadContext; +import org.apache.hugegraph.loader.executor.LoadOptions; +import org.apache.hugegraph.loader.mapping.EdgeMapping; +import org.apache.hugegraph.loader.mapping.InputStruct; +import org.apache.hugegraph.loader.metrics.LoadMetrics; +import org.apache.hugegraph.loader.metrics.LoadSummary; +import org.apache.hugegraph.loader.progress.LoadProgress; +import org.apache.hugegraph.loader.task.TaskManager; +import org.apache.hugegraph.structure.graph.Edge; +import org.junit.Test; + +import org.apache.hugegraph.testutil.Assert; + +public class TaskManagerFailureTest { + + @Test + public void testConcurrentSubmitWhenStopping() throws Exception { + LoadOptions options = new LoadOptions(); + options.batchFailureFallback = false; + options.batchInsertThreads = 2; + options.singleInsertThreads = 1; + + LoadContext context = newTestContext(options); + TaskManager taskManager = new TaskManager(context); + + EdgeMapping mapping = new EdgeMapping(Arrays.asList("s"), false, + Arrays.asList("t"), false); + mapping.label("knows"); + + InputStruct struct = new InputStruct(new ArrayList<>(), + new ArrayList<>()); + struct.id("1"); + struct.add(mapping); + + LoadSummary summary = context.summary(); + summary.inputMetricsMap() + .put(struct.id(), new LoadMetrics(struct)); + LoadMetrics metrics = summary.metrics(struct); + + CountDownLatch firstStarted = new CountDownLatch(1); + CountDownLatch allowFirstFinish = new CountDownLatch(1); + CountDownLatch failureCalled = new CountDownLatch(1); + FailingConcurrentGraphManager.BATCH_CALLS.set(0); + FailingConcurrentGraphManager.FIRST_STARTED = firstStarted; + FailingConcurrentGraphManager.ALLOW_FIRST_FINISH = allowFirstFinish; + FailingConcurrentGraphManager.FAILURE_CALLED = failureCalled; + setField(context.client(), "graph", newFailingConcurrentGraphManager()); + + List batch = new ArrayList<>(); + batch.add(new Record("line1", new Edge("knows"))); + batch.add(new Record("line2", new Edge("knows"))); + + ExecutorService executor = Executors.newFixedThreadPool(10); + List> futures = new ArrayList<>(); + try { + for (int i = 0; i < 10; i++) { + futures.add(executor.submit(() -> { + taskManager.submitBatch(struct, mapping, batch); + })); + } + + Assert.assertTrue(firstStarted.await(5, TimeUnit.SECONDS)); + Assert.assertTrue(failureCalled.await(5, TimeUnit.SECONDS)); + waitStopped(context, 5, TimeUnit.SECONDS); + allowFirstFinish.countDown(); + + for (Future future : futures) { + future.get(5, TimeUnit.SECONDS); + } + + taskManager.waitFinished(); + + int batchCalls = FailingConcurrentGraphManager.BATCH_CALLS.get(); + Assert.assertTrue(batchCalls >= 2 && batchCalls <= 3); + Assert.assertEquals(0L, flightingCount(metrics)); + Assert.assertTrue(context.stopped()); + Assert.assertFalse(context.noError()); + + long before = FailingConcurrentGraphManager.BATCH_CALLS.get(); + taskManager.submitBatch(struct, mapping, batch); + taskManager.waitFinished(); + Assert.assertEquals(before, FailingConcurrentGraphManager.BATCH_CALLS.get()); + + int expectedBatchPermits = 1 + options.batchInsertThreads; + int expectedSinglePermits = 2 * options.singleInsertThreads; + Assert.assertEquals(expectedBatchPermits, + getSemaphorePermits(taskManager, "batchSemaphore")); + Assert.assertEquals(expectedSinglePermits, + getSemaphorePermits(taskManager, "singleSemaphore")); + } finally { + allowFirstFinish.countDown(); + executor.shutdownNow(); + taskManager.shutdown(); + } + } + + @Test + public void testStopCheckTimingInSubmitBatch() throws Exception { + LoadOptions options = new LoadOptions(); + options.batchFailureFallback = false; + options.batchInsertThreads = 1; + options.singleInsertThreads = 1; + + LoadContext context = newTestContext(options); + TaskManager taskManager = new TaskManager(context); + + EdgeMapping mapping = new EdgeMapping(Arrays.asList("s"), false, + Arrays.asList("t"), false); + mapping.label("knows"); + + InputStruct struct = new InputStruct(new ArrayList<>(), + new ArrayList<>()); + struct.id("1"); + struct.add(mapping); + + LoadSummary summary = context.summary(); + summary.inputMetricsMap() + .put(struct.id(), new LoadMetrics(struct)); + LoadMetrics metrics = summary.metrics(struct); + + setField(context.client(), "graph", newSimpleGraphManager()); + + List batch = new ArrayList<>(); + batch.add(new Record("line1", new Edge("knows"))); + batch.add(new Record("line2", new Edge("knows"))); + + ExecutorService executor = Executors.newFixedThreadPool(2); + try { + taskManager.submitBatch(struct, mapping, batch); + taskManager.waitFinished(); + + Semaphore semaphore = getSemaphore(taskManager, "batchSemaphore"); + semaphore.acquire(); + + Future blocked = executor.submit(() -> { + taskManager.submitBatch(struct, mapping, batch); + }); + + Thread.sleep(50); + context.stopLoading(); + semaphore.release(); + + blocked.get(5, TimeUnit.SECONDS); + + taskManager.waitFinished(); + + Assert.assertTrue(context.stopped()); + Assert.assertEquals(0L, flightingCount(metrics)); + int expectedPermits = 1 + options.batchInsertThreads; + Assert.assertEquals(expectedPermits, + getSemaphorePermits(taskManager, "batchSemaphore")); + } finally { + executor.shutdownNow(); + taskManager.shutdown(); + } + } + + private static void waitStopped(LoadContext context, long timeout, + TimeUnit unit) throws Exception { + long deadline = System.nanoTime() + unit.toNanos(timeout); + while (!context.stopped() && System.nanoTime() < deadline) { + Thread.sleep(10); + } + Assert.assertTrue(context.stopped()); + } + + private static long flightingCount(LoadMetrics metrics) + throws Exception { + Field field = LoadMetrics.class.getDeclaredField("flightingNums"); + field.setAccessible(true); + LongAdder adder = (LongAdder) field.get(metrics); + return adder.longValue(); + } + + private static LoadContext newTestContext(LoadOptions options) + throws Exception { + LoadContext context = (LoadContext) allocateInstance(LoadContext.class); + setField(context, "timestamp", "test"); + setField(context, "closed", false); + setField(context, "stopped", false); + setField(context, "noError", true); + setField(context, "options", options); + setField(context, "summary", new LoadSummary()); + setField(context, "oldProgress", new LoadProgress()); + setField(context, "newProgress", new LoadProgress()); + setField(context, "loggers", new ConcurrentHashMap<>()); + + HugeClient client = (HugeClient) allocateInstance(HugeClient.class); + setField(context, "client", client); + setField(context, "indirectClient", client); + setField(context, "schemaCache", null); + setField(context, "parseGroup", null); + return context; + } + + private static Object allocateInstance(Class type) throws Exception { + Object unsafe = unsafe(); + Method method = unsafe.getClass() + .getMethod("allocateInstance", Class.class); + return method.invoke(unsafe, type); + } + + private static Object unsafe() throws Exception { + Class unsafeClass; + try { + unsafeClass = Class.forName("sun.misc.Unsafe"); + } catch (ClassNotFoundException e) { + unsafeClass = Class.forName("jdk.internal.misc.Unsafe"); + } + Field field = unsafeClass.getDeclaredField("theUnsafe"); + field.setAccessible(true); + return field.get(null); + } + + private static void setField(Object target, String name, Object value) + throws Exception { + Field field = target.getClass().getDeclaredField(name); + field.setAccessible(true); + field.set(target, value); + } + + private static int getSemaphorePermits(Object target, String name) + throws Exception { + Field field = target.getClass().getDeclaredField(name); + field.setAccessible(true); + Semaphore semaphore = (Semaphore) field.get(target); + return semaphore.availablePermits(); + } + + private static Semaphore getSemaphore(Object target, String name) + throws Exception { + Field field = target.getClass().getDeclaredField(name); + field.setAccessible(true); + return (Semaphore) field.get(target); + } + + private static GraphManager newFailingConcurrentGraphManager() + throws Exception { + return (GraphManager) allocateInstance(FailingConcurrentGraphManager.class); + } + + private static GraphManager newSimpleGraphManager() throws Exception { + return (GraphManager) allocateInstance(SimpleGraphManager.class); + } + + private static final class SimpleGraphManager extends GraphManager { + + private SimpleGraphManager() { + super(null, null, null); + } + + @Override + public List addEdges(List edges, boolean checkVertex) { + return this.addEdges(edges); + } + + @Override + public List addEdges(List edges) { + return edges; + } + } + + private static final class FailingConcurrentGraphManager extends GraphManager { + + private static final AtomicInteger BATCH_CALLS = new AtomicInteger(); + private static volatile CountDownLatch FIRST_STARTED; + private static volatile CountDownLatch ALLOW_FIRST_FINISH; + private static volatile CountDownLatch FAILURE_CALLED; + + private FailingConcurrentGraphManager() { + super(null, null, null); + } + + @Override + public List addEdges(List edges, boolean checkVertex) { + return this.addEdges(edges); + } + + @Override + public List addEdges(List edges) { + int call = BATCH_CALLS.incrementAndGet(); + if (call == 1) { + CountDownLatch started = FIRST_STARTED; + if (started != null) { + started.countDown(); + } + await(ALLOW_FIRST_FINISH); + return edges; + } + if (call == 2) { + CountDownLatch failed = FAILURE_CALLED; + if (failed != null) { + failed.countDown(); + } + throw new RuntimeException("batch insert failure"); + } + return edges; + } + + private void await(CountDownLatch latch) { + if (latch == null) { + return; + } + try { + latch.await(5, TimeUnit.SECONDS); + } catch (InterruptedException ignored) { + // Let the task finish on interruption. + } + } + } +} From 71644b6729cd98c9d49fb4d2c6e02eeaa4f5d5a8 Mon Sep 17 00:00:00 2001 From: ken <2979602290@qq.com> Date: Tue, 20 Jan 2026 09:17:07 +0800 Subject: [PATCH 16/31] [feat] Add unit tests --- .../loader/test/unit/LoadOptionsTest.java | 267 ------------------ ...rFailureTest.java => TaskManagerTest.java} | 189 ++++++++++++- 2 files changed, 188 insertions(+), 268 deletions(-) rename hugegraph-loader/src/test/java/org/apache/hugegraph/loader/test/unit/{TaskManagerFailureTest.java => TaskManagerTest.java} (64%) diff --git a/hugegraph-loader/src/test/java/org/apache/hugegraph/loader/test/unit/LoadOptionsTest.java b/hugegraph-loader/src/test/java/org/apache/hugegraph/loader/test/unit/LoadOptionsTest.java index 01de0e2b4..b327f59ef 100644 --- a/hugegraph-loader/src/test/java/org/apache/hugegraph/loader/test/unit/LoadOptionsTest.java +++ b/hugegraph-loader/src/test/java/org/apache/hugegraph/loader/test/unit/LoadOptionsTest.java @@ -17,198 +17,23 @@ package org.apache.hugegraph.loader.test.unit; -import java.io.ByteArrayOutputStream; import java.io.File; import java.io.FileWriter; -import java.io.PrintStream; import java.lang.reflect.Field; import java.lang.reflect.Method; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Semaphore; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.LongAdder; import org.apache.log4j.AppenderSkeleton; import org.apache.log4j.Level; import org.apache.log4j.Logger; import org.apache.log4j.spi.LoggingEvent; -import org.apache.hugegraph.driver.GraphManager; -import org.apache.hugegraph.driver.HugeClient; -import org.apache.hugegraph.loader.builder.Record; -import org.apache.hugegraph.loader.executor.LoadContext; import org.apache.hugegraph.loader.executor.LoadOptions; -import org.apache.hugegraph.loader.mapping.EdgeMapping; -import org.apache.hugegraph.loader.mapping.InputStruct; -import org.apache.hugegraph.loader.metrics.LoadMetrics; -import org.apache.hugegraph.loader.metrics.LoadSummary; -import org.apache.hugegraph.loader.progress.LoadProgress; -import org.apache.hugegraph.loader.task.TaskManager; -import org.apache.hugegraph.structure.graph.Edge; import org.junit.Test; import org.apache.hugegraph.testutil.Assert; public class LoadOptionsTest { - @Test - public void testBatchInsertFailureWithFallbackDisabled() throws Exception { - LoadOptions options = new LoadOptions(); - options.batchFailureFallback = false; - Assert.assertFalse(options.batchFailureFallback); - - LoadContext context = newTestContext(options); - TaskManager taskManager = new TaskManager(context); - - EdgeMapping mapping = new EdgeMapping(Arrays.asList("s"), false, - Arrays.asList("t"), false); - mapping.label("knows"); - - InputStruct struct = new InputStruct(new ArrayList<>(), - new ArrayList<>()); - struct.id("1"); - struct.add(mapping); - - LoadSummary summary = context.summary(); - summary.inputMetricsMap() - .put(struct.id(), new LoadMetrics(struct)); - LoadMetrics metrics = summary.metrics(struct); - - setField(context.client(), "graph", newFailingBatchGraphManager()); - - List batch = new ArrayList<>(); - batch.add(new Record("line1", new Edge("knows"))); - batch.add(new Record("line2", new Edge("knows"))); - - ByteArrayOutputStream errOutput = new ByteArrayOutputStream(); - PrintStream originalErr = System.err; - System.setErr(new PrintStream(errOutput, true, - StandardCharsets.UTF_8.name())); - try { - taskManager.submitBatch(struct, mapping, batch); - taskManager.waitFinished(); - - Assert.assertEquals(0L, flightingCount(metrics)); - Assert.assertTrue(context.stopped()); - Assert.assertFalse(context.noError()); - - String errText = errOutput.toString(StandardCharsets.UTF_8.name()); - Assert.assertTrue(errText.contains( - "Batch insert edges failed, stop loading.")); - - long before = flightingCount(metrics); - taskManager.submitBatch(struct, mapping, batch); - taskManager.waitFinished(); - Assert.assertEquals(before, flightingCount(metrics)); - } finally { - System.setErr(originalErr); - taskManager.shutdown(); - } - } - - @Test - public void testBatchInsertFailureWithFallbackEnabled() throws Exception { - LoadOptions options = new LoadOptions(); - - LoadContext context = newTestContext(options); - TaskManager taskManager = new TaskManager(context); - - EdgeMapping mapping = new EdgeMapping(Arrays.asList("s"), false, - Arrays.asList("t"), false); - mapping.label("knows"); - - InputStruct struct = new InputStruct(new ArrayList<>(), - new ArrayList<>()); - struct.id("1"); - struct.add(mapping); - - LoadSummary summary = context.summary(); - summary.inputMetricsMap() - .put(struct.id(), new LoadMetrics(struct)); - LoadMetrics metrics = summary.metrics(struct); - - FailingBatchGraphManager.BATCH_CALLS.set(0); - FailingBatchGraphManager.SINGLE_CALLS.set(0); - setField(context.client(), "graph", newFailingBatchGraphManager()); - - List batch = new ArrayList<>(); - batch.add(new Record("line1", new Edge("knows"))); - batch.add(new Record("line2", new Edge("knows"))); - - try { - taskManager.submitBatch(struct, mapping, batch); - taskManager.waitFinished(); - - Assert.assertEquals(1, FailingBatchGraphManager.BATCH_CALLS.get()); - Assert.assertEquals(2, FailingBatchGraphManager.SINGLE_CALLS.get()); - Assert.assertEquals(0L, flightingCount(metrics)); - Assert.assertFalse(context.stopped()); - Assert.assertTrue(context.noError()); - } finally { - taskManager.shutdown(); - } - } - - @Test - public void testMultipleBatchFailuresCounterConsistency() throws Exception { - LoadOptions options = new LoadOptions(); - options.batchFailureFallback = true; - - LoadContext context = newTestContext(options); - TaskManager taskManager = new TaskManager(context); - - EdgeMapping mapping = new EdgeMapping(Arrays.asList("s"), false, - Arrays.asList("t"), false); - mapping.label("knows"); - - InputStruct struct = new InputStruct(new ArrayList<>(), - new ArrayList<>()); - struct.id("1"); - struct.add(mapping); - - LoadSummary summary = context.summary(); - summary.inputMetricsMap() - .put(struct.id(), new LoadMetrics(struct)); - LoadMetrics metrics = summary.metrics(struct); - - FailingBatchGraphManager.BATCH_CALLS.set(0); - FailingBatchGraphManager.SINGLE_CALLS.set(0); - setField(context.client(), "graph", newFailingBatchGraphManager()); - - List batch1 = new ArrayList<>(); - batch1.add(new Record("line1", new Edge("knows"))); - batch1.add(new Record("line2", new Edge("knows"))); - - List batch2 = new ArrayList<>(); - batch2.add(new Record("line3", new Edge("knows"))); - batch2.add(new Record("line4", new Edge("knows"))); - - try { - taskManager.submitBatch(struct, mapping, batch1); - taskManager.submitBatch(struct, mapping, batch2); - taskManager.waitFinished(); - - Assert.assertEquals(2, FailingBatchGraphManager.BATCH_CALLS.get()); - Assert.assertEquals(4, FailingBatchGraphManager.SINGLE_CALLS.get()); - Assert.assertEquals(0L, flightingCount(metrics)); - Assert.assertFalse(context.stopped()); - Assert.assertTrue(context.noError()); - - int expectedBatchPermits = 1 + options.batchInsertThreads; - int expectedSinglePermits = 2 * options.singleInsertThreads; - Assert.assertEquals(expectedBatchPermits, - getSemaphorePermits(taskManager, "batchSemaphore")); - Assert.assertEquals(expectedSinglePermits, - getSemaphorePermits(taskManager, "singleSemaphore")); - } finally { - taskManager.shutdown(); - } - } - @Test public void testConnectionPoolAutoAdjustWithDefaultBatchThreads() throws Exception { int cpus = readStaticInt(LoadOptions.class, "CPUS"); @@ -308,69 +133,6 @@ public void testDeprecatedParallelCountParameter() throws Exception { } } - private static long flightingCount(LoadMetrics metrics) - throws Exception { - Field field = LoadMetrics.class.getDeclaredField("flightingNums"); - field.setAccessible(true); - LongAdder adder = (LongAdder) field.get(metrics); - return adder.longValue(); - } - - private static LoadContext newTestContext(LoadOptions options) - throws Exception { - LoadContext context = (LoadContext) allocateInstance(LoadContext.class); - setField(context, "timestamp", "test"); - setField(context, "closed", false); - setField(context, "stopped", false); - setField(context, "noError", true); - setField(context, "options", options); - setField(context, "summary", new LoadSummary()); - setField(context, "oldProgress", new LoadProgress()); - setField(context, "newProgress", new LoadProgress()); - setField(context, "loggers", new ConcurrentHashMap<>()); - - HugeClient client = (HugeClient) allocateInstance(HugeClient.class); - setField(context, "client", client); - setField(context, "indirectClient", client); - setField(context, "schemaCache", null); - setField(context, "parseGroup", null); - return context; - } - - private static Object allocateInstance(Class type) throws Exception { - Object unsafe = unsafe(); - Method method = unsafe.getClass() - .getMethod("allocateInstance", Class.class); - return method.invoke(unsafe, type); - } - - private static Object unsafe() throws Exception { - Class unsafeClass; - try { - unsafeClass = Class.forName("sun.misc.Unsafe"); - } catch (ClassNotFoundException e) { - unsafeClass = Class.forName("jdk.internal.misc.Unsafe"); - } - Field field = unsafeClass.getDeclaredField("theUnsafe"); - field.setAccessible(true); - return field.get(null); - } - - private static void setField(Object target, String name, Object value) - throws Exception { - Field field = target.getClass().getDeclaredField(name); - field.setAccessible(true); - field.set(target, value); - } - - private static int getSemaphorePermits(Object target, String name) - throws Exception { - Field field = target.getClass().getDeclaredField(name); - field.setAccessible(true); - Semaphore semaphore = (Semaphore) field.get(target); - return semaphore.availablePermits(); - } - private static int readStaticInt(Class type, String name) throws Exception { Field field = type.getDeclaredField(name); @@ -448,33 +210,4 @@ public boolean requiresLayout() { } } - private static GraphManager newFailingBatchGraphManager() throws Exception { - return (GraphManager) allocateInstance(FailingBatchGraphManager.class); - } - - private static final class FailingBatchGraphManager extends GraphManager { - - private static final AtomicInteger BATCH_CALLS = new AtomicInteger(); - private static final AtomicInteger SINGLE_CALLS = new AtomicInteger(); - - private FailingBatchGraphManager() { - super(null, null, null); - } - - @Override - public List addEdges(List edges, boolean checkVertex) { - return this.addEdges(edges); - } - - @Override - public List addEdges(List edges) { - if (edges.size() > 1) { - BATCH_CALLS.incrementAndGet(); - throw new RuntimeException("batch insert failure"); - } - SINGLE_CALLS.addAndGet(edges.size()); - return edges; - } - } - } diff --git a/hugegraph-loader/src/test/java/org/apache/hugegraph/loader/test/unit/TaskManagerFailureTest.java b/hugegraph-loader/src/test/java/org/apache/hugegraph/loader/test/unit/TaskManagerTest.java similarity index 64% rename from hugegraph-loader/src/test/java/org/apache/hugegraph/loader/test/unit/TaskManagerFailureTest.java rename to hugegraph-loader/src/test/java/org/apache/hugegraph/loader/test/unit/TaskManagerTest.java index 66428d29f..db317d6ea 100644 --- a/hugegraph-loader/src/test/java/org/apache/hugegraph/loader/test/unit/TaskManagerFailureTest.java +++ b/hugegraph-loader/src/test/java/org/apache/hugegraph/loader/test/unit/TaskManagerTest.java @@ -17,8 +17,11 @@ package org.apache.hugegraph.loader.test.unit; +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; import java.lang.reflect.Field; import java.lang.reflect.Method; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -48,7 +51,162 @@ import org.apache.hugegraph.testutil.Assert; -public class TaskManagerFailureTest { +public class TaskManagerTest { + + @Test + public void testBatchInsertFailureWithFallbackDisabled() throws Exception { + LoadOptions options = new LoadOptions(); + options.batchFailureFallback = false; + Assert.assertFalse(options.batchFailureFallback); + + LoadContext context = newTestContext(options); + TaskManager taskManager = new TaskManager(context); + + EdgeMapping mapping = new EdgeMapping(Arrays.asList("s"), false, + Arrays.asList("t"), false); + mapping.label("knows"); + + InputStruct struct = new InputStruct(new ArrayList<>(), + new ArrayList<>()); + struct.id("1"); + struct.add(mapping); + + LoadSummary summary = context.summary(); + summary.inputMetricsMap() + .put(struct.id(), new LoadMetrics(struct)); + LoadMetrics metrics = summary.metrics(struct); + + setField(context.client(), "graph", newFailingBatchGraphManager()); + + List batch = new ArrayList<>(); + batch.add(new Record("line1", new Edge("knows"))); + batch.add(new Record("line2", new Edge("knows"))); + + ByteArrayOutputStream errOutput = new ByteArrayOutputStream(); + PrintStream originalErr = System.err; + System.setErr(new PrintStream(errOutput, true, + StandardCharsets.UTF_8.name())); + try { + taskManager.submitBatch(struct, mapping, batch); + taskManager.waitFinished(); + + Assert.assertEquals(0L, flightingCount(metrics)); + Assert.assertTrue(context.stopped()); + Assert.assertFalse(context.noError()); + + String errText = errOutput.toString(StandardCharsets.UTF_8.name()); + Assert.assertTrue(errText.contains( + "Batch insert edges failed, stop loading.")); + + long before = flightingCount(metrics); + taskManager.submitBatch(struct, mapping, batch); + taskManager.waitFinished(); + Assert.assertEquals(before, flightingCount(metrics)); + } finally { + System.setErr(originalErr); + taskManager.shutdown(); + } + } + + @Test + public void testBatchInsertFailureWithFallbackEnabled() throws Exception { + LoadOptions options = new LoadOptions(); + options.batchFailureFallback = true; + + LoadContext context = newTestContext(options); + TaskManager taskManager = new TaskManager(context); + + EdgeMapping mapping = new EdgeMapping(Arrays.asList("s"), false, + Arrays.asList("t"), false); + mapping.label("knows"); + + InputStruct struct = new InputStruct(new ArrayList<>(), + new ArrayList<>()); + struct.id("1"); + struct.add(mapping); + + LoadSummary summary = context.summary(); + summary.inputMetricsMap() + .put(struct.id(), new LoadMetrics(struct)); + LoadMetrics metrics = summary.metrics(struct); + + FailingBatchGraphManager.BATCH_CALLS.set(0); + FailingBatchGraphManager.SINGLE_CALLS.set(0); + setField(context.client(), "graph", newFailingBatchGraphManager()); + + List batch = new ArrayList<>(); + batch.add(new Record("line1", new Edge("knows"))); + batch.add(new Record("line2", new Edge("knows"))); + + try { + taskManager.submitBatch(struct, mapping, batch); + taskManager.waitFinished(); + + Assert.assertEquals(1, FailingBatchGraphManager.BATCH_CALLS.get()); + Assert.assertEquals(2, FailingBatchGraphManager.SINGLE_CALLS.get()); + Assert.assertEquals(0L, flightingCount(metrics)); + Assert.assertFalse(context.stopped()); + Assert.assertTrue(context.noError()); + } finally { + taskManager.shutdown(); + } + } + + @Test + public void testMultipleBatchFailuresCounterConsistency() throws Exception { + LoadOptions options = new LoadOptions(); + options.batchFailureFallback = true; + + LoadContext context = newTestContext(options); + TaskManager taskManager = new TaskManager(context); + + EdgeMapping mapping = new EdgeMapping(Arrays.asList("s"), false, + Arrays.asList("t"), false); + mapping.label("knows"); + + InputStruct struct = new InputStruct(new ArrayList<>(), + new ArrayList<>()); + struct.id("1"); + struct.add(mapping); + + LoadSummary summary = context.summary(); + summary.inputMetricsMap() + .put(struct.id(), new LoadMetrics(struct)); + LoadMetrics metrics = summary.metrics(struct); + + FailingBatchGraphManager.BATCH_CALLS.set(0); + FailingBatchGraphManager.SINGLE_CALLS.set(0); + setField(context.client(), "graph", newFailingBatchGraphManager()); + + List batch1 = new ArrayList<>(); + batch1.add(new Record("line1", new Edge("knows"))); + batch1.add(new Record("line2", new Edge("knows"))); + + List batch2 = new ArrayList<>(); + batch2.add(new Record("line3", new Edge("knows"))); + batch2.add(new Record("line4", new Edge("knows"))); + + try { + taskManager.submitBatch(struct, mapping, batch1); + taskManager.submitBatch(struct, mapping, batch2); + taskManager.waitFinished(); + + Assert.assertEquals(2, FailingBatchGraphManager.BATCH_CALLS.get()); + Assert.assertEquals(4, FailingBatchGraphManager.SINGLE_CALLS.get()); + Assert.assertEquals(0L, flightingCount(metrics)); + Assert.assertFalse(context.stopped()); + Assert.assertTrue(context.noError()); + + int expectedBatchPermits = 1 + options.batchInsertThreads; + int expectedSinglePermits = 2 * options.singleInsertThreads; + Assert.assertEquals(expectedBatchPermits, + getSemaphorePermits(taskManager, "batchSemaphore")); + Assert.assertEquals(expectedSinglePermits, + getSemaphorePermits(taskManager, "singleSemaphore")); + } finally { + taskManager.shutdown(); + } + } @Test public void testConcurrentSubmitWhenStopping() throws Exception { @@ -276,6 +434,10 @@ private static GraphManager newFailingConcurrentGraphManager() return (GraphManager) allocateInstance(FailingConcurrentGraphManager.class); } + private static GraphManager newFailingBatchGraphManager() throws Exception { + return (GraphManager) allocateInstance(FailingBatchGraphManager.class); + } + private static GraphManager newSimpleGraphManager() throws Exception { return (GraphManager) allocateInstance(SimpleGraphManager.class); } @@ -345,4 +507,29 @@ private void await(CountDownLatch latch) { } } } + + private static final class FailingBatchGraphManager extends GraphManager { + + private static final AtomicInteger BATCH_CALLS = new AtomicInteger(); + private static final AtomicInteger SINGLE_CALLS = new AtomicInteger(); + + private FailingBatchGraphManager() { + super(null, null, null); + } + + @Override + public List addEdges(List edges, boolean checkVertex) { + return this.addEdges(edges); + } + + @Override + public List addEdges(List edges) { + if (edges.size() > 1) { + BATCH_CALLS.incrementAndGet(); + throw new RuntimeException("batch insert failure"); + } + SINGLE_CALLS.addAndGet(edges.size()); + return edges; + } + } } From 78fa442088f00c1e56a11b3b73fa70f1355d9504 Mon Sep 17 00:00:00 2001 From: ken <2979602290@qq.com> Date: Tue, 20 Jan 2026 11:03:09 +0800 Subject: [PATCH 17/31] [feat] Add unit tests --- .../apache/hugegraph/loader/test/functional/FileLoadTest.java | 3 ++- .../org/apache/hugegraph/loader/test/unit/TaskManagerTest.java | 1 + 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/hugegraph-loader/src/test/java/org/apache/hugegraph/loader/test/functional/FileLoadTest.java b/hugegraph-loader/src/test/java/org/apache/hugegraph/loader/test/functional/FileLoadTest.java index 80e5976d5..ea1460ef3 100644 --- a/hugegraph-loader/src/test/java/org/apache/hugegraph/loader/test/functional/FileLoadTest.java +++ b/hugegraph-loader/src/test/java/org/apache/hugegraph/loader/test/functional/FileLoadTest.java @@ -3065,7 +3065,8 @@ public void testReadReachedMaxLines() { "-h", SERVER, "--max-read-lines", "6", "--batch-insert-threads", "2", - "--test-mode", "true" + "--test-mode", "true", + "--parser-threads", "1" }; loadWithAuth(args); diff --git a/hugegraph-loader/src/test/java/org/apache/hugegraph/loader/test/unit/TaskManagerTest.java b/hugegraph-loader/src/test/java/org/apache/hugegraph/loader/test/unit/TaskManagerTest.java index db317d6ea..08b6b9043 100644 --- a/hugegraph-loader/src/test/java/org/apache/hugegraph/loader/test/unit/TaskManagerTest.java +++ b/hugegraph-loader/src/test/java/org/apache/hugegraph/loader/test/unit/TaskManagerTest.java @@ -504,6 +504,7 @@ private void await(CountDownLatch latch) { latch.await(5, TimeUnit.SECONDS); } catch (InterruptedException ignored) { // Let the task finish on interruption. + Thread.currentThread().interrupt(); } } } From ee496ddcaf9975ecf0f82ed3717f5f21d54c3fc3 Mon Sep 17 00:00:00 2001 From: ken <2979602290@qq.com> Date: Tue, 20 Jan 2026 13:55:33 +0800 Subject: [PATCH 18/31] [feat] Add unit tests --- .../org/apache/hugegraph/loader/test/unit/LoadOptionsTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/hugegraph-loader/src/test/java/org/apache/hugegraph/loader/test/unit/LoadOptionsTest.java b/hugegraph-loader/src/test/java/org/apache/hugegraph/loader/test/unit/LoadOptionsTest.java index b327f59ef..76168ddc3 100644 --- a/hugegraph-loader/src/test/java/org/apache/hugegraph/loader/test/unit/LoadOptionsTest.java +++ b/hugegraph-loader/src/test/java/org/apache/hugegraph/loader/test/unit/LoadOptionsTest.java @@ -82,6 +82,7 @@ public void testConnectionPoolNoAdjustWithCustomMaxConn() throws Exception { options.maxConnections = 100; options.maxConnectionsPerRoute = 50; + CapturingAppender appender = attachAppender(); try { invokeAdjustConnectionPool(options); From 037911897de8031d43861d2b7687ebc1f8cb6b27 Mon Sep 17 00:00:00 2001 From: ken <2979602290@qq.com> Date: Tue, 20 Jan 2026 14:30:21 +0800 Subject: [PATCH 19/31] [feat] Add unit tests --- .../hugegraph/loader/task/TaskManager.java | 26 +++++++++++-------- .../loader/test/unit/LoadOptionsTest.java | 1 - 2 files changed, 15 insertions(+), 12 deletions(-) diff --git a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/task/TaskManager.java b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/task/TaskManager.java index 63192b12d..27c943c7c 100644 --- a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/task/TaskManager.java +++ b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/task/TaskManager.java @@ -158,17 +158,21 @@ public void submitBatch(InputStruct struct, ElementMapping mapping, (r, e) -> { if (e != null) { LOG.error("Batch insert {} error, interrupting import", mapping.type(), e); - if (this.options.batchFailureFallback) { - LOG.warn("Batch insert {} error, try single insert", - mapping.type(), e); - this.submitInSingle(struct, mapping, batch); - } else { - summary.metrics(struct).minusFlighting(batch.size()); - this.context.occurredError(); - this.context.stopLoading(); - Printer.printError("Batch insert %s failed, stop loading. Please check the logs", - mapping.type().string()); - } + //if (this.options.batchFailureFallback) { + // LOG.warn("Batch insert {} error, try single insert", + // mapping.type(), e); + // this.submitInSingle(struct, mapping, batch); + //} else { + // summary.metrics(struct).minusFlighting(batch.size()); + // this.context.occurredError(); + // this.context.stopLoading(); + // Printer.printError("Batch insert %s failed, stop loading. Please check the logs", + // mapping.type().string()); + //} + LOG.warn("Batch insert {} error, try single insert", + mapping.type(), e); + // The time of single insert is counted separately + this.submitInSingle(struct, mapping, batch); } else { summary.metrics(struct).minusFlighting(batch.size()); } diff --git a/hugegraph-loader/src/test/java/org/apache/hugegraph/loader/test/unit/LoadOptionsTest.java b/hugegraph-loader/src/test/java/org/apache/hugegraph/loader/test/unit/LoadOptionsTest.java index 76168ddc3..b327f59ef 100644 --- a/hugegraph-loader/src/test/java/org/apache/hugegraph/loader/test/unit/LoadOptionsTest.java +++ b/hugegraph-loader/src/test/java/org/apache/hugegraph/loader/test/unit/LoadOptionsTest.java @@ -82,7 +82,6 @@ public void testConnectionPoolNoAdjustWithCustomMaxConn() throws Exception { options.maxConnections = 100; options.maxConnectionsPerRoute = 50; - CapturingAppender appender = attachAppender(); try { invokeAdjustConnectionPool(options); From 572cc94a6fb64f9608d520c37cbf3ca73f763d91 Mon Sep 17 00:00:00 2001 From: ken <2979602290@qq.com> Date: Tue, 20 Jan 2026 16:24:02 +0800 Subject: [PATCH 20/31] [feat] Add unit tests --- .../hugegraph/loader/task/TaskManager.java | 26 ++++++++----------- .../loader/test/functional/HDFSUtil.java | 10 ++++--- 2 files changed, 18 insertions(+), 18 deletions(-) diff --git a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/task/TaskManager.java b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/task/TaskManager.java index 27c943c7c..63192b12d 100644 --- a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/task/TaskManager.java +++ b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/task/TaskManager.java @@ -158,21 +158,17 @@ public void submitBatch(InputStruct struct, ElementMapping mapping, (r, e) -> { if (e != null) { LOG.error("Batch insert {} error, interrupting import", mapping.type(), e); - //if (this.options.batchFailureFallback) { - // LOG.warn("Batch insert {} error, try single insert", - // mapping.type(), e); - // this.submitInSingle(struct, mapping, batch); - //} else { - // summary.metrics(struct).minusFlighting(batch.size()); - // this.context.occurredError(); - // this.context.stopLoading(); - // Printer.printError("Batch insert %s failed, stop loading. Please check the logs", - // mapping.type().string()); - //} - LOG.warn("Batch insert {} error, try single insert", - mapping.type(), e); - // The time of single insert is counted separately - this.submitInSingle(struct, mapping, batch); + if (this.options.batchFailureFallback) { + LOG.warn("Batch insert {} error, try single insert", + mapping.type(), e); + this.submitInSingle(struct, mapping, batch); + } else { + summary.metrics(struct).minusFlighting(batch.size()); + this.context.occurredError(); + this.context.stopLoading(); + Printer.printError("Batch insert %s failed, stop loading. Please check the logs", + mapping.type().string()); + } } else { summary.metrics(struct).minusFlighting(batch.size()); } diff --git a/hugegraph-loader/src/test/java/org/apache/hugegraph/loader/test/functional/HDFSUtil.java b/hugegraph-loader/src/test/java/org/apache/hugegraph/loader/test/functional/HDFSUtil.java index 3faeef3f2..c9b22276c 100644 --- a/hugegraph-loader/src/test/java/org/apache/hugegraph/loader/test/functional/HDFSUtil.java +++ b/hugegraph-loader/src/test/java/org/apache/hugegraph/loader/test/functional/HDFSUtil.java @@ -20,6 +20,7 @@ import java.io.File; import java.io.IOException; import java.net.URI; +import java.net.URL; import java.nio.charset.Charset; import java.util.Arrays; import java.util.Objects; @@ -68,10 +69,13 @@ public Configuration config() { } private static Configuration loadConfiguration() { - // Just use local hadoop with default config in test String fileName = "hdfs_with_core_site_path/core-site.xml"; - String confPath = Objects.requireNonNull(HDFSUtil.class.getClassLoader() - .getResource(fileName)).getPath(); + URL resource = HDFSUtil.class.getClassLoader().getResource(fileName); + if (resource == null) { + throw new LoadException("core-site.xml not found in test classpath: " + + fileName); + } + String confPath = resource.getPath(); Configuration conf = new Configuration(); conf.addResource(new Path(confPath)); return conf; From a54eb645e6b80faf4a9bc12e60b440e9e34e8e51 Mon Sep 17 00:00:00 2001 From: Ken <157260097+kenssa4eedfd@users.noreply.github.com> Date: Tue, 20 Jan 2026 16:43:53 +0800 Subject: [PATCH 21/31] Update loader-ci.yml --- .github/workflows/loader-ci.yml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/workflows/loader-ci.yml b/.github/workflows/loader-ci.yml index b3e62b2f1..bf25a5a3e 100644 --- a/.github/workflows/loader-ci.yml +++ b/.github/workflows/loader-ci.yml @@ -75,10 +75,10 @@ jobs: run: | cd hugegraph-loader && ls mvn test -P unit -ntp - mvn test -P file - mvn test -P hdfs - mvn test -P jdbc - mvn test -P kafka + mvn test -P file -e + mvn test -P hdfs -e + mvn test -P jdbc -e + mvn test -P kafka -e - name: Upload coverage to Codecov uses: codecov/codecov-action@v4 From 537445cc7e9681936f50a99629cd56b6bd9a160c Mon Sep 17 00:00:00 2001 From: ken <2979602290@qq.com> Date: Tue, 20 Jan 2026 17:12:07 +0800 Subject: [PATCH 22/31] [feat] Add unit tests --- .../hugegraph/loader/test/functional/HDFSUtil.java | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/hugegraph-loader/src/test/java/org/apache/hugegraph/loader/test/functional/HDFSUtil.java b/hugegraph-loader/src/test/java/org/apache/hugegraph/loader/test/functional/HDFSUtil.java index c9b22276c..3faeef3f2 100644 --- a/hugegraph-loader/src/test/java/org/apache/hugegraph/loader/test/functional/HDFSUtil.java +++ b/hugegraph-loader/src/test/java/org/apache/hugegraph/loader/test/functional/HDFSUtil.java @@ -20,7 +20,6 @@ import java.io.File; import java.io.IOException; import java.net.URI; -import java.net.URL; import java.nio.charset.Charset; import java.util.Arrays; import java.util.Objects; @@ -69,13 +68,10 @@ public Configuration config() { } private static Configuration loadConfiguration() { + // Just use local hadoop with default config in test String fileName = "hdfs_with_core_site_path/core-site.xml"; - URL resource = HDFSUtil.class.getClassLoader().getResource(fileName); - if (resource == null) { - throw new LoadException("core-site.xml not found in test classpath: " - + fileName); - } - String confPath = resource.getPath(); + String confPath = Objects.requireNonNull(HDFSUtil.class.getClassLoader() + .getResource(fileName)).getPath(); Configuration conf = new Configuration(); conf.addResource(new Path(confPath)); return conf; From 18e83cfc7a0b57e8fcaf55613634eb77b7d84a88 Mon Sep 17 00:00:00 2001 From: ken <2979602290@qq.com> Date: Tue, 20 Jan 2026 17:16:04 +0800 Subject: [PATCH 23/31] [feat] Add unit tests --- .github/workflows/loader-ci.yml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/workflows/loader-ci.yml b/.github/workflows/loader-ci.yml index bf25a5a3e..b3e62b2f1 100644 --- a/.github/workflows/loader-ci.yml +++ b/.github/workflows/loader-ci.yml @@ -75,10 +75,10 @@ jobs: run: | cd hugegraph-loader && ls mvn test -P unit -ntp - mvn test -P file -e - mvn test -P hdfs -e - mvn test -P jdbc -e - mvn test -P kafka -e + mvn test -P file + mvn test -P hdfs + mvn test -P jdbc + mvn test -P kafka - name: Upload coverage to Codecov uses: codecov/codecov-action@v4 From dddfa40ba0c4ed8575f4f15506f193742717ee39 Mon Sep 17 00:00:00 2001 From: ken <2979602290@qq.com> Date: Tue, 20 Jan 2026 18:53:31 +0800 Subject: [PATCH 24/31] [feat] Add unit tests --- .../java/org/apache/hugegraph/loader/executor/LoadOptions.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/executor/LoadOptions.java b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/executor/LoadOptions.java index 4cf7b4c32..c4f9cc6d8 100644 --- a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/executor/LoadOptions.java +++ b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/executor/LoadOptions.java @@ -194,7 +194,7 @@ public final class LoadOptions implements Cloneable { "The number of parallel read pipelines. " + "Default: max(2, CPUS/2) where CPUS is the number " + "of available processors. Must be >= 1") - public int parseThreads = Math.max(2, CPUS / 2); + public int parseThreads = 1; @Parameter(names = {"--start-file"}, arity = 1, description = "start file index for partial loading") From ed816e3bf914d12edb70dd8221343a842e144158 Mon Sep 17 00:00:00 2001 From: ken <2979602290@qq.com> Date: Tue, 20 Jan 2026 19:35:31 +0800 Subject: [PATCH 25/31] [feat] Add unit tests --- .../org/apache/hugegraph/loader/executor/LoadOptions.java | 2 +- .../hugegraph/loader/test/functional/FileLoadTest.java | 6 ++++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/executor/LoadOptions.java b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/executor/LoadOptions.java index c4f9cc6d8..4cf7b4c32 100644 --- a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/executor/LoadOptions.java +++ b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/executor/LoadOptions.java @@ -194,7 +194,7 @@ public final class LoadOptions implements Cloneable { "The number of parallel read pipelines. " + "Default: max(2, CPUS/2) where CPUS is the number " + "of available processors. Must be >= 1") - public int parseThreads = 1; + public int parseThreads = Math.max(2, CPUS / 2); @Parameter(names = {"--start-file"}, arity = 1, description = "start file index for partial loading") diff --git a/hugegraph-loader/src/test/java/org/apache/hugegraph/loader/test/functional/FileLoadTest.java b/hugegraph-loader/src/test/java/org/apache/hugegraph/loader/test/functional/FileLoadTest.java index ea1460ef3..2e4803c68 100644 --- a/hugegraph-loader/src/test/java/org/apache/hugegraph/loader/test/functional/FileLoadTest.java +++ b/hugegraph-loader/src/test/java/org/apache/hugegraph/loader/test/functional/FileLoadTest.java @@ -1197,7 +1197,8 @@ public void testMultiFilesHaveHeader() { "-s", configPath("multi_files_have_header/schema.groovy"), "-g", GRAPH, "-h", SERVER, - "--test-mode", "true" + "--test-mode", "true", + "--parser-threads", "1" }; loadWithAuth(args); @@ -1628,7 +1629,8 @@ public void testFilterPathBySuffix() { "-s", configPath("filter_path_by_suffix/schema.groovy"), "-g", GRAPH, "-h", SERVER, - "--test-mode", "true" + "--test-mode", "true", + "--parser-threads", "1" }; loadWithAuth(args); From 18318932ea0527ea82732e4fda7f0010fc07c085 Mon Sep 17 00:00:00 2001 From: ken <2979602290@qq.com> Date: Tue, 20 Jan 2026 19:41:07 +0800 Subject: [PATCH 26/31] [feat] Add unit tests --- .../apache/hugegraph/loader/test/functional/FileLoadTest.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/hugegraph-loader/src/test/java/org/apache/hugegraph/loader/test/functional/FileLoadTest.java b/hugegraph-loader/src/test/java/org/apache/hugegraph/loader/test/functional/FileLoadTest.java index 2e4803c68..4a32dc583 100644 --- a/hugegraph-loader/src/test/java/org/apache/hugegraph/loader/test/functional/FileLoadTest.java +++ b/hugegraph-loader/src/test/java/org/apache/hugegraph/loader/test/functional/FileLoadTest.java @@ -1198,6 +1198,7 @@ public void testMultiFilesHaveHeader() { "-g", GRAPH, "-h", SERVER, "--test-mode", "true", + // FIXME: Set parser-threads to 1 because values > 1 currently trigger a NullPointerException (NPE). "--parser-threads", "1" }; loadWithAuth(args); @@ -1630,6 +1631,7 @@ public void testFilterPathBySuffix() { "-g", GRAPH, "-h", SERVER, "--test-mode", "true", + // FIXME: Set parser-threads to 1 because values > 1 currently trigger a NullPointerException (NPE). "--parser-threads", "1" }; loadWithAuth(args); From 588adadcecda0357fb156107e16a5f32fb353a1e Mon Sep 17 00:00:00 2001 From: ken <2979602290@qq.com> Date: Tue, 20 Jan 2026 22:12:36 +0800 Subject: [PATCH 27/31] [feat] Add unit tests --- .../apache/hugegraph/loader/test/functional/KafkaLoadTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/hugegraph-loader/src/test/java/org/apache/hugegraph/loader/test/functional/KafkaLoadTest.java b/hugegraph-loader/src/test/java/org/apache/hugegraph/loader/test/functional/KafkaLoadTest.java index c6c31520a..b44ffbd8f 100644 --- a/hugegraph-loader/src/test/java/org/apache/hugegraph/loader/test/functional/KafkaLoadTest.java +++ b/hugegraph-loader/src/test/java/org/apache/hugegraph/loader/test/functional/KafkaLoadTest.java @@ -85,7 +85,8 @@ public void testCustomizedSchema() { "-h", SERVER, "-p", String.valueOf(PORT), "--batch-insert-threads", "2", - "--test-mode", "true" + "--test-mode", "true", + "--parser-threads", "1" }; loadWithAuth(args); From f60d5a1ed9ef40b99a671fb4523b9370830ada9b Mon Sep 17 00:00:00 2001 From: ken <2979602290@qq.com> Date: Tue, 20 Jan 2026 22:52:40 +0800 Subject: [PATCH 28/31] [feat] Add unit tests --- .github/workflows/auto-pr-review.yml | 35 ---------------------------- 1 file changed, 35 deletions(-) delete mode 100644 .github/workflows/auto-pr-review.yml diff --git a/.github/workflows/auto-pr-review.yml b/.github/workflows/auto-pr-review.yml deleted file mode 100644 index 6a585355f..000000000 --- a/.github/workflows/auto-pr-review.yml +++ /dev/null @@ -1,35 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -name: "Auto PR Commenter" - -on: - pull_request_target: - types: [opened] - -jobs: - add-review-comment: - runs-on: ubuntu-latest - permissions: - pull-requests: write - steps: - - name: Add review comment - uses: peter-evans/create-or-update-comment@v4 - with: - issue-number: ${{ github.event.pull_request.number }} - body: | - @codecov-ai-reviewer review From 3706a3b492f24a4592d7ab64a4226cc8330fa90b Mon Sep 17 00:00:00 2001 From: ken <2979602290@qq.com> Date: Tue, 20 Jan 2026 23:43:17 +0800 Subject: [PATCH 29/31] [feat] Add unit tests --- .../apache/hugegraph/loader/test/functional/FileLoadTest.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/hugegraph-loader/src/test/java/org/apache/hugegraph/loader/test/functional/FileLoadTest.java b/hugegraph-loader/src/test/java/org/apache/hugegraph/loader/test/functional/FileLoadTest.java index 4a32dc583..5be6a61ea 100644 --- a/hugegraph-loader/src/test/java/org/apache/hugegraph/loader/test/functional/FileLoadTest.java +++ b/hugegraph-loader/src/test/java/org/apache/hugegraph/loader/test/functional/FileLoadTest.java @@ -1334,7 +1334,9 @@ public void testDirHasMultiFiles() { "-s", configPath("dir_has_multi_files/schema.groovy"), "-g", GRAPH, "-h", SERVER, - "--test-mode", "true" + "--test-mode", "true", + // FIXME: Set parser-threads to 1 because values > 1 currently trigger a NullPointerException (NPE). + "--parser-threads", "1" }; loadWithAuth(args); From c328a01ee78aa27462fea0cafb974522b38e89bc Mon Sep 17 00:00:00 2001 From: ken <2979602290@qq.com> Date: Wed, 21 Jan 2026 15:50:31 +0800 Subject: [PATCH 30/31] [feat] Standardize log levels and streamline parameter descriptions. --- .../hugegraph/loader/HugeGraphLoader.java | 2 +- .../loader/executor/LoadOptions.java | 20 +++++++------------ .../hugegraph/loader/task/TaskManager.java | 2 +- 3 files changed, 9 insertions(+), 15 deletions(-) diff --git a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/HugeGraphLoader.java b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/HugeGraphLoader.java index 888b33c29..dab3fb579 100644 --- a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/HugeGraphLoader.java +++ b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/HugeGraphLoader.java @@ -669,7 +669,7 @@ private void loadStructs(List structs) { boolean scatter = this.context.options().scatterSources; - LOG.info("{} threads for loading {} structs, from {} to {} in {} mode", + LOG.info("{} parser threads for loading {} structs, from {} to {} in {} mode", parseThreads, structs.size(), this.context.options().startFile, this.context.options().endFile, scatter ? "scatter" : "sequential"); diff --git a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/executor/LoadOptions.java b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/executor/LoadOptions.java index 4cf7b4c32..f0ea30b7b 100644 --- a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/executor/LoadOptions.java +++ b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/executor/LoadOptions.java @@ -159,9 +159,7 @@ public final class LoadOptions implements Cloneable { @Parameter(names = {"--batch-insert-threads"}, arity = 1, validateWith = {PositiveValidator.class}, - description = "The number of threads to execute batch insert. " + - "If max-conn/max-conn-per-route keep defaults, " + - "they may be auto-adjusted based on this value") + description = "The number of threads to execute batch insert (default: CPUS)") public int batchInsertThreads = CPUS; @Parameter(names = {"--single-insert-threads"}, arity = 1, @@ -171,16 +169,14 @@ public final class LoadOptions implements Cloneable { @Parameter(names = {"--max-conn"}, arity = 1, validateWith = {PositiveValidator.class}, - description = "Max number of HTTP connections to server. " + - "If left as default and batch-insert-threads is " + - "set, this may be auto-adjusted") + description = "Max HTTP connections (default: CPUS*4; auto-adjusted by " + + "--batch-insert-threads)") public int maxConnections = DEFAULT_MAX_CONNECTIONS; @Parameter(names = {"--max-conn-per-route"}, arity = 1, validateWith = {PositiveValidator.class}, - description = "Max number of HTTP connections to each route. " + - "If left as default and batch-insert-threads is " + - "set, this may be auto-adjusted") + description = "Max HTTP connections per route (default: CPUS*2; " + + "auto-adjusted by --batch-insert-threads)") public int maxConnectionsPerRoute = DEFAULT_MAX_CONNECTIONS_PER_ROUTE; @Parameter(names = {"--batch-size"}, arity = 1, @@ -190,10 +186,8 @@ public final class LoadOptions implements Cloneable { @Parameter(names = {"--parallel-count", "--parser-threads"}, arity = 1, validateWith = {PositiveValidator.class}, - description = "(--parallel-count is deprecated, use --parser-threads instead) " + - "The number of parallel read pipelines. " + - "Default: max(2, CPUS/2) where CPUS is the number " + - "of available processors. Must be >= 1") + description = "Parallel read pipelines (default: max(2, CPUS/2); " + + "--parallel-count is deprecated)") public int parseThreads = Math.max(2, CPUS / 2); @Parameter(names = {"--start-file"}, arity = 1, diff --git a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/task/TaskManager.java b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/task/TaskManager.java index 63192b12d..7d0793955 100644 --- a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/task/TaskManager.java +++ b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/task/TaskManager.java @@ -157,7 +157,6 @@ public void submitBatch(InputStruct struct, ElementMapping mapping, CompletableFuture.runAsync(task, this.batchService).whenComplete( (r, e) -> { if (e != null) { - LOG.error("Batch insert {} error, interrupting import", mapping.type(), e); if (this.options.batchFailureFallback) { LOG.warn("Batch insert {} error, try single insert", mapping.type(), e); @@ -166,6 +165,7 @@ public void submitBatch(InputStruct struct, ElementMapping mapping, summary.metrics(struct).minusFlighting(batch.size()); this.context.occurredError(); this.context.stopLoading(); + LOG.error("Batch insert {} error, interrupting import", mapping.type(), e); Printer.printError("Batch insert %s failed, stop loading. Please check the logs", mapping.type().string()); } From 146bdf4d84426b369f041cd8cfceec85b88c7e67 Mon Sep 17 00:00:00 2001 From: imbajin Date: Wed, 21 Jan 2026 16:33:36 +0800 Subject: [PATCH 31/31] Update required review count and collaborators --- .asf.yaml | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/.asf.yaml b/.asf.yaml index 83de7215a..1d9d56e20 100644 --- a/.asf.yaml +++ b/.asf.yaml @@ -49,14 +49,12 @@ github: required_pull_request_reviews: dismiss_stale_reviews: true require_code_owner_reviews: false - required_approving_review_count: 2 + required_approving_review_count: 1 # (for non-committer): assign/edit/close issues & PR, without write access to the code collaborators: - - Pengzna + - kenssa4eedfd - haohao0103 - - Thespica - FrostyHec - - MuLeiSY2021 notifications: # use https://selfserve.apache.org to manage it