Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,11 @@
package org.apache.spark.sql.catalyst.util

import java.text.SimpleDateFormat
import java.time.{LocalDate, ZoneId}
import java.time.{DateTimeException, LocalDate, ZoneId}
import java.util.{Date, Locale}

import org.apache.commons.lang3.time.FastDateFormat

import org.apache.spark.sql.catalyst.util.DateTimeUtils._
import org.apache.spark.sql.catalyst.util.LegacyDateFormats.LENIENT_SIMPLE_DATE_FORMAT
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy._

Expand Down Expand Up @@ -84,6 +83,23 @@ class Iso8601DateFormatter(
}
}

class PartitionDateFormatter(zoneId: ZoneId) extends Iso8601DateFormatter(
DateFormatter.defaultPattern,
zoneId,
TimestampFormatter.defaultLocale,
LENIENT_SIMPLE_DATE_FORMAT,
isParsing = false) {

override def parse(s: String): Int = {
convertSpecialDate(s.trim, zoneId) match {
case Some(_) =>
throw new DateTimeException(
s"$s is a special date which is not valid as a partition date")
case None => super.parse(s)
}
}
}

trait LegacyDateFormatter extends DateFormatter {
def parseToDate(s: String): Date

Expand Down Expand Up @@ -196,4 +212,12 @@ object DateFormatter {
def apply(zoneId: ZoneId): DateFormatter = {
getFormatter(None, zoneId, isParsing = false)
}

def getPartitioningFormatter(zoneId: ZoneId): DateFormatter = {
if (SQLConf.get.legacyTimeParserPolicy == LEGACY) {
getLegacyFormatter(defaultPattern, zoneId, defaultLocale, LENIENT_SIMPLE_DATE_FORMAT)
} else {
new PartitionDateFormatter(zoneId)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,24 @@ class FractionTimestampFormatter(zoneId: ZoneId)
}
}


class PartitionTimestampFormatter(zoneId: ZoneId) extends Iso8601TimestampFormatter(
TimestampFormatter.partitionPattern,
zoneId,
TimestampFormatter.defaultLocale,
isParsing = true) {

override def parse(s: String): Long = {
convertSpecialTimestamp(s.trim, zoneId) match {
case Some(x) =>
throw new DateTimeException(
s"$s is a special timestamp which is not valid as a partition timestamp")
case None => super.parse(s)
}
}
}


/**
* The custom sub-class of `GregorianCalendar` is needed to get access to
* protected `fields` immediately after parsing. We cannot use
Expand Down Expand Up @@ -284,6 +302,8 @@ object TimestampFormatter {

def defaultPattern(): String = s"${DateFormatter.defaultPattern} HH:mm:ss"

def partitionPattern(): String = "yyyy-MM-dd HH:mm:ss[.S]"

private def getFormatter(
format: Option[String],
zoneId: ZoneId,
Expand Down Expand Up @@ -347,4 +367,12 @@ object TimestampFormatter {
def getFractionFormatter(zoneId: ZoneId): TimestampFormatter = {
new FractionTimestampFormatter(zoneId)
}

def getPartitioningFormatter(zoneId: ZoneId): TimestampFormatter = {
if (SQLConf.get.legacyTimeParserPolicy == LEGACY) {
getLegacyFormatter(partitionPattern, zoneId, defaultLocale, LENIENT_SIMPLE_DATE_FORMAT)
} else {
new PartitionTimestampFormatter(zoneId)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -130,11 +130,9 @@ object PartitioningUtils {
Map.empty[String, String]
}

val dateFormatter = DateFormatter(zoneId)
val timestampFormatter = TimestampFormatter(
timestampPartitionPattern,
zoneId,
isParsing = true)
val dateFormatter = DateFormatter.getPartitioningFormatter(zoneId)
val timestampFormatter = TimestampFormatter.getPartitioningFormatter(zoneId)

// First, we need to parse every partition's path and see if we can find partition values.
val (partitionValues, optDiscoveredBasePaths) = paths.map { path =>
parsePartition(path, typeInference, basePaths, userSpecifiedDataTypes,
Expand Down