From 4cfe5c5335fa272df9638038b729f0bd50dbf72f Mon Sep 17 00:00:00 2001 From: "navis.ryu" Date: Tue, 7 Apr 2015 13:09:53 +0900 Subject: [PATCH] TAJO-1533 Provide hint to children execs whether it will require rescan on them --- .../planner/physical/AggregationExec.java | 4 +- .../engine/planner/physical/BNLJoinExec.java | 5 ++ .../planner/physical/BSTIndexScanExec.java | 4 +- .../planner/physical/BinaryPhysicalExec.java | 13 +++-- .../physical/ColPartitionStoreExec.java | 4 +- .../planner/physical/CommonJoinExec.java | 51 ++++++++++++++++-- .../DistinctGroupbyFirstAggregationExec.java | 4 +- .../DistinctGroupbyHashAggregationExec.java | 4 +- .../DistinctGroupbySecondAggregationExec.java | 4 +- .../DistinctGroupbySortAggregationExec.java | 12 ++--- .../DistinctGroupbyThirdAggregationExec.java | 4 +- .../engine/planner/physical/EvalExprExec.java | 4 +- .../planner/physical/ExternalSortExec.java | 34 ++++++------ .../HashBasedColPartitionStoreExec.java | 4 +- .../physical/HashLeftOuterJoinExec.java | 53 ++----------------- .../physical/HashShuffleFileWriteExec.java | 4 +- .../engine/planner/physical/HavingExec.java | 4 +- .../engine/planner/physical/LimitExec.java | 3 +- .../engine/planner/physical/MemSortExec.java | 4 +- .../engine/planner/physical/NLJoinExec.java | 5 ++ .../planner/physical/NLLeftOuterJoinExec.java | 5 ++ .../physical/PartitionMergeScanExec.java | 16 +++--- .../engine/planner/physical/PhysicalExec.java | 7 ++- .../planner/physical/ProjectionExec.java | 4 +- .../physical/RangeShuffleFileWriteExec.java | 4 +- .../engine/planner/physical/ScanExec.java | 4 +- .../planner/physical/SelectionExec.java | 4 +- .../engine/planner/physical/SeqScanExec.java | 4 +- .../SortBasedColPartitionStoreExec.java | 4 +- .../planner/physical/StoreTableExec.java | 4 +- .../planner/physical/UnaryPhysicalExec.java | 8 +-- .../planner/physical/WindowAggExec.java | 4 +- .../NonForwardQueryResultFileScanner.java | 2 +- .../NonForwardQueryResultSystemScanner.java | 4 +- .../tajo/master/exec/QueryExecutor.java | 2 +- .../java/org/apache/tajo/worker/Task.java | 2 +- .../planner/physical/TestBNLJoinExec.java | 4 +- .../planner/physical/TestBSTIndexExec.java | 2 +- .../physical/TestExternalSortExec.java | 2 +- .../physical/TestFullOuterHashJoinExec.java | 8 +-- .../physical/TestFullOuterMergeJoinExec.java | 12 ++--- .../physical/TestHashAntiJoinExec.java | 2 +- .../planner/physical/TestHashJoinExec.java | 2 +- .../physical/TestHashSemiJoinExec.java | 2 +- .../physical/TestLeftOuterHashJoinExec.java | 10 ++-- .../physical/TestLeftOuterNLJoinExec.java | 10 ++-- .../planner/physical/TestMergeJoinExec.java | 2 +- .../planner/physical/TestNLJoinExec.java | 4 +- .../planner/physical/TestPhysicalPlanner.java | 46 ++++++++-------- .../TestProgressExternalSortExec.java | 2 +- .../physical/TestRightOuterHashJoinExec.java | 6 +-- .../physical/TestRightOuterMergeJoinExec.java | 12 ++--- .../engine/planner/physical/TestSortExec.java | 2 +- .../apache/tajo/plan/expr/AlgebraicUtil.java | 5 ++ 54 files changed, 232 insertions(+), 203 deletions(-) diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/AggregationExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/AggregationExec.java index 6d9e38a796..0f7d3ce5bf 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/AggregationExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/AggregationExec.java @@ -60,8 +60,8 @@ public AggregationExec(final TaskAttemptContext context, GroupbyNode plan, } @Override - public void init() throws IOException { - super.init(); + public void init(boolean needsRescan) throws IOException { + super.init(needsRescan); for (EvalNode aggFunction : aggFunctions) { aggFunction.bind(inSchema); } diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java index 6e1a553039..ac556700e2 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java @@ -68,6 +68,11 @@ public BNLJoinExec(final TaskAttemptContext context, final JoinNode plan, outputTuple = new VTuple(outSchema.size()); } + @Override + public void init(boolean rescan) throws IOException { + init(rescan, true); + } + public Tuple next() throws IOException { if (leftTupleSlots.isEmpty()) { diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java index be6c046356..a9601339fd 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java @@ -68,8 +68,8 @@ public BSTIndexScanExec(TaskAttemptContext context, ScanNode scanNode , } @Override - public void init() throws IOException { - super.init(); + public void init(boolean needsRescan) throws IOException { + super.init(needsRescan); progress = 0.0f; if (qual != null) { qual.bind(inSchema); diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BinaryPhysicalExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BinaryPhysicalExec.java index 03ec396314..10f870543b 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BinaryPhysicalExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BinaryPhysicalExec.java @@ -47,13 +47,16 @@ public PhysicalExec getRightChild() { return rightChild; } - @Override - public void init() throws IOException { - leftChild.init(); - rightChild.init(); + protected void init(boolean leftRescan, boolean rightRescan) throws IOException { + leftChild.init(leftRescan); + rightChild.init(rightRescan); progress = 0.0f; + super.init(leftRescan || rightRescan); + } - super.init(); + @Override + public void init(boolean needsRescan) throws IOException { + init(needsRescan, needsRescan); } @Override diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java index 4481569fc8..733760010e 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java @@ -123,8 +123,8 @@ public ColPartitionStoreExec(TaskAttemptContext context, StoreTableNode plan, Ph } @Override - public void init() throws IOException { - super.init(); + public void init(boolean needsRescan) throws IOException { + super.init(needsRescan); storeTablePath = context.getOutputPath(); diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/CommonJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/CommonJoinExec.java index 0781041998..7f9e0f280f 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/CommonJoinExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/CommonJoinExec.java @@ -18,13 +18,18 @@ package org.apache.tajo.engine.planner.physical; +import com.google.common.collect.Lists; import org.apache.tajo.catalog.SchemaUtil; import org.apache.tajo.engine.planner.Projector; +import org.apache.tajo.plan.expr.AlgebraicUtil; import org.apache.tajo.plan.expr.EvalNode; +import org.apache.tajo.plan.expr.EvalTreeUtil; import org.apache.tajo.plan.logical.JoinNode; +import org.apache.tajo.util.Pair; import org.apache.tajo.worker.TaskAttemptContext; import java.io.IOException; +import java.util.List; // common join exec except HashLeftOuterJoinExec public abstract class CommonJoinExec extends BinaryPhysicalExec { @@ -32,30 +37,64 @@ public abstract class CommonJoinExec extends BinaryPhysicalExec { // from logical plan protected JoinNode plan; protected final boolean hasJoinQual; + protected final boolean hasJoinFilter; - protected EvalNode joinQual; + protected EvalNode joinQual; // ex) a.id = b.id + protected EvalNode joinFilter; // ex) a > 10 // projection protected Projector projector; public CommonJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec outer, PhysicalExec inner) { + this(context, plan, outer, inner, false); + } + + public CommonJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec outer, + PhysicalExec inner, boolean extractJoinFilter) { super(context, SchemaUtil.merge(outer.getSchema(), inner.getSchema()), plan.getOutSchema(), outer, inner); this.plan = plan; - this.joinQual = plan.getJoinQual(); - this.hasJoinQual = plan.hasJoinQual(); + if (plan.hasJoinQual() && extractJoinFilter) { + Pair extracted = extractJoinConditions(plan.getJoinQual()); + joinQual = extracted.getFirst(); + joinFilter = extracted.getSecond(); + } else { + joinQual = plan.getJoinQual(); + } + this.hasJoinQual = joinQual != null; + this.hasJoinFilter = joinFilter != null; // for projection this.projector = new Projector(context, inSchema, outSchema, plan.getTargets()); } + private Pair extractJoinConditions(EvalNode joinQual) { + List joinQuals = Lists.newArrayList(); + List joinFilters = Lists.newArrayList(); + for (EvalNode eachQual : AlgebraicUtil.toConjunctiveNormalFormArray(joinQual)) { + if (EvalTreeUtil.isJoinQual(eachQual, true)) { + joinQuals.add(eachQual); + } else { + joinFilters.add(eachQual); + } + } + + return new Pair( + joinQuals.isEmpty() ? null : AlgebraicUtil.createSingletonExprFromCNF(joinQuals), + joinFilters.isEmpty() ? null : AlgebraicUtil.createSingletonExprFromCNF(joinFilters) + ); + } + @Override - public void init() throws IOException { - super.init(); + public void init(boolean leftRescan, boolean rightRescan) throws IOException { + super.init(leftRescan, rightRescan); if (hasJoinQual) { joinQual.bind(inSchema); } + if (hasJoinFilter) { + joinFilter.bind(inSchema); + } } @Override @@ -63,6 +102,7 @@ protected void compile() { if (hasJoinQual) { joinQual = context.getPrecompiledEval(inSchema, joinQual); } + // compile filter? } public JoinNode getPlan() { @@ -74,6 +114,7 @@ public void close() throws IOException { super.close(); plan = null; joinQual = null; + joinFilter = null; projector = null; } } diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyFirstAggregationExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyFirstAggregationExec.java index 37bc5a7523..971021a27a 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyFirstAggregationExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyFirstAggregationExec.java @@ -107,8 +107,8 @@ public DistinctGroupbyFirstAggregationExec(TaskAttemptContext context, DistinctG } @Override - public void init() throws IOException { - super.init(); + public void init(boolean needsRescan) throws IOException { + super.init(needsRescan); // finding grouping column index Column[] groupingColumns = plan.getGroupingColumns(); diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyHashAggregationExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyHashAggregationExec.java index e96e750b61..c1cfa8c4fb 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyHashAggregationExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyHashAggregationExec.java @@ -57,8 +57,8 @@ public DistinctGroupbyHashAggregationExec(TaskAttemptContext context, DistinctGr } @Override - public void init() throws IOException { - super.init(); + public void init(boolean needsRescan) throws IOException { + super.init(needsRescan); List distinctGroupingKeyIdList = new ArrayList(); for (Column col: plan.getGroupingColumns()) { diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySecondAggregationExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySecondAggregationExec.java index 7b01a9b277..6f445e9e6b 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySecondAggregationExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySecondAggregationExec.java @@ -93,8 +93,8 @@ public DistinctGroupbySecondAggregationExec(TaskAttemptContext context, Distinct } @Override - public void init() throws IOException { - super.init(); + public void init(boolean needsRescan) throws IOException { + super.init(needsRescan); numGroupingColumns = plan.getGroupingColumns().length; List groupbyNodes = plan.getSubPlans(); diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySortAggregationExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySortAggregationExec.java index 9ff479b5da..dc951b887d 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySortAggregationExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySortAggregationExec.java @@ -22,7 +22,6 @@ import org.apache.tajo.common.TajoDataTypes; import org.apache.tajo.datum.DatumFactory; import org.apache.tajo.datum.NullDatum; -import org.apache.tajo.plan.expr.AggregationFunctionCallEval; import org.apache.tajo.plan.logical.DistinctGroupbyNode; import org.apache.tajo.plan.logical.GroupbyNode; import org.apache.tajo.storage.Tuple; @@ -66,9 +65,12 @@ public DistinctGroupbySortAggregationExec(final TaskAttemptContext context, Dist for(int i = 0; i < resultColumnIds.length; i++) { resultColumnIdIndexes[resultColumnIds[i]] = i; } + } - for (SortAggregateExec eachExec: aggregateExecs) { - eachExec.init(); + @Override + public void init(boolean needsScan) throws IOException { + for (int i = 0; i < aggregateExecs.length; i++) { + aggregateExecs[i].init(i < groupbyNodeNum); } } @@ -171,10 +173,6 @@ public void close() throws IOException { } } - @Override - public void init() throws IOException { - } - @Override public void rescan() throws IOException { finished = false; diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyThirdAggregationExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyThirdAggregationExec.java index 7bd71e25ec..71830cf24a 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyThirdAggregationExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyThirdAggregationExec.java @@ -58,8 +58,8 @@ public DistinctGroupbyThirdAggregationExec(TaskAttemptContext context, DistinctG } @Override - public void init() throws IOException { - super.init(); + public void init(boolean needsRescan) throws IOException { + super.init(needsRescan); numGroupingColumns = plan.getGroupingColumns().length; resultTupleLength = numGroupingColumns; diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/EvalExprExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/EvalExprExec.java index 32ec772248..c90f922cd4 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/EvalExprExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/EvalExprExec.java @@ -37,8 +37,8 @@ public EvalExprExec(final TaskAttemptContext context, final EvalExprNode plan) { } @Override - public void init() throws IOException { - super.init(); + public void init(boolean needsRescan) throws IOException { + super.init(needsRescan); progress = 0.0f; for (Target target : plan.getTargets()) { target.getEvalTree().bind(inSchema); diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java index 355f015ccc..08f40db16b 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java @@ -149,9 +149,9 @@ public void setSortBufferBytesNum(int sortBufferBytesNum) { this.sortBufferBytesNum = sortBufferBytesNum; } - public void init() throws IOException { + public void init(boolean needsRescan) throws IOException { inputStats = new TableStats(); - super.init(); + super.init(needsRescan); } public SortNode getPlan() { @@ -279,8 +279,7 @@ public Tuple next() throws IOException { info(LOG, "Chunks creation time: " + (endTimeOfChunkSplit - startTimeOfChunkSplit) + " msec"); if (memoryResident) { // if all sorted data reside in a main-memory table. - TupleSorter sorter = getSorter(inMemoryTable); - result = new MemTableScanner(sorter.sort(), inMemoryTable.size(), sortAndStoredBytes); + result = new MemTableScanner(); } else { // if input data exceeds main-memory at least once try { @@ -533,10 +532,9 @@ private Scanner createKWayMergerInternal(final Scanner [] sources, final int sta } } - private static class MemTableScanner extends AbstractScanner { - final Iterable iterable; - final long sortAndStoredBytes; - final int totalRecords; + private class MemTableScanner extends AbstractScanner { + Iterable iterable; // be warn, this can hold references to tuples inside of inMemoryTable + int totalRecords; Iterator iterator; // for input stats @@ -544,14 +542,16 @@ private static class MemTableScanner extends AbstractScanner { int numRecords; TableStats scannerTableStats; - public MemTableScanner(Iterable iterable, int length, long inBytes) { - this.iterable = iterable; - this.totalRecords = length; - this.sortAndStoredBytes = inBytes; + public MemTableScanner() { + this.iterable = getSorter(inMemoryTable).sort(); + this.totalRecords = inMemoryTable.size(); } @Override public void init() throws IOException { + if (iterable == null) { + throw new IllegalStateException("Backing memory is released already"); + } iterator = iterable.iterator(); scannerProgress = 0.0f; @@ -566,12 +566,14 @@ public void init() throws IOException { @Override public Tuple next() throws IOException { - if (iterator.hasNext()) { + if (iterator != null && iterator.hasNext()) { numRecords++; return iterator.next(); - } else { - return null; } + if (!parentNeedsRescan) { + close(); + } + return null; } @Override @@ -582,6 +584,8 @@ public void reset() throws IOException { @Override public void close() throws IOException { iterator = null; + iterable = null; + inMemoryTable.clear(); scannerProgress = 1.0f; } diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashBasedColPartitionStoreExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashBasedColPartitionStoreExec.java index e94bc262f5..f6935a28db 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashBasedColPartitionStoreExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashBasedColPartitionStoreExec.java @@ -44,8 +44,8 @@ public HashBasedColPartitionStoreExec(TaskAttemptContext context, StoreTableNode super(context, plan, child); } - public void init() throws IOException { - super.init(); + public void init(boolean needsRescan) throws IOException { + super.init(needsRescan); } private Appender getAppender(String partition) throws IOException { diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java index fa9ba940ba..bbfb85295f 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java @@ -18,20 +18,14 @@ package org.apache.tajo.engine.planner.physical; -import com.google.common.collect.Lists; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.tajo.catalog.Column; import org.apache.tajo.catalog.statistics.TableStats; -import org.apache.tajo.engine.planner.Projector; import org.apache.tajo.engine.utils.CacheHolder; import org.apache.tajo.engine.utils.TableCacheKey; import org.apache.tajo.engine.utils.TupleUtil; import org.apache.tajo.plan.util.PlannerUtil; -import org.apache.tajo.catalog.SchemaUtil; -import org.apache.tajo.plan.expr.AlgebraicUtil; -import org.apache.tajo.plan.expr.EvalNode; -import org.apache.tajo.plan.expr.EvalTreeUtil; import org.apache.tajo.plan.logical.JoinNode; import org.apache.tajo.storage.FrameTuple; import org.apache.tajo.storage.Tuple; @@ -43,11 +37,7 @@ import java.util.*; -public class HashLeftOuterJoinExec extends BinaryPhysicalExec { - // from logical plan - protected JoinNode plan; - protected EvalNode joinQual; // ex) a.id = b.id - protected EvalNode joinFilter; // ex) a > 10 +public class HashLeftOuterJoinExec extends CommonJoinExec { protected List joinKeyPairs; @@ -66,35 +56,13 @@ public class HashLeftOuterJoinExec extends BinaryPhysicalExec { protected boolean finished = false; protected boolean shouldGetLeftTuple = true; - // projection - protected Projector projector; - private int rightNumCols; private TableStats cachedRightTableStats; private static final Log LOG = LogFactory.getLog(HashLeftOuterJoinExec.class); public HashLeftOuterJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec leftChild, PhysicalExec rightChild) { - super(context, SchemaUtil.merge(leftChild.getSchema(), rightChild.getSchema()), - plan.getOutSchema(), leftChild, rightChild); - this.plan = plan; - - List joinQuals = Lists.newArrayList(); - List joinFilters = Lists.newArrayList(); - for (EvalNode eachQual : AlgebraicUtil.toConjunctiveNormalFormArray(plan.getJoinQual())) { - if (EvalTreeUtil.isJoinQual(eachQual, true)) { - joinQuals.add(eachQual); - } else { - joinFilters.add(eachQual); - } - } - - this.joinQual = AlgebraicUtil.createSingletonExprFromCNF(joinQuals.toArray(new EvalNode[joinQuals.size()])); - if (joinFilters.size() > 0) { - this.joinFilter = AlgebraicUtil.createSingletonExprFromCNF(joinFilters.toArray(new EvalNode[joinFilters.size()])); - } else { - this.joinFilter = null; - } + super(context, plan, leftChild, rightChild, true); // HashJoin only can manage equi join key pairs. this.joinKeyPairs = PlannerUtil.getJoinKeyPairs(joinQual, leftChild.getSchema(), @@ -111,25 +79,12 @@ public HashLeftOuterJoinExec(TaskAttemptContext context, JoinNode plan, Physical rightKeyList[i] = rightChild.getSchema().getColumnId(joinKeyPairs.get(i)[1].getQualifiedName()); } - // for projection - this.projector = new Projector(context, inSchema, outSchema, plan.getTargets()); - // for join frameTuple = new FrameTuple(); outTuple = new VTuple(outSchema.size()); leftKeyTuple = new VTuple(leftKeyList.length); rightNumCols = rightChild.getSchema().size(); - - joinQual.bind(inSchema); - if (joinFilter != null) { - joinFilter.bind(inSchema); - } - } - - @Override - protected void compile() { - joinQual = context.getPrecompiledEval(inSchema, joinQual); } protected void getKeyLeftTuple(final Tuple outerTuple, Tuple keyTuple) { @@ -182,7 +137,7 @@ public Tuple next() throws IOException { frameTuple.set(leftTuple, rightTuple); // evaluate a join condition on both tuples // if there is no join filter, it is always true. - boolean satisfiedWithFilter = joinFilter == null || joinFilter.eval(frameTuple).isTrue(); + boolean satisfiedWithFilter = !hasJoinFilter || joinFilter.eval(frameTuple).isTrue(); boolean satisfiedWithJoinCondition = joinQual.eval(frameTuple).isTrue(); // if a composited tuple satisfies with both join filter and join condition @@ -287,8 +242,6 @@ public void close() throws IOException { iterator = null; plan = null; joinQual = null; - joinFilter = null; - projector = null; } public JoinNode getPlan() { diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java index f1e2fe5060..353075e4b2 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java @@ -78,8 +78,8 @@ public HashShuffleFileWriteExec(TaskAttemptContext context, } @Override - public void init() throws IOException { - super.init(); + public void init(boolean needsRescan) throws IOException { + super.init(needsRescan); } private HashShuffleAppender getAppender(int partId) throws IOException { diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HavingExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HavingExec.java index b71c770c9e..b1018d67c5 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HavingExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HavingExec.java @@ -37,8 +37,8 @@ public HavingExec(TaskAttemptContext context, } @Override - public void init() throws IOException { - super.init(); + public void init(boolean needsRescan) throws IOException { + super.init(needsRescan); qual.bind(inSchema); } diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/LimitExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/LimitExec.java index 14e236637b..83ab6d2ee6 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/LimitExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/LimitExec.java @@ -48,8 +48,9 @@ public Tuple next() throws IOException { return tuple; } + @Override public void rescan() throws IOException { - super.init(); + super.init(false); fetchCount = 0; } } diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MemSortExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MemSortExec.java index f76e3561ac..55c5265d4e 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MemSortExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MemSortExec.java @@ -40,8 +40,8 @@ public MemSortExec(final TaskAttemptContext context, this.plan = plan; } - public void init() throws IOException { - super.init(); + public void init(boolean needsRescan) throws IOException { + super.init(needsRescan); this.tupleSlots = new ArrayList(10000); } diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLJoinExec.java index 964a5232f8..b7197fdcce 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLJoinExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLJoinExec.java @@ -44,6 +44,11 @@ public NLJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec outer, outTuple = new VTuple(outSchema.size()); } + @Override + public void init(boolean rescan) throws IOException { + init(rescan, true); + } + public Tuple next() throws IOException { while (!context.isStopped()) { if (needNewOuter) { diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLLeftOuterJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLLeftOuterJoinExec.java index 735623dbdf..b714b22553 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLLeftOuterJoinExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLLeftOuterJoinExec.java @@ -50,6 +50,11 @@ public NLLeftOuterJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalEx rightNumCols = rightChild.getSchema().size(); } + @Override + public void init(boolean rescan) throws IOException { + init(rescan, true); + } + public Tuple next() throws IOException { while (!context.isStopped()) { if (needNextRightTuple) { diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PartitionMergeScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PartitionMergeScanExec.java index a1eaa48f8b..af24299e78 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PartitionMergeScanExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PartitionMergeScanExec.java @@ -57,15 +57,17 @@ public PartitionMergeScanExec(TaskAttemptContext context, } @Override - public void init() throws IOException { + public void init(boolean needsRescan) throws IOException { for (CatalogProtos.FragmentProto fragment : fragments) { SeqScanExec scanExec = new SeqScanExec(context, (ScanNode) PlannerUtil.clone(null, plan), new CatalogProtos.FragmentProto[]{fragment}); + scanExec.init(true); scanners.add(scanExec); } progress = 0.0f; rescan(); - super.init(); + + super.init(true); } @Override @@ -83,7 +85,7 @@ public Tuple next() throws IOException { currentScanner.close(); } currentScanner = iterator.next(); - currentScanner.init(); + currentScanner.rescan(); } else { break; } @@ -96,7 +98,7 @@ public void rescan() throws IOException { if (scanners.size() > 0) { iterator = scanners.iterator(); currentScanner = iterator.next(); - currentScanner.init(); + currentScanner.rescan(); } } @@ -105,9 +107,9 @@ public void close() throws IOException { inputStats.reset(); for (SeqScanExec scanner : scanners) { scanner.close(); - TableStats scannerTableStsts = scanner.getInputStats(); - if (scannerTableStsts != null) { - inputStats.merge(scannerTableStsts); + TableStats scannerTableStats = scanner.getInputStats(); + if (scannerTableStats != null) { + inputStats.merge(scannerTableStats); } } iterator = null; diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java index de14c9a9d7..d316d12fac 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java @@ -37,6 +37,9 @@ public abstract class PhysicalExec implements SchemaObject { protected Schema outSchema; protected int outColumnNum; + // some exec's hold data in memory. in high-memory pressure situation, we need paging on them + protected boolean parentNeedsRescan; + public PhysicalExec(final TaskAttemptContext context, final Schema inSchema, final Schema outSchema) { this.context = context; @@ -49,7 +52,9 @@ public final Schema getSchema() { return outSchema; } - public void init() throws IOException { + // needsRescan=true : parent might rescan on this + public void init(boolean needsRescan) throws IOException { + parentNeedsRescan = needsRescan; if (context.getQueryContext().getBool(SessionVars.CODEGEN)) { this.compile(); } diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java index 72a667dbba..638ab61e8b 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java @@ -42,8 +42,8 @@ public ProjectionExec(TaskAttemptContext context, Projectable plan, this.plan = plan; } - public void init() throws IOException { - super.init(); + public void init(boolean needsRescan) throws IOException { + super.init(needsRescan); this.outTuple = new VTuple(outSchema.size()); this.projector = new Projector(context, inSchema, outSchema, this.plan.getTargets()); diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java index 8da1a03638..60c57e4842 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java @@ -58,8 +58,8 @@ public RangeShuffleFileWriteExec(final TaskAttemptContext context, this.sortSpecs = sortSpecs; } - public void init() throws IOException { - super.init(); + public void init(boolean needsRescan) throws IOException { + super.init(needsRescan); indexKeys = new int[sortSpecs.length]; keySchema = PlannerUtil.sortSpecsToSchema(sortSpecs); diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ScanExec.java index 86874ba3cc..9e3351608a 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ScanExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ScanExec.java @@ -43,10 +43,10 @@ public ScanExec(TaskAttemptContext context, Schema inSchema, Schema outSchema) { public abstract CatalogProtos.FragmentProto[] getFragments(); @Override - public void init() throws IOException { + public void init(boolean needsRescan) throws IOException { canBroadcast = checkIfBroadcast(); - super.init(); + super.init(needsRescan); } public boolean canBroadcast() { diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SelectionExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SelectionExec.java index c090fa75f0..43aafe71c5 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SelectionExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SelectionExec.java @@ -37,8 +37,8 @@ public SelectionExec(TaskAttemptContext context, } @Override - public void init() throws IOException { - super.init(); + public void init(boolean needsRescan) throws IOException { + super.init(needsRescan); qual.bind(inSchema); } diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java index 671555c791..611c703fdc 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java @@ -135,7 +135,7 @@ private void rewriteColumnPartitionedTableSchema() throws IOException { } @Override - public void init() throws IOException { + public void init(boolean needsRescan) throws IOException { Schema projected; if (plan.hasTargets()) { @@ -160,7 +160,7 @@ public void init() throws IOException { } initScanner(projected); - super.init(); + super.init(needsRescan); if (plan.hasQual()) { qual.bind(inSchema); diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java index ca90b0e78e..2356f36e19 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java @@ -44,8 +44,8 @@ public SortBasedColPartitionStoreExec(TaskAttemptContext context, StoreTableNode super(context, plan, child); } - public void init() throws IOException { - super.init(); + public void init(boolean needsRescan) throws IOException { + super.init(needsRescan); currentKey = new VTuple(keyNum); } diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java index 562269918e..e2df72968c 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java @@ -61,8 +61,8 @@ public StoreTableExec(TaskAttemptContext context, PersistentStoreNode plan, Phys this.plan = plan; } - public void init() throws IOException { - super.init(); + public void init(boolean needsRescan) throws IOException { + super.init(needsRescan); if (plan.hasOptions()) { meta = CatalogUtil.newTableMeta(plan.getStorageType(), plan.getOptions()); diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/UnaryPhysicalExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/UnaryPhysicalExec.java index 2c63ea6f52..38b029a4ed 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/UnaryPhysicalExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/UnaryPhysicalExec.java @@ -37,6 +37,7 @@ public UnaryPhysicalExec(TaskAttemptContext context, this.child = child; } + @SuppressWarnings("unchecked") public T getChild() { return (T) this.child; } @@ -47,13 +48,12 @@ public void setChild(PhysicalExec child){ } @Override - public void init() throws IOException { + public void init(boolean needsRescan) throws IOException { progress = 0.0f; if (child != null) { - child.init(); + child.init(needsRescan); } - - super.init(); + super.init(needsRescan); } @Override diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/WindowAggExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/WindowAggExec.java index 2f1fc467f0..da6bcc54d3 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/WindowAggExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/WindowAggExec.java @@ -180,8 +180,8 @@ public WindowAggExec(TaskAttemptContext context, WindowAggNode plan, PhysicalExe } @Override - public void init() throws IOException { - super.init(); + public void init(boolean needsRescan) throws IOException { + super.init(needsRescan); for (EvalNode functionEval : functions) { functionEval.bind(inSchema); } diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java index 804821bf77..2d2a11afec 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java @@ -122,7 +122,7 @@ private void initSeqScanExec() throws IOException { } catch (CloneNotSupportedException e) { throw new IOException(e.getMessage(), e); } - scanExec.init(); + scanExec.init(false); currentFragmentIndex += fragments.size(); } } diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java index 958c252304..00e886be6b 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java @@ -124,7 +124,7 @@ public void init() throws IOException { outSchema = physicalExec.getSchema(); encoder = RowStoreUtil.createEncoder(getLogicalSchema()); - physicalExec.init(); + physicalExec.init(false); } @Override @@ -665,6 +665,8 @@ public SystemPhysicalExec(TaskAttemptContext context, ScanNode scanNode) { projector = new Projector(context, inSchema, outSchema, scanNode.getTargets()); } + // this class rescans but it's not from children. so we need not to init with flag needsRescan=true + @Override public Tuple next() throws IOException { Tuple aTuple; diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java index 2eb3c5f752..41a4510193 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java @@ -322,7 +322,7 @@ private void insertNonFromQuery(QueryContext queryContext, EvalExprExec evalExprExec = new EvalExprExec(taskAttemptContext, (EvalExprNode) insertNode.getChild()); StoreTableExec exec = new StoreTableExec(taskAttemptContext, insertNode, evalExprExec); try { - exec.init(); + exec.init(false); exec.next(); } finally { exec.close(); diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java index b08af2b3a7..80cf6d3280 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java @@ -402,7 +402,7 @@ public void run() throws Exception { this.executor = executionBlockContext.getTQueryEngine(). createPlan(context, plan); - this.executor.init(); + this.executor.init(false); while(!context.isStopped() && executor.next() != null) { } diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java index 842a120791..18878837e5 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java @@ -169,7 +169,7 @@ public final void testBNLCrossJoin() throws IOException, PlanningException { assertTrue(proj.getChild() instanceof BNLJoinExec); int i = 0; - exec.init(); + exec.init(false); while (exec.next() != null) { i++; } @@ -210,7 +210,7 @@ public final void testBNLInnerJoin() throws IOException, PlanningException { Tuple tuple; int i = 1; int count = 0; - exec.init(); + exec.init(false); while ((tuple = exec.next()) != null) { count++; assertTrue(i == tuple.get(0).asInt4()); diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java index b22a87e168..cbe5f67720 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java @@ -176,7 +176,7 @@ public void testEqual() throws Exception { int tupleCount = this.randomValues.get(rndKey); int counter = 0; - exec.init(); + exec.init(false); while (exec.next() != null) { counter ++; } diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java index 946e0f30bf..fa1ca0952b 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java @@ -148,7 +148,7 @@ public final void testNext() throws IOException, PlanningException { Tuple preVal = null; Tuple curVal; int cnt = 0; - exec.init(); + exec.init(false); long start = System.currentTimeMillis(); BaseTupleComparator comparator = new BaseTupleComparator(proj.getSchema(), new SortSpec[]{ diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java index 780e698a4b..0ba598870b 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java @@ -285,7 +285,7 @@ public final void testFullOuterHashJoinExec0() throws IOException, PlanningExcep assertTrue(proj.getChild() instanceof HashFullOuterJoinExec); int count = 0; - exec.init(); + exec.init(false); while (exec.next() != null) { //TODO check contents @@ -324,7 +324,7 @@ public final void testFullOuterHashJoinExec1() throws IOException, PlanningExcep assertTrue(proj.getChild() instanceof HashFullOuterJoinExec); int count = 0; - exec.init(); + exec.init(false); while (exec.next() != null) { //TODO check contents @@ -362,7 +362,7 @@ public final void testFullOuterHashJoinExec2() throws IOException, PlanningExcep assertTrue(proj.getChild() instanceof HashFullOuterJoinExec); int count = 0; - exec.init(); + exec.init(false); while (exec.next() != null) { //TODO check contents @@ -402,7 +402,7 @@ public final void testFullOuterHashJoinExec3() throws IOException, PlanningExcep assertTrue(proj.getChild() instanceof HashFullOuterJoinExec); int count = 0; - exec.init(); + exec.init(false); while (exec.next() != null) { //TODO check contents count = count + 1; diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterMergeJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterMergeJoinExec.java index ef6f1ceb71..95760c9985 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterMergeJoinExec.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterMergeJoinExec.java @@ -333,7 +333,7 @@ public final void testFullOuterMergeJoin0() throws IOException, PlanningExceptio ProjectionExec proj = (ProjectionExec) exec; assertTrue(proj.getChild() instanceof MergeFullOuterJoinExec); int count = 0; - exec.init(); + exec.init(false); while (exec.next() != null) { //TODO check contents @@ -371,7 +371,7 @@ public final void testFullOuterMergeJoin1() throws IOException, PlanningExceptio assertTrue(proj.getChild() instanceof MergeFullOuterJoinExec); int count = 0; - exec.init(); + exec.init(false); while (exec.next() != null) { //TODO check contents @@ -409,7 +409,7 @@ public final void testFullOuterMergeJoin2() throws IOException, PlanningExceptio int count = 0; - exec.init(); + exec.init(false); while (exec.next() != null) { //TODO check contents @@ -448,7 +448,7 @@ public final void testFullOuterMergeJoin3() throws IOException, PlanningExceptio assertTrue(proj.getChild() instanceof MergeFullOuterJoinExec); int count = 0; - exec.init(); + exec.init(false); while (exec.next() != null) { //TODO check contents @@ -487,7 +487,7 @@ public final void testFullOuterMergeJoin4() throws IOException, PlanningExceptio assertTrue(proj.getChild() instanceof MergeFullOuterJoinExec); int count = 0; - exec.init(); + exec.init(false); while (exec.next() != null) { //TODO check contents @@ -526,7 +526,7 @@ public final void testFullOuterMergeJoin5() throws IOException, PlanningExceptio assertTrue(proj.getChild() instanceof MergeFullOuterJoinExec); int count = 0; - exec.init(); + exec.init(false); while (exec.next() != null) { //TODO check contents diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java index 64efa28ec3..be4aad05fd 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java @@ -197,7 +197,7 @@ public final void testHashAntiJoin() throws IOException, PlanningException { Tuple tuple; int count = 0; int i = 0; - exec.init(); + exec.init(false); while ((tuple = exec.next()) != null) { count++; assertTrue(i == tuple.get(0).asInt4()); diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java index 578b58607d..5492ca8810 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java @@ -171,7 +171,7 @@ public final void testHashInnerJoin() throws IOException, PlanningException { Tuple tuple; int count = 0; int i = 1; - exec.init(); + exec.init(false); while ((tuple = exec.next()) != null) { count++; assertTrue(i == tuple.get(0).asInt4()); diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java index 42a018bdac..b9ab0ddd75 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java @@ -202,7 +202,7 @@ public final void testHashSemiJoin() throws IOException, PlanningException { Tuple tuple; int count = 0; int i = 1; - exec.init(); + exec.init(false); // expect result without duplicated tuples. while ((tuple = exec.next()) != null) { count++; diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java index 5747e58aed..dfedc2b6be 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java @@ -289,7 +289,7 @@ public final void testLeftOuterHashJoinExec0() throws IOException, PlanningExcep assertTrue(proj.getChild() instanceof HashLeftOuterJoinExec); int count = 0; - exec.init(); + exec.init(false); while (exec.next() != null) { //TODO check contents count = count + 1; @@ -328,7 +328,7 @@ public final void testLeftOuter_HashJoinExec1() throws IOException, PlanningExce Tuple tuple; int count = 0; int i = 1; - exec.init(); + exec.init(false); while ((tuple = exec.next()) != null) { //TODO check contents @@ -370,7 +370,7 @@ public final void testLeftOuter_HashJoinExec2() throws IOException, PlanningExce Tuple tuple; int count = 0; int i = 1; - exec.init(); + exec.init(false); while ((tuple = exec.next()) != null) { //TODO check contents @@ -412,7 +412,7 @@ public final void testLeftOuter_HashJoinExec3() throws IOException, PlanningExce Tuple tuple; int count = 0; int i = 1; - exec.init(); + exec.init(false); while ((tuple = exec.next()) != null) { //TODO check contents @@ -452,7 +452,7 @@ public final void testLeftOuter_HashJoinExec4() throws IOException, PlanningExce } else{ int count = 0; - exec.init(); + exec.init(false); while (exec.next() != null) { //TODO check contents diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterNLJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterNLJoinExec.java index ab4b88161f..a212fbb5fc 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterNLJoinExec.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterNLJoinExec.java @@ -283,7 +283,7 @@ public final void testLeftOuterNLJoinExec0() throws IOException, PlanningExcepti } int count = 0; - exec.init(); + exec.init(false); while (exec.next() != null) { //TODO check contents count = count + 1; @@ -329,7 +329,7 @@ public final void testLeftOuterNLJoinExec1() throws IOException, PlanningExcepti Tuple tuple; int i = 1; int count = 0; - exec.init(); + exec.init(false); while ((tuple = exec.next()) != null) { //TODO check contents count = count + 1; @@ -373,7 +373,7 @@ public final void testLeftOuter_NLJoinExec2() throws IOException, PlanningExcept Tuple tuple; int i = 1; int count = 0; - exec.init(); + exec.init(false); while ((tuple = exec.next()) != null) { //TODO check contents count = count + 1; @@ -418,7 +418,7 @@ public final void testLeftOuter_NLJoinExec3() throws IOException, PlanningExcept Tuple tuple; int i = 1; int count = 0; - exec.init(); + exec.init(false); while ((tuple = exec.next()) != null) { //TODO check contents count = count + 1; @@ -462,7 +462,7 @@ public final void testLeftOuter_NLJoinExec4() throws IOException, PlanningExcept Tuple tuple; int i = 1; int count = 0; - exec.init(); + exec.init(false); while ((tuple = exec.next()) != null) { //TODO check contents count = count + 1; diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java index 1cc4e6358e..ac7b971510 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java @@ -182,7 +182,7 @@ public final void testMergeInnerJoin() throws IOException, PlanningException { Tuple tuple; int count = 0; int i = 1; - exec.init(); + exec.init(false); while ((tuple = exec.next()) != null) { count++; assertTrue(i == tuple.get(0).asInt4()); diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java index 240e7fddeb..e3da88ea25 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java @@ -163,7 +163,7 @@ public final void testNLCrossJoin() throws IOException, PlanningException { PhysicalExec exec = phyPlanner.createPlan(ctx, plan); int i = 0; - exec.init(); + exec.init(false); while (exec.next() != null) { i++; } @@ -194,7 +194,7 @@ public final void testNLInnerJoin() throws IOException, PlanningException { Tuple tuple; int i = 1; int count = 0; - exec.init(); + exec.init(false); while ((tuple = exec.next()) != null) { count++; assertTrue(i == tuple.get(0).asInt4()); diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java index e2216b91f5..9aceddad98 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java @@ -256,7 +256,7 @@ public final void testCreateScanPlan() throws IOException, PlanningException { Tuple tuple; int i = 0; - exec.init(); + exec.init(false); while ((tuple = exec.next()) != null) { assertTrue(tuple.contains(0)); assertTrue(tuple.contains(1)); @@ -287,7 +287,7 @@ public final void testCreateScanWithFilterPlan() throws IOException, PlanningExc Tuple tuple; int i = 0; - exec.init(); + exec.init(false); while ((tuple = exec.next()) != null) { assertTrue(tuple.contains(0)); i++; @@ -315,7 +315,7 @@ public final void testGroupByPlan() throws IOException, PlanningException { int i = 0; Tuple tuple; - exec.init(); + exec.init(false); while ((tuple = exec.next()) != null) { assertEquals(6, tuple.get(2).asInt4()); // sum assertEquals(3, tuple.get(3).asInt4()); // max @@ -346,7 +346,7 @@ public final void testHashGroupByPlanWithALLField() throws IOException, Planning int i = 0; Tuple tuple; - exec.init(); + exec.init(false); while ((tuple = exec.next()) != null) { assertEquals(12, tuple.get(1).asInt4()); // sum assertEquals(3, tuple.get(2).asInt4()); // max @@ -390,7 +390,7 @@ public final void testSortGroupByPlan() throws IOException, PlanningException { int i = 0; Tuple tuple; - exec.init(); + exec.init(false); while ((tuple = exec.next()) != null) { assertEquals(6, tuple.get(2).asInt4()); // sum assertEquals(3, tuple.get(3).asInt4()); // max @@ -439,7 +439,7 @@ public final void testStorePlan() throws IOException, PlanningException { PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode); - exec.init(); + exec.init(false); exec.next(); exec.close(); @@ -490,7 +490,7 @@ public final void testStorePlanWithMaxOutputFileSize() throws IOException, Plann // executing StoreTableExec PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode); - exec.init(); + exec.init(false); exec.next(); exec.close(); @@ -536,7 +536,7 @@ public final void testStorePlanWithRCFile() throws IOException, PlanningExceptio PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode); - exec.init(); + exec.init(false); exec.next(); exec.close(); @@ -653,7 +653,7 @@ public final void testPartitionedStorePlan() throws IOException, PlanningExcepti PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode); - exec.init(); + exec.init(false); exec.next(); exec.close(); ctx.getHashShuffleAppenderManager().close(ebId); @@ -717,7 +717,7 @@ public final void testPartitionedStorePlanWithMaxFileSize() throws IOException, // Executing CREATE TABLE PARTITION BY PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode); - exec.init(); + exec.init(false); exec.next(); exec.close(); @@ -787,7 +787,7 @@ public final void testPartitionedStorePlanWithEmptyGroupingSet() PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode); - exec.init(); + exec.init(false); exec.next(); exec.close(); ctx.getHashShuffleAppenderManager().close(ebId); @@ -847,7 +847,7 @@ public final void testAggregationFunction() throws IOException, PlanningExceptio PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode); - exec.init(); + exec.init(false); Tuple tuple = exec.next(); assertEquals(30, tuple.get(0).asInt8()); assertEquals(3, tuple.get(1).asInt4()); @@ -877,7 +877,7 @@ public final void testCountFunction() throws IOException, PlanningException { PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode); - exec.init(); + exec.init(false); Tuple tuple = exec.next(); assertEquals(30, tuple.get(0).asInt8()); assertNull(exec.next()); @@ -901,7 +901,7 @@ public final void testGroupByWithNullValue() throws IOException, PlanningExcepti PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode); int count = 0; - exec.init(); + exec.init(false); while(exec.next() != null) { count++; } @@ -931,7 +931,7 @@ public final void testUnionPlan() throws IOException, PlanningException, CloneNo PhysicalExec exec = phyPlanner.createPlan(ctx, root); int count = 0; - exec.init(); + exec.init(false); while(exec.next() != null) { count++; } @@ -952,7 +952,7 @@ public final void testEvalExpr() throws IOException, PlanningException { PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode); Tuple tuple; - exec.init(); + exec.init(false); tuple = exec.next(); exec.close(); assertEquals(true, tuple.get(0).asBool()); @@ -964,7 +964,7 @@ public final void testEvalExpr() throws IOException, PlanningException { phyPlanner = new PhysicalPlannerImpl(conf); exec = phyPlanner.createPlan(ctx, rootNode); - exec.init(); + exec.init(false); tuple = exec.next(); exec.close(); assertEquals(DatumFactory.createBool(true), tuple.get(0)); @@ -988,7 +988,7 @@ public final void testCreateIndex() throws IOException, PlanningException { PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode); - exec.init(); + exec.init(false); while (exec.next() != null) { } exec.close(); @@ -1022,7 +1022,7 @@ public final void testDuplicateEliminate() throws IOException, PlanningException int cnt = 0; Set expected = Sets.newHashSet( "name_1", "name_2", "name_3", "name_4", "name_5"); - exec.init(); + exec.init(false); while ((tuple = exec.next()) != null) { assertTrue(expected.contains(tuple.get(0).asChars())); cnt++; @@ -1057,7 +1057,7 @@ public final void testSortEnforcer() throws IOException, PlanningException { PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode); - exec.init(); + exec.init(false); exec.next(); exec.close(); @@ -1079,7 +1079,7 @@ public final void testSortEnforcer() throws IOException, PlanningException { phyPlanner = new PhysicalPlannerImpl(conf); exec = phyPlanner.createPlan(ctx, rootNode); - exec.init(); + exec.init(false); exec.next(); exec.close(); @@ -1107,7 +1107,7 @@ public final void testGroupByEnforcer() throws IOException, PlanningException { PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode); - exec.init(); + exec.init(false); exec.next(); exec.close(); @@ -1129,7 +1129,7 @@ public final void testGroupByEnforcer() throws IOException, PlanningException { phyPlanner = new PhysicalPlannerImpl(conf); exec = phyPlanner.createPlan(ctx, rootNode); - exec.init(); + exec.init(false); exec.next(); exec.close(); diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java index 161b0d8851..ca78238346 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java @@ -167,7 +167,7 @@ private void testProgress(int sortBufferBytesNum) throws Exception { Tuple preVal = null; Tuple curVal; int cnt = 0; - exec.init(); + exec.init(true); BaseTupleComparator comparator = new BaseTupleComparator(proj.getSchema(), new SortSpec[]{ new SortSpec(new Column("managerid", TajoDataTypes.Type.INT4)), diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterHashJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterHashJoinExec.java index 06ab6c8781..6b63ebbcfc 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterHashJoinExec.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterHashJoinExec.java @@ -257,7 +257,7 @@ public final void testRightOuter_HashJoinExec0() throws IOException, PlanningExc Tuple tuple; int count = 0; int i = 1; - exec.init(); + exec.init(false); while ((tuple = exec.next()) != null) { //TODO check contents @@ -298,7 +298,7 @@ public final void testRightOuter_HashJoinExec1() throws IOException, PlanningExc Tuple tuple; int count = 0; int i = 1; - exec.init(); + exec.init(false); while ((tuple = exec.next()) != null) { //TODO check contents @@ -339,7 +339,7 @@ public final void testRightOuter_HashJoinExec2() throws IOException, PlanningExc Tuple tuple; int count = 0; int i = 1; - exec.init(); + exec.init(false); while ((tuple = exec.next()) != null) { //TODO check contents diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterMergeJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterMergeJoinExec.java index 30fd999762..3dcfad1b7d 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterMergeJoinExec.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterMergeJoinExec.java @@ -333,7 +333,7 @@ public final void testRightOuterMergeJoin0() throws IOException, PlanningExcepti assertTrue(proj.getChild() instanceof RightOuterMergeJoinExec); int count = 0; - exec.init(); + exec.init(false); while (exec.next() != null) { //TODO check contents count = count + 1; @@ -370,7 +370,7 @@ public final void testRightOuter_MergeJoin1() throws IOException, PlanningExcept assertTrue(proj.getChild() instanceof RightOuterMergeJoinExec); int count = 0; - exec.init(); + exec.init(false); while (exec.next() != null) { //TODO check contents count = count + 1; @@ -405,7 +405,7 @@ public final void testRightOuterMergeJoin2() throws IOException, PlanningExcepti assertTrue(proj.getChild() instanceof RightOuterMergeJoinExec); int count = 0; - exec.init(); + exec.init(false); while (exec.next() != null) { //TODO check contents count = count + 1; @@ -442,7 +442,7 @@ public final void testRightOuter_MergeJoin3() throws IOException, PlanningExcept assertTrue(proj.getChild() instanceof RightOuterMergeJoinExec); int count = 0; - exec.init(); + exec.init(false); while (exec.next() != null) { //TODO check contents @@ -479,7 +479,7 @@ public final void testRightOuter_MergeJoin4() throws IOException, PlanningExcept assertTrue(proj.getChild() instanceof RightOuterMergeJoinExec); int count = 0; - exec.init(); + exec.init(false); while (exec.next() != null) { //TODO check contents @@ -516,7 +516,7 @@ public final void testRightOuterMergeJoin5() throws IOException, PlanningExcepti assertTrue(proj.getChild() instanceof RightOuterMergeJoinExec); int count = 0; - exec.init(); + exec.init(false); while (exec.next() != null) { //TODO check contents diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java index f2c2b1a393..56a6b5b0a6 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java @@ -127,7 +127,7 @@ public final void testNext() throws IOException, PlanningException { Tuple tuple; Datum preVal = null; Datum curVal; - exec.init(); + exec.init(false); while ((tuple = exec.next()) != null) { curVal = tuple.get(0); if (preVal != null) { diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/expr/AlgebraicUtil.java b/tajo-plan/src/main/java/org/apache/tajo/plan/expr/AlgebraicUtil.java index fb05f33b72..140a4ef715 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/expr/AlgebraicUtil.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/expr/AlgebraicUtil.java @@ -21,6 +21,7 @@ import org.apache.tajo.catalog.Column; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Stack; @@ -327,6 +328,10 @@ public static boolean isIndexableOperator(EvalNode expr) { (expr.getType() == EvalType.LIKE && !((LikePredicateEval)expr).isLeadingWildCard()); } + public static EvalNode createSingletonExprFromCNF(Collection cnfExprs) { + return createSingletonExprFromCNF(cnfExprs.toArray(new EvalNode[cnfExprs.size()])); + } + /** * Convert a list of conjunctive normal forms into a singleton expression. *