Skip to content
This repository was archived by the owner on May 12, 2021. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions tajo-common/src/main/java/org/apache/tajo/SessionVars.java
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,8 @@ public enum SessionVars implements ConfigKey {
"limited size for hash inner join (mb)", DEFAULT, Long.class, Validators.min("0")),
OUTER_HASH_JOIN_SIZE_LIMIT(ConfVars.$EXECUTOR_OUTER_HASH_JOIN_SIZE_THRESHOLD, "limited size for hash outer join (mb)",
DEFAULT, Long.class, Validators.min("0")),
CROSS_HASH_JOIN_SIZE_LIMIT(ConfVars.$EXECUTOR_CROSS_HASH_JOIN_SIZE_THRESHOLD, "limited size for hash cross join (mb)",
DEFAULT, Long.class, Validators.min("0")),
HASH_GROUPBY_SIZE_LIMIT(ConfVars.$EXECUTOR_GROUPBY_INMEMORY_HASH_THRESHOLD, "limited size for hash groupby (mb)",
DEFAULT, Long.class, Validators.min("0")),
MAX_OUTPUT_FILE_SIZE(ConfVars.$MAX_OUTPUT_FILE_SIZE, "Maximum per-output file size (mb). 0 means infinite.", DEFAULT,
Expand Down
2 changes: 2 additions & 0 deletions tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,8 @@ public static enum ConfVars implements ConfigKey {
(long)256 * 1048576),
$EXECUTOR_OUTER_HASH_JOIN_SIZE_THRESHOLD("tajo.executor.join.outer.in-memory-hash-threshold-bytes",
(long)256 * 1048576),
$EXECUTOR_CROSS_HASH_JOIN_SIZE_THRESHOLD("tajo.executor.join.cross.in-memory-hash-threshold-bytes",
(long)256 * 1048576),
$EXECUTOR_GROUPBY_INMEMORY_HASH_THRESHOLD("tajo.executor.groupby.in-memory-hash-threshold-bytes",
(long)256 * 1048576),
$MAX_OUTPUT_FILE_SIZE("tajo.query.max-outfile-size-mb", 0), // zero means infinite
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,6 @@ public interface PhysicalPlanner {
public PhysicalExec createPlan(TaskAttemptContext context,
LogicalNode logicalPlan)
throws InternalException;

enum INPUT { LEFT, RIGHT }
}
Original file line number Diff line number Diff line change
Expand Up @@ -256,28 +256,38 @@ public long estimateSizeRecursive(TaskAttemptContext ctx, String [] tableIds) th
return size;
}

@VisibleForTesting
public boolean checkIfInMemoryInnerJoinIsPossible(TaskAttemptContext context, LogicalNode node, boolean left)
public boolean isInnerJoinHashApplicable(TaskAttemptContext context, LogicalNode node, INPUT input)
throws IOException {
String [] lineage = PlannerUtil.getRelationLineage(node);
long volume = estimateSizeRecursive(context, lineage);
boolean inMemoryInnerJoinFlag = false;
return isHashApplicable(context, node, SessionVars.INNER_HASH_JOIN_SIZE_LIMIT, input);
}

QueryContext queryContext = context.getQueryContext();
public boolean isOuterJoinHashApplicable(TaskAttemptContext context, LogicalNode node, INPUT input)
throws IOException {
return isHashApplicable(context, node, SessionVars.OUTER_HASH_JOIN_SIZE_LIMIT, input);
}

if (queryContext.containsKey(SessionVars.INNER_HASH_JOIN_SIZE_LIMIT)) {
inMemoryInnerJoinFlag = volume <= context.getQueryContext().getLong(SessionVars.INNER_HASH_JOIN_SIZE_LIMIT);
} else {
inMemoryInnerJoinFlag = volume <= context.getQueryContext().getLong(SessionVars.HASH_JOIN_SIZE_LIMIT);
}
public boolean isCrossJoinHashApplicable(TaskAttemptContext context, LogicalNode node, INPUT input)
throws IOException {
return isHashApplicable(context, node, SessionVars.CROSS_HASH_JOIN_SIZE_LIMIT, input);
}

