From 77c6fb0c2b5c6173af1a7ec2dde54667b683521d Mon Sep 17 00:00:00 2001 From: Preston Carman Date: Wed, 16 Aug 2017 20:34:07 -0700 Subject: [PATCH 1/6] Snapshot of progress. New parameter to JSONParser. --- .../PushKeysOrMembersIntoDatascanRule.java | 7 +- .../rules/PushValueIntoDatascanRule.java | 9 +- .../rules/util/ExpressionToolbox.java | 26 +- .../apache/vxquery/jsonparser/JSONParser.java | 302 ++++++++++-------- .../metadata/AbstractVXQueryDataSource.java | 41 ++- .../metadata/VXQueryCollectionDataSource.java | 4 +- .../VXQueryCollectionOperatorDescriptor.java | 274 ++++++++-------- .../metadata/VXQueryIndexingDataSource.java | 2 - .../org/apache/vxquery/xtest/TestRunner.java | 2 +- 9 files changed, 366 insertions(+), 301 deletions(-) diff --git a/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/PushKeysOrMembersIntoDatascanRule.java b/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/PushKeysOrMembersIntoDatascanRule.java index 41b64013b..a05699270 100644 --- a/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/PushKeysOrMembersIntoDatascanRule.java +++ b/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/PushKeysOrMembersIntoDatascanRule.java @@ -19,7 +19,6 @@ import java.util.ArrayList; import java.util.List; -import org.apache.commons.lang3.ArrayUtils; import org.apache.commons.lang3.mutable.Mutable; import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression; import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag; @@ -47,7 +46,7 @@ * After * * plan__parent - * ASSIGN( $v2 : $v1 ) + * ASSIGN( $v2 : $v1 ) * DATASCAN( $source : $v1 ) * plan__child * @@ -61,12 +60,12 @@ boolean updateDataSource(IVXQueryDataSource datasource, Mutable> findkeys = new ArrayList>(); + List> findkeys = new ArrayList<>(); ExpressionToolbox.findAllFunctionExpressions(expression, BuiltinOperators.KEYS_OR_MEMBERS.getFunctionIdentifier(), findkeys); for (int i = findkeys.size(); i > 0; --i) { XDMConstants.setTrue(bp); - ds.addValueSeq(ArrayUtils.toObject(bp.getByteArray())); + ds.appendValueSequence(bp); added = true; } return added; diff --git a/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/PushValueIntoDatascanRule.java b/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/PushValueIntoDatascanRule.java index b9014690d..a43a37f3b 100644 --- a/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/PushValueIntoDatascanRule.java +++ b/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/PushValueIntoDatascanRule.java @@ -25,6 +25,7 @@ import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag; import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression; import org.apache.vxquery.compiler.rewriter.rules.util.ExpressionToolbox; +import org.apache.vxquery.datamodel.accessors.TaggedValuePointable; import org.apache.vxquery.functions.BuiltinFunctions; import org.apache.vxquery.functions.BuiltinOperators; import org.apache.vxquery.metadata.IVXQueryDataSource; @@ -56,6 +57,7 @@ */ public class PushValueIntoDatascanRule extends AbstractPushExpressionIntoDatascanRule { + TaggedValuePointable tvp = (TaggedValuePointable) TaggedValuePointable.FACTORY.createPointable(); @Override boolean updateDataSource(IVXQueryDataSource datasource, Mutable expression) { @@ -64,7 +66,7 @@ boolean updateDataSource(IVXQueryDataSource datasource, Mutable> finds = new ArrayList>(); + List> finds = new ArrayList<>(); ILogicalExpression le = expression.getValue(); if (le.getExpressionTag() == LogicalExpressionTag.FUNCTION_CALL) { AbstractFunctionCallExpression afce = (AbstractFunctionCallExpression) le; @@ -75,11 +77,10 @@ boolean updateDataSource(IVXQueryDataSource datasource, Mutable 0; --i) { - Byte[] value = null; List values = ExpressionToolbox.getFullArguments(finds.get(i - 1)); if (values.size() > 1) { - value = ExpressionToolbox.getConstantArgument(finds.get(i - 1), 1); - ds.addValueSeq(value); + ExpressionToolbox.getConstantArgument(finds.get(i - 1), 1, tvp); + ds.appendValueSequence(tvp); added = true; } } diff --git a/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/util/ExpressionToolbox.java b/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/util/ExpressionToolbox.java index 50bc07eec..1150179de 100644 --- a/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/util/ExpressionToolbox.java +++ b/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/util/ExpressionToolbox.java @@ -19,18 +19,7 @@ import java.util.ArrayList; import java.util.List; -import org.apache.commons.lang3.ArrayUtils; import org.apache.commons.lang3.mutable.Mutable; -import org.apache.vxquery.compiler.algebricks.VXQueryConstantValue; -import org.apache.vxquery.context.StaticContext; -import org.apache.vxquery.datamodel.accessors.TaggedValuePointable; -import org.apache.vxquery.functions.BuiltinFunctions; -import org.apache.vxquery.functions.BuiltinOperators; -import org.apache.vxquery.functions.Function; -import org.apache.vxquery.types.AnyNodeType; -import org.apache.vxquery.types.Quantifier; -import org.apache.vxquery.types.SequenceType; - import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression; import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator; import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag; @@ -43,6 +32,15 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator; import org.apache.hyracks.data.std.primitive.IntegerPointable; +import org.apache.vxquery.compiler.algebricks.VXQueryConstantValue; +import org.apache.vxquery.context.StaticContext; +import org.apache.vxquery.datamodel.accessors.TaggedValuePointable; +import org.apache.vxquery.functions.BuiltinFunctions; +import org.apache.vxquery.functions.BuiltinOperators; +import org.apache.vxquery.functions.Function; +import org.apache.vxquery.types.AnyNodeType; +import org.apache.vxquery.types.Quantifier; +import org.apache.vxquery.types.SequenceType; public class ExpressionToolbox { public static Mutable findVariableExpression(Mutable mutableLe, @@ -213,16 +211,14 @@ public static int getTypeExpressionTypeArgument(Mutable sear return pTypeCode.getInteger(); } - public static Byte[] getConstantArgument(Mutable searchM, int arg) { + public static void getConstantArgument(Mutable searchM, int arg, TaggedValuePointable tvp) { AbstractFunctionCallExpression searchFunction = (AbstractFunctionCallExpression) searchM.getValue(); ILogicalExpression argType = searchFunction.getArguments().get(arg).getValue(); searchFunction.getArguments().size(); if (argType.getExpressionTag() != LogicalExpressionTag.CONSTANT) { - return null; + return; } - TaggedValuePointable tvp = (TaggedValuePointable) TaggedValuePointable.FACTORY.createPointable(); ExpressionToolbox.getConstantAsPointable((ConstantExpression) argType, tvp); - return ArrayUtils.toObject(tvp.getByteArray()); } public static List getFullArguments(Mutable searchM) { diff --git a/vxquery-core/src/main/java/org/apache/vxquery/jsonparser/JSONParser.java b/vxquery-core/src/main/java/org/apache/vxquery/jsonparser/JSONParser.java index edf8dbc68..a238e7820 100644 --- a/vxquery-core/src/main/java/org/apache/vxquery/jsonparser/JSONParser.java +++ b/vxquery-core/src/main/java/org/apache/vxquery/jsonparser/JSONParser.java @@ -30,6 +30,7 @@ import org.apache.hyracks.api.comm.IFrameFieldAppender; import org.apache.hyracks.api.comm.IFrameWriter; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.data.std.api.IPointable; import org.apache.hyracks.data.std.primitive.BooleanPointable; import org.apache.hyracks.data.std.primitive.UTF8StringPointable; import org.apache.hyracks.data.std.util.ArrayBackedValueStorage; @@ -44,8 +45,8 @@ import org.apache.vxquery.xmlparser.IParser; public class JSONParser implements IParser { - final JsonFactory factory; - final List valueSeq; + private final JsonFactory factory; + private final List valuePointables; protected final ArrayBackedValueStorage atomic; private TaggedValuePointable tvp; private BooleanPointable bp; @@ -58,12 +59,16 @@ public class JSONParser implements IParser { protected final SequenceBuilder sb; protected final DataOutput out; protected itemType checkItem; - protected int levelArray, levelObject; + protected int levelArray; + protected int levelObject; protected final List allKeys; - protected ByteArrayOutputStream outputStream, prefixStream, pathStream; + protected ByteArrayOutputStream outputStream; + protected ByteArrayOutputStream prefixStream; + protected ByteArrayOutputStream pathStream; protected int objectMatchLevel; protected int arrayMatchLevel; - protected boolean matched, literal; + protected boolean matched; + protected boolean literal; protected ArrayBackedValueStorage tempABVS; protected List arrayCounters; protected List keysOrMembers; @@ -78,56 +83,55 @@ enum itemType { protected final List itemStack; public JSONParser() { - this(null); + this(new ArrayList<>()); } - public JSONParser(List valueSeq) { + public JSONParser(List valuePointables) { factory = new JsonFactory(); - this.valueSeq = valueSeq; + this.valuePointables = valuePointables; atomic = new ArrayBackedValueStorage(); tvp = new TaggedValuePointable(); - abStack = new ArrayList(); - obStack = new ArrayList(); - abvsStack = new ArrayList(); - keyStack = new ArrayList(); - spStack = new ArrayList(); - itemStack = new ArrayList(); + abStack = new ArrayList<>(); + obStack = new ArrayList<>(); + abvsStack = new ArrayList<>(); + keyStack = new ArrayList<>(); + spStack = new ArrayList<>(); + itemStack = new ArrayList<>(); svb = new StringValueBuilder(); sb = new SequenceBuilder(); bp = new BooleanPointable(); - allKeys = new ArrayList(); + allKeys = new ArrayList<>(); abvsStack.add(atomic); out = abvsStack.get(abvsStack.size() - 1).getDataOutput(); tempABVS = new ArrayBackedValueStorage(); - this.objectMatchLevel = 1; - this.arrayMatchLevel = 0; + objectMatchLevel = 1; + arrayMatchLevel = 0; matched = false; literal = false; - arrayCounters = new ArrayList(); + arrayCounters = new ArrayList<>(); outputStream = new ByteArrayOutputStream(); prefixStream = new ByteArrayOutputStream(); pathStream = new ByteArrayOutputStream(); - this.keysOrMembers = new ArrayList(); + keysOrMembers = new ArrayList<>(); outputStream.reset(); pathStream.reset(); - if (valueSeq != null) { - for (int i = 0; i < this.valueSeq.size(); i++) { - tvp.set(ArrayUtils.toPrimitive(valueSeq.get(i)), 0, ArrayUtils.toPrimitive(valueSeq.get(i)).length); - //access an item of an array - if (tvp.getTag() == ValueTag.XS_INTEGER_TAG) { - pathStream.write(tvp.getByteArray(), 0, tvp.getLength()); - this.arrayMatchLevel++; - this.keysOrMembers.add(Boolean.valueOf(true)); - //access all the items of an array or - //all the keys of an object - } else if (tvp.getTag() == ValueTag.XS_BOOLEAN_TAG) { - pathStream.write(tvp.getByteArray(), 0, tvp.getLength()); - this.arrayMatchLevel++; - this.keysOrMembers.add(Boolean.valueOf(false)); - //access an object - } else { - pathStream.write(tvp.getByteArray(), 1, tvp.getLength() - 1); - } + for (int i = 0; i < valuePointables.size(); i++) { + if (((TaggedValuePointable) valuePointables.get(i)).getTag() == ValueTag.XS_INTEGER_TAG) { + // access an item of an array + pathStream.write(valuePointables.get(i).getByteArray(), valuePointables.get(i).getStartOffset(), + valuePointables.get(i).getLength()); + this.arrayMatchLevel++; + this.keysOrMembers.add(Boolean.valueOf(true)); + } else if (((TaggedValuePointable) valuePointables.get(i)).getTag() == ValueTag.XS_BOOLEAN_TAG) { + // access all the items of an array or all the keys of an object + pathStream.write(valuePointables.get(i).getByteArray(), valuePointables.get(i).getStartOffset(), + valuePointables.get(i).getLength()); + this.arrayMatchLevel++; + this.keysOrMembers.add(Boolean.valueOf(false)); + } else { + //access an object + pathStream.write(valuePointables.get(i).getByteArray(), valuePointables.get(i).getStartOffset() + 1, + valuePointables.get(i).getLength() - 1); } } } @@ -142,7 +146,7 @@ public int parse(Reader input, ArrayBackedValueStorage result, IFrameWriter writ throws HyracksDataException { this.writer = writer; this.appender = appender; - if (this.valueSeq != null) { + if (!valuePointables.isEmpty()) { return parseElements(input, result); } else { return parse(input, result); @@ -183,38 +187,10 @@ public int parse(Reader input, ArrayBackedValueStorage result) throws HyracksDat startAtomicValues(ValueTag.XS_DOUBLE_TAG, parser); break; case END_ARRAY: - abStack.get(levelArray - 1).finish(); - if (itemStack.size() > 1) { - if (checkItem == itemType.ARRAY) { - abStack.get(levelArray - 2).addItem(abvsStack.get(levelArray + levelObject)); - } else if (checkItem == itemType.OBJECT) { - obStack.get(levelObject - 1).addItem(spStack.get(levelObject - 1), - abvsStack.get(levelArray + levelObject)); - } - } - itemStack.remove(itemStack.size() - 1); - levelArray--; - if (levelArray + levelObject == 0) { - sb.addItem(abvsStack.get(1)); - items++; - } + items = endArray(items); break; case END_OBJECT: - obStack.get(levelObject - 1).finish(); - if (itemStack.size() > 1) { - if (checkItem == itemType.OBJECT) { - obStack.get(levelObject - 2).addItem(spStack.get(levelObject - 2), - abvsStack.get(levelArray + levelObject)); - } else if (checkItem == itemType.ARRAY) { - abStack.get(levelArray - 1).addItem(abvsStack.get(levelArray + levelObject)); - } - } - itemStack.remove(itemStack.size() - 1); - levelObject--; - if (levelObject + levelArray == 0) { - sb.addItem(abvsStack.get(1)); - items++; - } + items = endObject(items); break; default: break; @@ -229,6 +205,44 @@ public int parse(Reader input, ArrayBackedValueStorage result) throws HyracksDat return items; } + private int endObject(int items) throws IOException { + obStack.get(levelObject - 1).finish(); + if (itemStack.size() > 1) { + if (checkItem == itemType.OBJECT) { + obStack.get(levelObject - 2).addItem(spStack.get(levelObject - 2), + abvsStack.get(levelArray + levelObject)); + } else if (checkItem == itemType.ARRAY) { + abStack.get(levelArray - 1).addItem(abvsStack.get(levelArray + levelObject)); + } + } + itemStack.remove(itemStack.size() - 1); + levelObject--; + if (levelObject + levelArray == 0) { + sb.addItem(abvsStack.get(1)); + items++; + } + return items; + } + + private int endArray(int items) throws IOException { + abStack.get(levelArray - 1).finish(); + if (itemStack.size() > 1) { + if (checkItem == itemType.ARRAY) { + abStack.get(levelArray - 2).addItem(abvsStack.get(levelArray + levelObject)); + } else if (checkItem == itemType.OBJECT) { + obStack.get(levelObject - 1).addItem(spStack.get(levelObject - 1), + abvsStack.get(levelArray + levelObject)); + } + } + itemStack.remove(itemStack.size() - 1); + levelArray--; + if (levelArray + levelObject == 0) { + sb.addItem(abvsStack.get(1)); + items++; + } + return items; + } + public int parseElements(Reader input, ArrayBackedValueStorage result) throws HyracksDataException { int items = 0; try { @@ -266,67 +280,10 @@ public int parseElements(Reader input, ArrayBackedValueStorage result) throws Hy startAtomicValues(ValueTag.XS_DOUBLE_TAG, parser); break; case END_ARRAY: - //if the query doesn't ask for an atomic value - if (!this.literal && this.pathMatch()) { - //check if the path asked from the query includes the current path - abStack.get(levelArray - 1).finish(); - if (itemStack.size() > 1) { - if (checkItem == itemType.ARRAY) { - if (levelArray > this.arrayMatchLevel + 1) { - abStack.get(levelArray - 2).addItem(abvsStack.get(levelArray + levelObject)); - } else if (this.matched) { - this.matched = false; - items++; - writeElement(abvsStack.get(levelArray + levelObject)); - } - } else if (checkItem == itemType.OBJECT) { - if (levelArray > this.arrayMatchLevel && !this.matched) { - obStack.get(levelObject - 1).addItem(spStack.get(levelObject - 1), - abvsStack.get(levelArray + levelObject)); - } else if (this.matched) { - writeElement(abvsStack.get(levelArray + levelObject)); - this.matched = false; - items++; - } - } - } - } - if (allKeys.size() - 1 >= 0) { - allKeys.remove(allKeys.size() - 1); - } - this.arrayCounters.remove(levelArray - 1); - itemStack.remove(itemStack.size() - 1); - levelArray--; + items = endArrayElements(items); break; case END_OBJECT: - //if the query doesn't ask for an atomic value - if (!this.literal && this.pathMatch()) { - //check if the path asked from the query includes the current path - obStack.get(levelObject - 1).finish(); - if (itemStack.size() > 1) { - if (checkItem == itemType.OBJECT) { - if (levelObject > this.objectMatchLevel) { - obStack.get(levelObject - 2).addItem(spStack.get(levelObject - 2), - abvsStack.get(levelArray + levelObject)); - } else if (this.matched) { - this.matched = false; - items++; - writeElement(abvsStack.get(levelArray + levelObject)); - } - } else if (checkItem == itemType.ARRAY) { - abStack.get(levelArray - 1).addItem(abvsStack.get(levelArray + levelObject)); - if (this.matched) { - writeElement(abvsStack.get(levelArray + levelObject)); - this.matched = false; - } - } - } - } - if (allKeys.size() - 1 >= 0) { - allKeys.remove(allKeys.size() - 1); - } - itemStack.remove(itemStack.size() - 1); - levelObject--; + items = endObjectElements(items); break; default: break; @@ -340,12 +297,79 @@ public int parseElements(Reader input, ArrayBackedValueStorage result) throws Hy return items; } + private int endObjectElements(int items) throws IOException { + //if the query doesn't ask for an atomic value + if (!this.literal && this.pathMatch()) { + //check if the path asked from the query includes the current path + obStack.get(levelObject - 1).finish(); + if (itemStack.size() > 1) { + if (checkItem == itemType.OBJECT) { + if (levelObject > this.objectMatchLevel) { + obStack.get(levelObject - 2).addItem(spStack.get(levelObject - 2), + abvsStack.get(levelArray + levelObject)); + } else if (this.matched) { + this.matched = false; + items++; + writeElement(abvsStack.get(levelArray + levelObject)); + } + } else if (checkItem == itemType.ARRAY) { + abStack.get(levelArray - 1).addItem(abvsStack.get(levelArray + levelObject)); + if (this.matched) { + writeElement(abvsStack.get(levelArray + levelObject)); + this.matched = false; + } + } + } + } + if (allKeys.size() - 1 >= 0) { + allKeys.remove(allKeys.size() - 1); + } + itemStack.remove(itemStack.size() - 1); + levelObject--; + return items; + } + + private int endArrayElements(int items) throws IOException { + //if the query doesn't ask for an atomic value + if (!this.literal && this.pathMatch()) { + //check if the path asked from the query includes the current path + abStack.get(levelArray - 1).finish(); + if (itemStack.size() > 1) { + if (checkItem == itemType.ARRAY) { + if (levelArray > this.arrayMatchLevel + 1) { + abStack.get(levelArray - 2).addItem(abvsStack.get(levelArray + levelObject)); + } else if (this.matched) { + this.matched = false; + items++; + writeElement(abvsStack.get(levelArray + levelObject)); + } + } else if (checkItem == itemType.OBJECT) { + if (levelArray > this.arrayMatchLevel && !this.matched) { + obStack.get(levelObject - 1).addItem(spStack.get(levelObject - 1), + abvsStack.get(levelArray + levelObject)); + } else if (this.matched) { + writeElement(abvsStack.get(levelArray + levelObject)); + this.matched = false; + items++; + } + } + } + } + if (allKeys.size() - 1 >= 0) { + allKeys.remove(allKeys.size() - 1); + } + this.arrayCounters.remove(levelArray - 1); + itemStack.remove(itemStack.size() - 1); + levelArray--; + return items; + } + private boolean pathMatch() { outputStream.reset(); for (Byte[] bb : allKeys) { outputStream.write(ArrayUtils.toPrimitive(bb), 0, ArrayUtils.toPrimitive(bb).length); } - //the path of values created by parsing the file + //the path of values created by parsing the file boolean contains = false; this.matched = false; prefixStream.reset(); @@ -392,14 +416,14 @@ public void atomicValues(int tag, JsonParser parser, DataOutput out, StringValue if (!itemStack.isEmpty()) { if (itemStack.get(itemStack.size() - 1) == itemType.ARRAY) { abStack.get(levelArray - 1).addItem(abvsStack.get(0)); - if (valueSeq != null && this.matched && levelArray == this.arrayMatchLevel) { + if (!valuePointables.isEmpty() && this.matched && levelArray == this.arrayMatchLevel) { this.literal = true; this.matched = false; writeElement(abvsStack.get(0)); } } else if (itemStack.get(itemStack.size() - 1) == itemType.OBJECT) { obStack.get(levelObject - 1).addItem(spStack.get(levelObject - 1), abvsStack.get(0)); - if (valueSeq != null && this.matched && levelObject == this.objectMatchLevel) { + if (!valuePointables.isEmpty() && this.matched && levelObject == this.objectMatchLevel) { this.literal = true; this.matched = false; writeElement(abvsStack.get(0)); @@ -410,14 +434,14 @@ public void atomicValues(int tag, JsonParser parser, DataOutput out, StringValue public void writeElement(ArrayBackedValueStorage abvs) throws IOException { tempABVS.reset(); - DataOutput out = tempABVS.getDataOutput(); - out.write(abvs.getByteArray(), abvs.getStartOffset(), abvs.getLength()); + DataOutput dOut = tempABVS.getDataOutput(); + dOut.write(abvs.getByteArray(), abvs.getStartOffset(), abvs.getLength()); FrameUtils.appendFieldToWriter(writer, appender, tempABVS.getByteArray(), tempABVS.getStartOffset(), tempABVS.getLength()); } public void startArrayOrObjects(int count) { - if (valueSeq != null && !this.arrayCounters.isEmpty()) { + if (!valuePointables.isEmpty() && !this.arrayCounters.isEmpty()) { boolean addCounter = levelArray - count < this.keysOrMembers.size() ? this.keysOrMembers.get(levelArray - count) : true; if (itemStack.get(itemStack.size() - 1) == itemType.ARRAY) { @@ -431,7 +455,7 @@ public void startArrayOrObjects(int count) { } } - if (count == 2 && valueSeq != null) { + if (count == 2 && !valuePointables.isEmpty()) { this.arrayCounters.add(Integer.valueOf(0)); } } @@ -446,7 +470,7 @@ public void startArray() throws HyracksDataException { } startArrayOrObjects(2); itemStack.add(itemType.ARRAY); - if (this.pathMatch() || this.valueSeq == null) { + if (this.pathMatch() || valuePointables.isEmpty()) { abvsStack.get(levelArray + levelObject).reset(); try { abStack.get(levelArray - 1).reset(abvsStack.get(levelArray + levelObject)); @@ -466,7 +490,7 @@ public void startObject() throws HyracksDataException { } startArrayOrObjects(1); itemStack.add(itemType.OBJECT); - if (this.pathMatch() || this.valueSeq == null) { + if (this.pathMatch() || valuePointables.isEmpty()) { abvsStack.get(levelArray + levelObject).reset(); try { obStack.get(levelObject - 1).reset(abvsStack.get(levelArray + levelObject)); @@ -486,7 +510,7 @@ public void startFieldName(JsonParser parser) throws HyracksDataException { try { svb.write(parser.getText(), outk); spStack.get(levelObject - 1).set(keyStack.get(levelObject - 1)); - if (this.valueSeq != null) { + if (!valuePointables.isEmpty()) { int length = 0; byte[] barr = spStack.get(levelObject - 1).getByteArray(); outputStream.reset(); @@ -496,7 +520,7 @@ public void startFieldName(JsonParser parser) throws HyracksDataException { tvp.set(ArrayUtils.toPrimitive(allKeys.get(i)), 0, ArrayUtils.toPrimitive(allKeys.get(i)).length); length += ArrayUtils.toPrimitive(allKeys.get(i)).length; } - //if the next two bytes represent a boolean (boolean has only two bytes), + //if the next two bytes represent a boolean (boolean has only two bytes), //it means that query asks for all the keys of the object if (length <= pathStream.size() && (length + 2) <= pathStream.size()) { tvp.set(pathStream.toByteArray(), length, length + 2); @@ -515,7 +539,7 @@ public void startFieldName(JsonParser parser) throws HyracksDataException { public void startAtomicValues(int tag, JsonParser parser) throws HyracksDataException { itemsInArray(); - if (this.pathMatch() || this.valueSeq == null) { + if (this.pathMatch() || valuePointables.isEmpty()) { try { atomicValues(tag, parser, out, svb, levelArray, levelObject); } catch (Exception e) { diff --git a/vxquery-core/src/main/java/org/apache/vxquery/metadata/AbstractVXQueryDataSource.java b/vxquery-core/src/main/java/org/apache/vxquery/metadata/AbstractVXQueryDataSource.java index 2459944d0..9c64c07e5 100644 --- a/vxquery-core/src/main/java/org/apache/vxquery/metadata/AbstractVXQueryDataSource.java +++ b/vxquery-core/src/main/java/org/apache/vxquery/metadata/AbstractVXQueryDataSource.java @@ -16,28 +16,35 @@ */ package org.apache.vxquery.metadata; +import java.util.ArrayList; import java.util.List; import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable; import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSourcePropertiesProvider; import org.apache.hyracks.algebricks.core.algebra.properties.FunctionalDependency; import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain; +import org.apache.hyracks.data.std.api.IPointable; +import org.apache.hyracks.data.std.api.IValueReference; +import org.apache.hyracks.data.std.util.ArrayBackedValueStorage; public abstract class AbstractVXQueryDataSource implements IVXQueryDataSource { + private static final long serialVersionUID = 1L; + protected static final String DELIMITER = "\\|"; protected int dataSourceId; protected String collectionName; protected String[] collectionPartitions; - protected List childSeq; - protected List valueSeq; + protected List childSeq = new ArrayList<>(); + protected List valueOffsets = new ArrayList<>(); + protected ArrayBackedValueStorage valueAbvs = new ArrayBackedValueStorage(); protected int totalDataSources; protected String tag; protected Object[] types; protected IDataSourcePropertiesProvider propProvider; - + @Override public INodeDomain getDomain() { return null; @@ -91,20 +98,39 @@ public IDataSourcePropertiesProvider getPropertiesProvider() { public void computeFDs(List scanVariables, List fdList) { } + @Override public void addChildSeq(int integer) { childSeq.add(integer); } + @Override public List getChildSeq() { return childSeq; } - public void addValueSeq(Byte[] value) { - valueSeq.add(value); + public void appendValueSequence(IValueReference value) { + valueAbvs.append(value); + valueOffsets.add(valueAbvs.getLength()); + } + + public void getValueSequence(int index, IPointable value) { + if (index == 0) { + value.set(valueAbvs.getByteArray(), 0, valueOffsets.get(index)); + } else { + value.set(valueAbvs.getByteArray(), valueOffsets.get(index - 1), valueOffsets.get(index)); + } } - public List getValueSeq() { - return valueSeq; + public byte[] getValueBytes() { + return valueAbvs.getByteArray(); + } + + public List getValueOffsets() { + return valueOffsets; + } + + public int getValueCount() { + return valueOffsets.size(); } public String[] getPartitions() { @@ -116,4 +142,5 @@ public void setPartitions(String[] collectionPartitions) { } abstract public boolean usingIndex(); + } diff --git a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionDataSource.java b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionDataSource.java index c5761c5d6..7f5a48111 100644 --- a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionDataSource.java +++ b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionDataSource.java @@ -44,8 +44,6 @@ public IPhysicalPropertiesVector computePropertiesVector(List s } }; this.tag = null; - this.childSeq = new ArrayList<>(); - this.valueSeq = new ArrayList<>(); } public static VXQueryCollectionDataSource create(int id, String collection, Object type) { @@ -55,7 +53,7 @@ public static VXQueryCollectionDataSource create(int id, String collection, Obje @Override public String toString() { return "VXQueryCollectionDataSource [collectionName=" + collectionName + ", childSeq=" + childSeq - + ", valueSeq=" + valueSeq + "]"; + + ", valuePointableCount=" + valueOffsets.size() + "]"; } public boolean usingIndex() { diff --git a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java index a3756d5b4..ffd1939fe 100644 --- a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java +++ b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java @@ -56,6 +56,7 @@ import org.apache.hyracks.api.dataflow.value.RecordDescriptor; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.job.IOperatorDescriptorRegistry; +import org.apache.hyracks.data.std.api.IPointable; import org.apache.hyracks.data.std.util.ArrayBackedValueStorage; import org.apache.hyracks.dataflow.common.comm.io.FrameFixedFieldTupleAppender; import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor; @@ -64,6 +65,7 @@ import org.apache.hyracks.hdfs.ContextFactory; import org.apache.hyracks.hdfs2.dataflow.FileSplitsFactory; import org.apache.vxquery.context.DynamicContext; +import org.apache.vxquery.datamodel.accessors.TaggedValuePointable; import org.apache.vxquery.hdfs2.HDFSFunctions; import org.apache.vxquery.jsonparser.JSONParser; import org.apache.vxquery.xmlparser.ITreeNodeIdProvider; @@ -76,7 +78,8 @@ public class VXQueryCollectionOperatorDescriptor extends AbstractSingleActivityO private short totalDataSources; private String[] collectionPartitions; private List childSeq; - private List valueSeq; + private List valueOffsets; + private byte[] valueBytes; protected static final Logger LOGGER = Logger.getLogger(VXQueryCollectionOperatorDescriptor.class.getName()); private HDFSFunctions hdfs; private String tag; @@ -91,7 +94,8 @@ public VXQueryCollectionOperatorDescriptor(IOperatorDescriptorRegistry spec, Abs dataSourceId = (short) ds.getDataSourceId(); totalDataSources = (short) ds.getTotalDataSources(); childSeq = ds.getChildSeq(); - valueSeq = ds.getValueSeq(); + valueBytes = ds.getValueBytes(); + valueOffsets = ds.getValueOffsets(); recordDescriptors[0] = rDesc; this.tag = ds.getTag(); this.hdfsConf = hdfsConf; @@ -114,7 +118,22 @@ public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx, final String collectionName = collectionPartitions[partition % collectionPartitions.length]; final XMLParser parser = new XMLParser(false, nodeIdProvider, nodeId, appender, childSeq, dCtx.getStaticContext()); - final JSONParser jparser = new JSONParser(valueSeq); + + List valuePointables = new ArrayList<>(); + { + for (int i = 0; i < valueOffsets.size(); ++i) { + TaggedValuePointable tvp = (TaggedValuePointable) TaggedValuePointable.FACTORY.createPointable(); + if (i == 0) { + tvp.set(valueBytes, 0, valueOffsets.get(i)); + } else { + tvp.set(valueBytes, valueOffsets.get(i - 1), valueOffsets.get(i)); + } + valuePointables.add(tvp); + } + } + final JSONParser jparser = new JSONParser(valuePointables); + final String collectionModifiedName = collectionName.replace("${nodeId}", nodeId); + final boolean hdfsQuery = collectionModifiedName.contains("hdfs:/"); return new AbstractUnaryInputUnaryOutputOperatorNodePushable() { @Override @@ -127,116 +146,19 @@ public void open() throws HyracksDataException { @Override public void nextFrame(ByteBuffer buffer) throws HyracksDataException { fta.reset(buffer); - String collectionModifiedName = collectionName.replace("${nodeId}", nodeId); - if (!collectionModifiedName.contains("hdfs:/")) { - File collectionDirectory = new File(collectionModifiedName); - // check if directory is in the local file system - if (collectionDirectory.exists()) { - // Go through each tuple. - if (collectionDirectory.isDirectory()) { - xmlAndJsonCollection(collectionDirectory); - } else { - throw new HyracksDataException("Invalid directory parameter (" + nodeId + ":" - + collectionDirectory.getAbsolutePath() + ") passed to collection."); - } - } + if (!hdfsQuery) { + processLocalFiles(collectionModifiedName); } else { // Else check in HDFS file system // Get instance of the HDFS filesystem FileSystem fs = hdfs.getFileSystem(); if (fs != null) { - collectionModifiedName = collectionModifiedName.replaceAll("hdfs:/", ""); - Path directory = new Path(collectionModifiedName); - Path xmlDocument; + Path directory = new Path(collectionModifiedName.replaceAll("hdfs:/", "")); if (tag != null) { - hdfs.setJob(directory.toString(), tag); - tag = "<" + tag + ">"; - Job job = hdfs.getJob(); - InputFormat inputFormat = hdfs.getinputFormat(); - try { - hdfs.scheduleSplits(); - ArrayList schedule = hdfs - .getScheduleForNode(InetAddress.getLocalHost().getHostAddress()); - List splits = hdfs.getSplits(); - List fileSplits = new ArrayList<>(); - for (int i : schedule) { - fileSplits.add((FileSplit) splits.get(i)); - } - FileSplitsFactory splitsFactory = new FileSplitsFactory(fileSplits); - List inputSplits = splitsFactory.getSplits(); - ContextFactory ctxFactory = new ContextFactory(); - int size = inputSplits.size(); - InputStream stream; - String value; - RecordReader reader; - TaskAttemptContext context; - for (int i = 0; i < size; i++) { - // read split - context = ctxFactory.createContext(job.getConfiguration(), i); - reader = inputFormat.createRecordReader(inputSplits.get(i), context); - reader.initialize(inputSplits.get(i), context); - while (reader.nextKeyValue()) { - value = reader.getCurrentValue().toString(); - // Split value if it contains more than - // one item with the tag - if (StringUtils.countMatches(value, tag) > 1) { - String[] items = value.split(tag); - for (String item : items) { - if (item.length() > 0) { - item = START_TAG + tag + item; - stream = new ByteArrayInputStream( - item.getBytes(StandardCharsets.UTF_8)); - parser.parseHDFSElements(stream, writer, fta, i); - stream.close(); - } - } - } else { - value = START_TAG + value; - // create an input stream to the - // file currently reading and send - // it to parser - stream = new ByteArrayInputStream(value.getBytes(StandardCharsets.UTF_8)); - parser.parseHDFSElements(stream, writer, fta, i); - stream.close(); - } - } - reader.close(); - } - - } catch (Exception e) { - throw new HyracksDataException(e); - } + processHdfsTag(fta, parser, directory); } else { - try { - // check if the path exists and is a directory - if (fs.exists(directory) && fs.isDirectory(directory)) { - for (int tupleIndex = 0; tupleIndex < fta.getTupleCount(); ++tupleIndex) { - // read every file in the directory - RemoteIterator it = fs.listFiles(directory, true); - while (it.hasNext()) { - xmlDocument = it.next().getPath(); - if (fs.isFile(xmlDocument)) { - if (LOGGER.isLoggable(Level.FINE)) { - LOGGER.fine( - "Starting to read XML document: " + xmlDocument.getName()); - } - // create an input stream to the - // file currently reading and - // send it to parser - InputStream in = fs.open(xmlDocument).getWrappedStream(); - parser.parseHDFSElements(in, writer, fta, tupleIndex); - in.close(); - } - } - } - } else { - throw new HyracksDataException("Invalid HDFS directory parameter (" + nodeId + ":" - + directory + ") passed to collection."); - } - } catch (Exception e) { - throw new HyracksDataException(e); - } + processHdfsFiles(fta, nodeId, parser, fs, directory); } try { fs.close(); @@ -247,29 +169,129 @@ public void nextFrame(ByteBuffer buffer) throws HyracksDataException { } } - public void xmlAndJsonCollection(File directory) throws HyracksDataException { - Reader input; - for (int tupleIndex = 0; tupleIndex < fta.getTupleCount(); ++tupleIndex) { - Iterator it = FileUtils.iterateFiles(directory, new VXQueryIOFileFilter(), - TrueFileFilter.INSTANCE); - while (it.hasNext()) { - File file = it.next(); - String fileName = file.getName().toLowerCase(); - if (fileName.endsWith(".xml")) { - if (LOGGER.isLoggable(Level.FINE)) { - LOGGER.fine("Starting to read XML document: " + file.getAbsolutePath()); + private void processHdfsTag(final FrameTupleAccessor fta, final XMLParser parser, Path directory) + throws HyracksDataException { + hdfs.setJob(directory.toString(), tag); + tag = "<" + tag + ">"; + Job job = hdfs.getJob(); + InputFormat inputFormat = hdfs.getinputFormat(); + try { + hdfs.scheduleSplits(); + ArrayList schedule = hdfs.getScheduleForNode(InetAddress.getLocalHost().getHostAddress()); + List splits = hdfs.getSplits(); + List fileSplits = new ArrayList<>(); + for (int i : schedule) { + fileSplits.add((FileSplit) splits.get(i)); + } + FileSplitsFactory splitsFactory = new FileSplitsFactory(fileSplits); + List inputSplits = splitsFactory.getSplits(); + ContextFactory ctxFactory = new ContextFactory(); + int size = inputSplits.size(); + InputStream stream; + String value; + RecordReader reader; + TaskAttemptContext context; + for (int i = 0; i < size; i++) { + // read split + context = ctxFactory.createContext(job.getConfiguration(), i); + reader = inputFormat.createRecordReader(inputSplits.get(i), context); + reader.initialize(inputSplits.get(i), context); + while (reader.nextKeyValue()) { + value = reader.getCurrentValue().toString(); + // Split value if it contains more than + // one item with the tag + if (StringUtils.countMatches(value, tag) > 1) { + String[] items = value.split(tag); + for (String item : items) { + if (item.length() > 0) { + item = START_TAG + tag + item; + stream = new ByteArrayInputStream(item.getBytes(StandardCharsets.UTF_8)); + parser.parseHDFSElements(stream, writer, fta, i); + stream.close(); + } + } + } else { + value = START_TAG + value; + // create an input stream to the + // file currently reading and send + // it to parser + stream = new ByteArrayInputStream(value.getBytes(StandardCharsets.UTF_8)); + parser.parseHDFSElements(stream, writer, fta, i); + stream.close(); } - parser.parseElements(file, writer, tupleIndex); - } else if (fileName.endsWith(".json")) { - if (LOGGER.isLoggable(Level.FINE)) { - LOGGER.fine("Starting to read JSON document: " + file.getAbsolutePath()); + } + reader.close(); + } + + } catch (Exception e) { + throw new HyracksDataException(e); + } + } + + private void processHdfsFiles(final FrameTupleAccessor fta, final String nodeId, final XMLParser parser, + FileSystem fs, Path directory) throws HyracksDataException { + Path xmlDocument; + try { + // check if the path exists and is a directory + if (!fs.exists(directory) || !fs.isDirectory(directory)) { + throw new HyracksDataException("Invalid HDFS directory parameter (" + nodeId + ":" + directory + + ") passed to collection."); + } + for (int tupleIndex = 0; tupleIndex < fta.getTupleCount(); ++tupleIndex) { + // read every file in the directory + RemoteIterator it = fs.listFiles(directory, true); + while (it.hasNext()) { + xmlDocument = it.next().getPath(); + if (fs.isFile(xmlDocument)) { + if (LOGGER.isLoggable(Level.FINE)) { + LOGGER.fine("Starting to read XML document: " + xmlDocument.getName()); + } + // create an input stream to the + // file currently reading and + // send it to parser + InputStream in = fs.open(xmlDocument).getWrappedStream(); + parser.parseHDFSElements(in, writer, fta, tupleIndex); + in.close(); } - try { - jsonAbvs.reset(); - input = new InputStreamReader(new FileInputStream(file)); - jparser.parse(input, jsonAbvs, writer, appender); - } catch (FileNotFoundException e) { - throw new HyracksDataException(e.toString()); + } + } + } catch (Exception e) { + throw new HyracksDataException(e); + } + } + + private void processLocalFiles(String collectionModifiedName) throws HyracksDataException { + Reader input; + File collectionDirectory = new File(collectionModifiedName); + // check if directory is in the local file system + if (collectionDirectory.exists()) { + // Go through each tuple. + if (!collectionDirectory.isDirectory()) { + throw new HyracksDataException("Invalid directory parameter (" + nodeId + ":" + + collectionDirectory.getAbsolutePath() + ") passed to collection."); + } + for (int tupleIndex = 0; tupleIndex < fta.getTupleCount(); ++tupleIndex) { + Iterator it = FileUtils.iterateFiles(collectionDirectory, new VXQueryIOFileFilter(), + TrueFileFilter.INSTANCE); + while (it.hasNext()) { + File file = it.next(); + String fileName = file.getName().toLowerCase(); + if (fileName.endsWith(".xml")) { + if (LOGGER.isLoggable(Level.FINE)) { + LOGGER.fine("Starting to read XML document: " + file.getAbsolutePath()); + } + parser.parseElements(file, writer, tupleIndex); + } else if (fileName.endsWith(".json")) { + if (LOGGER.isLoggable(Level.FINE)) { + LOGGER.fine("Starting to read JSON document: " + file.getAbsolutePath()); + } + try { + jsonAbvs.reset(); + input = new InputStreamReader(new FileInputStream(file)); + jparser.parse(input, jsonAbvs, writer, appender); + } catch (FileNotFoundException e) { + throw new HyracksDataException(e); + } } } } diff --git a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryIndexingDataSource.java b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryIndexingDataSource.java index d55530dcf..8723cad47 100644 --- a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryIndexingDataSource.java +++ b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryIndexingDataSource.java @@ -49,8 +49,6 @@ public IPhysicalPropertiesVector computePropertiesVector(List s } }; this.tag = null; - this.childSeq = new ArrayList<>(); - this.valueSeq = new ArrayList<>(); } public static VXQueryIndexingDataSource create(int id, String collection, Object type, String function) { diff --git a/vxquery-xtest/src/main/java/org/apache/vxquery/xtest/TestRunner.java b/vxquery-xtest/src/main/java/org/apache/vxquery/xtest/TestRunner.java index fa0a90034..71f55cfb0 100644 --- a/vxquery-xtest/src/main/java/org/apache/vxquery/xtest/TestRunner.java +++ b/vxquery-xtest/src/main/java/org/apache/vxquery/xtest/TestRunner.java @@ -66,7 +66,7 @@ public class TestRunner { public TestRunner(XTestOptions opts) throws UnknownHostException { this.opts = opts; - this.collectionList = new ArrayList(); + this.collectionList = new ArrayList<>(); } public void open() throws Exception { From 662dc85a9fddde687683cadaffd7d53643ce42b2 Mon Sep 17 00:00:00 2001 From: Preston Carman Date: Wed, 16 Aug 2017 20:47:58 -0700 Subject: [PATCH 2/6] Reduce changes. --- .../apache/vxquery/jsonparser/JSONParser.java | 198 ++++++++---------- 1 file changed, 89 insertions(+), 109 deletions(-) diff --git a/vxquery-core/src/main/java/org/apache/vxquery/jsonparser/JSONParser.java b/vxquery-core/src/main/java/org/apache/vxquery/jsonparser/JSONParser.java index a238e7820..e52860411 100644 --- a/vxquery-core/src/main/java/org/apache/vxquery/jsonparser/JSONParser.java +++ b/vxquery-core/src/main/java/org/apache/vxquery/jsonparser/JSONParser.java @@ -187,10 +187,38 @@ public int parse(Reader input, ArrayBackedValueStorage result) throws HyracksDat startAtomicValues(ValueTag.XS_DOUBLE_TAG, parser); break; case END_ARRAY: - items = endArray(items); + abStack.get(levelArray - 1).finish(); + if (itemStack.size() > 1) { + if (checkItem == itemType.ARRAY) { + abStack.get(levelArray - 2).addItem(abvsStack.get(levelArray + levelObject)); + } else if (checkItem == itemType.OBJECT) { + obStack.get(levelObject - 1).addItem(spStack.get(levelObject - 1), + abvsStack.get(levelArray + levelObject)); + } + } + itemStack.remove(itemStack.size() - 1); + levelArray--; + if (levelArray + levelObject == 0) { + sb.addItem(abvsStack.get(1)); + items++; + } break; case END_OBJECT: - items = endObject(items); + obStack.get(levelObject - 1).finish(); + if (itemStack.size() > 1) { + if (checkItem == itemType.OBJECT) { + obStack.get(levelObject - 2).addItem(spStack.get(levelObject - 2), + abvsStack.get(levelArray + levelObject)); + } else if (checkItem == itemType.ARRAY) { + abStack.get(levelArray - 1).addItem(abvsStack.get(levelArray + levelObject)); + } + } + itemStack.remove(itemStack.size() - 1); + levelObject--; + if (levelObject + levelArray == 0) { + sb.addItem(abvsStack.get(1)); + items++; + } break; default: break; @@ -205,44 +233,6 @@ public int parse(Reader input, ArrayBackedValueStorage result) throws HyracksDat return items; } - private int endObject(int items) throws IOException { - obStack.get(levelObject - 1).finish(); - if (itemStack.size() > 1) { - if (checkItem == itemType.OBJECT) { - obStack.get(levelObject - 2).addItem(spStack.get(levelObject - 2), - abvsStack.get(levelArray + levelObject)); - } else if (checkItem == itemType.ARRAY) { - abStack.get(levelArray - 1).addItem(abvsStack.get(levelArray + levelObject)); - } - } - itemStack.remove(itemStack.size() - 1); - levelObject--; - if (levelObject + levelArray == 0) { - sb.addItem(abvsStack.get(1)); - items++; - } - return items; - } - - private int endArray(int items) throws IOException { - abStack.get(levelArray - 1).finish(); - if (itemStack.size() > 1) { - if (checkItem == itemType.ARRAY) { - abStack.get(levelArray - 2).addItem(abvsStack.get(levelArray + levelObject)); - } else if (checkItem == itemType.OBJECT) { - obStack.get(levelObject - 1).addItem(spStack.get(levelObject - 1), - abvsStack.get(levelArray + levelObject)); - } - } - itemStack.remove(itemStack.size() - 1); - levelArray--; - if (levelArray + levelObject == 0) { - sb.addItem(abvsStack.get(1)); - items++; - } - return items; - } - public int parseElements(Reader input, ArrayBackedValueStorage result) throws HyracksDataException { int items = 0; try { @@ -280,10 +270,67 @@ public int parseElements(Reader input, ArrayBackedValueStorage result) throws Hy startAtomicValues(ValueTag.XS_DOUBLE_TAG, parser); break; case END_ARRAY: - items = endArrayElements(items); + //if the query doesn't ask for an atomic value + if (!this.literal && this.pathMatch()) { + //check if the path asked from the query includes the current path + abStack.get(levelArray - 1).finish(); + if (itemStack.size() > 1) { + if (checkItem == itemType.ARRAY) { + if (levelArray > this.arrayMatchLevel + 1) { + abStack.get(levelArray - 2).addItem(abvsStack.get(levelArray + levelObject)); + } else if (this.matched) { + this.matched = false; + items++; + writeElement(abvsStack.get(levelArray + levelObject)); + } + } else if (checkItem == itemType.OBJECT) { + if (levelArray > this.arrayMatchLevel && !this.matched) { + obStack.get(levelObject - 1).addItem(spStack.get(levelObject - 1), + abvsStack.get(levelArray + levelObject)); + } else if (this.matched) { + writeElement(abvsStack.get(levelArray + levelObject)); + this.matched = false; + items++; + } + } + } + } + if (allKeys.size() - 1 >= 0) { + allKeys.remove(allKeys.size() - 1); + } + this.arrayCounters.remove(levelArray - 1); + itemStack.remove(itemStack.size() - 1); + levelArray--; break; case END_OBJECT: - items = endObjectElements(items); + //if the query doesn't ask for an atomic value + if (!this.literal && this.pathMatch()) { + //check if the path asked from the query includes the current path + obStack.get(levelObject - 1).finish(); + if (itemStack.size() > 1) { + if (checkItem == itemType.OBJECT) { + if (levelObject > this.objectMatchLevel) { + obStack.get(levelObject - 2).addItem(spStack.get(levelObject - 2), + abvsStack.get(levelArray + levelObject)); + } else if (this.matched) { + this.matched = false; + items++; + writeElement(abvsStack.get(levelArray + levelObject)); + } + } else if (checkItem == itemType.ARRAY) { + abStack.get(levelArray - 1).addItem(abvsStack.get(levelArray + levelObject)); + if (this.matched) { + writeElement(abvsStack.get(levelArray + levelObject)); + this.matched = false; + } + } + } + } + if (allKeys.size() - 1 >= 0) { + allKeys.remove(allKeys.size() - 1); + } + itemStack.remove(itemStack.size() - 1); + levelObject--; break; default: break; @@ -297,73 +344,6 @@ public int parseElements(Reader input, ArrayBackedValueStorage result) throws Hy return items; } - private int endObjectElements(int items) throws IOException { - //if the query doesn't ask for an atomic value - if (!this.literal && this.pathMatch()) { - //check if the path asked from the query includes the current path - obStack.get(levelObject - 1).finish(); - if (itemStack.size() > 1) { - if (checkItem == itemType.OBJECT) { - if (levelObject > this.objectMatchLevel) { - obStack.get(levelObject - 2).addItem(spStack.get(levelObject - 2), - abvsStack.get(levelArray + levelObject)); - } else if (this.matched) { - this.matched = false; - items++; - writeElement(abvsStack.get(levelArray + levelObject)); - } - } else if (checkItem == itemType.ARRAY) { - abStack.get(levelArray - 1).addItem(abvsStack.get(levelArray + levelObject)); - if (this.matched) { - writeElement(abvsStack.get(levelArray + levelObject)); - this.matched = false; - } - } - } - } - if (allKeys.size() - 1 >= 0) { - allKeys.remove(allKeys.size() - 1); - } - itemStack.remove(itemStack.size() - 1); - levelObject--; - return items; - } - - private int endArrayElements(int items) throws IOException { - //if the query doesn't ask for an atomic value - if (!this.literal && this.pathMatch()) { - //check if the path asked from the query includes the current path - abStack.get(levelArray - 1).finish(); - if (itemStack.size() > 1) { - if (checkItem == itemType.ARRAY) { - if (levelArray > this.arrayMatchLevel + 1) { - abStack.get(levelArray - 2).addItem(abvsStack.get(levelArray + levelObject)); - } else if (this.matched) { - this.matched = false; - items++; - writeElement(abvsStack.get(levelArray + levelObject)); - } - } else if (checkItem == itemType.OBJECT) { - if (levelArray > this.arrayMatchLevel && !this.matched) { - obStack.get(levelObject - 1).addItem(spStack.get(levelObject - 1), - abvsStack.get(levelArray + levelObject)); - } else if (this.matched) { - writeElement(abvsStack.get(levelArray + levelObject)); - this.matched = false; - items++; - } - } - } - } - if (allKeys.size() - 1 >= 0) { - allKeys.remove(allKeys.size() - 1); - } - this.arrayCounters.remove(levelArray - 1); - itemStack.remove(itemStack.size() - 1); - levelArray--; - return items; - } - private boolean pathMatch() { outputStream.reset(); for (Byte[] bb : allKeys) { From af688ac97fe0e7a82910cca6fe03aefe12b7efa3 Mon Sep 17 00:00:00 2001 From: Christina Pavlopoulou Date: Thu, 17 Aug 2017 14:48:02 -0700 Subject: [PATCH 3/6] working on cleaning JSONParser --- .../main/java/org/apache/vxquery/jsonparser/JSONParser.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/vxquery-core/src/main/java/org/apache/vxquery/jsonparser/JSONParser.java b/vxquery-core/src/main/java/org/apache/vxquery/jsonparser/JSONParser.java index e52860411..67e92c70f 100644 --- a/vxquery-core/src/main/java/org/apache/vxquery/jsonparser/JSONParser.java +++ b/vxquery-core/src/main/java/org/apache/vxquery/jsonparser/JSONParser.java @@ -119,19 +119,19 @@ public JSONParser(List valuePointables) { if (((TaggedValuePointable) valuePointables.get(i)).getTag() == ValueTag.XS_INTEGER_TAG) { // access an item of an array pathStream.write(valuePointables.get(i).getByteArray(), valuePointables.get(i).getStartOffset(), - valuePointables.get(i).getLength()); + valuePointables.get(i).getLength() - valuePointables.get(i).getStartOffset()); this.arrayMatchLevel++; this.keysOrMembers.add(Boolean.valueOf(true)); } else if (((TaggedValuePointable) valuePointables.get(i)).getTag() == ValueTag.XS_BOOLEAN_TAG) { // access all the items of an array or all the keys of an object pathStream.write(valuePointables.get(i).getByteArray(), valuePointables.get(i).getStartOffset(), - valuePointables.get(i).getLength()); + valuePointables.get(i).getLength() - valuePointables.get(i).getStartOffset()); this.arrayMatchLevel++; this.keysOrMembers.add(Boolean.valueOf(false)); } else { //access an object pathStream.write(valuePointables.get(i).getByteArray(), valuePointables.get(i).getStartOffset() + 1, - valuePointables.get(i).getLength() - 1); + valuePointables.get(i).getLength() - valuePointables.get(i).getStartOffset() - 1); } } } From 92299c83c353ffb0bed4a8dab079a13a47857cd2 Mon Sep 17 00:00:00 2001 From: Preston Carman Date: Thu, 17 Aug 2017 16:24:29 -0700 Subject: [PATCH 4/6] Moved Christina's fix into the operator descriptor. --- .../org/apache/vxquery/jsonparser/JSONParser.java | 14 +++++++++++--- .../VXQueryCollectionOperatorDescriptor.java | 2 +- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/vxquery-core/src/main/java/org/apache/vxquery/jsonparser/JSONParser.java b/vxquery-core/src/main/java/org/apache/vxquery/jsonparser/JSONParser.java index 67e92c70f..be4711532 100644 --- a/vxquery-core/src/main/java/org/apache/vxquery/jsonparser/JSONParser.java +++ b/vxquery-core/src/main/java/org/apache/vxquery/jsonparser/JSONParser.java @@ -32,6 +32,7 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.data.std.api.IPointable; import org.apache.hyracks.data.std.primitive.BooleanPointable; +import org.apache.hyracks.data.std.primitive.LongPointable; import org.apache.hyracks.data.std.primitive.UTF8StringPointable; import org.apache.hyracks.data.std.util.ArrayBackedValueStorage; import org.apache.hyracks.dataflow.common.comm.util.FrameUtils; @@ -72,6 +73,7 @@ public class JSONParser implements IParser { protected ArrayBackedValueStorage tempABVS; protected List arrayCounters; protected List keysOrMembers; + protected List valueSequence; protected IFrameWriter writer; protected IFrameFieldAppender appender; @@ -113,25 +115,31 @@ public JSONParser(List valuePointables) { prefixStream = new ByteArrayOutputStream(); pathStream = new ByteArrayOutputStream(); keysOrMembers = new ArrayList<>(); + valueSequence = new ArrayList<>(); outputStream.reset(); pathStream.reset(); for (int i = 0; i < valuePointables.size(); i++) { + int start = valuePointables.get(i).getStartOffset() + 1; + int length = valuePointables.get(i).getLength() - 1; if (((TaggedValuePointable) valuePointables.get(i)).getTag() == ValueTag.XS_INTEGER_TAG) { // access an item of an array pathStream.write(valuePointables.get(i).getByteArray(), valuePointables.get(i).getStartOffset(), - valuePointables.get(i).getLength() - valuePointables.get(i).getStartOffset()); + valuePointables.get(i).getLength()); this.arrayMatchLevel++; this.keysOrMembers.add(Boolean.valueOf(true)); + valueSequence.add(new Long(LongPointable.getLong(valuePointables.get(i).getByteArray(), start))); } else if (((TaggedValuePointable) valuePointables.get(i)).getTag() == ValueTag.XS_BOOLEAN_TAG) { // access all the items of an array or all the keys of an object pathStream.write(valuePointables.get(i).getByteArray(), valuePointables.get(i).getStartOffset(), - valuePointables.get(i).getLength() - valuePointables.get(i).getStartOffset()); + valuePointables.get(i).getLength()); this.arrayMatchLevel++; this.keysOrMembers.add(Boolean.valueOf(false)); + valueSequence + .add(new Boolean(BooleanPointable.getBoolean(valuePointables.get(i).getByteArray(), start))); } else { //access an object pathStream.write(valuePointables.get(i).getByteArray(), valuePointables.get(i).getStartOffset() + 1, - valuePointables.get(i).getLength() - valuePointables.get(i).getStartOffset() - 1); + valuePointables.get(i).getLength() - 1); } } } diff --git a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java index ffd1939fe..441458de7 100644 --- a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java +++ b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java @@ -126,7 +126,7 @@ public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx, if (i == 0) { tvp.set(valueBytes, 0, valueOffsets.get(i)); } else { - tvp.set(valueBytes, valueOffsets.get(i - 1), valueOffsets.get(i)); + tvp.set(valueBytes, valueOffsets.get(i - 1), valueOffsets.get(i) - valueOffsets.get(i - 1)); } valuePointables.add(tvp); } From 84efee4c31fe42961c841ad0c72eb452a3734c65 Mon Sep 17 00:00:00 2001 From: Christina Pavlopoulou Date: Fri, 18 Aug 2017 15:16:26 -0700 Subject: [PATCH 5/6] a few changes --- .../apache/vxquery/jsonparser/JSONParser.java | 128 ++++++++++++------ 1 file changed, 90 insertions(+), 38 deletions(-) diff --git a/vxquery-core/src/main/java/org/apache/vxquery/jsonparser/JSONParser.java b/vxquery-core/src/main/java/org/apache/vxquery/jsonparser/JSONParser.java index 67e92c70f..3e9bfedb8 100644 --- a/vxquery-core/src/main/java/org/apache/vxquery/jsonparser/JSONParser.java +++ b/vxquery-core/src/main/java/org/apache/vxquery/jsonparser/JSONParser.java @@ -32,6 +32,7 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.data.std.api.IPointable; import org.apache.hyracks.data.std.primitive.BooleanPointable; +import org.apache.hyracks.data.std.primitive.LongPointable; import org.apache.hyracks.data.std.primitive.UTF8StringPointable; import org.apache.hyracks.data.std.util.ArrayBackedValueStorage; import org.apache.hyracks.dataflow.common.comm.util.FrameUtils; @@ -61,7 +62,8 @@ public class JSONParser implements IParser { protected itemType checkItem; protected int levelArray; protected int levelObject; - protected final List allKeys; + //protected final List allKeys; + protected final List allKeys; protected ByteArrayOutputStream outputStream; protected ByteArrayOutputStream prefixStream; protected ByteArrayOutputStream pathStream; @@ -74,6 +76,9 @@ public class JSONParser implements IParser { protected List keysOrMembers; protected IFrameWriter writer; protected IFrameFieldAppender appender; + protected boolean[] matchedKeys; + protected Object[] subelements; + protected boolean skipping; enum itemType { ARRAY, @@ -115,23 +120,35 @@ public JSONParser(List valuePointables) { keysOrMembers = new ArrayList<>(); outputStream.reset(); pathStream.reset(); + subelements = new Object[valuePointables.size()]; + matchedKeys = new boolean[valuePointables.size()]; + skipping = true; for (int i = 0; i < valuePointables.size(); i++) { if (((TaggedValuePointable) valuePointables.get(i)).getTag() == ValueTag.XS_INTEGER_TAG) { // access an item of an array - pathStream.write(valuePointables.get(i).getByteArray(), valuePointables.get(i).getStartOffset(), - valuePointables.get(i).getLength() - valuePointables.get(i).getStartOffset()); + // pathStream.write(valuePointables.get(i).getByteArray(), valuePointables.get(i).getStartOffset(), + // valuePointables.get(i).getLength() - valuePointables.get(i).getStartOffset()); + LongPointable lp = new LongPointable(); + lp.set(valuePointables.get(i).getByteArray(), 1, valuePointables.get(i).getLength()); + subelements[i] = lp.getLong(); this.arrayMatchLevel++; this.keysOrMembers.add(Boolean.valueOf(true)); } else if (((TaggedValuePointable) valuePointables.get(i)).getTag() == ValueTag.XS_BOOLEAN_TAG) { // access all the items of an array or all the keys of an object - pathStream.write(valuePointables.get(i).getByteArray(), valuePointables.get(i).getStartOffset(), - valuePointables.get(i).getLength() - valuePointables.get(i).getStartOffset()); + // pathStream.write(valuePointables.get(i).getByteArray(), valuePointables.get(i).getStartOffset(), + // valuePointables.get(i).getLength() - valuePointables.get(i).getStartOffset()); + + subelements[i] = Boolean.FALSE; this.arrayMatchLevel++; this.keysOrMembers.add(Boolean.valueOf(false)); } else { //access an object - pathStream.write(valuePointables.get(i).getByteArray(), valuePointables.get(i).getStartOffset() + 1, - valuePointables.get(i).getLength() - valuePointables.get(i).getStartOffset() - 1); + // pathStream.write(valuePointables.get(i).getByteArray(), valuePointables.get(i).getStartOffset() + 1, + // valuePointables.get(i).getLength() - valuePointables.get(i).getStartOffset() - 1); + + UTF8StringPointable sp = new UTF8StringPointable(); + sp.set(valuePointables.get(i).getByteArray(), 1, valuePointables.get(i).getLength() - 1); + subelements[i] = sp.toString(); } } } @@ -271,7 +288,8 @@ public int parseElements(Reader input, ArrayBackedValueStorage result) throws Hy break; case END_ARRAY: //if the query doesn't ask for an atomic value - if (!this.literal && this.pathMatch()) { +// if (!this.literal && this.pathMatch()) { + if (!this.literal && !skipping) { //check if the path asked from the query includes the current path abStack.get(levelArray - 1).finish(); if (itemStack.size() > 1) { @@ -287,7 +305,8 @@ public int parseElements(Reader input, ArrayBackedValueStorage result) throws Hy if (levelArray > this.arrayMatchLevel && !this.matched) { obStack.get(levelObject - 1).addItem(spStack.get(levelObject - 1), abvsStack.get(levelArray + levelObject)); - } else if (this.matched) { + // } else if (this.matched) { + } else if (!skipping) { writeElement(abvsStack.get(levelArray + levelObject)); this.matched = false; items++; @@ -304,7 +323,8 @@ public int parseElements(Reader input, ArrayBackedValueStorage result) throws Hy break; case END_OBJECT: //if the query doesn't ask for an atomic value - if (!this.literal && this.pathMatch()) { +// if (!this.literal && this.pathMatch()) { + if (!this.literal && !skipping) { //check if the path asked from the query includes the current path obStack.get(levelObject - 1).finish(); if (itemStack.size() > 1) { @@ -319,9 +339,12 @@ public int parseElements(Reader input, ArrayBackedValueStorage result) throws Hy } } else if (checkItem == itemType.ARRAY) { abStack.get(levelArray - 1).addItem(abvsStack.get(levelArray + levelObject)); - if (this.matched) { + // if (this.matched) { + if (!skipping) { writeElement(abvsStack.get(levelArray + levelObject)); this.matched = false; + skipping = true; + // } } } } @@ -346,24 +369,42 @@ public int parseElements(Reader input, ArrayBackedValueStorage result) throws Hy private boolean pathMatch() { outputStream.reset(); - for (Byte[] bb : allKeys) { - outputStream.write(ArrayUtils.toPrimitive(bb), 0, ArrayUtils.toPrimitive(bb).length); - } + // for (Byte[] bb : allKeys) { + // outputStream.write(ArrayUtils.toPrimitive(bb), 0, ArrayUtils.toPrimitive(bb).length); + // } //the path of values created by parsing the file boolean contains = false; this.matched = false; prefixStream.reset(); - if (pathStream.size() < outputStream.size()) { - prefixStream.write(outputStream.toByteArray(), 0, pathStream.size()); - contains = Arrays.equals(prefixStream.toByteArray(), pathStream.toByteArray()); - } else { - prefixStream.write(pathStream.toByteArray(), 0, outputStream.size()); - contains = Arrays.equals(prefixStream.toByteArray(), outputStream.toByteArray()); + // if (pathStream.size() < outputStream.size()) { + // prefixStream.write(outputStream.toByteArray(), 0, pathStream.size()); + // contains = Arrays.equals(prefixStream.toByteArray(), pathStream.toByteArray()); + // } else { + // prefixStream.write(pathStream.toByteArray(), 0, outputStream.size()); + // contains = Arrays.equals(prefixStream.toByteArray(), outputStream.toByteArray()); + // } + // if (pathStream.size() == outputStream.size() && contains) { + // this.objectMatchLevel = this.levelObject; + // this.matched = true; + // this.literal = false; + // } + if (!allKeys.isEmpty()) { + if (allKeys.size() <= valuePointables.size()) { + //for(int i=0; i Date: Wed, 23 Aug 2017 12:32:19 -0700 Subject: [PATCH 6/6] faster JSONParser --- .../apache/vxquery/jsonparser/JSONParser.java | 92 ++++++++++++------- 1 file changed, 58 insertions(+), 34 deletions(-) diff --git a/vxquery-core/src/main/java/org/apache/vxquery/jsonparser/JSONParser.java b/vxquery-core/src/main/java/org/apache/vxquery/jsonparser/JSONParser.java index 3ae34a660..f89106707 100644 --- a/vxquery-core/src/main/java/org/apache/vxquery/jsonparser/JSONParser.java +++ b/vxquery-core/src/main/java/org/apache/vxquery/jsonparser/JSONParser.java @@ -215,7 +215,7 @@ public int parseElements(Reader input, ArrayBackedValueStorage result) throws Hy checkItem = null; this.matched = false; - + matchedKeys = new boolean[valuePointables.size()]; levelArray = 0; levelObject = 0; sb.reset(result); @@ -268,9 +268,13 @@ public int parseElements(Reader input, ArrayBackedValueStorage result) throws Hy if (allKeys.size() - 1 >= 0) { allKeys.remove(allKeys.size() - 1); } - this.arrayCounters.remove(levelArray - 1); + if (levelArray > 0) { + this.arrayCounters.remove(levelArray - 1); + } itemStack.remove(itemStack.size() - 1); - levelArray--; + if (levelArray > 0) { + levelArray--; + } break; case END_OBJECT: //if the query doesn't ask for an atomic value @@ -291,6 +295,7 @@ public int parseElements(Reader input, ArrayBackedValueStorage result) throws Hy if (!matched) { writeElement(abvsStack.get(levelArray + levelObject)); skipping = true; + items++; } } } @@ -299,7 +304,9 @@ public int parseElements(Reader input, ArrayBackedValueStorage result) throws Hy allKeys.remove(allKeys.size() - 1); } itemStack.remove(itemStack.size() - 1); - levelObject--; + if (levelObject > 0) { + levelObject--; + } break; default: break; @@ -314,7 +321,7 @@ public int parseElements(Reader input, ArrayBackedValueStorage result) throws Hy } private boolean pathMatch() { - boolean contains = false; + boolean contains = true; if (!allKeys.isEmpty() && allKeys.size() <= valuePointables.size()) { if (allKeys.get(allKeys.size() - 1).equals(subelements[allKeys.size() - 1])) { matchedKeys[allKeys.size() - 1] = true; @@ -325,8 +332,6 @@ private boolean pathMatch() { for (boolean b : matchedKeys) { if (!b) { contains = false; - } else { - contains = true; } } if (contains) { @@ -336,7 +341,8 @@ private boolean pathMatch() { } public void itemsInArray() { - if (itemStack.get(itemStack.size() - 1) == itemType.ARRAY && !this.arrayCounters.isEmpty()) { + if (!itemStack.isEmpty() && itemStack.get(itemStack.size() - 1) == itemType.ARRAY + && !this.arrayCounters.isEmpty()) { boolean addCounter = subelements[allKeys.size()].equals(Boolean.TRUE) ? false : true; if (addCounter) { this.arrayCounters.set(levelArray - 1, this.arrayCounters.get(levelArray - 1) + 1); @@ -360,18 +366,24 @@ public void atomicValues(int tag, JsonParser parser, DataOutput out, StringValue } if (!itemStack.isEmpty()) { if (itemStack.get(itemStack.size() - 1) == itemType.ARRAY) { - abStack.get(levelArray - 1).addItem(abvsStack.get(0)); + if (levelArray > 0) { + abStack.get(levelArray - 1).addItem(abvsStack.get(0)); + } if (!valuePointables.isEmpty() && allKeys.get(allKeys.size() - 1).equals(subelements[subelements.length - 1])) { writeElement(abvsStack.get(0)); matched = true; + skipping = true; } } else if (itemStack.get(itemStack.size() - 1) == itemType.OBJECT) { - obStack.get(levelObject - 1).addItem(spStack.get(levelObject - 1), abvsStack.get(0)); + if (levelObject > 0) { + obStack.get(levelObject - 1).addItem(spStack.get(levelObject - 1), abvsStack.get(0)); + } if (!valuePointables.isEmpty() && allKeys.get(allKeys.size() - 1).equals(subelements[subelements.length - 1])) { writeElement(abvsStack.get(0)); matched = true; + skipping = true; } } } @@ -386,7 +398,7 @@ public void writeElement(ArrayBackedValueStorage abvs) throws IOException { } public void startArrayOrObjects(int count) { - if (!valuePointables.isEmpty() && !this.arrayCounters.isEmpty()) { + if (!valuePointables.isEmpty() && !this.arrayCounters.isEmpty() && levelArray > 0) { boolean addCounter = subelements[allKeys.size()].equals(Boolean.TRUE) ? false : true; if (itemStack.get(itemStack.size() - 1) == itemType.ARRAY) { if (addCounter) { @@ -404,16 +416,17 @@ public void startArrayOrObjects(int count) { } public void startArray() throws HyracksDataException { - levelArray++; - if (levelArray > abStack.size()) { - abStack.add(new ArrayBuilder()); - } - if (levelArray + levelObject > abvsStack.size() - 1) { - abvsStack.add(new ArrayBackedValueStorage()); - } startArrayOrObjects(2); itemStack.add(itemType.ARRAY); - if (this.pathMatch() || valuePointables.isEmpty()) { + if (this.pathMatch() || valuePointables.isEmpty() || subelements[allKeys.size()].equals(Boolean.TRUE) + || subelements[allKeys.size()] instanceof Long) { + levelArray++; + if (levelArray > abStack.size()) { + abStack.add(new ArrayBuilder()); + } + if (levelArray + levelObject > abvsStack.size() - 1) { + abvsStack.add(new ArrayBackedValueStorage()); + } abvsStack.get(levelArray + levelObject).reset(); try { abStack.get(levelArray - 1).reset(abvsStack.get(levelArray + levelObject)); @@ -424,16 +437,16 @@ public void startArray() throws HyracksDataException { } public void startObject() throws HyracksDataException { - levelObject++; - if (levelObject > obStack.size()) { - obStack.add(new ObjectBuilder()); - } - if (levelArray + levelObject > abvsStack.size() - 1) { - abvsStack.add(new ArrayBackedValueStorage()); - } startArrayOrObjects(1); itemStack.add(itemType.OBJECT); if (this.pathMatch() || valuePointables.isEmpty()) { + levelObject++; + if (levelObject > obStack.size()) { + obStack.add(new ObjectBuilder()); + } + if (levelArray + levelObject > abvsStack.size() - 1) { + abvsStack.add(new ArrayBackedValueStorage()); + } abvsStack.get(levelArray + levelObject).reset(); try { obStack.get(levelObject - 1).reset(abvsStack.get(levelArray + levelObject)); @@ -444,27 +457,38 @@ public void startObject() throws HyracksDataException { } public void startFieldName(JsonParser parser) throws HyracksDataException { - if (levelObject > spStack.size()) { + if (levelObject > spStack.size() || spStack.isEmpty()) { keyStack.add(new ArrayBackedValueStorage()); spStack.add(new UTF8StringPointable()); } - keyStack.get(levelObject - 1).reset(); - DataOutput outk = keyStack.get(levelObject - 1).getDataOutput(); + try { + allKeys.add(parser.getText()); + } catch (IOException e1) { + e1.printStackTrace(); + } + int keyaccess; + if (valuePointables.isEmpty()) { + keyaccess = levelObject; + } else { + keyaccess = keyStack.size(); + } + keyStack.get(keyaccess - 1).reset(); + DataOutput outk = keyStack.get(keyaccess - 1).getDataOutput(); try { svb.write(parser.getText(), outk); - spStack.get(levelObject - 1).set(keyStack.get(levelObject - 1)); + spStack.get(keyaccess - 1).set(keyStack.get(keyaccess - 1)); if (!valuePointables.isEmpty()) { - allKeys.add(spStack.get(levelObject - 1).toString()); //if the next two bytes represent a boolean (boolean has only two bytes), //it means that query asks for all the keys of the object - if (allKeys.size() < valuePointables.size()) { - if (allKeys.get(allKeys.size() - 1) == valuePointables.get(allKeys.size() - 1).toString()) { - tvp.set(valuePointables.get(allKeys.size())); + if (allKeys.size() == valuePointables.size()) { + if (allKeys.get(allKeys.size() - 2).equals(subelements[allKeys.size() - 2])) { + tvp.set(valuePointables.get(allKeys.size() - 1)); if (tvp.getTag() == ValueTag.XS_BOOLEAN_TAG) { abvsStack.get(0).reset(); out.write(ValueTag.XS_STRING_TAG); svb.write(parser.getText(), out); writeElement(abvsStack.get(0)); + matchedKeys[allKeys.size() - 2] = false; } } }