From 346a5d247e6cd13d68c745febc10680b0f0382e9 Mon Sep 17 00:00:00 2001 From: Zihan Li Date: Fri, 25 Mar 2022 10:22:56 -0700 Subject: [PATCH 1/9] address comments --- .../source/extractor/extract/restapi/RestApiConnector.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/restapi/RestApiConnector.java b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/restapi/RestApiConnector.java index 6cdc8b7741a..73fcd05a119 100644 --- a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/restapi/RestApiConnector.java +++ b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/restapi/RestApiConnector.java @@ -149,6 +149,8 @@ protected HttpClient getHttpClient() { .createClient(); if (httpClient instanceof Closeable) { this.closer.register((Closeable)httpClient); + } else { + log.warn("httpClient is not closable, we will not be able to handle the resources close, please make sure the implementation handle it correctly"); } } return this.httpClient; From 65558f818f3304f44ce7b5f7916a39e2c122f35a Mon Sep 17 00:00:00 2001 From: Zihan Li Date: Fri, 25 Mar 2022 10:53:41 -0700 Subject: [PATCH 2/9] use connectionmanager when httpclient is not cloesable --- .../source/extractor/extract/restapi/RestApiConnector.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/restapi/RestApiConnector.java b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/restapi/RestApiConnector.java index 73fcd05a119..6cdc8b7741a 100644 --- a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/restapi/RestApiConnector.java +++ b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/restapi/RestApiConnector.java @@ -149,8 +149,6 @@ protected HttpClient getHttpClient() { .createClient(); if (httpClient instanceof Closeable) { this.closer.register((Closeable)httpClient); - } else { - log.warn("httpClient is not closable, we will not be able to handle the resources close, please make sure the implementation handle it correctly"); } } return this.httpClient; From 254f79dbbc79366c91421992c58aca048eefe3ae Mon Sep 17 00:00:00 2001 From: Zihan Li Date: Tue, 24 Oct 2023 10:58:11 -0700 Subject: [PATCH 3/9] add uite test --- .../kafka/workunit/packer/KafkaTopicGroupingWorkUnitPacker.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPacker.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPacker.java index 764eeae0988..319cccc8c42 100644 --- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPacker.java +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPacker.java @@ -76,7 +76,7 @@ public class KafkaTopicGroupingWorkUnitPacker extends KafkaWorkUnitPacker { public static final String DEFAULT_NUM_TOPIC_PARTITIONS_PER_CONTAINER_KEY = GOBBLIN_KAFKA_PREFIX + "default.num.topic.partitions.per.container"; private static final int DEFAULT_DEFAULT_NUM_TOPIC_PARTITIONS_PER_CONTAINER = 10; - //A global configuration for container capacity. The container capacity refers to the peak rate (in MB/s) that a + //A global configuration for container capacity. The container capacity refers to the peak rate (in MB/s) that agit //single JVM can consume from Kafka for a single topic and controls the number of partitions of a topic that will be // packed into a single workunit. For example, if the container capacity is set to 10, and each topic partition has a // weight of 1, then 10 partitions of the topic will be packed into a single workunit. This configuration is topic-independent From 27ada6c22fd6a16c18dcb48befa4a6c31997207b Mon Sep 17 00:00:00 2001 From: Zihan Li Date: Fri, 27 Oct 2023 17:17:08 -0700 Subject: [PATCH 4/9] fix typo --- .../kafka/workunit/packer/KafkaTopicGroupingWorkUnitPacker.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPacker.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPacker.java index 319cccc8c42..764eeae0988 100644 --- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPacker.java +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPacker.java @@ -76,7 +76,7 @@ public class KafkaTopicGroupingWorkUnitPacker extends KafkaWorkUnitPacker { public static final String DEFAULT_NUM_TOPIC_PARTITIONS_PER_CONTAINER_KEY = GOBBLIN_KAFKA_PREFIX + "default.num.topic.partitions.per.container"; private static final int DEFAULT_DEFAULT_NUM_TOPIC_PARTITIONS_PER_CONTAINER = 10; - //A global configuration for container capacity. The container capacity refers to the peak rate (in MB/s) that agit + //A global configuration for container capacity. The container capacity refers to the peak rate (in MB/s) that a //single JVM can consume from Kafka for a single topic and controls the number of partitions of a topic that will be // packed into a single workunit. For example, if the container capacity is set to 10, and each topic partition has a // weight of 1, then 10 partitions of the topic will be packed into a single workunit. This configuration is topic-independent From 67365e2a919d597ff0ca55cda4664f5e8c0c3faf Mon Sep 17 00:00:00 2001 From: Zihan Li Date: Tue, 14 Nov 2023 16:38:45 -0800 Subject: [PATCH 5/9] [GOBBLIN-1956] Make Kafka streaming pipeline be able to config the max poll records during runtime --- .../extractor/extract/kafka/KafkaStreamingExtractor.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractor.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractor.java index 8320a82ec06..aa70b0be4ab 100644 --- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractor.java +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractor.java @@ -231,7 +231,11 @@ public KafkaStreamingExtractor(WorkUnitState state) { } else { // As there is no avg record size available, using lower number to make sure we don't hit OOM issue state.setProp(KAFKA_MAX_POLL_RECORDS_KEY, DEFAULT_MAX_POLL_RECORDS); +<<<<<<< HEAD log.info("set max.poll.records to be {}", DEFAULT_MAX_POLL_RECORDS); +======= + log.info("set max.poll.records to be 100"); +>>>>>>> 76696ca95 ([GOBBLIN-1956] Make Kafka streaming pipeline be able to config the max poll records during runtime) } this.kafkaConsumerClientResolver = new ClassAliasResolver<>(GobblinKafkaConsumerClient.GobblinKafkaConsumerClientFactory.class); From 870bef565d195c5d12598903344b5d50d90e9279 Mon Sep 17 00:00:00 2001 From: Zihan Li Date: Tue, 14 Nov 2023 16:51:54 -0800 Subject: [PATCH 6/9] small refractor --- .../extractor/extract/kafka/KafkaStreamingExtractor.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractor.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractor.java index aa70b0be4ab..8320a82ec06 100644 --- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractor.java +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractor.java @@ -231,11 +231,7 @@ public KafkaStreamingExtractor(WorkUnitState state) { } else { // As there is no avg record size available, using lower number to make sure we don't hit OOM issue state.setProp(KAFKA_MAX_POLL_RECORDS_KEY, DEFAULT_MAX_POLL_RECORDS); -<<<<<<< HEAD log.info("set max.poll.records to be {}", DEFAULT_MAX_POLL_RECORDS); -======= - log.info("set max.poll.records to be 100"); ->>>>>>> 76696ca95 ([GOBBLIN-1956] Make Kafka streaming pipeline be able to config the max poll records during runtime) } this.kafkaConsumerClientResolver = new ClassAliasResolver<>(GobblinKafkaConsumerClient.GobblinKafkaConsumerClientFactory.class); From af84d10cf6f3920067382f8ee31b37ec24e1d135 Mon Sep 17 00:00:00 2001 From: Zihan Li Date: Thu, 18 Jan 2024 16:00:04 -0800 Subject: [PATCH 7/9] add unit test --- .../test/java/org/apache/gobblin/runtime/TestRecordStream.java | 1 + 1 file changed, 1 insertion(+) diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TestRecordStream.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TestRecordStream.java index 4f3fd6ce41d..1433e50040e 100644 --- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TestRecordStream.java +++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TestRecordStream.java @@ -30,6 +30,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; +import org.mockito.Mockito; import org.testng.Assert; import org.testng.annotations.Test; From 208175dc85a00f7fbb66d5972e83c1e9d94d157c Mon Sep 17 00:00:00 2001 From: Zihan Li Date: Thu, 18 Jan 2024 17:03:42 -0800 Subject: [PATCH 8/9] fix code style --- .../test/java/org/apache/gobblin/runtime/TestRecordStream.java | 1 - 1 file changed, 1 deletion(-) diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TestRecordStream.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TestRecordStream.java index 1433e50040e..4f3fd6ce41d 100644 --- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TestRecordStream.java +++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TestRecordStream.java @@ -30,7 +30,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; -import org.mockito.Mockito; import org.testng.Assert; import org.testng.annotations.Test; From 95cd59d67f05a20509944467accf692685b2abec Mon Sep 17 00:00:00 2001 From: Zihan Li Date: Mon, 29 Jan 2024 16:25:31 -0800 Subject: [PATCH 9/9] [GOBBLIN-1995]Kill the creating writer thread when timeout happens to unblock other trhead talking with HDFS --- .../org/apache/gobblin/writer/PartitionedDataWriter.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java b/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java index cc0e1d4e7d5..386efb708c6 100644 --- a/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java +++ b/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java @@ -172,10 +172,11 @@ public DataWriter load(final GenericRecord key) new CloseOnFlushWriterWrapper(new Supplier>() { @Override public DataWriter get() { + Future> future = null; try { log.info(String.format("Adding one more writer to loading cache of existing writer " + "with size = %d", partitionWriters.size())); - Future> future = createWriterPool.submit(() -> createPartitionWriter(key)); + future = createWriterPool.submit(() -> createPartitionWriter(key)); state.setProp(CURRENT_PARTITIONED_WRITERS_COUNTER, partitionWriters.size() + 1); return future.get(writeTimeoutInterval, TimeUnit.SECONDS); } catch (ExecutionException | InterruptedException e) { @@ -183,6 +184,11 @@ public DataWriter get() { } catch (TimeoutException e) { throw new RuntimeException(String.format("Failed to create writer due to timeout. The operation timed out after %s seconds.", writeTimeoutInterval), e); } + finally { + if (future != null && !future.isDone()) { + future.cancel(true); + } + } } }, state), state, key); }