private boolean isHashApplicable(TaskAttemptContext context, LogicalNode node,
SessionVars sessionVar, INPUT input) throws IOException {
String [] lineage = PlannerUtil.getRelationLineage(node);
long volume = estimateSizeRecursive(context, lineage);

LOG.info(String.format("[%s] the volume of %s relations (%s) is %s and is %sfit to main maemory.",
boolean applicable = volume <= getThreshold(context.getQueryContext(), sessionVar);
LOG.info(String.format("[%s] the volume of %s relations (%s) is %s and is %sfit to main memory.",
context.getTaskId().toString(),
(left ? "Left" : "Right"),
input.name().toLowerCase(),
TUtil.arrayToString(lineage),
FileUtil.humanReadableByteCount(volume, false),
(inMemoryInnerJoinFlag ? "" : "not ")));
return inMemoryInnerJoinFlag;
(applicable ? "" : "not ")));
return applicable;
}

private long getThreshold(QueryContext context, SessionVars sessionVar) {
return context.getLong(sessionVar, context.getLong(SessionVars.HASH_JOIN_SIZE_LIMIT));
}

public PhysicalExec createJoinPlan(TaskAttemptContext context, JoinNode joinNode, PhysicalExec leftExec,
Expand Down Expand Up @@ -325,6 +335,9 @@ private PhysicalExec createCrossJoinPlan(TaskAttemptContext context, JoinNode pl
JoinAlgorithm algorithm = property.getJoin().getAlgorithm();

switch (algorithm) {
case IN_MEMORY_HASH_JOIN:
LOG.info("Join (" + plan.getPID() +") chooses [Hash Join]");
return new HashCrossJoinExec(context, plan, leftExec, rightExec);
case NESTED_LOOP_JOIN:
LOG.info("Join (" + plan.getPID() +") chooses [Nested Loop Join]");
return new NLJoinExec(context, plan, leftExec, rightExec);
Expand All @@ -336,10 +349,19 @@ private PhysicalExec createCrossJoinPlan(TaskAttemptContext context, JoinNode pl
LOG.error("Invalid Cross Join Algorithm Enforcer: " + algorithm.name());
return new BNLJoinExec(context, plan, leftExec, rightExec);
}
}

} else {
return new BNLJoinExec(context, plan, leftExec, rightExec);
boolean inMemoryHashJoin =
isCrossJoinHashApplicable(context, plan.getLeftChild(), INPUT.LEFT) ||
isCrossJoinHashApplicable(context, plan.getRightChild(), INPUT.RIGHT);

if (inMemoryHashJoin) {
LOG.info("Join (" + plan.getPID() +") chooses [Hash Join]");
// returns two PhysicalExec. smaller one is 0, and larger one is 1.
PhysicalExec [] orderedChilds = switchJoinSidesIfNecessary(context, plan, leftExec, rightExec);
return new HashCrossJoinExec(context, plan, orderedChilds[1], orderedChilds[0]);
}
return new BNLJoinExec(context, plan, leftExec, rightExec);
}

private PhysicalExec createInnerJoinPlan(TaskAttemptContext context, JoinNode plan,
Expand Down Expand Up @@ -417,11 +439,9 @@ private PhysicalExec createInnerJoinPlan(TaskAttemptContext context, JoinNode pl

private PhysicalExec createBestInnerJoinPlan(TaskAttemptContext context, JoinNode plan,
PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
boolean inMemoryHashJoin = false;
if (checkIfInMemoryInnerJoinIsPossible(context, plan.getLeftChild(), true)
|| checkIfInMemoryInnerJoinIsPossible(context, plan.getRightChild(), false)) {
inMemoryHashJoin = true;
}
boolean inMemoryHashJoin =
isInnerJoinHashApplicable(context, plan.getLeftChild(), INPUT.LEFT) ||
isInnerJoinHashApplicable(context, plan.getRightChild(), INPUT.RIGHT);

if (inMemoryHashJoin) {
LOG.info("Join (" + plan.getPID() +") chooses [In-memory Hash Join]");
Expand Down Expand Up @@ -480,19 +500,7 @@ private PhysicalExec createLeftOuterJoinPlan(TaskAttemptContext context, JoinNod

private PhysicalExec createBestLeftOuterJoinPlan(TaskAttemptContext context, JoinNode plan,
PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
String [] rightLineage = PlannerUtil.getRelationLineage(plan.getRightChild());
long rightTableVolume = estimateSizeRecursive(context, rightLineage);
boolean hashJoin;

QueryContext queryContext = context.getQueryContext();

if (queryContext.containsKey(SessionVars.OUTER_HASH_JOIN_SIZE_LIMIT)) {
hashJoin = rightTableVolume < queryContext.getLong(SessionVars.OUTER_HASH_JOIN_SIZE_LIMIT);
} else {
hashJoin = rightTableVolume < queryContext.getLong(SessionVars.HASH_JOIN_SIZE_LIMIT);
}

if (hashJoin) {
if (isOuterJoinHashApplicable(context, plan, INPUT.RIGHT)) {
// we can implement left outer join using hash join, using the right operand as the build relation
LOG.info("Left Outer Join (" + plan.getPID() +") chooses [Hash Join].");
return new HashLeftOuterJoinExec(context, plan, leftExec, rightExec);
Expand All @@ -508,19 +516,7 @@ private PhysicalExec createBestRightJoinPlan(TaskAttemptContext context, JoinNod
PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
//if the left operand is small enough => implement it as a left outer hash join with exchanged operators (note:
// blocking, but merge join is blocking as well)
String [] outerLineage4 = PlannerUtil.getRelationLineage(plan.getLeftChild());
long leftTableVolume = estimateSizeRecursive(context, outerLineage4);
boolean hashJoin;

QueryContext queryContext = context.getQueryContext();

if (queryContext.containsKey(SessionVars.OUTER_HASH_JOIN_SIZE_LIMIT)) {
hashJoin = leftTableVolume < queryContext.getLong(SessionVars.OUTER_HASH_JOIN_SIZE_LIMIT);
} else {
hashJoin = leftTableVolume < queryContext.getLong(SessionVars.HASH_JOIN_SIZE_LIMIT);
}

if (hashJoin){
if (isOuterJoinHashApplicable(context, plan, INPUT.LEFT)){
LOG.info("Right Outer Join (" + plan.getPID() +") chooses [Hash Join].");
return new HashLeftOuterJoinExec(context, plan, rightExec, leftExec);
} else {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.tajo.engine.planner.physical;

import org.apache.tajo.plan.logical.JoinNode;
import org.apache.tajo.storage.Tuple;
import org.apache.tajo.storage.VTuple;
import org.apache.tajo.worker.TaskAttemptContext;

import java.io.IOException;
import java.util.Iterator;
import java.util.List;

public class HashCrossJoinExec extends HashJoinExec {

private Iterator<List<Tuple>> outIterator;

public HashCrossJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec leftExec,
PhysicalExec rightExec) {
super(context, plan, leftExec, rightExec);
}

@Override
public Tuple next() throws IOException {
if (first) {
loadRightToHashTable();
}
if (tupleSlots.isEmpty()) {
return null;
}

while (!context.isStopped() && !finished) {
if (shouldGetLeftTuple) {
leftTuple = leftChild.next();
if (leftTuple == null) {
finished = true;
return null;
}
outIterator = tupleSlots.values().iterator();
iterator = outIterator.next().iterator();
shouldGetLeftTuple = false;
}

// getting a next right tuple on in-memory hash table.
while (!iterator.hasNext() && outIterator.hasNext()) {
iterator = outIterator.next().iterator();
}
if (!iterator.hasNext()) {
shouldGetLeftTuple = true;
continue;
}
frameTuple.set(leftTuple, iterator.next());
projector.eval(frameTuple, outTuple);
return new VTuple(outTuple);
}

return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,24 +59,26 @@ public HashJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec left
super(context, plan, leftExec, rightExec);

// HashJoin only can manage equi join key pairs.
this.joinKeyPairs = PlannerUtil.getJoinKeyPairs(joinQual, leftExec.getSchema(),
rightExec.getSchema(), false);
if (hasJoinQual) {
this.joinKeyPairs = PlannerUtil.getJoinKeyPairs(joinQual, leftExec.getSchema(),
rightExec.getSchema(), false);

leftKeyList = new int[joinKeyPairs.size()];
rightKeyList = new int[joinKeyPairs.size()];
leftKeyList = new int[joinKeyPairs.size()];
rightKeyList = new int[joinKeyPairs.size()];

for (int i = 0; i < joinKeyPairs.size(); i++) {
leftKeyList[i] = leftExec.getSchema().getColumnId(joinKeyPairs.get(i)[0].getQualifiedName());
}
for (int i = 0; i < joinKeyPairs.size(); i++) {
leftKeyList[i] = leftExec.getSchema().getColumnId(joinKeyPairs.get(i)[0].getQualifiedName());
}

for (int i = 0; i < joinKeyPairs.size(); i++) {
rightKeyList[i] = rightExec.getSchema().getColumnId(joinKeyPairs.get(i)[1].getQualifiedName());
for (int i = 0; i < joinKeyPairs.size(); i++) {
rightKeyList[i] = rightExec.getSchema().getColumnId(joinKeyPairs.get(i)[1].getQualifiedName());
}
leftKeyTuple = new VTuple(leftKeyList.length);
}

// for join
frameTuple = new FrameTuple();
outTuple = new VTuple(outSchema.size());
leftKeyTuple = new VTuple(leftKeyList.length);
}

protected void getKeyLeftTuple(final Tuple outerTuple, Tuple keyTuple) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@
import java.io.IOException;

import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME;
import static org.apache.tajo.engine.planner.PhysicalPlanner.INPUT.LEFT;
import static org.apache.tajo.engine.planner.PhysicalPlanner.INPUT.RIGHT;
import static org.apache.tajo.ipc.TajoWorkerProtocol.JoinEnforce.JoinAlgorithm;
import static org.junit.Assert.*;

Expand Down Expand Up @@ -269,11 +271,11 @@ private static boolean assertCheckInnerJoinRelatedFunctions(TaskAttemptContext c
}

if (leftSmaller) {
assertTrue(phyPlanner.checkIfInMemoryInnerJoinIsPossible(ctx, joinNode.getLeftChild(), true));
assertFalse(phyPlanner.checkIfInMemoryInnerJoinIsPossible(ctx, joinNode.getRightChild(), false));
assertTrue(phyPlanner.isInnerJoinHashApplicable(ctx, joinNode.getLeftChild(), LEFT));
assertFalse(phyPlanner.isInnerJoinHashApplicable(ctx, joinNode.getRightChild(), RIGHT));
} else {
assertFalse(phyPlanner.checkIfInMemoryInnerJoinIsPossible(ctx, joinNode.getLeftChild(), true));
assertTrue(phyPlanner.checkIfInMemoryInnerJoinIsPossible(ctx, joinNode.getRightChild(), false));
assertFalse(phyPlanner.isInnerJoinHashApplicable(ctx, joinNode.getLeftChild(), LEFT));
assertTrue(phyPlanner.isInnerJoinHashApplicable(ctx, joinNode.getRightChild(), RIGHT));
}

return leftSmaller;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ Available Session Variables:
\set HASH_JOIN_SIZE_LIMIT [long value] - limited size for hash join (mb)
\set INNER_HASH_JOIN_SIZE_LIMIT [long value] - limited size for hash inner join (mb)
\set OUTER_HASH_JOIN_SIZE_LIMIT [long value] - limited size for hash outer join (mb)
\set CROSS_HASH_JOIN_SIZE_LIMIT [long value] - limited size for hash cross join (mb)
\set HASH_GROUPBY_SIZE_LIMIT [long value] - limited size for hash groupby (mb)
\set MAX_OUTPUT_FILE_SIZE [int value] - Maximum per-output file size (mb). 0 means infinite.
\set NULL_CHAR [text value] - null char of text file output
Expand Down