From 255e7b78fe615b88494862c128aa121d2773ae0f Mon Sep 17 00:00:00 2001 From: Mrinmoy Das Date: Mon, 22 Apr 2024 17:32:36 +0400 Subject: [PATCH 1/2] fix kafka connector value --- .../airbyte/integrations/source/kafka/format/JsonFormat.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/format/JsonFormat.java b/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/format/JsonFormat.java index 6e1707bd2104..168d36933963 100644 --- a/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/format/JsonFormat.java +++ b/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/format/JsonFormat.java @@ -6,9 +6,11 @@ import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.AbstractIterator; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import io.airbyte.commons.util.AutoCloseableIterator; import io.airbyte.commons.util.AutoCloseableIterators; +import io.airbyte.commons.json.Jsons; import io.airbyte.protocol.models.Field; import io.airbyte.protocol.models.JsonSchemaType; import io.airbyte.protocol.models.v0.AirbyteMessage; @@ -151,7 +153,7 @@ protected AirbyteMessage computeNext() { .withRecord(new AirbyteRecordMessage() .withStream(record.topic()) .withEmittedAt(Instant.now().toEpochMilli()) - .withData(record.value())); + .withData(Jsons.jsonNode(ImmutableMap.builder().put("value", output).build()))); } return endOfData(); From f436b257fac906f8ff8f5deca2e21c6f33c1833e Mon Sep 17 00:00:00 2001 From: Mrinmoy Das Date: Mon, 22 Apr 2024 17:37:34 +0400 Subject: [PATCH 2/2] custom docker tag --- airbyte-integrations/connectors/source-kafka/metadata.yaml | 2 +- .../io/airbyte/integrations/source/kafka/format/JsonFormat.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte-integrations/connectors/source-kafka/metadata.yaml b/airbyte-integrations/connectors/source-kafka/metadata.yaml index b6a1d0494c7b..be0227961994 100644 --- a/airbyte-integrations/connectors/source-kafka/metadata.yaml +++ b/airbyte-integrations/connectors/source-kafka/metadata.yaml @@ -2,7 +2,7 @@ data: connectorSubtype: database connectorType: source definitionId: d917a47b-8537-4d0d-8c10-36a9928d4265 - dockerImageTag: 0.2.4 + dockerImageTag: circle-0.2.4 dockerRepository: airbyte/source-kafka githubIssueLabel: source-kafka icon: kafka.svg diff --git a/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/format/JsonFormat.java b/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/format/JsonFormat.java index 168d36933963..8216b9775cdb 100644 --- a/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/format/JsonFormat.java +++ b/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/format/JsonFormat.java @@ -153,7 +153,7 @@ protected AirbyteMessage computeNext() { .withRecord(new AirbyteRecordMessage() .withStream(record.topic()) .withEmittedAt(Instant.now().toEpochMilli()) - .withData(Jsons.jsonNode(ImmutableMap.builder().put("value", output).build()))); + .withData(Jsons.jsonNode(ImmutableMap.builder().put("value", record.value()).build()))); } return endOfData();