diff --git a/tajo-common/src/main/java/org/apache/tajo/SessionVars.java b/tajo-common/src/main/java/org/apache/tajo/SessionVars.java index 5cca413c8e..c44a951c73 100644 --- a/tajo-common/src/main/java/org/apache/tajo/SessionVars.java +++ b/tajo-common/src/main/java/org/apache/tajo/SessionVars.java @@ -116,6 +116,8 @@ public enum SessionVars implements ConfigKey { "limited size for hash inner join (mb)", DEFAULT, Long.class, Validators.min("0")), OUTER_HASH_JOIN_SIZE_LIMIT(ConfVars.$EXECUTOR_OUTER_HASH_JOIN_SIZE_THRESHOLD, "limited size for hash outer join (mb)", DEFAULT, Long.class, Validators.min("0")), + CROSS_HASH_JOIN_SIZE_LIMIT(ConfVars.$EXECUTOR_CROSS_HASH_JOIN_SIZE_THRESHOLD, "limited size for hash cross join (mb)", + DEFAULT, Long.class, Validators.min("0")), HASH_GROUPBY_SIZE_LIMIT(ConfVars.$EXECUTOR_GROUPBY_INMEMORY_HASH_THRESHOLD, "limited size for hash groupby (mb)", DEFAULT, Long.class, Validators.min("0")), MAX_OUTPUT_FILE_SIZE(ConfVars.$MAX_OUTPUT_FILE_SIZE, "Maximum per-output file size (mb). 0 means infinite.", DEFAULT, diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java index e74d842e98..b9e915b49b 100644 --- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java +++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java @@ -328,6 +328,8 @@ public static enum ConfVars implements ConfigKey { (long)256 * 1048576), $EXECUTOR_OUTER_HASH_JOIN_SIZE_THRESHOLD("tajo.executor.join.outer.in-memory-hash-threshold-bytes", (long)256 * 1048576), + $EXECUTOR_CROSS_HASH_JOIN_SIZE_THRESHOLD("tajo.executor.join.cross.in-memory-hash-threshold-bytes", + (long)256 * 1048576), $EXECUTOR_GROUPBY_INMEMORY_HASH_THRESHOLD("tajo.executor.groupby.in-memory-hash-threshold-bytes", (long)256 * 1048576), $MAX_OUTPUT_FILE_SIZE("tajo.query.max-outfile-size-mb", 0), // zero means infinite diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlanner.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlanner.java index d4c57db220..e5992c751a 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlanner.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlanner.java @@ -33,4 +33,6 @@ public interface PhysicalPlanner { public PhysicalExec createPlan(TaskAttemptContext context, LogicalNode logicalPlan) throws InternalException; + + enum INPUT { LEFT, RIGHT } } diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java index ac1c9adb5f..acf64c5715 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java @@ -256,28 +256,38 @@ public long estimateSizeRecursive(TaskAttemptContext ctx, String [] tableIds) th return size; } - @VisibleForTesting - public boolean checkIfInMemoryInnerJoinIsPossible(TaskAttemptContext context, LogicalNode node, boolean left) + public boolean isInnerJoinHashApplicable(TaskAttemptContext context, LogicalNode node, INPUT input) throws IOException { - String [] lineage = PlannerUtil.getRelationLineage(node); - long volume = estimateSizeRecursive(context, lineage); - boolean inMemoryInnerJoinFlag = false; + return isHashApplicable(context, node, SessionVars.INNER_HASH_JOIN_SIZE_LIMIT, input); + } - QueryContext queryContext = context.getQueryContext(); + public boolean isOuterJoinHashApplicable(TaskAttemptContext context, LogicalNode node, INPUT input) + throws IOException { + return isHashApplicable(context, node, SessionVars.OUTER_HASH_JOIN_SIZE_LIMIT, input); + } - if (queryContext.containsKey(SessionVars.INNER_HASH_JOIN_SIZE_LIMIT)) { - inMemoryInnerJoinFlag = volume <= context.getQueryContext().getLong(SessionVars.INNER_HASH_JOIN_SIZE_LIMIT); - } else { - inMemoryInnerJoinFlag = volume <= context.getQueryContext().getLong(SessionVars.HASH_JOIN_SIZE_LIMIT); - } + public boolean isCrossJoinHashApplicable(TaskAttemptContext context, LogicalNode node, INPUT input) + throws IOException { + return isHashApplicable(context, node, SessionVars.CROSS_HASH_JOIN_SIZE_LIMIT, input); + } + + private boolean isHashApplicable(TaskAttemptContext context, LogicalNode node, + SessionVars sessionVar, INPUT input) throws IOException { + String [] lineage = PlannerUtil.getRelationLineage(node); + long volume = estimateSizeRecursive(context, lineage); - LOG.info(String.format("[%s] the volume of %s relations (%s) is %s and is %sfit to main maemory.", + boolean applicable = volume <= getThreshold(context.getQueryContext(), sessionVar); + LOG.info(String.format("[%s] the volume of %s relations (%s) is %s and is %sfit to main memory.", context.getTaskId().toString(), - (left ? "Left" : "Right"), + input.name().toLowerCase(), TUtil.arrayToString(lineage), FileUtil.humanReadableByteCount(volume, false), - (inMemoryInnerJoinFlag ? "" : "not "))); - return inMemoryInnerJoinFlag; + (applicable ? "" : "not "))); + return applicable; + } + + private long getThreshold(QueryContext context, SessionVars sessionVar) { + return context.getLong(sessionVar, context.getLong(SessionVars.HASH_JOIN_SIZE_LIMIT)); } public PhysicalExec createJoinPlan(TaskAttemptContext context, JoinNode joinNode, PhysicalExec leftExec, @@ -325,6 +335,9 @@ private PhysicalExec createCrossJoinPlan(TaskAttemptContext context, JoinNode pl JoinAlgorithm algorithm = property.getJoin().getAlgorithm(); switch (algorithm) { + case IN_MEMORY_HASH_JOIN: + LOG.info("Join (" + plan.getPID() +") chooses [Hash Join]"); + return new HashCrossJoinExec(context, plan, leftExec, rightExec); case NESTED_LOOP_JOIN: LOG.info("Join (" + plan.getPID() +") chooses [Nested Loop Join]"); return new NLJoinExec(context, plan, leftExec, rightExec); @@ -336,10 +349,19 @@ private PhysicalExec createCrossJoinPlan(TaskAttemptContext context, JoinNode pl LOG.error("Invalid Cross Join Algorithm Enforcer: " + algorithm.name()); return new BNLJoinExec(context, plan, leftExec, rightExec); } + } - } else { - return new BNLJoinExec(context, plan, leftExec, rightExec); + boolean inMemoryHashJoin = + isCrossJoinHashApplicable(context, plan.getLeftChild(), INPUT.LEFT) || + isCrossJoinHashApplicable(context, plan.getRightChild(), INPUT.RIGHT); + + if (inMemoryHashJoin) { + LOG.info("Join (" + plan.getPID() +") chooses [Hash Join]"); + // returns two PhysicalExec. smaller one is 0, and larger one is 1. + PhysicalExec [] orderedChilds = switchJoinSidesIfNecessary(context, plan, leftExec, rightExec); + return new HashCrossJoinExec(context, plan, orderedChilds[1], orderedChilds[0]); } + return new BNLJoinExec(context, plan, leftExec, rightExec); } private PhysicalExec createInnerJoinPlan(TaskAttemptContext context, JoinNode plan, @@ -417,11 +439,9 @@ private PhysicalExec createInnerJoinPlan(TaskAttemptContext context, JoinNode pl private PhysicalExec createBestInnerJoinPlan(TaskAttemptContext context, JoinNode plan, PhysicalExec leftExec, PhysicalExec rightExec) throws IOException { - boolean inMemoryHashJoin = false; - if (checkIfInMemoryInnerJoinIsPossible(context, plan.getLeftChild(), true) - || checkIfInMemoryInnerJoinIsPossible(context, plan.getRightChild(), false)) { - inMemoryHashJoin = true; - } + boolean inMemoryHashJoin = + isInnerJoinHashApplicable(context, plan.getLeftChild(), INPUT.LEFT) || + isInnerJoinHashApplicable(context, plan.getRightChild(), INPUT.RIGHT); if (inMemoryHashJoin) { LOG.info("Join (" + plan.getPID() +") chooses [In-memory Hash Join]"); @@ -480,19 +500,7 @@ private PhysicalExec createLeftOuterJoinPlan(TaskAttemptContext context, JoinNod private PhysicalExec createBestLeftOuterJoinPlan(TaskAttemptContext context, JoinNode plan, PhysicalExec leftExec, PhysicalExec rightExec) throws IOException { - String [] rightLineage = PlannerUtil.getRelationLineage(plan.getRightChild()); - long rightTableVolume = estimateSizeRecursive(context, rightLineage); - boolean hashJoin; - - QueryContext queryContext = context.getQueryContext(); - - if (queryContext.containsKey(SessionVars.OUTER_HASH_JOIN_SIZE_LIMIT)) { - hashJoin = rightTableVolume < queryContext.getLong(SessionVars.OUTER_HASH_JOIN_SIZE_LIMIT); - } else { - hashJoin = rightTableVolume < queryContext.getLong(SessionVars.HASH_JOIN_SIZE_LIMIT); - } - - if (hashJoin) { + if (isOuterJoinHashApplicable(context, plan, INPUT.RIGHT)) { // we can implement left outer join using hash join, using the right operand as the build relation LOG.info("Left Outer Join (" + plan.getPID() +") chooses [Hash Join]."); return new HashLeftOuterJoinExec(context, plan, leftExec, rightExec); @@ -508,19 +516,7 @@ private PhysicalExec createBestRightJoinPlan(TaskAttemptContext context, JoinNod PhysicalExec leftExec, PhysicalExec rightExec) throws IOException { //if the left operand is small enough => implement it as a left outer hash join with exchanged operators (note: // blocking, but merge join is blocking as well) - String [] outerLineage4 = PlannerUtil.getRelationLineage(plan.getLeftChild()); - long leftTableVolume = estimateSizeRecursive(context, outerLineage4); - boolean hashJoin; - - QueryContext queryContext = context.getQueryContext(); - - if (queryContext.containsKey(SessionVars.OUTER_HASH_JOIN_SIZE_LIMIT)) { - hashJoin = leftTableVolume < queryContext.getLong(SessionVars.OUTER_HASH_JOIN_SIZE_LIMIT); - } else { - hashJoin = leftTableVolume < queryContext.getLong(SessionVars.HASH_JOIN_SIZE_LIMIT); - } - - if (hashJoin){ + if (isOuterJoinHashApplicable(context, plan, INPUT.LEFT)){ LOG.info("Right Outer Join (" + plan.getPID() +") chooses [Hash Join]."); return new HashLeftOuterJoinExec(context, plan, rightExec, leftExec); } else { diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashCrossJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashCrossJoinExec.java new file mode 100644 index 0000000000..b0144c8185 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashCrossJoinExec.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.engine.planner.physical; + +import org.apache.tajo.plan.logical.JoinNode; +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.storage.VTuple; +import org.apache.tajo.worker.TaskAttemptContext; + +import java.io.IOException; +import java.util.Iterator; +import java.util.List; + +public class HashCrossJoinExec extends HashJoinExec { + + private Iterator> outIterator; + + public HashCrossJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec leftExec, + PhysicalExec rightExec) { + super(context, plan, leftExec, rightExec); + } + + @Override + public Tuple next() throws IOException { + if (first) { + loadRightToHashTable(); + } + if (tupleSlots.isEmpty()) { + return null; + } + + while (!context.isStopped() && !finished) { + if (shouldGetLeftTuple) { + leftTuple = leftChild.next(); + if (leftTuple == null) { + finished = true; + return null; + } + outIterator = tupleSlots.values().iterator(); + iterator = outIterator.next().iterator(); + shouldGetLeftTuple = false; + } + + // getting a next right tuple on in-memory hash table. + while (!iterator.hasNext() && outIterator.hasNext()) { + iterator = outIterator.next().iterator(); + } + if (!iterator.hasNext()) { + shouldGetLeftTuple = true; + continue; + } + frameTuple.set(leftTuple, iterator.next()); + projector.eval(frameTuple, outTuple); + return new VTuple(outTuple); + } + + return null; + } +} diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java index 48f3682610..4c427b6325 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java @@ -59,24 +59,26 @@ public HashJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec left super(context, plan, leftExec, rightExec); // HashJoin only can manage equi join key pairs. - this.joinKeyPairs = PlannerUtil.getJoinKeyPairs(joinQual, leftExec.getSchema(), - rightExec.getSchema(), false); + if (hasJoinQual) { + this.joinKeyPairs = PlannerUtil.getJoinKeyPairs(joinQual, leftExec.getSchema(), + rightExec.getSchema(), false); - leftKeyList = new int[joinKeyPairs.size()]; - rightKeyList = new int[joinKeyPairs.size()]; + leftKeyList = new int[joinKeyPairs.size()]; + rightKeyList = new int[joinKeyPairs.size()]; - for (int i = 0; i < joinKeyPairs.size(); i++) { - leftKeyList[i] = leftExec.getSchema().getColumnId(joinKeyPairs.get(i)[0].getQualifiedName()); - } + for (int i = 0; i < joinKeyPairs.size(); i++) { + leftKeyList[i] = leftExec.getSchema().getColumnId(joinKeyPairs.get(i)[0].getQualifiedName()); + } - for (int i = 0; i < joinKeyPairs.size(); i++) { - rightKeyList[i] = rightExec.getSchema().getColumnId(joinKeyPairs.get(i)[1].getQualifiedName()); + for (int i = 0; i < joinKeyPairs.size(); i++) { + rightKeyList[i] = rightExec.getSchema().getColumnId(joinKeyPairs.get(i)[1].getQualifiedName()); + } + leftKeyTuple = new VTuple(leftKeyList.length); } // for join frameTuple = new FrameTuple(); outTuple = new VTuple(outSchema.size()); - leftKeyTuple = new VTuple(leftKeyList.length); } protected void getKeyLeftTuple(final Tuple outerTuple, Tuple keyTuple) { diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java index 578b58607d..1f8463a827 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java @@ -52,6 +52,8 @@ import java.io.IOException; import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME; +import static org.apache.tajo.engine.planner.PhysicalPlanner.INPUT.LEFT; +import static org.apache.tajo.engine.planner.PhysicalPlanner.INPUT.RIGHT; import static org.apache.tajo.ipc.TajoWorkerProtocol.JoinEnforce.JoinAlgorithm; import static org.junit.Assert.*; @@ -269,11 +271,11 @@ private static boolean assertCheckInnerJoinRelatedFunctions(TaskAttemptContext c } if (leftSmaller) { - assertTrue(phyPlanner.checkIfInMemoryInnerJoinIsPossible(ctx, joinNode.getLeftChild(), true)); - assertFalse(phyPlanner.checkIfInMemoryInnerJoinIsPossible(ctx, joinNode.getRightChild(), false)); + assertTrue(phyPlanner.isInnerJoinHashApplicable(ctx, joinNode.getLeftChild(), LEFT)); + assertFalse(phyPlanner.isInnerJoinHashApplicable(ctx, joinNode.getRightChild(), RIGHT)); } else { - assertFalse(phyPlanner.checkIfInMemoryInnerJoinIsPossible(ctx, joinNode.getLeftChild(), true)); - assertTrue(phyPlanner.checkIfInMemoryInnerJoinIsPossible(ctx, joinNode.getRightChild(), false)); + assertFalse(phyPlanner.isInnerJoinHashApplicable(ctx, joinNode.getLeftChild(), LEFT)); + assertTrue(phyPlanner.isInnerJoinHashApplicable(ctx, joinNode.getRightChild(), RIGHT)); } return leftSmaller; diff --git a/tajo-core/src/test/resources/results/TestTajoCli/testHelpSessionVars.result b/tajo-core/src/test/resources/results/TestTajoCli/testHelpSessionVars.result index b5b7c229c9..81719bd86f 100644 --- a/tajo-core/src/test/resources/results/TestTajoCli/testHelpSessionVars.result +++ b/tajo-core/src/test/resources/results/TestTajoCli/testHelpSessionVars.result @@ -30,6 +30,7 @@ Available Session Variables: \set HASH_JOIN_SIZE_LIMIT [long value] - limited size for hash join (mb) \set INNER_HASH_JOIN_SIZE_LIMIT [long value] - limited size for hash inner join (mb) \set OUTER_HASH_JOIN_SIZE_LIMIT [long value] - limited size for hash outer join (mb) +\set CROSS_HASH_JOIN_SIZE_LIMIT [long value] - limited size for hash cross join (mb) \set HASH_GROUPBY_SIZE_LIMIT [long value] - limited size for hash groupby (mb) \set MAX_OUTPUT_FILE_SIZE [int value] - Maximum per-output file size (mb). 0 means infinite. \set NULL_CHAR [text value] - null char of text file output