From 75208417d3ef26fdeb59eab941071c5736340b10 Mon Sep 17 00:00:00 2001 From: d-hrs Date: Tue, 17 Sep 2024 19:57:55 +0900 Subject: [PATCH] impl --- .../parser/jsonpath/JsonpathParserPlugin.java | 37 ++++++++++++++++--- 1 file changed, 31 insertions(+), 6 deletions(-) diff --git a/src/main/java/org/embulk/parser/jsonpath/JsonpathParserPlugin.java b/src/main/java/org/embulk/parser/jsonpath/JsonpathParserPlugin.java index cd24716..80abc03 100644 --- a/src/main/java/org/embulk/parser/jsonpath/JsonpathParserPlugin.java +++ b/src/main/java/org/embulk/parser/jsonpath/JsonpathParserPlugin.java @@ -167,8 +167,10 @@ public int read() } }; final JsonNode json; + final JsonNode rootNode; try { - json = JsonPath.using(JSON_PATH_CONFIG).parse(toParse).read(jsonRoot, JsonNode.class); + rootNode = JsonPath.using(JSON_PATH_CONFIG).parse(toParse).read("$", JsonNode.class); + json = JsonPath.using(JSON_PATH_CONFIG).parse(rootNode).read(jsonRoot, JsonNode.class); } catch (PathNotFoundException e) { skipOrThrow(new DataException(format(Locale.ENGLISH, @@ -179,11 +181,11 @@ public int read() skipOrThrow(new DataException(e), stopOnInvalidRecord); continue; } - + Map additionalValues = createAdditionalColumns(jsonPathMap, rootNode); if (json.isArray()) { for (JsonNode recordValue : json) { try { - createRecordFromJson(recordValue, schema, jsonPathMap, visitor, pageBuilder); + createRecordFromJson(recordValue, schema, jsonPathMap, visitor, pageBuilder, additionalValues); } catch (DataException e) { skipOrThrow(e, stopOnInvalidRecord); @@ -193,7 +195,7 @@ public int read() } else { try { - createRecordFromJson(json, schema, jsonPathMap, visitor, pageBuilder); + createRecordFromJson(json, schema, jsonPathMap, visitor, pageBuilder, additionalValues); } catch (DataException e) { skipOrThrow(e, stopOnInvalidRecord); @@ -207,6 +209,23 @@ public int read() } } + private Map createAdditionalColumns(Map jsonPathMap, JsonNode rootNode) + { + Map additionalColumns = new HashMap<>(); + jsonPathMap.forEach((column, path) -> { + if (path.startsWith("$")) { + try { + additionalColumns.put( + column, + JsonPath.using(JSON_PATH_CONFIG).parse(rootNode).read(path, JsonNode.class) + ); + } catch (PathNotFoundException e) { + logger.warn("Failed to get %s", path); + } + } + }); + return Collections.unmodifiableMap(additionalColumns); + } private Map createJsonPathMap(PluginTask task, Schema schema) { Map columnMap = new HashMap<>(); @@ -220,7 +239,7 @@ private Map createJsonPathMap(PluginTask task, Schema schema) return Collections.unmodifiableMap(columnMap); } - private void createRecordFromJson(JsonNode json, Schema schema, Map jsonPathMap, ColumnVisitorImpl visitor, PageBuilder pageBuilder) + private void createRecordFromJson(JsonNode json, Schema schema, Map jsonPathMap, ColumnVisitorImpl visitor, PageBuilder pageBuilder, Map additionalValues) { if (json.getNodeType() != JsonNodeType.OBJECT) { throw new JsonRecordValidateException(format(Locale.ENGLISH, @@ -229,7 +248,7 @@ private void createRecordFromJson(JsonNode json, Schema schema, Map { + visitor.setValue(v); + k.visit(visitor); + } + ); pageBuilder.addRecord(); }