diff --git a/README.md b/README.md
index 016f30f..b4361c2 100644
--- a/README.md
+++ b/README.md
@@ -1,5 +1,31 @@
# Apache Paimon Presto Connector
+## 说明
+我的环境
+```shell
+mvn clean install -DskipTests -Dgpg.skip -Drat.skip -Papache-release -am -pl paimon-presto-0.273 -Dpresto.version=0.287
+# -Dhadoop.apache2.version=3.3.4
+```
+
+单编辑一下个模块
+```shell
+mvn install -Dgpg.skip -Drat.skip -DskipTests -Papache-release -pl paimon-presto-common
+```
+> 如果要让hive catalog查询paimon表,则需要将出的`paimon-presto-common`包中的`META-INFO/services/com.facebook.presto.spi.Plugin`文件删除掉
+
+
+
+
+
+
+
+
+
+
+
+
+
+
This repository is Presto Connector for the [Apache Paimon](https://paimon.apache.org/) project.
## About
diff --git a/paimon-presto-0.236/pom.xml b/paimon-presto-0.236/pom.xml
index a387801..2262f38 100644
--- a/paimon-presto-0.236/pom.xml
+++ b/paimon-presto-0.236/pom.xml
@@ -25,7 +25,7 @@ under the License.
paimon-presto
org.apache.paimon
- 0.7-SNAPSHOT
+ 0.9-SNAPSHOT
paimon-presto-0.236
diff --git a/paimon-presto-0.268/pom.xml b/paimon-presto-0.268/pom.xml
index d00269b..d4938e5 100644
--- a/paimon-presto-0.268/pom.xml
+++ b/paimon-presto-0.268/pom.xml
@@ -25,7 +25,7 @@ under the License.
paimon-presto
org.apache.paimon
- 0.7-SNAPSHOT
+ 0.9-SNAPSHOT
paimon-presto-0.268
diff --git a/paimon-presto-0.273/pom.xml b/paimon-presto-0.273/pom.xml
index 670b268..0cc4682 100644
--- a/paimon-presto-0.273/pom.xml
+++ b/paimon-presto-0.273/pom.xml
@@ -25,7 +25,7 @@ under the License.
paimon-presto
org.apache.paimon
- 0.7-SNAPSHOT
+ 0.9-SNAPSHOT
paimon-presto-0.273
diff --git a/paimon-presto-common/pom.xml b/paimon-presto-common/pom.xml
index 47efc2a..686d593 100644
--- a/paimon-presto-common/pom.xml
+++ b/paimon-presto-common/pom.xml
@@ -25,7 +25,7 @@ under the License.
paimon-presto
org.apache.paimon
- 0.7-SNAPSHOT
+ 0.9-SNAPSHOT
jar
diff --git a/paimon-presto-common/src/main/java/org/apache/paimon/presto/DateUtils.java b/paimon-presto-common/src/main/java/org/apache/paimon/presto/DateUtils.java
new file mode 100644
index 0000000..6aae523
--- /dev/null
+++ b/paimon-presto-common/src/main/java/org/apache/paimon/presto/DateUtils.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.presto;
+
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.time.format.DateTimeParseException;
+import java.util.Arrays;
+import java.util.stream.Collectors;
+
+/** util for date. */
+public class DateUtils {
+ private static String[] parsePatterns = {
+ "yyyy-MM-dd",
+ "yyyy-MM-dd HH",
+ "yyyy-MM-dd HH:mm",
+ "yyyy-MM-dd HH:mm:ss",
+ "yyyy-MM-dd HH:mm:ss.SSS"
+ };
+
+ /**
+ * auto format date.
+ *
+ * @param dateStr
+ * @return
+ */
+ public static LocalDateTime autoFormat(String dateStr) {
+ // date set default time
+ if (dateStr.length() == "yyyy-MM-dd".length()) {
+ dateStr += " 00:00:00";
+ }
+ int length = dateStr.length();
+
+ String exceptionMessage =
+ "Time travel param format not supported:"
+ + dateStr
+ + ",Supported formats include:"
+ + Arrays.stream(parsePatterns).collect(Collectors.joining(","));
+
+ try {
+ for (String pattern : parsePatterns) {
+ if (pattern.length() == length) {
+ return LocalDateTime.parse(dateStr, DateTimeFormatter.ofPattern(pattern));
+ }
+ }
+ } catch (DateTimeParseException exception) {
+ throw new RuntimeException(exceptionMessage);
+ }
+
+ throw new RuntimeException(exceptionMessage);
+ }
+
+ /**
+ * auto format date to timestamp.
+ *
+ * @param dateStr
+ * @return
+ */
+ public static Long autoFormatToTimestamp(String dateStr) {
+ LocalDateTime localDateTime = autoFormat(dateStr);
+ return localDateTime.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
+ }
+
+}
\ No newline at end of file
diff --git a/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoConnectorBase.java b/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoConnectorBase.java
index 42fec7c..5c3ad12 100644
--- a/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoConnectorBase.java
+++ b/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoConnectorBase.java
@@ -28,18 +28,21 @@
import com.facebook.presto.spi.connector.classloader.ClassLoaderSafeConnectorMetadata;
import com.facebook.presto.spi.session.PropertyMetadata;
import com.facebook.presto.spi.transaction.IsolationLevel;
-import com.google.common.collect.ImmutableList;
+import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import static com.facebook.presto.spi.transaction.IsolationLevel.READ_COMMITTED;
import static com.facebook.presto.spi.transaction.IsolationLevel.checkConnectorSupports;
import static java.util.Objects.requireNonNull;
+import static org.apache.paimon.CoreOptions.SCAN_SNAPSHOT_ID;
+import static org.apache.paimon.CoreOptions.SCAN_TIMESTAMP_MILLIS;
+import static org.apache.paimon.presto.PrestoTableHandle.SCAN_SNAPSHOT;
+import static org.apache.paimon.presto.PrestoTableHandle.SCAN_TIMESTAMP;
/** Presto {@link Connector}. */
public abstract class PrestoConnectorBase implements Connector {
-
private final List> sessionProperties;
private final PrestoTransactionManager transactionManager;
private final PrestoSplitManager prestoSplitManager;
@@ -54,9 +57,8 @@ public PrestoConnectorBase(
PrestoPageSourceProvider prestoPageSourceProvider,
PrestoMetadata prestoMetadata,
Optional prestoPlanOptimizerProvider) {
- this.sessionProperties =
- ImmutableList.copyOf(
- requireNonNull(sessionProperties, "sessionProperties is null"));
+
+ this.sessionProperties = buildProperties(sessionProperties);
this.transactionManager = requireNonNull(transactionManager, "transactionManager is null");
this.prestoSplitManager = requireNonNull(prestoSplitManager, "prestoSplitManager is null");
this.prestoPageSourceProvider =
@@ -66,6 +68,33 @@ public PrestoConnectorBase(
requireNonNull(prestoPlanOptimizerProvider, "prestoPlanOptimizerProvider is null");
}
+ /**
+ * add ext support props.
+ *
+ * @param sessionProperties
+ * @return
+ */
+ private List> buildProperties(List> sessionProperties) {
+ String datetimeDescription =
+ "Will automatically convert to parameter: scan_timestamp_millis";
+ List> props = new ArrayList<>();
+ props.add(PropertyMetadata.stringProperty(SCAN_TIMESTAMP, datetimeDescription, null, true));
+ props.add(
+ PropertyMetadata.longProperty(
+ PrestoTableHandle.SCAN_TIMESTAMP_MILLIS,
+ SCAN_TIMESTAMP_MILLIS.description().toString(),
+ null,
+ true));
+ props.add(
+ PropertyMetadata.longProperty(
+ SCAN_SNAPSHOT, SCAN_SNAPSHOT_ID.description().toString(), null, true));
+
+ if (sessionProperties != null) {
+ props.addAll(sessionProperties);
+ }
+ return props;
+ }
+
@Override
public ConnectorTransactionHandle beginTransaction(
IsolationLevel isolationLevel, boolean readOnly) {
diff --git a/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoMetadata.java b/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoMetadata.java
index 80463b7..c7c4676 100644
--- a/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoMetadata.java
+++ b/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoMetadata.java
@@ -321,4 +321,8 @@ public Map> listTableColumns(
getTableHandle(session, table)
.columnMetadatas(typeManager)));
}
+
+ public TypeManager getTypeManager() {
+ return typeManager;
+ }
}
diff --git a/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoPageSourceProvider.java b/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoPageSourceProvider.java
index 94eeda6..22b504b 100644
--- a/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoPageSourceProvider.java
+++ b/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoPageSourceProvider.java
@@ -18,8 +18,20 @@
package org.apache.paimon.presto;
+import com.facebook.airlift.log.Logger;
+import com.facebook.presto.hive.HiveColumnHandle;
+import com.facebook.presto.hive.HiveTableLayoutHandle;
+import com.facebook.presto.hive.HiveType;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.types.BigIntType;
+import org.apache.paimon.types.BinaryType;
+import org.apache.paimon.types.BooleanType;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DateType;
+import org.apache.paimon.types.DoubleType;
+import org.apache.paimon.types.FloatType;
+import org.apache.paimon.types.IntType;
import org.apache.paimon.types.RowType;
import com.facebook.presto.spi.ColumnHandle;
@@ -34,11 +46,17 @@
import java.io.IOException;
import java.util.List;
import java.util.stream.Collectors;
+import org.apache.paimon.types.SmallIntType;
+import org.apache.paimon.types.TimestampType;
+import org.apache.paimon.types.VarCharType;
import static org.apache.paimon.presto.ClassLoaderUtils.runWithContextClassLoader;
/** Presto {@link ConnectorPageSourceProvider}. */
public class PrestoPageSourceProvider implements ConnectorPageSourceProvider {
+ private static final Logger log = Logger.get(PrestoPageSourceProvider.class);
+
+ private PrestoMetadata paimonMetadata;
@Override
public ConnectorPageSource createPageSource(
@@ -49,25 +67,58 @@ public ConnectorPageSource createPageSource(
List columns,
SplitContext splitContext) {
return runWithContextClassLoader(
- () ->
- createPageSource(
- ((PrestoTableLayoutHandle) layout).getTableHandle(),
- (PrestoSplit) split,
- columns),
+ () -> {
+ PrestoTableLayoutHandle prestoLayout;
+ PrestoSplit prestoSplit;
+ if(layout.getClass().getSimpleName().equals("HiveTableLayoutHandle")){
+ long start=System.currentTimeMillis();
+ try{// 如果直接调用会报错:没在同一个classLoader,因此用反射
+ prestoSplit=new PrestoSplit(split.getClass().getMethod("getSplitSerialized").invoke(split)+"");
+ }catch (Exception e){
+ throw new RuntimeException(e);
+ }
+
+ // 初始化元数据
+ HiveTableLayoutHandle hiveLayout = (HiveTableLayoutHandle) layout;
+ PrestoTableHandle tableHandle = paimonMetadata.getTableHandle(hiveLayout.getSchemaTableName());
+ prestoLayout = new PrestoTableLayoutHandle(tableHandle, hiveLayout.getPartitionColumnPredicate());
+ log.info("初始化paimon表:%s相关信息,耗时:%s",hiveLayout.getSchemaTableName(),System.currentTimeMillis()-start);
+ }else{
+ prestoLayout=(PrestoTableLayoutHandle) layout;
+ prestoSplit=(PrestoSplit) split;
+ }
+
+ return createPageSource(prestoLayout.getTableHandle(),prestoSplit, columns);
+ },
PrestoPageSourceProvider.class.getClassLoader());
}
+ // hive catalog中进行设置此值
+ public void setPaimonMetadata(PrestoMetadata paimonMetadata) {
+ this.paimonMetadata = paimonMetadata;
+ }
+
private ConnectorPageSource createPageSource(
PrestoTableHandle tableHandle, PrestoSplit split, List columns) {
Table table = tableHandle.table();
ReadBuilder read = table.newReadBuilder();
RowType rowType = table.rowType();
List fieldNames = FieldNameUtils.fieldNames(rowType);
+ List prestoColumns=columns;
+ if(!columns.isEmpty() && columns.get(0).getClass().getSimpleName().equals("HiveColumnHandle")){
+ // 如果使用hive查询paimon表
+ prestoColumns =
+ columns.stream()
+ .map(HiveColumnHandle.class::cast)
+ .map((hiveColumn)-> hiveColumn2PrestoColumn(hiveColumn))
+ .collect(Collectors.toList());
+ }
List projectedFields =
- columns.stream()
+ prestoColumns.stream()
.map(PrestoColumnHandle.class::cast)
.map(PrestoColumnHandle::getColumnName)
.collect(Collectors.toList());
+
if (!fieldNames.equals(projectedFields)) {
int[] projected = projectedFields.stream().mapToInt(fieldNames::indexOf).toArray();
read.withProjection(projected);
@@ -79,9 +130,42 @@ private ConnectorPageSource createPageSource(
try {
return new PrestoPageSource(
- read.newRead().executeFilter().createReader(split.decodeSplit()), columns);
+ read.newRead().executeFilter().createReader(split.decodeSplit()), prestoColumns);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
+
+ private PrestoColumnHandle hiveColumn2PrestoColumn(HiveColumnHandle hiveColumn){
+ HiveType hiveType = hiveColumn.getHiveType();
+ DataType dataType = new VarCharType();
+ if(hiveType.getHiveTypeName().equals(HiveType.HIVE_INT.getHiveTypeName())){
+ dataType=new IntType();
+ }
+ if(hiveType.getHiveTypeName().equals(HiveType.HIVE_BOOLEAN.getHiveTypeName())){
+ dataType=new BooleanType();
+ }
+ if(hiveType.getHiveTypeName().equals(HiveType.HIVE_BINARY.getHiveTypeName())){
+ dataType=new BinaryType();
+ }
+ if(hiveType.getHiveTypeName().equals(HiveType.HIVE_DATE.getHiveTypeName())){
+ dataType=new DateType();
+ }
+ if(hiveType.getHiveTypeName().equals(HiveType.HIVE_DOUBLE.getHiveTypeName())){
+ dataType=new DoubleType();
+ }
+ if(hiveType.getHiveTypeName().equals(HiveType.HIVE_FLOAT.getHiveTypeName())){
+ dataType=new FloatType();
+ }
+ if(hiveType.getHiveTypeName().equals(HiveType.HIVE_LONG.getHiveTypeName())){
+ dataType=new BigIntType();
+ }
+ if(hiveType.getHiveTypeName().equals(HiveType.HIVE_SHORT.getHiveTypeName())){
+ dataType=new SmallIntType();
+ }
+ if(hiveType.getHiveTypeName().equals(HiveType.HIVE_TIMESTAMP.getHiveTypeName())){
+ dataType=new TimestampType();
+ }
+ return PrestoColumnHandle.create(hiveColumn.getName(),dataType, paimonMetadata.getTypeManager());
+ }
}
diff --git a/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoSplit.java b/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoSplit.java
index dadca6e..e4493f7 100644
--- a/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoSplit.java
+++ b/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoSplit.java
@@ -18,20 +18,22 @@
package org.apache.paimon.presto;
-import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
import org.apache.paimon.table.source.Split;
+import java.util.ArrayList;
+import java.util.List;
+
import com.facebook.presto.spi.ConnectorSplit;
import com.facebook.presto.spi.HostAddress;
import com.facebook.presto.spi.NodeProvider;
+
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
-import java.util.List;
+
/** Presto {@link ConnectorSplit}. */
public class PrestoSplit extends PrestoSplitBase {
-
@JsonCreator
public PrestoSplit(@JsonProperty("splitSerialized") String splitSerialized) {
super(splitSerialized);
@@ -43,6 +45,6 @@ public static PrestoSplit fromSplit(Split split) {
@Override
public List getPreferredNodes(NodeProvider nodeProvider) {
- return ImmutableList.of();
+ return new ArrayList<>();
}
}
diff --git a/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoSplitManager.java b/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoSplitManager.java
index 5ea71c0..3bd0685 100644
--- a/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoSplitManager.java
+++ b/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoSplitManager.java
@@ -42,7 +42,7 @@ public ConnectorSplitSource getSplits(
SplitSchedulingContext splitSchedulingContext) {
PrestoTableHandle tableHandle = ((PrestoTableLayoutHandle) layout).getTableHandle();
- Table table = tableHandle.table();
+ Table table = tableHandle.tableWithDynamicOptions(session);
ReadBuilder readBuilder = table.newReadBuilder();
new PrestoFilterConverter(table.rowType())
.convert(tableHandle.getFilter())
diff --git a/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoTableHandle.java b/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoTableHandle.java
index a706e09..5a451af 100644
--- a/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoTableHandle.java
+++ b/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoTableHandle.java
@@ -18,6 +18,7 @@
package org.apache.paimon.presto;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.table.Table;
import org.apache.paimon.utils.InstantiationUtil;
@@ -25,6 +26,7 @@
import com.facebook.presto.common.type.TypeManager;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ColumnMetadata;
+import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.ConnectorTableHandle;
import com.facebook.presto.spi.ConnectorTableMetadata;
import com.facebook.presto.spi.SchemaTableName;
@@ -33,7 +35,9 @@
import java.io.IOException;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
@@ -41,6 +45,10 @@
/** Presto {@link ConnectorTableHandle}. */
public class PrestoTableHandle implements ConnectorTableHandle {
+ public static final String SCAN_TIMESTAMP_MILLIS = "scan_timestamp_millis";
+ public static final String SCAN_TIMESTAMP = "scan_timestamp";
+ public static final String SCAN_SNAPSHOT = "scan_snapshot_id";
+
private final String schemaName;
private final String tableName;
private final byte[] serializedTable;
@@ -102,6 +110,29 @@ public PrestoTableHandle copy(Optional> projectedColumns) {
schemaName, tableName, serializedTable, filter, projectedColumns);
}
+ public Table tableWithDynamicOptions(ConnectorSession session) {
+ // see TrinoConnector.getSessionProperties
+ Map dynamicOptions = new HashMap<>();
+ Long scanTimestampMills = session.getProperty(SCAN_TIMESTAMP_MILLIS, Long.class);
+ if (scanTimestampMills != null) {
+ dynamicOptions.put(
+ CoreOptions.SCAN_TIMESTAMP_MILLIS.key(), scanTimestampMills.toString());
+ }
+ String scanTimestampStr = session.getProperty(SCAN_TIMESTAMP, String.class);
+ //
+ if (scanTimestampStr != null) {
+ Long scanTimestamp = DateUtils.autoFormatToTimestamp(scanTimestampStr);
+ dynamicOptions.put(CoreOptions.SCAN_TIMESTAMP_MILLIS.key(), scanTimestamp.toString());
+ }
+
+ Long scanSnapshotId = session.getProperty(SCAN_SNAPSHOT, Long.class);
+ if (scanSnapshotId != null) {
+ dynamicOptions.put(CoreOptions.SCAN_SNAPSHOT_ID.key(), scanSnapshotId.toString());
+ }
+
+ return dynamicOptions.size() > 0 ? table().copy(dynamicOptions) : table();
+ }
+
public Table table() {
if (lazyTable == null) {
try {
diff --git a/paimon-prestosql-332/pom.xml b/paimon-prestosql-332/pom.xml
index 8ca5700..6ee2633 100644
--- a/paimon-prestosql-332/pom.xml
+++ b/paimon-prestosql-332/pom.xml
@@ -24,7 +24,7 @@ under the License.
paimon-presto
org.apache.paimon
- 0.7-SNAPSHOT
+ 0.9-SNAPSHOT
paimon-prestosql-332
diff --git a/paimon-prestosql-common/pom.xml b/paimon-prestosql-common/pom.xml
index 011d652..583783c 100644
--- a/paimon-prestosql-common/pom.xml
+++ b/paimon-prestosql-common/pom.xml
@@ -24,7 +24,7 @@ under the License.
paimon-presto
org.apache.paimon
- 0.7-SNAPSHOT
+ 0.9-SNAPSHOT
jar
diff --git a/pom.xml b/pom.xml
index 4f8c960..c520c97 100644
--- a/pom.xml
+++ b/pom.xml
@@ -27,8 +27,8 @@ under the License.
paimon-presto-0.273
paimon-presto-0.268
paimon-presto-0.236
- paimon-prestosql-common
- paimon-prestosql-332
+
+
@@ -40,7 +40,7 @@ under the License.
org.apache.paimon
paimon-presto
Paimon : Presto
- 0.7-SNAPSHOT
+ 0.9-SNAPSHOT
pom
@@ -167,17 +167,17 @@ under the License.
${maven.compiler.version}
-
- org.apache.maven.plugins
- maven-checkstyle-plugin
- ${maven.checkstyle.version}
-
+
+
+
+
+
-
- com.diffplug.spotless
- spotless-maven-plugin
- ${spotless.version}
-
+
+
+
+
+
org.apache.maven.plugins
@@ -393,9 +393,9 @@ under the License.
- /tools/maven/suppressions.xml
+ tools/maven/suppressions.xml
true
- /tools/maven/checkstyle.xml
+ tools/maven/checkstyle.xml
true
true