From e98cf2c7c7bb3b69e871dec45dd53d9ae567b42b Mon Sep 17 00:00:00 2001 From: JaeHwa Jung Date: Thu, 5 Nov 2015 16:46:48 +0900 Subject: [PATCH 01/19] TAJO-1925: Improve hive compatibility with TIMESTAMP partition column. --- .../org/apache/tajo/algebra/TimeValue.java | 2 +- .../org/apache/tajo/datum/DatumFactory.java | 13 ++++ .../org/apache/tajo/datum/TimestampDatum.java | 7 ++ .../engine/query/TestTablePartitions.java | 61 ++++++++++++++++-- .../HashBasedColPartitionStoreExec.java | 14 +++- .../SortBasedColPartitionStoreExec.java | 17 ++++- .../rules/PartitionedTableRewriter.java | 15 ++++- .../plan/util/EvalNodeToExprConverter.java | 6 ++ .../util/PartitionFilterAlgebraVisitor.java | 64 +++++++++++-------- 9 files changed, 160 insertions(+), 39 deletions(-) diff --git a/tajo-algebra/src/main/java/org/apache/tajo/algebra/TimeValue.java b/tajo-algebra/src/main/java/org/apache/tajo/algebra/TimeValue.java index cc8cc886a5..ebff555149 100644 --- a/tajo-algebra/src/main/java/org/apache/tajo/algebra/TimeValue.java +++ b/tajo-algebra/src/main/java/org/apache/tajo/algebra/TimeValue.java @@ -56,7 +56,7 @@ public boolean hasSecondsFraction() { } public void setSecondsFraction(String secondsFraction) { - this.secondsFraction = StringUtils.rightPad(secondsFraction, 3, '0'); + this.secondsFraction = StringUtils.leftPad(secondsFraction, 3, '0'); } public String getSecondsFraction() { diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/DatumFactory.java b/tajo-common/src/main/java/org/apache/tajo/datum/DatumFactory.java index dd4a4e440a..2727bec8ef 100644 --- a/tajo-common/src/main/java/org/apache/tajo/datum/DatumFactory.java +++ b/tajo-common/src/main/java/org/apache/tajo/datum/DatumFactory.java @@ -19,6 +19,9 @@ package org.apache.tajo.datum; import org.apache.commons.codec.binary.Base64; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.tajo.common.TajoDataTypes; import org.apache.tajo.common.TajoDataTypes.DataType; import org.apache.tajo.common.TajoDataTypes.Type; import org.apache.tajo.exception.InvalidValueForCastException; @@ -35,6 +38,7 @@ import java.util.TimeZone; public class DatumFactory { + protected static final Log LOG = LogFactory.getLog(DatumFactory.class); public static Class getDatumClass(Type type) { switch (type) { @@ -362,6 +366,15 @@ public static TimestampDatum createTimestamp(Datum datum, @Nullable TimeZone tz) return parseTimestamp(datum.asChars(), tz); case TIMESTAMP: return (TimestampDatum) datum; + case INT8: + TimeMeta tm = new TimeMeta(); + DateTimeUtil.toJulianTimeMeta(DateTimeUtil.javaTimeToJulianTime(datum.asInt8()), tm); + if (tz != null) { + DateTimeUtil.toUserTimezone(tm, tz); + } else { + DateTimeUtil.toUserTimezone(tm, TimeZone.getDefault()); + } + return new TimestampDatum(DateTimeUtil.toJulianTimestamp(tm)); default: throw new TajoRuntimeException(new InvalidValueForCastException(datum.type(), Type.TIMESTAMP)); } diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/TimestampDatum.java b/tajo-common/src/main/java/org/apache/tajo/datum/TimestampDatum.java index 5b4c152a51..dd7dbd6ecf 100644 --- a/tajo-common/src/main/java/org/apache/tajo/datum/TimestampDatum.java +++ b/tajo-common/src/main/java/org/apache/tajo/datum/TimestampDatum.java @@ -19,6 +19,8 @@ package org.apache.tajo.datum; import com.google.common.primitives.Longs; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.tajo.common.TajoDataTypes; import org.apache.tajo.exception.InvalidOperationException; import org.apache.tajo.util.Bytes; @@ -30,6 +32,7 @@ import java.util.TimeZone; public class TimestampDatum extends Datum { + private static final Log LOG = LogFactory.getLog(TimestampDatum.class); public static final int SIZE = 8; private long timestamp; @@ -178,6 +181,9 @@ public byte[] asTextBytes() { public Datum equalsTo(Datum datum) { if (datum.type() == TajoDataTypes.Type.TIME) { return timestamp == datum.asInt8() ? BooleanDatum.TRUE : BooleanDatum.FALSE; + } else if(datum.type() == TajoDataTypes.Type.TIMESTAMP) { + TimestampDatum another = (TimestampDatum) datum; + return timestamp == another.timestamp ? BooleanDatum.TRUE : BooleanDatum.FALSE; } else if (datum.isNull()) { return datum; } else { @@ -198,6 +204,7 @@ public int compareTo(Datum datum) { } else if (datum.isNull()) { return -1; } else { + LOG.info("### 2000 ### type:" + datum.type().name() + ", value:" + datum.asChars()); throw new InvalidOperationException(datum.type()); } } diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java index b4bc8ee23f..7b9811191c 100644 --- a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java +++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java @@ -1562,18 +1562,65 @@ public final void testTimestampPartitionColumn() throws Exception { executeString( "insert overwrite into " + tableName - + " select l_orderkey, l_partkey, to_timestamp(l_shipdate, 'YYYY-MM-DD') from lineitem"); + + " select l_orderkey, l_partkey, to_timestamp(l_shipdate, 'YYYY-MM-DD') from lineitem " + + " where l_orderkey != 2"); } else { executeString( "create table " + tableName + "(col1 int4, col2 int4) partition by column(key timestamp) " - + " as select l_orderkey, l_partkey, to_timestamp(l_shipdate, 'YYYY-MM-DD') from lineitem"); + + " as select l_orderkey, l_partkey, to_timestamp(l_shipdate, 'YYYY-MM-DD') from lineitem " + + " where l_orderkey != 2"); } + executeString( + "insert overwrite into " + tableName + + " select l_orderkey, l_partkey, TIMESTAMP '1997-01-28 02:50:08.037' from lineitem " + + " where l_orderkey = 2"); + assertTrue(client.existTable(tableName)); + List partitions = catalog.getPartitionsOfTable(DEFAULT_DATABASE_NAME, tableName); + assertEquals(5, partitions.size()); + + // Equals + res = executeString("SELECT * FROM " + tableName + " WHERE key = cast(760147200000 as timestamp)"); + + expectedResult = "col1,col2,key\n" + + "-------------------------------\n" + + "3,2,1994-02-02 00:00:00\n" ; + + assertEquals(expectedResult, resultSetToString(res)); + res.close(); + + res = executeString("SELECT * FROM " + tableName + " WHERE key = TIMESTAMP '1993-11-09 00:00:00.0'"); + + expectedResult = "col1,col2,key\n" + + "-------------------------------\n" + + "3,3,1993-11-09 00:00:00\n"; + + assertEquals(expectedResult, resultSetToString(res)); + res.close(); + + res = executeString("SELECT * FROM " + tableName + " WHERE key = to_timestamp(760147200)"); + + expectedResult = "col1,col2,key\n" + + "-------------------------------\n" + + "3,2,1994-02-02 00:00:00\n" ; + + assertEquals(expectedResult, resultSetToString(res)); + res.close(); + + res = executeString("SELECT * FROM " + tableName + " WHERE key = TIMESTAMP '1997-01-28 02:50:08.037'"); + + expectedResult = "col1,col2,key\n" + + "-------------------------------\n" + + "2,2,1997-01-28 02:50:08.037\n"; + + assertEquals(expectedResult, resultSetToString(res)); + res.close(); + // LessThanOrEquals res = executeString("SELECT * FROM " + tableName - + " WHERE key <= to_timestamp('1995-09-01', 'YYYY-MM-DD') order by col1, col2, key"); + + " WHERE key <= TIMESTAMP '1995-09-01 00:00:00' order by col1, col2, key"); expectedResult = "col1,col2,key\n" + "-------------------------------\n" + @@ -1585,8 +1632,8 @@ public final void testTimestampPartitionColumn() throws Exception { // LessThan and GreaterThan res = executeString("SELECT * FROM " + tableName - + " WHERE key > to_timestamp('1993-01-01', 'YYYY-MM-DD') and " + - "key < to_timestamp('1996-01-01', 'YYYY-MM-DD') order by col1, col2, key desc"); + + " WHERE key > TIMESTAMP '1993-01-01 00:00:00' and " + + "key < TIMESTAMP '1996-01-01 00:00:00' order by col1, col2, key desc"); expectedResult = "col1,col2,key\n" + "-------------------------------\n" + @@ -1598,8 +1645,8 @@ public final void testTimestampPartitionColumn() throws Exception { // Between res = executeString("SELECT * FROM " + tableName - + " WHERE key between to_timestamp('1993-01-01', 'YYYY-MM-DD') " + - "and to_timestamp('1997-01-01', 'YYYY-MM-DD') order by col1, col2, key desc"); + + " WHERE key between TIMESTAMP '1993-01-01 00:00:00' " + + "and TIMESTAMP '1997-01-01 00:00:00' order by col1, col2, key desc"); expectedResult = "col1,col2,key\n" + "-------------------------------\n" + diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashBasedColPartitionStoreExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashBasedColPartitionStoreExec.java index c7987de240..b4e99ecba1 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashBasedColPartitionStoreExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashBasedColPartitionStoreExec.java @@ -20,15 +20,19 @@ import org.apache.tajo.catalog.statistics.StatisticsUtil; import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.common.TajoDataTypes; import org.apache.tajo.datum.Datum; +import org.apache.tajo.datum.TimestampDatum; import org.apache.tajo.engine.planner.physical.ComparableVector.ComparableTuple; import org.apache.tajo.plan.logical.StoreTableNode; import org.apache.tajo.storage.Appender; import org.apache.tajo.storage.Tuple; import org.apache.tajo.util.StringUtils; +import org.apache.tajo.util.datetime.DateTimeFormat; import org.apache.tajo.worker.TaskAttemptContext; import java.io.IOException; +import java.sql.Timestamp; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -62,7 +66,15 @@ private Appender getAppender(ComparableTuple partitionKey, Tuple tuple) throws I } sb.append(keyNames[i]).append('='); Datum datum = tuple.asDatum(keyIds[i]); - sb.append(StringUtils.escapePathName(datum.asChars())); + + if (datum.type() == TajoDataTypes.Type.TIMESTAMP) { + // Converts TimeMeta to formatted string for Hive compatibility. + TimestampDatum timestampDatum = (TimestampDatum) datum; + Timestamp timestamp = new Timestamp(timestampDatum.getJavaTimestamp()); + sb.append(StringUtils.escapePathName(timestamp.toString())); + } else { + sb.append(StringUtils.escapePathName(datum.asChars())); + } } appender = getNextPartitionAppender(sb.toString()); diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java index 176b6fb501..359c70d4dc 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java @@ -22,14 +22,21 @@ package org.apache.tajo.engine.planner.physical; import org.apache.tajo.catalog.statistics.StatisticsUtil; +import org.apache.tajo.common.TajoDataTypes; import org.apache.tajo.datum.Datum; +import org.apache.tajo.datum.DatumFactory; +import org.apache.tajo.datum.TimestampDatum; import org.apache.tajo.engine.planner.physical.ComparableVector.ComparableTuple; import org.apache.tajo.plan.logical.StoreTableNode; import org.apache.tajo.storage.Tuple; import org.apache.tajo.util.StringUtils; +import org.apache.tajo.util.datetime.DateTimeFormat; import org.apache.tajo.worker.TaskAttemptContext; import java.io.IOException; +import java.sql.Timestamp; +import java.text.DateFormat; +import java.text.SimpleDateFormat; /** * It stores a sorted data set into a number of partition files. It assumes that input tuples are sorted in an @@ -54,7 +61,15 @@ private String getSubdirectory(Tuple tuple) { sb.append('/'); } sb.append(keyNames[i]).append('='); - sb.append(StringUtils.escapePathName(datum.asChars())); + + if (datum.type() == TajoDataTypes.Type.TIMESTAMP) { + // Converts TimeMeta to formatted string for Hive compatibility. + TimestampDatum timestampDatum = (TimestampDatum) datum; + Timestamp timestamp = new Timestamp(timestampDatum.getJavaTimestamp()); + sb.append(StringUtils.escapePathName(timestamp.toString())); + } else { + sb.append(StringUtils.escapePathName(datum.asChars())); + } } return sb.toString(); } diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java index cf54f7b4c7..1e52f64aca 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java @@ -28,8 +28,10 @@ import org.apache.tajo.catalog.partition.PartitionMethodDesc; import org.apache.tajo.catalog.proto.CatalogProtos.PartitionsByAlgebraProto; import org.apache.tajo.catalog.proto.CatalogProtos.PartitionDescProto; +import org.apache.tajo.common.TajoDataTypes; import org.apache.tajo.datum.DatumFactory; import org.apache.tajo.datum.NullDatum; +import org.apache.tajo.datum.TimestampDatum; import org.apache.tajo.exception.*; import org.apache.tajo.plan.LogicalPlan; import org.apache.tajo.plan.expr.*; @@ -42,8 +44,10 @@ import org.apache.tajo.storage.Tuple; import org.apache.tajo.storage.VTuple; import org.apache.tajo.util.StringUtils; +import org.apache.tajo.util.datetime.DateTimeUtil; import java.io.IOException; +import java.sql.Timestamp; import java.util.*; public class PartitionedTableRewriter implements LogicalPlanRewriteRule { @@ -464,7 +468,16 @@ public static Tuple buildTupleFromPartitionPath(Schema partitionColumnSchema, Pa } int columnId = partitionColumnSchema.getColumnIdByName(parts[0]); Column keyColumn = partitionColumnSchema.getColumn(columnId); - tuple.put(columnId, DatumFactory.createFromString(keyColumn.getDataType(), StringUtils.unescapePathName(parts[1]))); + + if (keyColumn.getDataType().getType() == TajoDataTypes.Type.TIMESTAMP) { + Timestamp timestamp = Timestamp.valueOf(StringUtils.unescapePathName(parts[1])); + long julianTime = DateTimeUtil.javaTimeToJulianTime(timestamp.getTime()); + TimestampDatum timestampDatum = new TimestampDatum(julianTime); + tuple.put(columnId, timestampDatum); + } else { + tuple.put(columnId, DatumFactory.createFromString(keyColumn.getDataType(), + StringUtils.unescapePathName(parts[1]))); + } } for (; i < partitionColumnSchema.size(); i++) { tuple.put(i, NullDatum.get()); diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/util/EvalNodeToExprConverter.java b/tajo-plan/src/main/java/org/apache/tajo/plan/util/EvalNodeToExprConverter.java index 139bcc0671..0871162b99 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/util/EvalNodeToExprConverter.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/util/EvalNodeToExprConverter.java @@ -18,6 +18,8 @@ package org.apache.tajo.plan.util; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.tajo.algebra.*; import org.apache.tajo.datum.DateDatum; import org.apache.tajo.datum.Datum; @@ -32,6 +34,8 @@ * */ public class EvalNodeToExprConverter extends SimpleEvalNodeVisitor { + protected final Log LOG = LogFactory.getLog(getClass()); + private Stack exprs = new Stack<>(); private String tableName; @@ -186,6 +190,8 @@ protected EvalNode visitConst(Object o, ConstEval evalNode, Stack stac timeValue = new TimeValue(""+timestampDatum.getHourOfDay() , ""+timestampDatum.getMinuteOfHour(), ""+timestampDatum.getSecondOfMinute()); + timeValue.setSecondsFraction(Integer.toString(timestampDatum.getMillisOfSecond())); + value = new TimestampLiteral(dateValue, timeValue); break; case TIME: diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/util/PartitionFilterAlgebraVisitor.java b/tajo-plan/src/main/java/org/apache/tajo/plan/util/PartitionFilterAlgebraVisitor.java index 72fd93969c..276c2f40f4 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/util/PartitionFilterAlgebraVisitor.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/util/PartitionFilterAlgebraVisitor.java @@ -19,6 +19,9 @@ package org.apache.tajo.plan.util; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.tajo.algebra.*; import org.apache.tajo.catalog.CatalogConstants; import org.apache.tajo.catalog.Column; @@ -30,12 +33,12 @@ import org.apache.tajo.plan.visitor.SimpleAlgebraVisitor; import org.apache.tajo.util.Pair; import org.apache.tajo.util.TUtil; +import org.apache.tajo.util.datetime.DateTimeFormat; import org.apache.tajo.util.datetime.DateTimeUtil; import org.apache.tajo.util.datetime.TimeMeta; import java.sql.Date; import java.sql.Time; -import java.sql.Timestamp; import java.util.List; import java.util.Stack; import java.util.TimeZone; @@ -46,6 +49,8 @@ * */ public class PartitionFilterAlgebraVisitor extends SimpleAlgebraVisitor { + protected final Log LOG = LogFactory.getLog(getClass()); + private String tableAlias; private Column column; private boolean isHiveCatalog = false; @@ -122,38 +127,41 @@ public Expr visitDateLiteral(Object ctx, Stack stack, DateLiteral expr) th @Override public Expr visitTimestampLiteral(Object ctx, Stack stack, TimestampLiteral expr) throws TajoException { StringBuilder sb = new StringBuilder(); + DateValue dateValue = expr.getDate(); + TimeValue timeValue = expr.getTime(); + + int [] dates = ExprAnnotator.dateToIntArray(dateValue.getYears(), + dateValue.getMonths(), + dateValue.getDays()); + int [] times = ExprAnnotator.timeToIntArray(timeValue.getHours(), + timeValue.getMinutes(), + timeValue.getSeconds(), + timeValue.getSecondsFraction()); + + long julianTimestamp; + if (timeValue.hasSecondsFraction()) { + julianTimestamp = DateTimeUtil.toJulianTimestamp(dates[0], dates[1], dates[2], times[0], times[1], times[2], + times[3] * 1000); + } else { + julianTimestamp = DateTimeUtil.toJulianTimestamp(dates[0], dates[1], dates[2], times[0], times[1], times[2], 0); + } - if (!isHiveCatalog) { - DateValue dateValue = expr.getDate(); - TimeValue timeValue = expr.getTime(); - - int [] dates = ExprAnnotator.dateToIntArray(dateValue.getYears(), - dateValue.getMonths(), - dateValue.getDays()); - int [] times = ExprAnnotator.timeToIntArray(timeValue.getHours(), - timeValue.getMinutes(), - timeValue.getSeconds(), - timeValue.getSecondsFraction()); - - long julianTimestamp; - if (timeValue.hasSecondsFraction()) { - julianTimestamp = DateTimeUtil.toJulianTimestamp(dates[0], dates[1], dates[2], times[0], times[1], times[2], - times[3] * 1000); - } else { - julianTimestamp = DateTimeUtil.toJulianTimestamp(dates[0], dates[1], dates[2], times[0], times[1], times[2], 0); - } - - TimeMeta tm = new TimeMeta(); - DateTimeUtil.toJulianTimeMeta(julianTimestamp, tm); + TimeMeta tm = new TimeMeta(); + DateTimeUtil.toJulianTimeMeta(julianTimestamp, tm); - TimeZone tz = TimeZone.getDefault(); - DateTimeUtil.toUTCTimezone(tm, tz); + String dateFormat = DateTimeFormat.to_char(tm, "yyyy-MM-dd HH24:MI:SS"); + if (tm.fsecs == 0) { + dateFormat += ".0"; + } else { + int secondsFraction = tm.fsecs / 1000; + dateFormat += "." + StringUtils.leftPad(""+secondsFraction, 3, '0'); + } + if (!isHiveCatalog) { sb.append("?").append(" )"); - Timestamp timestamp = new Timestamp(DateTimeUtil.julianTimeToJavaTime(DateTimeUtil.toJulianTimestamp(tm))); - parameters.add(new Pair(Type.TIMESTAMP, timestamp)); + parameters.add(new Pair(Type.TEXT, dateFormat)); } else { - sb.append("\"").append(expr.toString()).append("\""); + throw new UnsupportedException("Timestamp type"); } queries.push(sb.toString()); From 36e9ec498b7486cb63c78b54d63e438834a51a6a Mon Sep 17 00:00:00 2001 From: JaeHwa Jung Date: Thu, 5 Nov 2015 16:49:54 +0900 Subject: [PATCH 02/19] Remove debug logs --- .../src/main/java/org/apache/tajo/datum/DatumFactory.java | 4 ---- .../src/main/java/org/apache/tajo/datum/TimestampDatum.java | 4 ---- 2 files changed, 8 deletions(-) diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/DatumFactory.java b/tajo-common/src/main/java/org/apache/tajo/datum/DatumFactory.java index 2727bec8ef..a7c9e015d9 100644 --- a/tajo-common/src/main/java/org/apache/tajo/datum/DatumFactory.java +++ b/tajo-common/src/main/java/org/apache/tajo/datum/DatumFactory.java @@ -19,9 +19,6 @@ package org.apache.tajo.datum; import org.apache.commons.codec.binary.Base64; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.tajo.common.TajoDataTypes; import org.apache.tajo.common.TajoDataTypes.DataType; import org.apache.tajo.common.TajoDataTypes.Type; import org.apache.tajo.exception.InvalidValueForCastException; @@ -38,7 +35,6 @@ import java.util.TimeZone; public class DatumFactory { - protected static final Log LOG = LogFactory.getLog(DatumFactory.class); public static Class getDatumClass(Type type) { switch (type) { diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/TimestampDatum.java b/tajo-common/src/main/java/org/apache/tajo/datum/TimestampDatum.java index dd7dbd6ecf..cf06601c68 100644 --- a/tajo-common/src/main/java/org/apache/tajo/datum/TimestampDatum.java +++ b/tajo-common/src/main/java/org/apache/tajo/datum/TimestampDatum.java @@ -19,8 +19,6 @@ package org.apache.tajo.datum; import com.google.common.primitives.Longs; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.tajo.common.TajoDataTypes; import org.apache.tajo.exception.InvalidOperationException; import org.apache.tajo.util.Bytes; @@ -32,7 +30,6 @@ import java.util.TimeZone; public class TimestampDatum extends Datum { - private static final Log LOG = LogFactory.getLog(TimestampDatum.class); public static final int SIZE = 8; private long timestamp; @@ -204,7 +201,6 @@ public int compareTo(Datum datum) { } else if (datum.isNull()) { return -1; } else { - LOG.info("### 2000 ### type:" + datum.type().name() + ", value:" + datum.asChars()); throw new InvalidOperationException(datum.type()); } } From 88d1335b36df19d58e2d68b7671e6405df686eec Mon Sep 17 00:00:00 2001 From: JaeHwa Jung Date: Thu, 5 Nov 2015 17:08:07 +0900 Subject: [PATCH 03/19] Add more descriptions for added codes --- .../src/main/java/org/apache/tajo/datum/DatumFactory.java | 2 ++ .../planner/physical/HashBasedColPartitionStoreExec.java | 3 ++- .../planner/physical/SortBasedColPartitionStoreExec.java | 3 ++- .../tajo/plan/rewrite/rules/PartitionedTableRewriter.java | 2 ++ .../apache/tajo/plan/util/PartitionFilterAlgebraVisitor.java | 4 ++++ 5 files changed, 12 insertions(+), 2 deletions(-) diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/DatumFactory.java b/tajo-common/src/main/java/org/apache/tajo/datum/DatumFactory.java index a7c9e015d9..5ea6e046aa 100644 --- a/tajo-common/src/main/java/org/apache/tajo/datum/DatumFactory.java +++ b/tajo-common/src/main/java/org/apache/tajo/datum/DatumFactory.java @@ -363,6 +363,8 @@ public static TimestampDatum createTimestamp(Datum datum, @Nullable TimeZone tz) case TIMESTAMP: return (TimestampDatum) datum; case INT8: + // TimestampDatum use UTC based Julian time microseconds. So this need to convert long number to julian time + // microseconds. And if users set their timezone, this must apply the timezone for correct query result. TimeMeta tm = new TimeMeta(); DateTimeUtil.toJulianTimeMeta(DateTimeUtil.javaTimeToJulianTime(datum.asInt8()), tm); if (tz != null) { diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashBasedColPartitionStoreExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashBasedColPartitionStoreExec.java index b4e99ecba1..07d1e0067f 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashBasedColPartitionStoreExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashBasedColPartitionStoreExec.java @@ -68,7 +68,8 @@ private Appender getAppender(ComparableTuple partitionKey, Tuple tuple) throws I Datum datum = tuple.asDatum(keyIds[i]); if (datum.type() == TajoDataTypes.Type.TIMESTAMP) { - // Converts TimeMeta to formatted string for Hive compatibility. + // Hive automatically converts TIMESTAMP value to STRING literals which are accepted in the format YYYY-MM-DD + // HH:MM:SS.MS. So Tajo need to convert TimestampDatum to formatted string for Hive compatibility. TimestampDatum timestampDatum = (TimestampDatum) datum; Timestamp timestamp = new Timestamp(timestampDatum.getJavaTimestamp()); sb.append(StringUtils.escapePathName(timestamp.toString())); diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java index 359c70d4dc..4ec2baa2e3 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java @@ -63,7 +63,8 @@ private String getSubdirectory(Tuple tuple) { sb.append(keyNames[i]).append('='); if (datum.type() == TajoDataTypes.Type.TIMESTAMP) { - // Converts TimeMeta to formatted string for Hive compatibility. + // Hive automatically converts TIMESTAMP value to STRING literals which are accepted in the format YYYY-MM-DD + // HH:MM:SS.MS. So Tajo need to convert TimestampDatum to formatted string for Hive compatibility. TimestampDatum timestampDatum = (TimestampDatum) datum; Timestamp timestamp = new Timestamp(timestampDatum.getJavaTimestamp()); sb.append(StringUtils.escapePathName(timestamp.toString())); diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java index 1e52f64aca..942f27452d 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java @@ -470,6 +470,8 @@ public static Tuple buildTupleFromPartitionPath(Schema partitionColumnSchema, Pa Column keyColumn = partitionColumnSchema.getColumn(columnId); if (keyColumn.getDataType().getType() == TajoDataTypes.Type.TIMESTAMP) { + // TimestampDatum use UTC based Julian time microseconds. So this need to convert the number of milliseconds + // to julian time microseconds. Timestamp timestamp = Timestamp.valueOf(StringUtils.unescapePathName(parts[1])); long julianTime = DateTimeUtil.javaTimeToJulianTime(timestamp.getTime()); TimestampDatum timestampDatum = new TimestampDatum(julianTime); diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/util/PartitionFilterAlgebraVisitor.java b/tajo-plan/src/main/java/org/apache/tajo/plan/util/PartitionFilterAlgebraVisitor.java index 276c2f40f4..8a4d03c0f6 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/util/PartitionFilterAlgebraVisitor.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/util/PartitionFilterAlgebraVisitor.java @@ -161,6 +161,10 @@ public Expr visitTimestampLiteral(Object ctx, Stack stack, TimestampLitera sb.append("?").append(" )"); parameters.add(new Pair(Type.TEXT, dateFormat)); } else { + // Currently, Hive doesn't support to use Timestamp type using partition api. As a result, if Tajo uses Timestamp + // type, Hive will throws MetaException. Also Hive doesn't allow cast operator using partition api. So if there + // is any Timestamp type on filter conditions, Tajo must get all list of partitions of the table and build + // correct query result with the list. throw new UnsupportedException("Timestamp type"); } queries.push(sb.toString()); From 9856ddae8eb1022af36a8999169a5a429b652377 Mon Sep 17 00:00:00 2001 From: JaeHwa Jung Date: Thu, 5 Nov 2015 17:16:17 +0900 Subject: [PATCH 04/19] Add more description for PartitionFilterAlgebraVisitor::visitTimestampLiteral --- .../tajo/plan/util/PartitionFilterAlgebraVisitor.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/util/PartitionFilterAlgebraVisitor.java b/tajo-plan/src/main/java/org/apache/tajo/plan/util/PartitionFilterAlgebraVisitor.java index 8a4d03c0f6..9661e7db49 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/util/PartitionFilterAlgebraVisitor.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/util/PartitionFilterAlgebraVisitor.java @@ -20,8 +20,6 @@ package org.apache.tajo.plan.util; import org.apache.commons.lang.StringUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.tajo.algebra.*; import org.apache.tajo.catalog.CatalogConstants; import org.apache.tajo.catalog.Column; @@ -49,8 +47,6 @@ * */ public class PartitionFilterAlgebraVisitor extends SimpleAlgebraVisitor { - protected final Log LOG = LogFactory.getLog(getClass()); - private String tableAlias; private Column column; private boolean isHiveCatalog = false; @@ -149,6 +145,9 @@ public Expr visitTimestampLiteral(Object ctx, Stack stack, TimestampLitera TimeMeta tm = new TimeMeta(); DateTimeUtil.toJulianTimeMeta(julianTimestamp, tm); + // For Hive compatibility, Tajo need to convert TimeMeta to STRING literals which are accepted in the format + // YYYY-MM-DD HH:MM:SS.MS. Also if there is no fractional seconds, we should use '.0' for nanos value in + // accordance with Hive partition naming rule. String dateFormat = DateTimeFormat.to_char(tm, "yyyy-MM-dd HH24:MI:SS"); if (tm.fsecs == 0) { dateFormat += ".0"; From 2743bfbf63855a80bf081e83adc80b4e1fbbbd22 Mon Sep 17 00:00:00 2001 From: JaeHwa Jung Date: Thu, 5 Nov 2015 17:18:00 +0900 Subject: [PATCH 05/19] Remove unncessary codes --- .../org/apache/tajo/plan/util/EvalNodeToExprConverter.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/util/EvalNodeToExprConverter.java b/tajo-plan/src/main/java/org/apache/tajo/plan/util/EvalNodeToExprConverter.java index 0871162b99..9db3f5c4f7 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/util/EvalNodeToExprConverter.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/util/EvalNodeToExprConverter.java @@ -18,8 +18,6 @@ package org.apache.tajo.plan.util; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.tajo.algebra.*; import org.apache.tajo.datum.DateDatum; import org.apache.tajo.datum.Datum; @@ -34,8 +32,6 @@ * */ public class EvalNodeToExprConverter extends SimpleEvalNodeVisitor { - protected final Log LOG = LogFactory.getLog(getClass()); - private Stack exprs = new Stack<>(); private String tableName; From 359d9886b61a33744ee9748a79e077e5b05a77ff Mon Sep 17 00:00:00 2001 From: JaeHwa Jung Date: Fri, 6 Nov 2015 12:30:01 +0900 Subject: [PATCH 06/19] Implement ColPartitionStoreExec::encodeTimestamp --- .../physical/ColPartitionStoreExec.java | 31 +++++++++++++++++++ .../HashBasedColPartitionStoreExec.java | 14 +++------ .../SortBasedColPartitionStoreExec.java | 10 +++--- 3 files changed, 41 insertions(+), 14 deletions(-) diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java index 445644b181..101b55ea9a 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java @@ -39,9 +39,13 @@ import org.apache.tajo.storage.*; import org.apache.tajo.unit.StorageUnit; import org.apache.tajo.util.StringUtils; +import org.apache.tajo.util.datetime.DateTimeFormat; +import org.apache.tajo.util.datetime.DateTimeUtil; +import org.apache.tajo.util.datetime.TimeMeta; import org.apache.tajo.worker.TaskAttemptContext; import java.io.IOException; +import java.util.TimeZone; public abstract class ColPartitionStoreExec extends UnaryPhysicalExec { private static Log LOG = LogFactory.getLog(ColPartitionStoreExec.class); @@ -193,6 +197,33 @@ private void addPartition(String partition) throws IOException { context.addPartition(builder.build()); } + /** + * Convert TimestampDatum to formatted string for Hive compatibility with users timezone. + * + * @param tm TimeMeta + * @return + */ + protected String encodeTimestamp(TimeMeta tm) { + StringBuilder sb = new StringBuilder(); + + TimeZone tz = null; + if (context.getQueryContext().containsKey(SessionVars.TIMEZONE)) { + tz = TimeZone.getTimeZone(context.getQueryContext().get(SessionVars.TIMEZONE)); + } else { + tz = TimeZone.getDefault(); + } + DateTimeUtil.toUserTimezone(tm, tz); + + sb.append(DateTimeFormat.to_char(tm, "yyyy-MM-dd HH24:MI:SS")); + if (tm.fsecs == 0) { + sb.append(".0"); + } else { + int secondsFraction = tm.fsecs / 1000; + sb.append(".").append(org.apache.commons.lang.StringUtils.leftPad("" + secondsFraction, 3, '0')); + } + return sb.toString(); + } + public void openAppender(int suffixId) throws IOException { Path actualFilePath = lastFileName; if (suffixId > 0) { diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashBasedColPartitionStoreExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashBasedColPartitionStoreExec.java index 07d1e0067f..eaf5385e29 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashBasedColPartitionStoreExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashBasedColPartitionStoreExec.java @@ -18,6 +18,7 @@ package org.apache.tajo.engine.planner.physical; +import org.apache.tajo.SessionVars; import org.apache.tajo.catalog.statistics.StatisticsUtil; import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.common.TajoDataTypes; @@ -29,14 +30,13 @@ import org.apache.tajo.storage.Tuple; import org.apache.tajo.util.StringUtils; import org.apache.tajo.util.datetime.DateTimeFormat; +import org.apache.tajo.util.datetime.DateTimeUtil; +import org.apache.tajo.util.datetime.TimeMeta; import org.apache.tajo.worker.TaskAttemptContext; import java.io.IOException; import java.sql.Timestamp; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; /** * This class is a physical operator to store at column partitioned table. @@ -68,11 +68,7 @@ private Appender getAppender(ComparableTuple partitionKey, Tuple tuple) throws I Datum datum = tuple.asDatum(keyIds[i]); if (datum.type() == TajoDataTypes.Type.TIMESTAMP) { - // Hive automatically converts TIMESTAMP value to STRING literals which are accepted in the format YYYY-MM-DD - // HH:MM:SS.MS. So Tajo need to convert TimestampDatum to formatted string for Hive compatibility. - TimestampDatum timestampDatum = (TimestampDatum) datum; - Timestamp timestamp = new Timestamp(timestampDatum.getJavaTimestamp()); - sb.append(StringUtils.escapePathName(timestamp.toString())); + sb.append(encodeTimestamp(tuple.getTimeDate(keyIds[i]))); } else { sb.append(StringUtils.escapePathName(datum.asChars())); } diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java index 4ec2baa2e3..638e9b737f 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java @@ -21,6 +21,7 @@ */ package org.apache.tajo.engine.planner.physical; +import org.apache.tajo.SessionVars; import org.apache.tajo.catalog.statistics.StatisticsUtil; import org.apache.tajo.common.TajoDataTypes; import org.apache.tajo.datum.Datum; @@ -31,12 +32,15 @@ import org.apache.tajo.storage.Tuple; import org.apache.tajo.util.StringUtils; import org.apache.tajo.util.datetime.DateTimeFormat; +import org.apache.tajo.util.datetime.DateTimeUtil; +import org.apache.tajo.util.datetime.TimeMeta; import org.apache.tajo.worker.TaskAttemptContext; import java.io.IOException; import java.sql.Timestamp; import java.text.DateFormat; import java.text.SimpleDateFormat; +import java.util.TimeZone; /** * It stores a sorted data set into a number of partition files. It assumes that input tuples are sorted in an @@ -63,11 +67,7 @@ private String getSubdirectory(Tuple tuple) { sb.append(keyNames[i]).append('='); if (datum.type() == TajoDataTypes.Type.TIMESTAMP) { - // Hive automatically converts TIMESTAMP value to STRING literals which are accepted in the format YYYY-MM-DD - // HH:MM:SS.MS. So Tajo need to convert TimestampDatum to formatted string for Hive compatibility. - TimestampDatum timestampDatum = (TimestampDatum) datum; - Timestamp timestamp = new Timestamp(timestampDatum.getJavaTimestamp()); - sb.append(StringUtils.escapePathName(timestamp.toString())); + sb.append(encodeTimestamp(tuple.getTimeDate(keyIds[i]))); } else { sb.append(StringUtils.escapePathName(datum.asChars())); } From 2e09493e91bccc44c2680408da81843df4b4191d Mon Sep 17 00:00:00 2001 From: JaeHwa Jung Date: Fri, 6 Nov 2015 12:33:43 +0900 Subject: [PATCH 07/19] Escape the path name of timestamp partition --- .../tajo/engine/planner/physical/ColPartitionStoreExec.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java index 101b55ea9a..e19936bc44 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java @@ -214,7 +214,8 @@ protected String encodeTimestamp(TimeMeta tm) { } DateTimeUtil.toUserTimezone(tm, tz); - sb.append(DateTimeFormat.to_char(tm, "yyyy-MM-dd HH24:MI:SS")); + sb.append(StringUtils.escapePathName(DateTimeFormat.to_char(tm, "yyyy-MM-dd HH24:MI:SS"))); + if (tm.fsecs == 0) { sb.append(".0"); } else { From 84c016426a817159c3b2dc40675d5951236b40a3 Mon Sep 17 00:00:00 2001 From: JaeHwa Jung Date: Fri, 6 Nov 2015 12:45:59 +0900 Subject: [PATCH 08/19] Apply users timezone to partition pruning. --- .../tajo/catalog/store/HiveCatalogStore.java | 7 ++++++ .../tajo/catalog/store/AbstractDBStore.java | 10 +++++--- .../physical/ColPartitionStoreExec.java | 2 +- .../util/PartitionFilterAlgebraVisitor.java | 23 +++++++++++++------ 4 files changed, 31 insertions(+), 11 deletions(-) diff --git a/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogStore.java b/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogStore.java index f8bc07e410..8d9b411035 100644 --- a/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogStore.java +++ b/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogStore.java @@ -39,6 +39,7 @@ import org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe; import org.apache.hadoop.mapred.TextInputFormat; import org.apache.tajo.BuiltinStorages; +import org.apache.tajo.SessionVars; import org.apache.tajo.TajoConstants; import org.apache.tajo.algebra.Expr; import org.apache.tajo.algebra.IsNullPredicate; @@ -975,6 +976,12 @@ private String getFilter(String databaseName, String tableName, List getPartitionsByAlgebra(PartitionsByAlgebraProto case DATE: pstmt.setDate(currentIndex, (Date) parameter.getSecond()); break; - case TIMESTAMP: - pstmt.setTimestamp(currentIndex, (Timestamp) parameter.getSecond()); - break; case TIME: pstmt.setTime(currentIndex, (Time) parameter.getSecond()); break; @@ -2263,6 +2261,12 @@ private Pair> getSelectStatementAndPartitionFil PartitionFilterAlgebraVisitor visitor = new PartitionFilterAlgebraVisitor(); visitor.setIsHiveCatalog(false); + if (conf.get(SessionVars.TIMEZONE.getConfVars().keyname()) != null) { + visitor.setTimezoneId(conf.get(SessionVars.TIMEZONE.getConfVars().keyname())); + } else { + visitor.setTimezoneId(TimeZone.getDefault().getID()); + } + Expr[] filters = AlgebraicUtil.getRearrangedCNFExpressions(tableName, partitionColumns, exprs); StringBuffer sb = new StringBuffer(); diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java index e19936bc44..a41ff019e7 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java @@ -215,7 +215,7 @@ protected String encodeTimestamp(TimeMeta tm) { DateTimeUtil.toUserTimezone(tm, tz); sb.append(StringUtils.escapePathName(DateTimeFormat.to_char(tm, "yyyy-MM-dd HH24:MI:SS"))); - + if (tm.fsecs == 0) { sb.append(".0"); } else { diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/util/PartitionFilterAlgebraVisitor.java b/tajo-plan/src/main/java/org/apache/tajo/plan/util/PartitionFilterAlgebraVisitor.java index 9661e7db49..6f95bf858b 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/util/PartitionFilterAlgebraVisitor.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/util/PartitionFilterAlgebraVisitor.java @@ -50,6 +50,7 @@ public class PartitionFilterAlgebraVisitor extends SimpleAlgebraVisitor queries = new Stack(); private List> parameters = TUtil.newList(); @@ -70,6 +71,14 @@ public void setColumn(Column column) { this.column = column; } + public String getTimezoneId() { + return timezoneId; + } + + public void setTimezoneId(String timezoneId) { + this.timezoneId = timezoneId; + } + public boolean isHiveCatalog() { return isHiveCatalog; } @@ -145,20 +154,20 @@ public Expr visitTimestampLiteral(Object ctx, Stack stack, TimestampLitera TimeMeta tm = new TimeMeta(); DateTimeUtil.toJulianTimeMeta(julianTimestamp, tm); - // For Hive compatibility, Tajo need to convert TimeMeta to STRING literals which are accepted in the format - // YYYY-MM-DD HH:MM:SS.MS. Also if there is no fractional seconds, we should use '.0' for nanos value in - // accordance with Hive partition naming rule. - String dateFormat = DateTimeFormat.to_char(tm, "yyyy-MM-dd HH24:MI:SS"); + TimeZone tz = TimeZone.getTimeZone(getTimezoneId()); + DateTimeUtil.toUserTimezone(tm, tz); + + String dateTime = DateTimeFormat.to_char(tm, "yyyy-MM-dd HH24:MI:SS"); if (tm.fsecs == 0) { - dateFormat += ".0"; + dateTime += ".0"; } else { int secondsFraction = tm.fsecs / 1000; - dateFormat += "." + StringUtils.leftPad(""+secondsFraction, 3, '0'); + dateTime += "." + StringUtils.leftPad(""+secondsFraction, 3, '0'); } if (!isHiveCatalog) { sb.append("?").append(" )"); - parameters.add(new Pair(Type.TEXT, dateFormat)); + parameters.add(new Pair(Type.TEXT, dateTime)); } else { // Currently, Hive doesn't support to use Timestamp type using partition api. As a result, if Tajo uses Timestamp // type, Hive will throws MetaException. Also Hive doesn't allow cast operator using partition api. So if there From 0dad0601edbb710453899b8c9a68350f78bb4670 Mon Sep 17 00:00:00 2001 From: JaeHwa Jung Date: Fri, 6 Nov 2015 14:20:59 +0900 Subject: [PATCH 09/19] Apply UTC timezone for casting operation. --- .../src/main/java/org/apache/tajo/datum/DatumFactory.java | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/DatumFactory.java b/tajo-common/src/main/java/org/apache/tajo/datum/DatumFactory.java index 5ea6e046aa..fbb92dc1de 100644 --- a/tajo-common/src/main/java/org/apache/tajo/datum/DatumFactory.java +++ b/tajo-common/src/main/java/org/apache/tajo/datum/DatumFactory.java @@ -363,14 +363,10 @@ public static TimestampDatum createTimestamp(Datum datum, @Nullable TimeZone tz) case TIMESTAMP: return (TimestampDatum) datum; case INT8: - // TimestampDatum use UTC based Julian time microseconds. So this need to convert long number to julian time - // microseconds. And if users set their timezone, this must apply the timezone for correct query result. TimeMeta tm = new TimeMeta(); DateTimeUtil.toJulianTimeMeta(DateTimeUtil.javaTimeToJulianTime(datum.asInt8()), tm); if (tz != null) { - DateTimeUtil.toUserTimezone(tm, tz); - } else { - DateTimeUtil.toUserTimezone(tm, TimeZone.getDefault()); + DateTimeUtil.toUTCTimezone(tm, tz); } return new TimestampDatum(DateTimeUtil.toJulianTimestamp(tm)); default: From 491d685cbe43066082e55f4ab36507451d4697b8 Mon Sep 17 00:00:00 2001 From: JaeHwa Jung Date: Fri, 6 Nov 2015 15:56:28 +0900 Subject: [PATCH 10/19] Escape partition and unescase partition values --- .../main/java/org/apache/tajo/master/exec/DDLExecutor.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java index da19625341..8e555fc28c 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java @@ -665,7 +665,7 @@ private PartitionDescProto getPartitionDesc(Path tablePath, Path partitionPath, partitionName = partitionName.substring(startIndex + File.separator.length()); CatalogProtos.PartitionDescProto.Builder builder = CatalogProtos.PartitionDescProto.newBuilder(); - builder.setPartitionName(partitionName); + builder.setPartitionName(StringUtils.escapePathName(partitionName)); String[] partitionKeyPairs = partitionName.split("/"); @@ -675,7 +675,7 @@ private PartitionDescProto getPartitionDesc(Path tablePath, Path partitionPath, PartitionKeyProto.Builder keyBuilder = PartitionKeyProto.newBuilder(); keyBuilder.setColumnName(split[0]); - keyBuilder.setPartitionValue(split[1]); + keyBuilder.setPartitionValue(StringUtils.unescapePathName(split[1])); builder.addPartitionKeys(keyBuilder.build()); } From ad644c19b3b6a3c2ba8854e6ba029878ac060025 Mon Sep 17 00:00:00 2001 From: JaeHwa Jung Date: Mon, 9 Nov 2015 02:13:52 +0900 Subject: [PATCH 11/19] Fix SecondsFraction paddning bug --- .../main/java/org/apache/tajo/algebra/TimeValue.java | 2 +- .../planner/physical/ColPartitionStoreExec.java | 12 +++++++++--- .../tajo/plan/util/EvalNodeToExprConverter.java | 3 ++- .../plan/util/PartitionFilterAlgebraVisitor.java | 8 ++++---- 4 files changed, 16 insertions(+), 9 deletions(-) diff --git a/tajo-algebra/src/main/java/org/apache/tajo/algebra/TimeValue.java b/tajo-algebra/src/main/java/org/apache/tajo/algebra/TimeValue.java index ebff555149..cc8cc886a5 100644 --- a/tajo-algebra/src/main/java/org/apache/tajo/algebra/TimeValue.java +++ b/tajo-algebra/src/main/java/org/apache/tajo/algebra/TimeValue.java @@ -56,7 +56,7 @@ public boolean hasSecondsFraction() { } public void setSecondsFraction(String secondsFraction) { - this.secondsFraction = StringUtils.leftPad(secondsFraction, 3, '0'); + this.secondsFraction = StringUtils.rightPad(secondsFraction, 3, '0'); } public String getSecondsFraction() { diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java index a41ff019e7..49aab34620 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java @@ -189,7 +189,7 @@ private void addPartition(String partition) throws IOException { // In CTAS, the uri would be null. So, it get the uri from staging directory. int endIndex = storeTablePath.toString().indexOf(FileTablespace.TMP_STAGING_DIR_PREFIX); String outputPath = storeTablePath.toString().substring(0, endIndex); - builder.setPath(outputPath + partition); + builder.setPath(outputPath + partition); } else { builder.setPath(this.plan.getUri().toString() + "/" + partition); } @@ -216,11 +216,17 @@ protected String encodeTimestamp(TimeMeta tm) { sb.append(StringUtils.escapePathName(DateTimeFormat.to_char(tm, "yyyy-MM-dd HH24:MI:SS"))); + sb.append("."); if (tm.fsecs == 0) { - sb.append(".0"); + sb.append("0"); } else { int secondsFraction = tm.fsecs / 1000; - sb.append(".").append(org.apache.commons.lang.StringUtils.leftPad("" + secondsFraction, 3, '0')); + + if (secondsFraction < 10) { + sb.append(secondsFraction); + } else { + sb.append(org.apache.commons.lang.StringUtils.leftPad("" + secondsFraction, 3, '0')); + } } return sb.toString(); } diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/util/EvalNodeToExprConverter.java b/tajo-plan/src/main/java/org/apache/tajo/plan/util/EvalNodeToExprConverter.java index 9db3f5c4f7..a1905f57d6 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/util/EvalNodeToExprConverter.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/util/EvalNodeToExprConverter.java @@ -18,6 +18,7 @@ package org.apache.tajo.plan.util; +import org.apache.commons.lang.StringUtils; import org.apache.tajo.algebra.*; import org.apache.tajo.datum.DateDatum; import org.apache.tajo.datum.Datum; @@ -186,7 +187,7 @@ protected EvalNode visitConst(Object o, ConstEval evalNode, Stack stac timeValue = new TimeValue(""+timestampDatum.getHourOfDay() , ""+timestampDatum.getMinuteOfHour(), ""+timestampDatum.getSecondOfMinute()); - timeValue.setSecondsFraction(Integer.toString(timestampDatum.getMillisOfSecond())); + timeValue.setSecondsFraction(StringUtils.leftPad("" + timestampDatum.getMillisOfSecond(), 3, '0')); value = new TimestampLiteral(dateValue, timeValue); break; diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/util/PartitionFilterAlgebraVisitor.java b/tajo-plan/src/main/java/org/apache/tajo/plan/util/PartitionFilterAlgebraVisitor.java index 6f95bf858b..14da177b3e 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/util/PartitionFilterAlgebraVisitor.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/util/PartitionFilterAlgebraVisitor.java @@ -31,6 +31,7 @@ import org.apache.tajo.plan.visitor.SimpleAlgebraVisitor; import org.apache.tajo.util.Pair; import org.apache.tajo.util.TUtil; +import org.apache.tajo.util.datetime.DateTimeConstants; import org.apache.tajo.util.datetime.DateTimeFormat; import org.apache.tajo.util.datetime.DateTimeUtil; import org.apache.tajo.util.datetime.TimeMeta; @@ -157,12 +158,11 @@ public Expr visitTimestampLiteral(Object ctx, Stack stack, TimestampLitera TimeZone tz = TimeZone.getTimeZone(getTimezoneId()); DateTimeUtil.toUserTimezone(tm, tz); - String dateTime = DateTimeFormat.to_char(tm, "yyyy-MM-dd HH24:MI:SS"); + String dateTime = null; if (tm.fsecs == 0) { - dateTime += ".0"; + dateTime = DateTimeFormat.to_char(tm, "yyyy-MM-dd HH24:MI:SS") + ".0"; } else { - int secondsFraction = tm.fsecs / 1000; - dateTime += "." + StringUtils.leftPad(""+secondsFraction, 3, '0'); + dateTime = DateTimeUtil.encodeDateTime(tm, DateTimeConstants.DateStyle.ISO_DATES); } if (!isHiveCatalog) { From fb5d7de0ca14ee41244c611b6533b979772da4f5 Mon Sep 17 00:00:00 2001 From: JaeHwa Jung Date: Mon, 9 Nov 2015 17:27:25 +0900 Subject: [PATCH 12/19] Fix hive compatibility and remove cast operator for int8 --- .../org/apache/tajo/datum/DatumFactory.java | 7 -- .../engine/query/TestTablePartitions.java | 10 +- .../tajo/engine/util/TestTupleUtil.java | 24 ++-- .../engine/planner/physical/SeqScanExec.java | 11 +- .../apache/tajo/master/exec/DDLExecutor.java | 17 ++- .../rules/PartitionedTableRewriter.java | 112 +++++++++++++++--- .../util/PartitionFilterAlgebraVisitor.java | 3 + 7 files changed, 136 insertions(+), 48 deletions(-) diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/DatumFactory.java b/tajo-common/src/main/java/org/apache/tajo/datum/DatumFactory.java index fbb92dc1de..dd4a4e440a 100644 --- a/tajo-common/src/main/java/org/apache/tajo/datum/DatumFactory.java +++ b/tajo-common/src/main/java/org/apache/tajo/datum/DatumFactory.java @@ -362,13 +362,6 @@ public static TimestampDatum createTimestamp(Datum datum, @Nullable TimeZone tz) return parseTimestamp(datum.asChars(), tz); case TIMESTAMP: return (TimestampDatum) datum; - case INT8: - TimeMeta tm = new TimeMeta(); - DateTimeUtil.toJulianTimeMeta(DateTimeUtil.javaTimeToJulianTime(datum.asInt8()), tm); - if (tz != null) { - DateTimeUtil.toUTCTimezone(tm, tz); - } - return new TimestampDatum(DateTimeUtil.toJulianTimestamp(tm)); default: throw new TajoRuntimeException(new InvalidValueForCastException(datum.type(), Type.TIMESTAMP)); } diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java index 7b9811191c..f3b23117d8 100644 --- a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java +++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java @@ -48,6 +48,7 @@ import java.io.IOException; import java.sql.ResultSet; +import java.sql.Timestamp; import java.util.*; import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME; @@ -1582,15 +1583,6 @@ public final void testTimestampPartitionColumn() throws Exception { assertEquals(5, partitions.size()); // Equals - res = executeString("SELECT * FROM " + tableName + " WHERE key = cast(760147200000 as timestamp)"); - - expectedResult = "col1,col2,key\n" + - "-------------------------------\n" + - "3,2,1994-02-02 00:00:00\n" ; - - assertEquals(expectedResult, resultSetToString(res)); - res.close(); - res = executeString("SELECT * FROM " + tableName + " WHERE key = TIMESTAMP '1993-11-09 00:00:00.0'"); expectedResult = "col1,col2,key\n" + diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/util/TestTupleUtil.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/util/TestTupleUtil.java index 4a3565e184..794f7a03d9 100644 --- a/tajo-core-tests/src/test/java/org/apache/tajo/engine/util/TestTupleUtil.java +++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/util/TestTupleUtil.java @@ -19,6 +19,7 @@ package org.apache.tajo.engine.util; import org.apache.hadoop.fs.Path; +import org.apache.tajo.SessionVars; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.SortSpec; import org.apache.tajo.common.TajoDataTypes.Type; @@ -33,6 +34,8 @@ import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder; import org.junit.Test; +import java.util.TimeZone; + import static org.junit.Assert.*; public class TestTupleUtil { @@ -140,47 +143,48 @@ public final void testGetPartitions() { @Test public void testBuildTupleFromPartitionPath() { + String timezoneId = TimeZone.getDefault().getID(); Schema schema = new Schema(); schema.addColumn("key1", Type.INT8); schema.addColumn("key2", Type.TEXT); Path path = new Path("hdfs://tajo/warehouse/partition_test/"); - Tuple tuple = PartitionedTableRewriter.buildTupleFromPartitionPath(schema, path, true); + Tuple tuple = PartitionedTableRewriter.buildTupleFromPartitionPath(schema, path, true, timezoneId); assertNull(tuple); - tuple = PartitionedTableRewriter.buildTupleFromPartitionPath(schema, path, false); + tuple = PartitionedTableRewriter.buildTupleFromPartitionPath(schema, path, false, timezoneId); assertNull(tuple); path = new Path("hdfs://tajo/warehouse/partition_test/key1=123"); - tuple = PartitionedTableRewriter.buildTupleFromPartitionPath(schema, path, true); + tuple = PartitionedTableRewriter.buildTupleFromPartitionPath(schema, path, true, timezoneId); assertNotNull(tuple); assertEquals(DatumFactory.createInt8(123), tuple.asDatum(0)); - tuple = PartitionedTableRewriter.buildTupleFromPartitionPath(schema, path, false); + tuple = PartitionedTableRewriter.buildTupleFromPartitionPath(schema, path, false, timezoneId); assertNotNull(tuple); assertEquals(DatumFactory.createInt8(123), tuple.asDatum(0)); path = new Path("hdfs://tajo/warehouse/partition_test/key1=123/part-0000"); // wrong cases; - tuple = PartitionedTableRewriter.buildTupleFromPartitionPath(schema, path, true); + tuple = PartitionedTableRewriter.buildTupleFromPartitionPath(schema, path, true, timezoneId); assertNull(tuple); - tuple = PartitionedTableRewriter.buildTupleFromPartitionPath(schema, path, false); + tuple = PartitionedTableRewriter.buildTupleFromPartitionPath(schema, path, false, timezoneId); assertNull(tuple); path = new Path("hdfs://tajo/warehouse/partition_test/key1=123/key2=abc"); - tuple = PartitionedTableRewriter.buildTupleFromPartitionPath(schema, path, true); + tuple = PartitionedTableRewriter.buildTupleFromPartitionPath(schema, path, true, timezoneId); assertNotNull(tuple); assertEquals(DatumFactory.createInt8(123), tuple.asDatum(0)); assertEquals(DatumFactory.createText("abc"), tuple.asDatum(1)); - tuple = PartitionedTableRewriter.buildTupleFromPartitionPath(schema, path, false); + tuple = PartitionedTableRewriter.buildTupleFromPartitionPath(schema, path, false, timezoneId); assertNotNull(tuple); assertEquals(DatumFactory.createInt8(123), tuple.asDatum(0)); assertEquals(DatumFactory.createText("abc"), tuple.asDatum(1)); path = new Path("hdfs://tajo/warehouse/partition_test/key1=123/key2=abc/part-0001"); - tuple = PartitionedTableRewriter.buildTupleFromPartitionPath(schema, path, true); + tuple = PartitionedTableRewriter.buildTupleFromPartitionPath(schema, path, true, timezoneId); assertNull(tuple); - tuple = PartitionedTableRewriter.buildTupleFromPartitionPath(schema, path, false); + tuple = PartitionedTableRewriter.buildTupleFromPartitionPath(schema, path, false, timezoneId); assertNotNull(tuple); assertEquals(DatumFactory.createInt8(123), tuple.asDatum(0)); assertEquals(DatumFactory.createText("abc"), tuple.asDatum(1)); diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java index 2572e1db8d..118cc24a75 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java @@ -19,6 +19,7 @@ package org.apache.tajo.engine.planner.physical; import org.apache.hadoop.io.IOUtils; +import org.apache.tajo.SessionVars; import org.apache.tajo.catalog.*; import org.apache.tajo.catalog.partition.PartitionMethodDesc; import org.apache.tajo.catalog.proto.CatalogProtos; @@ -44,6 +45,7 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.TimeZone; public class SeqScanExec extends ScanExec { @@ -100,9 +102,16 @@ private void rewriteColumnPartitionedTableSchema() throws IOException { if (fragments != null && fragments.length > 0) { List fileFragments = FragmentConvertor.convert(FileFragment.class, fragments); + String timezoneId = null; + if (context.getQueryContext().containsKey(SessionVars.TIMEZONE)) { + timezoneId = context.getQueryContext().get(SessionVars.TIMEZONE); + } else { + timezoneId = TimeZone.getDefault().getID(); + } + // Get a partition key value from a given path partitionRow = PartitionedTableRewriter.buildTupleFromPartitionPath( - columnPartitionSchema, fileFragments.get(0).getPath(), false); + columnPartitionSchema, fileFragments.get(0).getPath(), false, timezoneId); } // Targets or search conditions may contain column references. diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java index 8e555fc28c..d96bf89492 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java @@ -25,6 +25,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; +import org.apache.tajo.SessionVars; import org.apache.tajo.algebra.AlterTableOpType; import org.apache.tajo.algebra.AlterTablespaceSetType; import org.apache.tajo.annotation.Nullable; @@ -54,6 +55,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.TimeZone; import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME; @@ -599,8 +601,15 @@ public void repairPartition(TajoMaster.MasterContext context, final QueryContext PartitionMethodDesc partitionDesc = tableDesc.getPartitionMethod(); Schema partitionColumns = partitionDesc.getExpressionSchema(); + String timezoneId = null; + if (queryContext.containsKey(SessionVars.TIMEZONE)) { + timezoneId = queryContext.get(SessionVars.TIMEZONE); + } else { + timezoneId = TimeZone.getDefault().getID(); + } + // Get the array of path filter, accepting all partition paths. - PathFilter[] filters = PartitionedTableRewriter.buildAllAcceptingPathFilters(partitionColumns); + PathFilter[] filters = PartitionedTableRewriter.buildAllAcceptingPathFilters(partitionColumns, timezoneId); // loop from one to the number of partition columns Path [] filteredPaths = toPathArray(fs.listStatus(tablePath, filters[0])); @@ -664,8 +673,12 @@ private PartitionDescProto getPartitionDesc(Path tablePath, Path partitionPath, int startIndex = partitionName.indexOf(tablePath.toString()) + tablePath.toString().length(); partitionName = partitionName.substring(startIndex + File.separator.length()); + String[] partitionNameParts = partitionName.split("="); + String partitionColumn = partitionNameParts[0]; + String partitionValue = StringUtils.escapePathName(partitionNameParts[1]); + CatalogProtos.PartitionDescProto.Builder builder = CatalogProtos.PartitionDescProto.newBuilder(); - builder.setPartitionName(StringUtils.escapePathName(partitionName)); + builder.setPartitionName(partitionColumn + "=" + partitionValue); String[] partitionKeyPairs = partitionName.split("/"); diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java index 942f27452d..758f3336c8 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java @@ -24,6 +24,10 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.*; import org.apache.tajo.OverridableConf; +import org.apache.tajo.SessionVars; +import org.apache.tajo.algebra.DateValue; +import org.apache.tajo.algebra.TimeValue; +import org.apache.tajo.algebra.TimestampLiteral; import org.apache.tajo.catalog.*; import org.apache.tajo.catalog.partition.PartitionMethodDesc; import org.apache.tajo.catalog.proto.CatalogProtos.PartitionsByAlgebraProto; @@ -33,6 +37,7 @@ import org.apache.tajo.datum.NullDatum; import org.apache.tajo.datum.TimestampDatum; import org.apache.tajo.exception.*; +import org.apache.tajo.plan.ExprAnnotator; import org.apache.tajo.plan.LogicalPlan; import org.apache.tajo.plan.expr.*; import org.apache.tajo.plan.logical.*; @@ -45,6 +50,7 @@ import org.apache.tajo.storage.VTuple; import org.apache.tajo.util.StringUtils; import org.apache.tajo.util.datetime.DateTimeUtil; +import org.apache.tajo.util.datetime.TimeMeta; import java.io.IOException; import java.sql.Timestamp; @@ -92,15 +98,17 @@ private static class PartitionPathFilter implements PathFilter { private Schema schema; private EvalNode partitionFilter; - public PartitionPathFilter(Schema schema, EvalNode partitionFilter) { + private String timezoneId; + public PartitionPathFilter(Schema schema, EvalNode partitionFilter, String timezoneId) { this.schema = schema; this.partitionFilter = partitionFilter; + this.timezoneId = timezoneId; partitionFilter.bind(null, schema); } @Override public boolean accept(Path path) { - Tuple tuple = buildTupleFromPartitionPath(schema, path, true); + Tuple tuple = buildTupleFromPartitionPath(schema, path, true, timezoneId); if (tuple == null) { // if it is a file or not acceptable file return false; } @@ -140,12 +148,20 @@ public String toString() { FileSystem fs = tablePath.getFileSystem(queryContext.getConf()); String [] splits = CatalogUtil.splitFQTableName(tableName); List partitions = null; + String timezoneId = null; try { + if (queryContext.containsKey(SessionVars.TIMEZONE)) { + timezoneId = queryContext.get(SessionVars.TIMEZONE); + } else { + timezoneId = TimeZone.getDefault().getID(); + } + if (conjunctiveForms == null) { partitions = catalog.getPartitionsOfTable(splits[0], splits[1]); if (partitions.isEmpty()) { - filteredPaths = findFilteredPathsFromFileSystem(partitionColumns, conjunctiveForms, fs, tablePath); + filteredPaths = findFilteredPathsFromFileSystem(partitionColumns, conjunctiveForms, fs, tablePath, + timezoneId); } else { filteredPaths = findFilteredPathsByPartitionDesc(partitions); } @@ -155,7 +171,8 @@ public String toString() { partitions = catalog.getPartitionsByAlgebra(request); filteredPaths = findFilteredPathsByPartitionDesc(partitions); } else { - filteredPaths = findFilteredPathsFromFileSystem(partitionColumns, conjunctiveForms, fs, tablePath); + filteredPaths = findFilteredPathsFromFileSystem(partitionColumns, conjunctiveForms, fs, tablePath, + timezoneId); } } } catch (UnsupportedException ue) { @@ -164,7 +181,7 @@ public String toString() { LOG.warn(ue.getMessage()); partitions = catalog.getPartitionsOfTable(splits[0], splits[1]); if (partitions.isEmpty()) { - filteredPaths = findFilteredPathsFromFileSystem(partitionColumns, conjunctiveForms, fs, tablePath); + filteredPaths = findFilteredPathsFromFileSystem(partitionColumns, conjunctiveForms, fs, tablePath, timezoneId); } else { filteredPaths = findFilteredPathsByPartitionDesc(partitions); } @@ -203,14 +220,14 @@ private Path[] findFilteredPathsByPartitionDesc(List partiti * @throws IOException */ private Path [] findFilteredPathsFromFileSystem(Schema partitionColumns, EvalNode [] conjunctiveForms, - FileSystem fs, Path tablePath) throws IOException{ + FileSystem fs, Path tablePath, String timezoneId) throws IOException{ Path [] filteredPaths = null; PathFilter [] filters; if (conjunctiveForms == null) { - filters = buildAllAcceptingPathFilters(partitionColumns); + filters = buildAllAcceptingPathFilters(partitionColumns, timezoneId); } else { - filters = buildPathFiltersForAllLevels(partitionColumns, conjunctiveForms); + filters = buildPathFiltersForAllLevels(partitionColumns, conjunctiveForms, timezoneId); } // loop from one to the number of partition columns @@ -275,7 +292,7 @@ public static PartitionsByAlgebraProto getPartitionsAlgebraProto( * @return */ private static PathFilter [] buildPathFiltersForAllLevels(Schema partitionColumns, - EvalNode [] conjunctiveForms) { + EvalNode [] conjunctiveForms, String timezoneId) { // Building partition path filters for all levels Column target; PathFilter [] filters = new PathFilter[partitionColumns.size()]; @@ -296,7 +313,7 @@ public static PartitionsByAlgebraProto getPartitionsAlgebraProto( EvalNode filterPerLevel = AlgebraicUtil.createSingletonExprFromCNF( accumulatedFilters.toArray(new EvalNode[accumulatedFilters.size()])); - filters[i] = new PartitionPathFilter(partitionColumns, filterPerLevel); + filters[i] = new PartitionPathFilter(partitionColumns, filterPerLevel, timezoneId); } return filters; @@ -307,7 +324,7 @@ public static PartitionsByAlgebraProto getPartitionsAlgebraProto( * @param partitionColumns The partition columns schema * @return The array of path filter, accpeting all partition paths. */ - public static PathFilter [] buildAllAcceptingPathFilters(Schema partitionColumns) { + public static PathFilter [] buildAllAcceptingPathFilters(Schema partitionColumns, String timezoneId) { Column target; PathFilter [] filters = new PathFilter[partitionColumns.size()]; List accumulatedFilters = Lists.newArrayList(); @@ -317,7 +334,7 @@ public static PartitionsByAlgebraProto getPartitionsAlgebraProto( EvalNode filterPerLevel = AlgebraicUtil.createSingletonExprFromCNF( accumulatedFilters.toArray(new EvalNode[accumulatedFilters.size()])); - filters[i] = new PartitionPathFilter(partitionColumns, filterPerLevel); + filters[i] = new PartitionPathFilter(partitionColumns, filterPerLevel, timezoneId); } return filters; } @@ -444,7 +461,7 @@ private boolean checkIfDisjunctiveButOneVariable(EvalNode evalNode) { * @return The tuple transformed from a column values part. */ public static Tuple buildTupleFromPartitionPath(Schema partitionColumnSchema, Path partitionPath, - boolean beNullIfFile) { + boolean beNullIfFile, String timezoneId) { int startIdx = partitionPath.toString().indexOf(getColumnPartitionPathPrefix(partitionColumnSchema)); if (startIdx == -1) { // if there is no partition column in the patch @@ -470,12 +487,43 @@ public static Tuple buildTupleFromPartitionPath(Schema partitionColumnSchema, Pa Column keyColumn = partitionColumnSchema.getColumn(columnId); if (keyColumn.getDataType().getType() == TajoDataTypes.Type.TIMESTAMP) { - // TimestampDatum use UTC based Julian time microseconds. So this need to convert the number of milliseconds - // to julian time microseconds. - Timestamp timestamp = Timestamp.valueOf(StringUtils.unescapePathName(parts[1])); - long julianTime = DateTimeUtil.javaTimeToJulianTime(timestamp.getTime()); - TimestampDatum timestampDatum = new TimestampDatum(julianTime); - tuple.put(columnId, timestampDatum); + String timestampStr = StringUtils.unescapePathName(parts[1]); + + String[] timestampParts = timestampStr.split(" "); + String datePart = timestampParts[0]; + String timePart = timestampParts[1]; + + DateValue dateValue = parseDate(datePart); + TimeValue timeValue = parseTime(timePart); + + try { + int [] dates = ExprAnnotator.dateToIntArray(dateValue.getYears(), + dateValue.getMonths(), + dateValue.getDays()); + int [] times = ExprAnnotator.timeToIntArray(timeValue.getHours(), + timeValue.getMinutes(), + timeValue.getSeconds(), + timeValue.getSecondsFraction()); + + long timestamp; + if (timeValue.hasSecondsFraction()) { + timestamp = DateTimeUtil.toJulianTimestamp(dates[0], dates[1], dates[2], times[0], times[1], times[2], + times[3] * 1000); + } else { + timestamp = DateTimeUtil.toJulianTimestamp(dates[0], dates[1], dates[2], times[0], times[1], times[2], 0); + } + + TimeMeta tm = new TimeMeta(); + DateTimeUtil.toJulianTimeMeta(timestamp, tm); + + TimeZone tz = TimeZone.getTimeZone(timezoneId); + DateTimeUtil.toUTCTimezone(tm, tz); + + TimestampDatum timestampDatum = new TimestampDatum(DateTimeUtil.toJulianTimestamp(tm)); + tuple.put(columnId, timestampDatum); + } catch (TajoException e) { + throw new TajoInternalError("TimestampDatum build Failed: \n" + e.getMessage()); + } } else { tuple.put(columnId, DatumFactory.createFromString(keyColumn.getDataType(), StringUtils.unescapePathName(parts[1]))); @@ -487,6 +535,32 @@ public static Tuple buildTupleFromPartitionPath(Schema partitionColumnSchema, Pa return tuple; } + + private static DateValue parseDate(String datePart) { + String[] parts = datePart.split("-"); + return new DateValue(parts[0], parts[1], parts[2]); + } + + private static TimeValue parseTime(String timePart) { + String[] parts = timePart.split(":"); + + TimeValue time; + boolean hasFractionOfSeconds = (parts.length > 2 && parts[2].indexOf('.') > 0); + if (hasFractionOfSeconds) { + String[] secondsParts = parts[2].split("\\."); + time = new TimeValue(parts[0], parts[1], secondsParts[0]); + if (secondsParts.length == 2) { + time.setSecondsFraction(secondsParts[1]); + } + } else { + time = new TimeValue(parts[0], + (parts.length > 1 ? parts[1] : "0"), + (parts.length > 2 ? parts[2] : "0")); + } + return time; + } + + /** * Get a prefix of column partition path. For example, consider a column partition (col1, col2). * Then, you will get a string 'col1='. diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/util/PartitionFilterAlgebraVisitor.java b/tajo-plan/src/main/java/org/apache/tajo/plan/util/PartitionFilterAlgebraVisitor.java index 14da177b3e..f3d4572deb 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/util/PartitionFilterAlgebraVisitor.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/util/PartitionFilterAlgebraVisitor.java @@ -20,6 +20,8 @@ package org.apache.tajo.plan.util; import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.tajo.algebra.*; import org.apache.tajo.catalog.CatalogConstants; import org.apache.tajo.catalog.Column; @@ -48,6 +50,7 @@ * */ public class PartitionFilterAlgebraVisitor extends SimpleAlgebraVisitor { + private static final Log LOG = LogFactory.getLog(PartitionFilterAlgebraVisitor.class); private String tableAlias; private Column column; private boolean isHiveCatalog = false; From b33e2a0b4057a9ed63aff3209726e9e82059fd7a Mon Sep 17 00:00:00 2001 From: JaeHwa Jung Date: Mon, 9 Nov 2015 17:35:42 +0900 Subject: [PATCH 13/19] Add a description for hive compatibility --- tajo-docs/src/main/sphinx/time_zone.rst | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tajo-docs/src/main/sphinx/time_zone.rst b/tajo-docs/src/main/sphinx/time_zone.rst index 101012f3ed..de969df16c 100644 --- a/tajo-docs/src/main/sphinx/time_zone.rst +++ b/tajo-docs/src/main/sphinx/time_zone.rst @@ -68,6 +68,10 @@ The following ways use SQL statements. So, this way is available in ``tsql``, JD SET SESSION TIMEZONE TO 'GMT+9'; +.. note:: + + When using a timestamp column a partition key with hive, you must set your time zone with the meta command. Because hive automatically converts TIMESTAMP value to STRING literals which are accepted in the format YYYY-MM-DD HH:MM:SS.MS with users local timezone. + ============ Time Zone ID ============ From f77289ba93aeb1e43983312430aa533fd86b5278 Mon Sep 17 00:00:00 2001 From: JaeHwa Jung Date: Mon, 9 Nov 2015 17:36:57 +0900 Subject: [PATCH 14/19] Fix a typo --- tajo-docs/src/main/sphinx/time_zone.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tajo-docs/src/main/sphinx/time_zone.rst b/tajo-docs/src/main/sphinx/time_zone.rst index de969df16c..e50f752495 100644 --- a/tajo-docs/src/main/sphinx/time_zone.rst +++ b/tajo-docs/src/main/sphinx/time_zone.rst @@ -70,7 +70,7 @@ The following ways use SQL statements. So, this way is available in ``tsql``, JD .. note:: - When using a timestamp column a partition key with hive, you must set your time zone with the meta command. Because hive automatically converts TIMESTAMP value to STRING literals which are accepted in the format YYYY-MM-DD HH:MM:SS.MS with users local timezone. + When using a timestamp column as a partition key with hive, you must set your time zone with the meta command. Because hive automatically converts TIMESTAMP value to STRING literals which are accepted in the format YYYY-MM-DD HH:MM:SS.MS with users local timezone. ============ Time Zone ID From 4d27894f29461166408c2fa5adb6c8f84ece34b7 Mon Sep 17 00:00:00 2001 From: JaeHwa Jung Date: Mon, 9 Nov 2015 17:48:13 +0900 Subject: [PATCH 15/19] Remove unused codes --- .../apache/tajo/plan/util/PartitionFilterAlgebraVisitor.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/util/PartitionFilterAlgebraVisitor.java b/tajo-plan/src/main/java/org/apache/tajo/plan/util/PartitionFilterAlgebraVisitor.java index f3d4572deb..a4ae997807 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/util/PartitionFilterAlgebraVisitor.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/util/PartitionFilterAlgebraVisitor.java @@ -19,9 +19,6 @@ package org.apache.tajo.plan.util; -import org.apache.commons.lang.StringUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.tajo.algebra.*; import org.apache.tajo.catalog.CatalogConstants; import org.apache.tajo.catalog.Column; @@ -50,7 +47,6 @@ * */ public class PartitionFilterAlgebraVisitor extends SimpleAlgebraVisitor { - private static final Log LOG = LogFactory.getLog(PartitionFilterAlgebraVisitor.class); private String tableAlias; private Column column; private boolean isHiveCatalog = false; From 0094dd04a17544e631ba32b8db9115b34289570b Mon Sep 17 00:00:00 2001 From: JaeHwa Jung Date: Mon, 9 Nov 2015 18:39:00 +0900 Subject: [PATCH 16/19] Fix repair partition bug --- .../apache/tajo/engine/query/TestAlterTable.java | 3 +++ .../org/apache/tajo/master/exec/DDLExecutor.java | 14 ++++++++------ 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestAlterTable.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestAlterTable.java index 4c9f367223..73604fb584 100644 --- a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestAlterTable.java +++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestAlterTable.java @@ -188,6 +188,9 @@ public final void testAlterTableRepairPartition() throws Exception { assertTrue(fs.isDirectory(new Path(tablePath.toUri() + "/col1=3/col2=2"))); assertTrue(fs.isDirectory(new Path(tablePath.toUri() + "/col1=3/col2=3"))); + List partitions = catalog.getPartitionsOfTable(getCurrentDatabase(), + simpleTableName); + executeString("ALTER TABLE " + simpleTableName + " REPAIR PARTITION").close(); verifyPartitionCount(getCurrentDatabase(), simpleTableName, 4); diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java index d96bf89492..d556eb8bdc 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java @@ -673,15 +673,10 @@ private PartitionDescProto getPartitionDesc(Path tablePath, Path partitionPath, int startIndex = partitionName.indexOf(tablePath.toString()) + tablePath.toString().length(); partitionName = partitionName.substring(startIndex + File.separator.length()); - String[] partitionNameParts = partitionName.split("="); - String partitionColumn = partitionNameParts[0]; - String partitionValue = StringUtils.escapePathName(partitionNameParts[1]); - CatalogProtos.PartitionDescProto.Builder builder = CatalogProtos.PartitionDescProto.newBuilder(); - builder.setPartitionName(partitionColumn + "=" + partitionValue); - String[] partitionKeyPairs = partitionName.split("/"); + StringBuilder sb = new StringBuilder(); for(int i = 0; i < partitionKeyPairs.length; i++) { String partitionKeyPair = partitionKeyPairs[i]; String[] split = partitionKeyPair.split("="); @@ -690,8 +685,15 @@ private PartitionDescProto getPartitionDesc(Path tablePath, Path partitionPath, keyBuilder.setColumnName(split[0]); keyBuilder.setPartitionValue(StringUtils.unescapePathName(split[1])); + if (i > 0) { + sb.append("/"); + } + sb.append(split[0]).append("="); + sb.append(StringUtils.escapePathName(split[1])); + builder.addPartitionKeys(keyBuilder.build()); } + builder.setPartitionName(sb.toString()); builder.setPath(partitionPath.toString()); From ff1629f70bf26202efd3c9c34dd2468bb05acd66 Mon Sep 17 00:00:00 2001 From: JaeHwa Jung Date: Wed, 18 Nov 2015 16:11:49 +0900 Subject: [PATCH 17/19] Add a configuration for timestamp transform --- .../src/main/java/org/apache/tajo/conf/TajoConf.java | 1 + tajo-dist/src/main/conf/tajo-site.xml.template | 8 ++++++++ 2 files changed, 9 insertions(+) diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java index a2c1fb820e..7a6eab317d 100644 --- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java +++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java @@ -298,6 +298,7 @@ public static enum ConfVars implements ConfigKey { // Partition PARTITION_DYNAMIC_BULK_INSERT_BATCH_SIZE("tajo.partition.dynamic.bulk-insert.batch-size", 1000), + PARTITION_TIMESTAMP_TRAMSFORM_DATEFORMAT("tajo.partition.timestamp.transform.dateformat", true), ///////////////////////////////////////////////////////////////////////////////// diff --git a/tajo-dist/src/main/conf/tajo-site.xml.template b/tajo-dist/src/main/conf/tajo-site.xml.template index a6f5dd686c..d87eb558e9 100644 --- a/tajo-dist/src/main/conf/tajo-site.xml.template +++ b/tajo-dist/src/main/conf/tajo-site.xml.template @@ -99,6 +99,14 @@ --> + +