diff --git a/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/PushKeysOrMembersIntoDatascanRule.java b/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/PushKeysOrMembersIntoDatascanRule.java index 41b64013b..a05699270 100644 --- a/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/PushKeysOrMembersIntoDatascanRule.java +++ b/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/PushKeysOrMembersIntoDatascanRule.java @@ -19,7 +19,6 @@ import java.util.ArrayList; import java.util.List; -import org.apache.commons.lang3.ArrayUtils; import org.apache.commons.lang3.mutable.Mutable; import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression; import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag; @@ -47,7 +46,7 @@ * After * * plan__parent - * ASSIGN( $v2 : $v1 ) + * ASSIGN( $v2 : $v1 ) * DATASCAN( $source : $v1 ) * plan__child * @@ -61,12 +60,12 @@ boolean updateDataSource(IVXQueryDataSource datasource, Mutable> findkeys = new ArrayList>(); + List> findkeys = new ArrayList<>(); ExpressionToolbox.findAllFunctionExpressions(expression, BuiltinOperators.KEYS_OR_MEMBERS.getFunctionIdentifier(), findkeys); for (int i = findkeys.size(); i > 0; --i) { XDMConstants.setTrue(bp); - ds.addValueSeq(ArrayUtils.toObject(bp.getByteArray())); + ds.appendValueSequence(bp); added = true; } return added; diff --git a/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/PushValueIntoDatascanRule.java b/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/PushValueIntoDatascanRule.java index b9014690d..a43a37f3b 100644 --- a/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/PushValueIntoDatascanRule.java +++ b/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/PushValueIntoDatascanRule.java @@ -25,6 +25,7 @@ import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag; import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression; import org.apache.vxquery.compiler.rewriter.rules.util.ExpressionToolbox; +import org.apache.vxquery.datamodel.accessors.TaggedValuePointable; import org.apache.vxquery.functions.BuiltinFunctions; import org.apache.vxquery.functions.BuiltinOperators; import org.apache.vxquery.metadata.IVXQueryDataSource; @@ -56,6 +57,7 @@ */ public class PushValueIntoDatascanRule extends AbstractPushExpressionIntoDatascanRule { + TaggedValuePointable tvp = (TaggedValuePointable) TaggedValuePointable.FACTORY.createPointable(); @Override boolean updateDataSource(IVXQueryDataSource datasource, Mutable expression) { @@ -64,7 +66,7 @@ boolean updateDataSource(IVXQueryDataSource datasource, Mutable> finds = new ArrayList>(); + List> finds = new ArrayList<>(); ILogicalExpression le = expression.getValue(); if (le.getExpressionTag() == LogicalExpressionTag.FUNCTION_CALL) { AbstractFunctionCallExpression afce = (AbstractFunctionCallExpression) le; @@ -75,11 +77,10 @@ boolean updateDataSource(IVXQueryDataSource datasource, Mutable 0; --i) { - Byte[] value = null; List values = ExpressionToolbox.getFullArguments(finds.get(i - 1)); if (values.size() > 1) { - value = ExpressionToolbox.getConstantArgument(finds.get(i - 1), 1); - ds.addValueSeq(value); + ExpressionToolbox.getConstantArgument(finds.get(i - 1), 1, tvp); + ds.appendValueSequence(tvp); added = true; } } diff --git a/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/util/ExpressionToolbox.java b/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/util/ExpressionToolbox.java index 50bc07eec..1150179de 100644 --- a/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/util/ExpressionToolbox.java +++ b/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/util/ExpressionToolbox.java @@ -19,18 +19,7 @@ import java.util.ArrayList; import java.util.List; -import org.apache.commons.lang3.ArrayUtils; import org.apache.commons.lang3.mutable.Mutable; -import org.apache.vxquery.compiler.algebricks.VXQueryConstantValue; -import org.apache.vxquery.context.StaticContext; -import org.apache.vxquery.datamodel.accessors.TaggedValuePointable; -import org.apache.vxquery.functions.BuiltinFunctions; -import org.apache.vxquery.functions.BuiltinOperators; -import org.apache.vxquery.functions.Function; -import org.apache.vxquery.types.AnyNodeType; -import org.apache.vxquery.types.Quantifier; -import org.apache.vxquery.types.SequenceType; - import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression; import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator; import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag; @@ -43,6 +32,15 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator; import org.apache.hyracks.data.std.primitive.IntegerPointable; +import org.apache.vxquery.compiler.algebricks.VXQueryConstantValue; +import org.apache.vxquery.context.StaticContext; +import org.apache.vxquery.datamodel.accessors.TaggedValuePointable; +import org.apache.vxquery.functions.BuiltinFunctions; +import org.apache.vxquery.functions.BuiltinOperators; +import org.apache.vxquery.functions.Function; +import org.apache.vxquery.types.AnyNodeType; +import org.apache.vxquery.types.Quantifier; +import org.apache.vxquery.types.SequenceType; public class ExpressionToolbox { public static Mutable findVariableExpression(Mutable mutableLe, @@ -213,16 +211,14 @@ public static int getTypeExpressionTypeArgument(Mutable sear return pTypeCode.getInteger(); } - public static Byte[] getConstantArgument(Mutable searchM, int arg) { + public static void getConstantArgument(Mutable searchM, int arg, TaggedValuePointable tvp) { AbstractFunctionCallExpression searchFunction = (AbstractFunctionCallExpression) searchM.getValue(); ILogicalExpression argType = searchFunction.getArguments().get(arg).getValue(); searchFunction.getArguments().size(); if (argType.getExpressionTag() != LogicalExpressionTag.CONSTANT) { - return null; + return; } - TaggedValuePointable tvp = (TaggedValuePointable) TaggedValuePointable.FACTORY.createPointable(); ExpressionToolbox.getConstantAsPointable((ConstantExpression) argType, tvp); - return ArrayUtils.toObject(tvp.getByteArray()); } public static List getFullArguments(Mutable searchM) { diff --git a/vxquery-core/src/main/java/org/apache/vxquery/jsonparser/JSONParser.java b/vxquery-core/src/main/java/org/apache/vxquery/jsonparser/JSONParser.java index edf8dbc68..f89106707 100644 --- a/vxquery-core/src/main/java/org/apache/vxquery/jsonparser/JSONParser.java +++ b/vxquery-core/src/main/java/org/apache/vxquery/jsonparser/JSONParser.java @@ -14,23 +14,21 @@ */ package org.apache.vxquery.jsonparser; -import java.io.ByteArrayOutputStream; import java.io.DataOutput; import java.io.IOException; import java.io.Reader; -import java.nio.ByteBuffer; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; -import org.apache.commons.lang3.ArrayUtils; import org.apache.htrace.fasterxml.jackson.core.JsonFactory; import org.apache.htrace.fasterxml.jackson.core.JsonParser; import org.apache.htrace.fasterxml.jackson.core.JsonToken; import org.apache.hyracks.api.comm.IFrameFieldAppender; import org.apache.hyracks.api.comm.IFrameWriter; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.data.std.api.IPointable; import org.apache.hyracks.data.std.primitive.BooleanPointable; +import org.apache.hyracks.data.std.primitive.LongPointable; import org.apache.hyracks.data.std.primitive.UTF8StringPointable; import org.apache.hyracks.data.std.util.ArrayBackedValueStorage; import org.apache.hyracks.dataflow.common.comm.util.FrameUtils; @@ -40,15 +38,13 @@ import org.apache.vxquery.datamodel.builders.jsonitem.ObjectBuilder; import org.apache.vxquery.datamodel.builders.sequence.SequenceBuilder; import org.apache.vxquery.datamodel.values.ValueTag; -import org.apache.vxquery.datamodel.values.XDMConstants; import org.apache.vxquery.xmlparser.IParser; public class JSONParser implements IParser { - final JsonFactory factory; - final List valueSeq; + private final JsonFactory factory; + private final List valuePointables; protected final ArrayBackedValueStorage atomic; private TaggedValuePointable tvp; - private BooleanPointable bp; protected final List abStack; protected final List obStack; protected final List abvsStack; @@ -58,17 +54,17 @@ public class JSONParser implements IParser { protected final SequenceBuilder sb; protected final DataOutput out; protected itemType checkItem; - protected int levelArray, levelObject; - protected final List allKeys; - protected ByteArrayOutputStream outputStream, prefixStream, pathStream; - protected int objectMatchLevel; - protected int arrayMatchLevel; - protected boolean matched, literal; + protected int levelArray; + protected int levelObject; + protected final List allKeys; + protected boolean matched; protected ArrayBackedValueStorage tempABVS; protected List arrayCounters; - protected List keysOrMembers; protected IFrameWriter writer; protected IFrameFieldAppender appender; + protected boolean[] matchedKeys; + protected Object[] subelements; + protected boolean skipping; enum itemType { ARRAY, @@ -78,71 +74,53 @@ enum itemType { protected final List itemStack; public JSONParser() { - this(null); + this(new ArrayList<>()); } - public JSONParser(List valueSeq) { + public JSONParser(List valuePointables) { factory = new JsonFactory(); - this.valueSeq = valueSeq; + this.valuePointables = valuePointables; atomic = new ArrayBackedValueStorage(); tvp = new TaggedValuePointable(); - abStack = new ArrayList(); - obStack = new ArrayList(); - abvsStack = new ArrayList(); - keyStack = new ArrayList(); - spStack = new ArrayList(); - itemStack = new ArrayList(); + abStack = new ArrayList<>(); + obStack = new ArrayList<>(); + abvsStack = new ArrayList<>(); + keyStack = new ArrayList<>(); + spStack = new ArrayList<>(); + itemStack = new ArrayList<>(); svb = new StringValueBuilder(); sb = new SequenceBuilder(); - bp = new BooleanPointable(); - allKeys = new ArrayList(); + allKeys = new ArrayList<>(); abvsStack.add(atomic); out = abvsStack.get(abvsStack.size() - 1).getDataOutput(); tempABVS = new ArrayBackedValueStorage(); - this.objectMatchLevel = 1; - this.arrayMatchLevel = 0; matched = false; - literal = false; - arrayCounters = new ArrayList(); - outputStream = new ByteArrayOutputStream(); - prefixStream = new ByteArrayOutputStream(); - pathStream = new ByteArrayOutputStream(); - this.keysOrMembers = new ArrayList(); - outputStream.reset(); - pathStream.reset(); - if (valueSeq != null) { - for (int i = 0; i < this.valueSeq.size(); i++) { - tvp.set(ArrayUtils.toPrimitive(valueSeq.get(i)), 0, ArrayUtils.toPrimitive(valueSeq.get(i)).length); - //access an item of an array - if (tvp.getTag() == ValueTag.XS_INTEGER_TAG) { - pathStream.write(tvp.getByteArray(), 0, tvp.getLength()); - this.arrayMatchLevel++; - this.keysOrMembers.add(Boolean.valueOf(true)); - //access all the items of an array or - //all the keys of an object - } else if (tvp.getTag() == ValueTag.XS_BOOLEAN_TAG) { - pathStream.write(tvp.getByteArray(), 0, tvp.getLength()); - this.arrayMatchLevel++; - this.keysOrMembers.add(Boolean.valueOf(false)); - //access an object - } else { - pathStream.write(tvp.getByteArray(), 1, tvp.getLength() - 1); - } + arrayCounters = new ArrayList<>(); + subelements = new Object[valuePointables.size()]; + matchedKeys = new boolean[valuePointables.size()]; + skipping = true; + for (int i = 0; i < valuePointables.size(); i++) { + int start = valuePointables.get(i).getStartOffset() + 1; + int length = valuePointables.get(i).getLength() - 1; + if (((TaggedValuePointable) valuePointables.get(i)).getTag() == ValueTag.XS_INTEGER_TAG) { + // access an item of an array + subelements[i] = new Long(LongPointable.getLong(valuePointables.get(i).getByteArray(), start)); + } else if (((TaggedValuePointable) valuePointables.get(i)).getTag() == ValueTag.XS_BOOLEAN_TAG) { + // access all the items of an array or all the keys of an object + subelements[i] = new Boolean(BooleanPointable.getBoolean(valuePointables.get(i).getByteArray(), start)); + } else { + UTF8StringPointable sp = new UTF8StringPointable(); + sp.set(valuePointables.get(i).getByteArray(), start, length); + subelements[i] = sp.toString(); } } } - Byte[] toBytes(Integer v) { - Byte[] barr = ArrayUtils.toObject(ByteBuffer.allocate(9).putLong(1, v).array()); - barr[0] = ValueTag.XS_INTEGER_TAG; - return barr; - } - public int parse(Reader input, ArrayBackedValueStorage result, IFrameWriter writer, IFrameFieldAppender appender) throws HyracksDataException { this.writer = writer; this.appender = appender; - if (this.valueSeq != null) { + if (!valuePointables.isEmpty()) { return parseElements(input, result); } else { return parse(input, result); @@ -236,9 +214,8 @@ public int parseElements(Reader input, ArrayBackedValueStorage result) throws Hy JsonToken token = parser.nextToken(); checkItem = null; - this.objectMatchLevel = 0; this.matched = false; - + matchedKeys = new boolean[valuePointables.size()]; levelArray = 0; levelObject = 0; sb.reset(result); @@ -267,25 +244,22 @@ public int parseElements(Reader input, ArrayBackedValueStorage result) throws Hy break; case END_ARRAY: //if the query doesn't ask for an atomic value - if (!this.literal && this.pathMatch()) { - //check if the path asked from the query includes the current path + if (!matched && !skipping) { abStack.get(levelArray - 1).finish(); if (itemStack.size() > 1) { if (checkItem == itemType.ARRAY) { - if (levelArray > this.arrayMatchLevel + 1) { - abStack.get(levelArray - 2).addItem(abvsStack.get(levelArray + levelObject)); - } else if (this.matched) { - this.matched = false; + abStack.get(levelArray - 2).addItem(abvsStack.get(levelArray + levelObject)); + if (!matched) { items++; writeElement(abvsStack.get(levelArray + levelObject)); + skipping = true; } } else if (checkItem == itemType.OBJECT) { - if (levelArray > this.arrayMatchLevel && !this.matched) { - obStack.get(levelObject - 1).addItem(spStack.get(levelObject - 1), - abvsStack.get(levelArray + levelObject)); - } else if (this.matched) { + obStack.get(levelObject - 1).addItem(spStack.get(levelObject - 1), + abvsStack.get(levelArray + levelObject)); + if (!matched) { writeElement(abvsStack.get(levelArray + levelObject)); - this.matched = false; + skipping = true; items++; } } @@ -294,30 +268,34 @@ public int parseElements(Reader input, ArrayBackedValueStorage result) throws Hy if (allKeys.size() - 1 >= 0) { allKeys.remove(allKeys.size() - 1); } - this.arrayCounters.remove(levelArray - 1); + if (levelArray > 0) { + this.arrayCounters.remove(levelArray - 1); + } itemStack.remove(itemStack.size() - 1); - levelArray--; + if (levelArray > 0) { + levelArray--; + } break; case END_OBJECT: //if the query doesn't ask for an atomic value - if (!this.literal && this.pathMatch()) { - //check if the path asked from the query includes the current path + if (!matched && !skipping) { + //check if the path asked from the query includes the current path obStack.get(levelObject - 1).finish(); if (itemStack.size() > 1) { if (checkItem == itemType.OBJECT) { - if (levelObject > this.objectMatchLevel) { - obStack.get(levelObject - 2).addItem(spStack.get(levelObject - 2), - abvsStack.get(levelArray + levelObject)); - } else if (this.matched) { - this.matched = false; + obStack.get(levelObject - 2).addItem(spStack.get(levelObject - 2), + abvsStack.get(levelArray + levelObject)); + if (!this.matched) { items++; writeElement(abvsStack.get(levelArray + levelObject)); + skipping = true; } } else if (checkItem == itemType.ARRAY) { abStack.get(levelArray - 1).addItem(abvsStack.get(levelArray + levelObject)); - if (this.matched) { + if (!matched) { writeElement(abvsStack.get(levelArray + levelObject)); - this.matched = false; + skipping = true; + items++; } } } @@ -326,7 +304,9 @@ public int parseElements(Reader input, ArrayBackedValueStorage result) throws Hy allKeys.remove(allKeys.size() - 1); } itemStack.remove(itemStack.size() - 1); - levelObject--; + if (levelObject > 0) { + levelObject--; + } break; default: break; @@ -341,39 +321,34 @@ public int parseElements(Reader input, ArrayBackedValueStorage result) throws Hy } private boolean pathMatch() { - outputStream.reset(); - for (Byte[] bb : allKeys) { - outputStream.write(ArrayUtils.toPrimitive(bb), 0, ArrayUtils.toPrimitive(bb).length); + boolean contains = true; + if (!allKeys.isEmpty() && allKeys.size() <= valuePointables.size()) { + if (allKeys.get(allKeys.size() - 1).equals(subelements[allKeys.size() - 1])) { + matchedKeys[allKeys.size() - 1] = true; + } else { + matchedKeys[allKeys.size() - 1] = false; + } } - //the path of values created by parsing the file - boolean contains = false; - this.matched = false; - prefixStream.reset(); - if (pathStream.size() < outputStream.size()) { - prefixStream.write(outputStream.toByteArray(), 0, pathStream.size()); - contains = Arrays.equals(prefixStream.toByteArray(), pathStream.toByteArray()); - } else { - prefixStream.write(pathStream.toByteArray(), 0, outputStream.size()); - contains = Arrays.equals(prefixStream.toByteArray(), outputStream.toByteArray()); + for (boolean b : matchedKeys) { + if (!b) { + contains = false; + } } - if (pathStream.size() == outputStream.size() && contains) { - this.objectMatchLevel = this.levelObject; - this.matched = true; - this.literal = false; + if (contains) { + skipping = false; } return contains; } public void itemsInArray() { - if (itemStack.get(itemStack.size() - 1) == itemType.ARRAY && !this.arrayCounters.isEmpty()) { - boolean addCounter = levelArray - 1 < this.keysOrMembers.size() ? this.keysOrMembers.get(levelArray - 1) - : true; + if (!itemStack.isEmpty() && itemStack.get(itemStack.size() - 1) == itemType.ARRAY + && !this.arrayCounters.isEmpty()) { + boolean addCounter = subelements[allKeys.size()].equals(Boolean.TRUE) ? false : true; if (addCounter) { this.arrayCounters.set(levelArray - 1, this.arrayCounters.get(levelArray - 1) + 1); - this.allKeys.add(this.toBytes(this.arrayCounters.get(levelArray - 1))); + this.allKeys.add(new Long(this.arrayCounters.get(levelArray - 1))); } else { - Byte[] bool = { (byte) 0x2B, 0x01 }; - this.allKeys.add(bool); + this.allKeys.add(Boolean.TRUE); } } } @@ -391,18 +366,24 @@ public void atomicValues(int tag, JsonParser parser, DataOutput out, StringValue } if (!itemStack.isEmpty()) { if (itemStack.get(itemStack.size() - 1) == itemType.ARRAY) { - abStack.get(levelArray - 1).addItem(abvsStack.get(0)); - if (valueSeq != null && this.matched && levelArray == this.arrayMatchLevel) { - this.literal = true; - this.matched = false; + if (levelArray > 0) { + abStack.get(levelArray - 1).addItem(abvsStack.get(0)); + } + if (!valuePointables.isEmpty() + && allKeys.get(allKeys.size() - 1).equals(subelements[subelements.length - 1])) { writeElement(abvsStack.get(0)); + matched = true; + skipping = true; } } else if (itemStack.get(itemStack.size() - 1) == itemType.OBJECT) { - obStack.get(levelObject - 1).addItem(spStack.get(levelObject - 1), abvsStack.get(0)); - if (valueSeq != null && this.matched && levelObject == this.objectMatchLevel) { - this.literal = true; - this.matched = false; + if (levelObject > 0) { + obStack.get(levelObject - 1).addItem(spStack.get(levelObject - 1), abvsStack.get(0)); + } + if (!valuePointables.isEmpty() + && allKeys.get(allKeys.size() - 1).equals(subelements[subelements.length - 1])) { writeElement(abvsStack.get(0)); + matched = true; + skipping = true; } } } @@ -410,43 +391,42 @@ public void atomicValues(int tag, JsonParser parser, DataOutput out, StringValue public void writeElement(ArrayBackedValueStorage abvs) throws IOException { tempABVS.reset(); - DataOutput out = tempABVS.getDataOutput(); - out.write(abvs.getByteArray(), abvs.getStartOffset(), abvs.getLength()); + DataOutput dOut = tempABVS.getDataOutput(); + dOut.write(abvs.getByteArray(), abvs.getStartOffset(), abvs.getLength()); FrameUtils.appendFieldToWriter(writer, appender, tempABVS.getByteArray(), tempABVS.getStartOffset(), tempABVS.getLength()); } public void startArrayOrObjects(int count) { - if (valueSeq != null && !this.arrayCounters.isEmpty()) { - boolean addCounter = levelArray - count < this.keysOrMembers.size() - ? this.keysOrMembers.get(levelArray - count) : true; + if (!valuePointables.isEmpty() && !this.arrayCounters.isEmpty() && levelArray > 0) { + boolean addCounter = subelements[allKeys.size()].equals(Boolean.TRUE) ? false : true; if (itemStack.get(itemStack.size() - 1) == itemType.ARRAY) { if (addCounter) { this.arrayCounters.set(levelArray - count, this.arrayCounters.get(levelArray - count) + 1); - this.allKeys.add(this.toBytes(this.arrayCounters.get(levelArray - count))); + this.allKeys.add(new Long(this.arrayCounters.get(levelArray - count))); } else { - XDMConstants.setTrue(bp); - this.allKeys.add(ArrayUtils.toObject(bp.getByteArray())); + this.allKeys.add(Boolean.TRUE); } } } - if (count == 2 && valueSeq != null) { + if (count == 2 && !valuePointables.isEmpty()) { this.arrayCounters.add(Integer.valueOf(0)); } } public void startArray() throws HyracksDataException { - levelArray++; - if (levelArray > abStack.size()) { - abStack.add(new ArrayBuilder()); - } - if (levelArray + levelObject > abvsStack.size() - 1) { - abvsStack.add(new ArrayBackedValueStorage()); - } startArrayOrObjects(2); itemStack.add(itemType.ARRAY); - if (this.pathMatch() || this.valueSeq == null) { + if (this.pathMatch() || valuePointables.isEmpty() || subelements[allKeys.size()].equals(Boolean.TRUE) + || subelements[allKeys.size()] instanceof Long) { + levelArray++; + if (levelArray > abStack.size()) { + abStack.add(new ArrayBuilder()); + } + if (levelArray + levelObject > abvsStack.size() - 1) { + abvsStack.add(new ArrayBackedValueStorage()); + } abvsStack.get(levelArray + levelObject).reset(); try { abStack.get(levelArray - 1).reset(abvsStack.get(levelArray + levelObject)); @@ -457,16 +437,16 @@ public void startArray() throws HyracksDataException { } public void startObject() throws HyracksDataException { - levelObject++; - if (levelObject > obStack.size()) { - obStack.add(new ObjectBuilder()); - } - if (levelArray + levelObject > abvsStack.size() - 1) { - abvsStack.add(new ArrayBackedValueStorage()); - } startArrayOrObjects(1); itemStack.add(itemType.OBJECT); - if (this.pathMatch() || this.valueSeq == null) { + if (this.pathMatch() || valuePointables.isEmpty()) { + levelObject++; + if (levelObject > obStack.size()) { + obStack.add(new ObjectBuilder()); + } + if (levelArray + levelObject > abvsStack.size() - 1) { + abvsStack.add(new ArrayBackedValueStorage()); + } abvsStack.get(levelArray + levelObject).reset(); try { obStack.get(levelObject - 1).reset(abvsStack.get(levelArray + levelObject)); @@ -477,34 +457,39 @@ public void startObject() throws HyracksDataException { } public void startFieldName(JsonParser parser) throws HyracksDataException { - if (levelObject > spStack.size()) { + if (levelObject > spStack.size() || spStack.isEmpty()) { keyStack.add(new ArrayBackedValueStorage()); spStack.add(new UTF8StringPointable()); } - keyStack.get(levelObject - 1).reset(); - DataOutput outk = keyStack.get(levelObject - 1).getDataOutput(); + try { + allKeys.add(parser.getText()); + } catch (IOException e1) { + e1.printStackTrace(); + } + int keyaccess; + if (valuePointables.isEmpty()) { + keyaccess = levelObject; + } else { + keyaccess = keyStack.size(); + } + keyStack.get(keyaccess - 1).reset(); + DataOutput outk = keyStack.get(keyaccess - 1).getDataOutput(); try { svb.write(parser.getText(), outk); - spStack.get(levelObject - 1).set(keyStack.get(levelObject - 1)); - if (this.valueSeq != null) { - int length = 0; - byte[] barr = spStack.get(levelObject - 1).getByteArray(); - outputStream.reset(); - outputStream.write(barr, 0, spStack.get(levelObject - 1).getLength()); - allKeys.add(ArrayUtils.toObject(outputStream.toByteArray())); - for (int i = 0; i < allKeys.size() - 1; i++) { - tvp.set(ArrayUtils.toPrimitive(allKeys.get(i)), 0, ArrayUtils.toPrimitive(allKeys.get(i)).length); - length += ArrayUtils.toPrimitive(allKeys.get(i)).length; - } - //if the next two bytes represent a boolean (boolean has only two bytes), + spStack.get(keyaccess - 1).set(keyStack.get(keyaccess - 1)); + if (!valuePointables.isEmpty()) { + //if the next two bytes represent a boolean (boolean has only two bytes), //it means that query asks for all the keys of the object - if (length <= pathStream.size() && (length + 2) <= pathStream.size()) { - tvp.set(pathStream.toByteArray(), length, length + 2); - if (tvp.getTag() == ValueTag.XS_BOOLEAN_TAG) { - abvsStack.get(0).reset(); - out.write(ValueTag.XS_STRING_TAG); - svb.write(parser.getText(), out); - writeElement(abvsStack.get(0)); + if (allKeys.size() == valuePointables.size()) { + if (allKeys.get(allKeys.size() - 2).equals(subelements[allKeys.size() - 2])) { + tvp.set(valuePointables.get(allKeys.size() - 1)); + if (tvp.getTag() == ValueTag.XS_BOOLEAN_TAG) { + abvsStack.get(0).reset(); + out.write(ValueTag.XS_STRING_TAG); + svb.write(parser.getText(), out); + writeElement(abvsStack.get(0)); + matchedKeys[allKeys.size() - 2] = false; + } } } } @@ -515,7 +500,7 @@ public void startFieldName(JsonParser parser) throws HyracksDataException { public void startAtomicValues(int tag, JsonParser parser) throws HyracksDataException { itemsInArray(); - if (this.pathMatch() || this.valueSeq == null) { + if (this.pathMatch() || valuePointables.isEmpty()) { try { atomicValues(tag, parser, out, svb, levelArray, levelObject); } catch (Exception e) { diff --git a/vxquery-core/src/main/java/org/apache/vxquery/metadata/AbstractVXQueryDataSource.java b/vxquery-core/src/main/java/org/apache/vxquery/metadata/AbstractVXQueryDataSource.java index 2459944d0..9c64c07e5 100644 --- a/vxquery-core/src/main/java/org/apache/vxquery/metadata/AbstractVXQueryDataSource.java +++ b/vxquery-core/src/main/java/org/apache/vxquery/metadata/AbstractVXQueryDataSource.java @@ -16,28 +16,35 @@ */ package org.apache.vxquery.metadata; +import java.util.ArrayList; import java.util.List; import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable; import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSourcePropertiesProvider; import org.apache.hyracks.algebricks.core.algebra.properties.FunctionalDependency; import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain; +import org.apache.hyracks.data.std.api.IPointable; +import org.apache.hyracks.data.std.api.IValueReference; +import org.apache.hyracks.data.std.util.ArrayBackedValueStorage; public abstract class AbstractVXQueryDataSource implements IVXQueryDataSource { + private static final long serialVersionUID = 1L; + protected static final String DELIMITER = "\\|"; protected int dataSourceId; protected String collectionName; protected String[] collectionPartitions; - protected List childSeq; - protected List valueSeq; + protected List childSeq = new ArrayList<>(); + protected List valueOffsets = new ArrayList<>(); + protected ArrayBackedValueStorage valueAbvs = new ArrayBackedValueStorage(); protected int totalDataSources; protected String tag; protected Object[] types; protected IDataSourcePropertiesProvider propProvider; - + @Override public INodeDomain getDomain() { return null; @@ -91,20 +98,39 @@ public IDataSourcePropertiesProvider getPropertiesProvider() { public void computeFDs(List scanVariables, List fdList) { } + @Override public void addChildSeq(int integer) { childSeq.add(integer); } + @Override public List getChildSeq() { return childSeq; } - public void addValueSeq(Byte[] value) { - valueSeq.add(value); + public void appendValueSequence(IValueReference value) { + valueAbvs.append(value); + valueOffsets.add(valueAbvs.getLength()); + } + + public void getValueSequence(int index, IPointable value) { + if (index == 0) { + value.set(valueAbvs.getByteArray(), 0, valueOffsets.get(index)); + } else { + value.set(valueAbvs.getByteArray(), valueOffsets.get(index - 1), valueOffsets.get(index)); + } } - public List getValueSeq() { - return valueSeq; + public byte[] getValueBytes() { + return valueAbvs.getByteArray(); + } + + public List getValueOffsets() { + return valueOffsets; + } + + public int getValueCount() { + return valueOffsets.size(); } public String[] getPartitions() { @@ -116,4 +142,5 @@ public void setPartitions(String[] collectionPartitions) { } abstract public boolean usingIndex(); + } diff --git a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionDataSource.java b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionDataSource.java index c5761c5d6..7f5a48111 100644 --- a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionDataSource.java +++ b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionDataSource.java @@ -44,8 +44,6 @@ public IPhysicalPropertiesVector computePropertiesVector(List s } }; this.tag = null; - this.childSeq = new ArrayList<>(); - this.valueSeq = new ArrayList<>(); } public static VXQueryCollectionDataSource create(int id, String collection, Object type) { @@ -55,7 +53,7 @@ public static VXQueryCollectionDataSource create(int id, String collection, Obje @Override public String toString() { return "VXQueryCollectionDataSource [collectionName=" + collectionName + ", childSeq=" + childSeq - + ", valueSeq=" + valueSeq + "]"; + + ", valuePointableCount=" + valueOffsets.size() + "]"; } public boolean usingIndex() { diff --git a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java index a3756d5b4..441458de7 100644 --- a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java +++ b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java @@ -56,6 +56,7 @@ import org.apache.hyracks.api.dataflow.value.RecordDescriptor; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.job.IOperatorDescriptorRegistry; +import org.apache.hyracks.data.std.api.IPointable; import org.apache.hyracks.data.std.util.ArrayBackedValueStorage; import org.apache.hyracks.dataflow.common.comm.io.FrameFixedFieldTupleAppender; import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor; @@ -64,6 +65,7 @@ import org.apache.hyracks.hdfs.ContextFactory; import org.apache.hyracks.hdfs2.dataflow.FileSplitsFactory; import org.apache.vxquery.context.DynamicContext; +import org.apache.vxquery.datamodel.accessors.TaggedValuePointable; import org.apache.vxquery.hdfs2.HDFSFunctions; import org.apache.vxquery.jsonparser.JSONParser; import org.apache.vxquery.xmlparser.ITreeNodeIdProvider; @@ -76,7 +78,8 @@ public class VXQueryCollectionOperatorDescriptor extends AbstractSingleActivityO private short totalDataSources; private String[] collectionPartitions; private List childSeq; - private List valueSeq; + private List valueOffsets; + private byte[] valueBytes; protected static final Logger LOGGER = Logger.getLogger(VXQueryCollectionOperatorDescriptor.class.getName()); private HDFSFunctions hdfs; private String tag; @@ -91,7 +94,8 @@ public VXQueryCollectionOperatorDescriptor(IOperatorDescriptorRegistry spec, Abs dataSourceId = (short) ds.getDataSourceId(); totalDataSources = (short) ds.getTotalDataSources(); childSeq = ds.getChildSeq(); - valueSeq = ds.getValueSeq(); + valueBytes = ds.getValueBytes(); + valueOffsets = ds.getValueOffsets(); recordDescriptors[0] = rDesc; this.tag = ds.getTag(); this.hdfsConf = hdfsConf; @@ -114,7 +118,22 @@ public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx, final String collectionName = collectionPartitions[partition % collectionPartitions.length]; final XMLParser parser = new XMLParser(false, nodeIdProvider, nodeId, appender, childSeq, dCtx.getStaticContext()); - final JSONParser jparser = new JSONParser(valueSeq); + + List valuePointables = new ArrayList<>(); + { + for (int i = 0; i < valueOffsets.size(); ++i) { + TaggedValuePointable tvp = (TaggedValuePointable) TaggedValuePointable.FACTORY.createPointable(); + if (i == 0) { + tvp.set(valueBytes, 0, valueOffsets.get(i)); + } else { + tvp.set(valueBytes, valueOffsets.get(i - 1), valueOffsets.get(i) - valueOffsets.get(i - 1)); + } + valuePointables.add(tvp); + } + } + final JSONParser jparser = new JSONParser(valuePointables); + final String collectionModifiedName = collectionName.replace("${nodeId}", nodeId); + final boolean hdfsQuery = collectionModifiedName.contains("hdfs:/"); return new AbstractUnaryInputUnaryOutputOperatorNodePushable() { @Override @@ -127,116 +146,19 @@ public void open() throws HyracksDataException { @Override public void nextFrame(ByteBuffer buffer) throws HyracksDataException { fta.reset(buffer); - String collectionModifiedName = collectionName.replace("${nodeId}", nodeId); - if (!collectionModifiedName.contains("hdfs:/")) { - File collectionDirectory = new File(collectionModifiedName); - // check if directory is in the local file system - if (collectionDirectory.exists()) { - // Go through each tuple. - if (collectionDirectory.isDirectory()) { - xmlAndJsonCollection(collectionDirectory); - } else { - throw new HyracksDataException("Invalid directory parameter (" + nodeId + ":" - + collectionDirectory.getAbsolutePath() + ") passed to collection."); - } - } + if (!hdfsQuery) { + processLocalFiles(collectionModifiedName); } else { // Else check in HDFS file system // Get instance of the HDFS filesystem FileSystem fs = hdfs.getFileSystem(); if (fs != null) { - collectionModifiedName = collectionModifiedName.replaceAll("hdfs:/", ""); - Path directory = new Path(collectionModifiedName); - Path xmlDocument; + Path directory = new Path(collectionModifiedName.replaceAll("hdfs:/", "")); if (tag != null) { - hdfs.setJob(directory.toString(), tag); - tag = "<" + tag + ">"; - Job job = hdfs.getJob(); - InputFormat inputFormat = hdfs.getinputFormat(); - try { - hdfs.scheduleSplits(); - ArrayList schedule = hdfs - .getScheduleForNode(InetAddress.getLocalHost().getHostAddress()); - List splits = hdfs.getSplits(); - List fileSplits = new ArrayList<>(); - for (int i : schedule) { - fileSplits.add((FileSplit) splits.get(i)); - } - FileSplitsFactory splitsFactory = new FileSplitsFactory(fileSplits); - List inputSplits = splitsFactory.getSplits(); - ContextFactory ctxFactory = new ContextFactory(); - int size = inputSplits.size(); - InputStream stream; - String value; - RecordReader reader; - TaskAttemptContext context; - for (int i = 0; i < size; i++) { - // read split - context = ctxFactory.createContext(job.getConfiguration(), i); - reader = inputFormat.createRecordReader(inputSplits.get(i), context); - reader.initialize(inputSplits.get(i), context); - while (reader.nextKeyValue()) { - value = reader.getCurrentValue().toString(); - // Split value if it contains more than - // one item with the tag - if (StringUtils.countMatches(value, tag) > 1) { - String[] items = value.split(tag); - for (String item : items) { - if (item.length() > 0) { - item = START_TAG + tag + item; - stream = new ByteArrayInputStream( - item.getBytes(StandardCharsets.UTF_8)); - parser.parseHDFSElements(stream, writer, fta, i); - stream.close(); - } - } - } else { - value = START_TAG + value; - // create an input stream to the - // file currently reading and send - // it to parser - stream = new ByteArrayInputStream(value.getBytes(StandardCharsets.UTF_8)); - parser.parseHDFSElements(stream, writer, fta, i); - stream.close(); - } - } - reader.close(); - } - - } catch (Exception e) { - throw new HyracksDataException(e); - } + processHdfsTag(fta, parser, directory); } else { - try { - // check if the path exists and is a directory - if (fs.exists(directory) && fs.isDirectory(directory)) { - for (int tupleIndex = 0; tupleIndex < fta.getTupleCount(); ++tupleIndex) { - // read every file in the directory - RemoteIterator it = fs.listFiles(directory, true); - while (it.hasNext()) { - xmlDocument = it.next().getPath(); - if (fs.isFile(xmlDocument)) { - if (LOGGER.isLoggable(Level.FINE)) { - LOGGER.fine( - "Starting to read XML document: " + xmlDocument.getName()); - } - // create an input stream to the - // file currently reading and - // send it to parser - InputStream in = fs.open(xmlDocument).getWrappedStream(); - parser.parseHDFSElements(in, writer, fta, tupleIndex); - in.close(); - } - } - } - } else { - throw new HyracksDataException("Invalid HDFS directory parameter (" + nodeId + ":" - + directory + ") passed to collection."); - } - } catch (Exception e) { - throw new HyracksDataException(e); - } + processHdfsFiles(fta, nodeId, parser, fs, directory); } try { fs.close(); @@ -247,29 +169,129 @@ public void nextFrame(ByteBuffer buffer) throws HyracksDataException { } } - public void xmlAndJsonCollection(File directory) throws HyracksDataException { - Reader input; - for (int tupleIndex = 0; tupleIndex < fta.getTupleCount(); ++tupleIndex) { - Iterator it = FileUtils.iterateFiles(directory, new VXQueryIOFileFilter(), - TrueFileFilter.INSTANCE); - while (it.hasNext()) { - File file = it.next(); - String fileName = file.getName().toLowerCase(); - if (fileName.endsWith(".xml")) { - if (LOGGER.isLoggable(Level.FINE)) { - LOGGER.fine("Starting to read XML document: " + file.getAbsolutePath()); + private void processHdfsTag(final FrameTupleAccessor fta, final XMLParser parser, Path directory) + throws HyracksDataException { + hdfs.setJob(directory.toString(), tag); + tag = "<" + tag + ">"; + Job job = hdfs.getJob(); + InputFormat inputFormat = hdfs.getinputFormat(); + try { + hdfs.scheduleSplits(); + ArrayList schedule = hdfs.getScheduleForNode(InetAddress.getLocalHost().getHostAddress()); + List splits = hdfs.getSplits(); + List fileSplits = new ArrayList<>(); + for (int i : schedule) { + fileSplits.add((FileSplit) splits.get(i)); + } + FileSplitsFactory splitsFactory = new FileSplitsFactory(fileSplits); + List inputSplits = splitsFactory.getSplits(); + ContextFactory ctxFactory = new ContextFactory(); + int size = inputSplits.size(); + InputStream stream; + String value; + RecordReader reader; + TaskAttemptContext context; + for (int i = 0; i < size; i++) { + // read split + context = ctxFactory.createContext(job.getConfiguration(), i); + reader = inputFormat.createRecordReader(inputSplits.get(i), context); + reader.initialize(inputSplits.get(i), context); + while (reader.nextKeyValue()) { + value = reader.getCurrentValue().toString(); + // Split value if it contains more than + // one item with the tag + if (StringUtils.countMatches(value, tag) > 1) { + String[] items = value.split(tag); + for (String item : items) { + if (item.length() > 0) { + item = START_TAG + tag + item; + stream = new ByteArrayInputStream(item.getBytes(StandardCharsets.UTF_8)); + parser.parseHDFSElements(stream, writer, fta, i); + stream.close(); + } + } + } else { + value = START_TAG + value; + // create an input stream to the + // file currently reading and send + // it to parser + stream = new ByteArrayInputStream(value.getBytes(StandardCharsets.UTF_8)); + parser.parseHDFSElements(stream, writer, fta, i); + stream.close(); } - parser.parseElements(file, writer, tupleIndex); - } else if (fileName.endsWith(".json")) { - if (LOGGER.isLoggable(Level.FINE)) { - LOGGER.fine("Starting to read JSON document: " + file.getAbsolutePath()); + } + reader.close(); + } + + } catch (Exception e) { + throw new HyracksDataException(e); + } + } + + private void processHdfsFiles(final FrameTupleAccessor fta, final String nodeId, final XMLParser parser, + FileSystem fs, Path directory) throws HyracksDataException { + Path xmlDocument; + try { + // check if the path exists and is a directory + if (!fs.exists(directory) || !fs.isDirectory(directory)) { + throw new HyracksDataException("Invalid HDFS directory parameter (" + nodeId + ":" + directory + + ") passed to collection."); + } + for (int tupleIndex = 0; tupleIndex < fta.getTupleCount(); ++tupleIndex) { + // read every file in the directory + RemoteIterator it = fs.listFiles(directory, true); + while (it.hasNext()) { + xmlDocument = it.next().getPath(); + if (fs.isFile(xmlDocument)) { + if (LOGGER.isLoggable(Level.FINE)) { + LOGGER.fine("Starting to read XML document: " + xmlDocument.getName()); + } + // create an input stream to the + // file currently reading and + // send it to parser + InputStream in = fs.open(xmlDocument).getWrappedStream(); + parser.parseHDFSElements(in, writer, fta, tupleIndex); + in.close(); } - try { - jsonAbvs.reset(); - input = new InputStreamReader(new FileInputStream(file)); - jparser.parse(input, jsonAbvs, writer, appender); - } catch (FileNotFoundException e) { - throw new HyracksDataException(e.toString()); + } + } + } catch (Exception e) { + throw new HyracksDataException(e); + } + } + + private void processLocalFiles(String collectionModifiedName) throws HyracksDataException { + Reader input; + File collectionDirectory = new File(collectionModifiedName); + // check if directory is in the local file system + if (collectionDirectory.exists()) { + // Go through each tuple. + if (!collectionDirectory.isDirectory()) { + throw new HyracksDataException("Invalid directory parameter (" + nodeId + ":" + + collectionDirectory.getAbsolutePath() + ") passed to collection."); + } + for (int tupleIndex = 0; tupleIndex < fta.getTupleCount(); ++tupleIndex) { + Iterator it = FileUtils.iterateFiles(collectionDirectory, new VXQueryIOFileFilter(), + TrueFileFilter.INSTANCE); + while (it.hasNext()) { + File file = it.next(); + String fileName = file.getName().toLowerCase(); + if (fileName.endsWith(".xml")) { + if (LOGGER.isLoggable(Level.FINE)) { + LOGGER.fine("Starting to read XML document: " + file.getAbsolutePath()); + } + parser.parseElements(file, writer, tupleIndex); + } else if (fileName.endsWith(".json")) { + if (LOGGER.isLoggable(Level.FINE)) { + LOGGER.fine("Starting to read JSON document: " + file.getAbsolutePath()); + } + try { + jsonAbvs.reset(); + input = new InputStreamReader(new FileInputStream(file)); + jparser.parse(input, jsonAbvs, writer, appender); + } catch (FileNotFoundException e) { + throw new HyracksDataException(e); + } } } } diff --git a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryIndexingDataSource.java b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryIndexingDataSource.java index d55530dcf..8723cad47 100644 --- a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryIndexingDataSource.java +++ b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryIndexingDataSource.java @@ -49,8 +49,6 @@ public IPhysicalPropertiesVector computePropertiesVector(List s } }; this.tag = null; - this.childSeq = new ArrayList<>(); - this.valueSeq = new ArrayList<>(); } public static VXQueryIndexingDataSource create(int id, String collection, Object type, String function) { diff --git a/vxquery-xtest/src/main/java/org/apache/vxquery/xtest/TestRunner.java b/vxquery-xtest/src/main/java/org/apache/vxquery/xtest/TestRunner.java index fa0a90034..71f55cfb0 100644 --- a/vxquery-xtest/src/main/java/org/apache/vxquery/xtest/TestRunner.java +++ b/vxquery-xtest/src/main/java/org/apache/vxquery/xtest/TestRunner.java @@ -66,7 +66,7 @@ public class TestRunner { public TestRunner(XTestOptions opts) throws UnknownHostException { this.opts = opts; - this.collectionList = new ArrayList(); + this.collectionList = new ArrayList<>(); } public void open() throws Exception {