Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 31 additions & 6 deletions src/main/java/org/embulk/parser/jsonpath/JsonpathParserPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,10 @@ public int read()
}
};
final JsonNode json;
final JsonNode rootNode;
try {
json = JsonPath.using(JSON_PATH_CONFIG).parse(toParse).read(jsonRoot, JsonNode.class);
rootNode = JsonPath.using(JSON_PATH_CONFIG).parse(toParse).read("$", JsonNode.class);
json = JsonPath.using(JSON_PATH_CONFIG).parse(rootNode).read(jsonRoot, JsonNode.class);
}
catch (PathNotFoundException e) {
skipOrThrow(new DataException(format(Locale.ENGLISH,
Expand All @@ -179,11 +181,11 @@ public int read()
skipOrThrow(new DataException(e), stopOnInvalidRecord);
continue;
}

Map<Column, JsonNode> additionalValues = createAdditionalColumns(jsonPathMap, rootNode);
if (json.isArray()) {
for (JsonNode recordValue : json) {
try {
createRecordFromJson(recordValue, schema, jsonPathMap, visitor, pageBuilder);
createRecordFromJson(recordValue, schema, jsonPathMap, visitor, pageBuilder, additionalValues);
}
catch (DataException e) {
skipOrThrow(e, stopOnInvalidRecord);
Expand All @@ -193,7 +195,7 @@ public int read()
}
else {
try {
createRecordFromJson(json, schema, jsonPathMap, visitor, pageBuilder);
createRecordFromJson(json, schema, jsonPathMap, visitor, pageBuilder, additionalValues);
}
catch (DataException e) {
skipOrThrow(e, stopOnInvalidRecord);
Expand All @@ -207,6 +209,23 @@ public int read()
}
}

private Map<Column, JsonNode> createAdditionalColumns(Map<Column, String> jsonPathMap, JsonNode rootNode)
{
Map<Column, JsonNode> additionalColumns = new HashMap<>();
jsonPathMap.forEach((column, path) -> {
if (path.startsWith("$")) {
try {
additionalColumns.put(
column,
JsonPath.using(JSON_PATH_CONFIG).parse(rootNode).read(path, JsonNode.class)
);
} catch (PathNotFoundException e) {
logger.warn("Failed to get %s", path);
}
}
});
return Collections.unmodifiableMap(additionalColumns);
}
private Map<Column, String> createJsonPathMap(PluginTask task, Schema schema)
{
Map<Column, String> columnMap = new HashMap<>();
Expand All @@ -220,7 +239,7 @@ private Map<Column, String> createJsonPathMap(PluginTask task, Schema schema)
return Collections.unmodifiableMap(columnMap);
}

private void createRecordFromJson(JsonNode json, Schema schema, Map<Column, String> jsonPathMap, ColumnVisitorImpl visitor, PageBuilder pageBuilder)
private void createRecordFromJson(JsonNode json, Schema schema, Map<Column, String> jsonPathMap, ColumnVisitorImpl visitor, PageBuilder pageBuilder, Map<Column, JsonNode> additionalValues)
{
if (json.getNodeType() != JsonNodeType.OBJECT) {
throw new JsonRecordValidateException(format(Locale.ENGLISH,
Expand All @@ -229,7 +248,7 @@ private void createRecordFromJson(JsonNode json, Schema schema, Map<Column, Stri

for (Column column : schema.getColumns()) {
JsonNode value = null;
if (jsonPathMap.containsKey(column)) {
if (jsonPathMap.containsKey(column) && !jsonPathMap.get(column).startsWith("$")) {
try {
value = JsonPath.using(JSON_PATH_CONFIG).parse(json).read(jsonPathMap.get(column));
}
Expand All @@ -239,10 +258,16 @@ private void createRecordFromJson(JsonNode json, Schema schema, Map<Column, Stri
}
else {
value = json.get(column.getName());

}
visitor.setValue(value);
column.visit(visitor);
}
additionalValues.forEach( (k, v) -> {
visitor.setValue(v);
k.visit(visitor);
}
);

pageBuilder.addRecord();
}
Expand Down