Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,31 @@
# Apache Paimon Presto Connector

## 说明
我的环境
```shell
mvn clean install -DskipTests -Dgpg.skip -Drat.skip -Papache-release -am -pl paimon-presto-0.273 -Dpresto.version=0.287
# -Dhadoop.apache2.version=3.3.4
```

单编辑一下个模块
```shell
mvn install -Dgpg.skip -Drat.skip -DskipTests -Papache-release -pl paimon-presto-common
```
> 如果要让hive catalog查询paimon表,则需要将出的`paimon-presto-common`包中的`META-INFO/services/com.facebook.presto.spi.Plugin`文件删除掉














This repository is Presto Connector for the [Apache Paimon](https://paimon.apache.org/) project.

## About
Expand Down
2 changes: 1 addition & 1 deletion paimon-presto-0.236/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ under the License.
<parent>
<artifactId>paimon-presto</artifactId>
<groupId>org.apache.paimon</groupId>
<version>0.7-SNAPSHOT</version>
<version>0.9-SNAPSHOT</version>
</parent>

<artifactId>paimon-presto-0.236</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion paimon-presto-0.268/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ under the License.
<parent>
<artifactId>paimon-presto</artifactId>
<groupId>org.apache.paimon</groupId>
<version>0.7-SNAPSHOT</version>
<version>0.9-SNAPSHOT</version>
</parent>

<artifactId>paimon-presto-0.268</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion paimon-presto-0.273/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ under the License.
<parent>
<artifactId>paimon-presto</artifactId>
<groupId>org.apache.paimon</groupId>
<version>0.7-SNAPSHOT</version>
<version>0.9-SNAPSHOT</version>
</parent>

<artifactId>paimon-presto-0.273</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion paimon-presto-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ under the License.
<parent>
<artifactId>paimon-presto</artifactId>
<groupId>org.apache.paimon</groupId>
<version>0.7-SNAPSHOT</version>
<version>0.9-SNAPSHOT</version>
</parent>

<packaging>jar</packaging>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.presto;

import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeParseException;
import java.util.Arrays;
import java.util.stream.Collectors;

/** util for date. */
public class DateUtils {
private static String[] parsePatterns = {
"yyyy-MM-dd",
"yyyy-MM-dd HH",
"yyyy-MM-dd HH:mm",
"yyyy-MM-dd HH:mm:ss",
"yyyy-MM-dd HH:mm:ss.SSS"
};

/**
* auto format date.
*
* @param dateStr
* @return
*/
public static LocalDateTime autoFormat(String dateStr) {
// date set default time
if (dateStr.length() == "yyyy-MM-dd".length()) {
dateStr += " 00:00:00";
}
int length = dateStr.length();

String exceptionMessage =
"Time travel param format not supported:"
+ dateStr
+ ",Supported formats include:"
+ Arrays.stream(parsePatterns).collect(Collectors.joining(","));

try {
for (String pattern : parsePatterns) {
if (pattern.length() == length) {
return LocalDateTime.parse(dateStr, DateTimeFormatter.ofPattern(pattern));
}
}
} catch (DateTimeParseException exception) {
throw new RuntimeException(exceptionMessage);
}

throw new RuntimeException(exceptionMessage);
}

/**
* auto format date to timestamp.
*
* @param dateStr
* @return
*/
public static Long autoFormatToTimestamp(String dateStr) {
LocalDateTime localDateTime = autoFormat(dateStr);
return localDateTime.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,21 @@
import com.facebook.presto.spi.connector.classloader.ClassLoaderSafeConnectorMetadata;
import com.facebook.presto.spi.session.PropertyMetadata;
import com.facebook.presto.spi.transaction.IsolationLevel;
import com.google.common.collect.ImmutableList;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;

import static com.facebook.presto.spi.transaction.IsolationLevel.READ_COMMITTED;
import static com.facebook.presto.spi.transaction.IsolationLevel.checkConnectorSupports;
import static java.util.Objects.requireNonNull;
import static org.apache.paimon.CoreOptions.SCAN_SNAPSHOT_ID;
import static org.apache.paimon.CoreOptions.SCAN_TIMESTAMP_MILLIS;
import static org.apache.paimon.presto.PrestoTableHandle.SCAN_SNAPSHOT;
import static org.apache.paimon.presto.PrestoTableHandle.SCAN_TIMESTAMP;

/** Presto {@link Connector}. */
public abstract class PrestoConnectorBase implements Connector {

private final List<PropertyMetadata<?>> sessionProperties;
private final PrestoTransactionManager transactionManager;
private final PrestoSplitManager prestoSplitManager;
Expand All @@ -54,9 +57,8 @@ public PrestoConnectorBase(
PrestoPageSourceProvider prestoPageSourceProvider,
PrestoMetadata prestoMetadata,
Optional<PrestoPlanOptimizerProvider> prestoPlanOptimizerProvider) {
this.sessionProperties =
ImmutableList.copyOf(
requireNonNull(sessionProperties, "sessionProperties is null"));

this.sessionProperties = buildProperties(sessionProperties);
this.transactionManager = requireNonNull(transactionManager, "transactionManager is null");
this.prestoSplitManager = requireNonNull(prestoSplitManager, "prestoSplitManager is null");
this.prestoPageSourceProvider =
Expand All @@ -66,6 +68,33 @@ public PrestoConnectorBase(
requireNonNull(prestoPlanOptimizerProvider, "prestoPlanOptimizerProvider is null");
}

/**
* add ext support props.
*
* @param sessionProperties
* @return
*/
private List<PropertyMetadata<?>> buildProperties(List<PropertyMetadata<?>> sessionProperties) {
String datetimeDescription =
"Will automatically convert to parameter: scan_timestamp_millis";
List<PropertyMetadata<?>> props = new ArrayList<>();
props.add(PropertyMetadata.stringProperty(SCAN_TIMESTAMP, datetimeDescription, null, true));
props.add(
PropertyMetadata.longProperty(
PrestoTableHandle.SCAN_TIMESTAMP_MILLIS,
SCAN_TIMESTAMP_MILLIS.description().toString(),
null,
true));
props.add(
PropertyMetadata.longProperty(
SCAN_SNAPSHOT, SCAN_SNAPSHOT_ID.description().toString(), null, true));

if (sessionProperties != null) {
props.addAll(sessionProperties);
}
return props;
}

@Override
public ConnectorTransactionHandle beginTransaction(
IsolationLevel isolationLevel, boolean readOnly) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,4 +321,8 @@ public Map<SchemaTableName, List<ColumnMetadata>> listTableColumns(
getTableHandle(session, table)
.columnMetadatas(typeManager)));
}

public TypeManager getTypeManager() {
return typeManager;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,20 @@

package org.apache.paimon.presto;

import com.facebook.airlift.log.Logger;
import com.facebook.presto.hive.HiveColumnHandle;
import com.facebook.presto.hive.HiveTableLayoutHandle;
import com.facebook.presto.hive.HiveType;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.types.BigIntType;
import org.apache.paimon.types.BinaryType;
import org.apache.paimon.types.BooleanType;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DateType;
import org.apache.paimon.types.DoubleType;
import org.apache.paimon.types.FloatType;
import org.apache.paimon.types.IntType;
import org.apache.paimon.types.RowType;

import com.facebook.presto.spi.ColumnHandle;
Expand All @@ -34,11 +46,17 @@
import java.io.IOException;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.paimon.types.SmallIntType;
import org.apache.paimon.types.TimestampType;
import org.apache.paimon.types.VarCharType;

import static org.apache.paimon.presto.ClassLoaderUtils.runWithContextClassLoader;

/** Presto {@link ConnectorPageSourceProvider}. */
public class PrestoPageSourceProvider implements ConnectorPageSourceProvider {
private static final Logger log = Logger.get(PrestoPageSourceProvider.class);

private PrestoMetadata paimonMetadata;

@Override
public ConnectorPageSource createPageSource(
Expand All @@ -49,25 +67,58 @@ public ConnectorPageSource createPageSource(
List<ColumnHandle> columns,
SplitContext splitContext) {
return runWithContextClassLoader(
() ->
createPageSource(
((PrestoTableLayoutHandle) layout).getTableHandle(),
(PrestoSplit) split,
columns),
() -> {
PrestoTableLayoutHandle prestoLayout;
PrestoSplit prestoSplit;
if(layout.getClass().getSimpleName().equals("HiveTableLayoutHandle")){
long start=System.currentTimeMillis();
try{// 如果直接调用会报错:没在同一个classLoader,因此用反射
prestoSplit=new PrestoSplit(split.getClass().getMethod("getSplitSerialized").invoke(split)+"");
}catch (Exception e){
throw new RuntimeException(e);
}

// 初始化元数据
HiveTableLayoutHandle hiveLayout = (HiveTableLayoutHandle) layout;
PrestoTableHandle tableHandle = paimonMetadata.getTableHandle(hiveLayout.getSchemaTableName());
prestoLayout = new PrestoTableLayoutHandle(tableHandle, hiveLayout.getPartitionColumnPredicate());
log.info("初始化paimon表:%s相关信息,耗时:%s",hiveLayout.getSchemaTableName(),System.currentTimeMillis()-start);
}else{
prestoLayout=(PrestoTableLayoutHandle) layout;
prestoSplit=(PrestoSplit) split;
}

return createPageSource(prestoLayout.getTableHandle(),prestoSplit, columns);
},
PrestoPageSourceProvider.class.getClassLoader());
}

// hive catalog中进行设置此值
public void setPaimonMetadata(PrestoMetadata paimonMetadata) {
this.paimonMetadata = paimonMetadata;
}

private ConnectorPageSource createPageSource(
PrestoTableHandle tableHandle, PrestoSplit split, List<ColumnHandle> columns) {
Table table = tableHandle.table();
ReadBuilder read = table.newReadBuilder();
RowType rowType = table.rowType();
List<String> fieldNames = FieldNameUtils.fieldNames(rowType);
List<ColumnHandle> prestoColumns=columns;
if(!columns.isEmpty() && columns.get(0).getClass().getSimpleName().equals("HiveColumnHandle")){
// 如果使用hive查询paimon表
prestoColumns =
columns.stream()
.map(HiveColumnHandle.class::cast)
.map((hiveColumn)-> hiveColumn2PrestoColumn(hiveColumn))
.collect(Collectors.toList());
}
List<String> projectedFields =
columns.stream()
prestoColumns.stream()
.map(PrestoColumnHandle.class::cast)
.map(PrestoColumnHandle::getColumnName)
.collect(Collectors.toList());

if (!fieldNames.equals(projectedFields)) {
int[] projected = projectedFields.stream().mapToInt(fieldNames::indexOf).toArray();
read.withProjection(projected);
Expand All @@ -79,9 +130,42 @@ private ConnectorPageSource createPageSource(

try {
return new PrestoPageSource(
read.newRead().executeFilter().createReader(split.decodeSplit()), columns);
read.newRead().executeFilter().createReader(split.decodeSplit()), prestoColumns);
} catch (IOException e) {
throw new RuntimeException(e);
}
}

private PrestoColumnHandle hiveColumn2PrestoColumn(HiveColumnHandle hiveColumn){
HiveType hiveType = hiveColumn.getHiveType();
DataType dataType = new VarCharType();
if(hiveType.getHiveTypeName().equals(HiveType.HIVE_INT.getHiveTypeName())){
dataType=new IntType();
}
if(hiveType.getHiveTypeName().equals(HiveType.HIVE_BOOLEAN.getHiveTypeName())){
dataType=new BooleanType();
}
if(hiveType.getHiveTypeName().equals(HiveType.HIVE_BINARY.getHiveTypeName())){
dataType=new BinaryType();
}
if(hiveType.getHiveTypeName().equals(HiveType.HIVE_DATE.getHiveTypeName())){
dataType=new DateType();
}
if(hiveType.getHiveTypeName().equals(HiveType.HIVE_DOUBLE.getHiveTypeName())){
dataType=new DoubleType();
}
if(hiveType.getHiveTypeName().equals(HiveType.HIVE_FLOAT.getHiveTypeName())){
dataType=new FloatType();
}
if(hiveType.getHiveTypeName().equals(HiveType.HIVE_LONG.getHiveTypeName())){
dataType=new BigIntType();
}
if(hiveType.getHiveTypeName().equals(HiveType.HIVE_SHORT.getHiveTypeName())){
dataType=new SmallIntType();
}
if(hiveType.getHiveTypeName().equals(HiveType.HIVE_TIMESTAMP.getHiveTypeName())){
dataType=new TimestampType();
}
return PrestoColumnHandle.create(hiveColumn.getName(),dataType, paimonMetadata.getTypeManager());
}
}
Loading