Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,19 @@
/**
* @author yuexie
* @date 2024/11/8 15:51
* Arrow type utility for parsing Kuscia column types to Arrow types
**/
public class ArrowUtil {

/**
* Parse Kuscia column type to Arrow type
* @param type Column type string (e.g., "int32", "interval_year_month", "large_string")
* @return ArrowType
*/
public static ArrowType parseKusciaColumnType(String type) {
// string integer float datetime timestamp
return switch (type) {
String typeLower = type.toLowerCase();
return switch (typeLower) {
// Integer types
case "int8" -> Types.MinorType.TINYINT.getType();
case "int16" -> Types.MinorType.SMALLINT.getType();
case "int32" -> Types.MinorType.INT.getType();
Expand All @@ -38,13 +45,49 @@ public static ArrowType parseKusciaColumnType(String type) {
case "uint16" -> Types.MinorType.UINT2.getType();
case "uint32" -> Types.MinorType.UINT4.getType();
case "uint64" -> Types.MinorType.UINT8.getType();

// Floating point types
case "float32" -> Types.MinorType.FLOAT4.getType();
case "float64", "float" -> Types.MinorType.FLOAT8.getType();

// Date types
case "date32" -> Types.MinorType.DATEDAY.getType();
case "date64" -> Types.MinorType.DATEMILLI.getType();

// Time types
case "time32" -> Types.MinorType.TIMEMILLI.getType();
case "time64" -> Types.MinorType.TIMEMICRO.getType();

// Timestamp types
case "timestamp" -> Types.MinorType.TIMESTAMPMICRO.getType();
case "timestamp_us" -> Types.MinorType.TIMESTAMPMICRO.getType();
case "timestamp_ms" -> Types.MinorType.TIMESTAMPMILLI.getType();
case "timestamp_ns" -> Types.MinorType.TIMESTAMPNANO.getType();
case "timestamp_tz" -> Types.MinorType.TIMESTAMPMICROTZ.getType();

// Boolean types
case "bool" -> Types.MinorType.BIT.getType();

// String types
case "string", "str" -> Types.MinorType.VARCHAR.getType();
case "large_string", "large_utf8", "utf8_large" -> Types.MinorType.LARGEVARCHAR.getType();

// Binary types
case "binary" -> Types.MinorType.VARBINARY.getType();
case "large_binary", "large_varbinary", "varbinary_large" -> Types.MinorType.LARGEVARBINARY.getType();

// Decimal types
// Note: Types.MinorType.DECIMAL.getType() throws UnsupportedOperationException
// Decimal requires precision/scale, must use new ArrowType.Decimal(precision, scale, bitWidth)
case "decimal" -> new ArrowType.Decimal(38, 10, 128);

// Interval types
case "interval_year_month", "interval_ym" ->
Types.MinorType.INTERVALYEAR.getType();
case "interval_day_time", "interval_dt" ->
Types.MinorType.INTERVALDAY.getType();
case "interval" -> Types.MinorType.INTERVALYEAR.getType();

default -> throw DataproxyException.of(DataproxyErrorCode.PARAMS_UNRELIABLE, "Unsupported field types: " + type);
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,16 @@

package org.secretflow.dataproxy.core.visitor;

import lombok.extern.slf4j.Slf4j;

import javax.annotation.Nonnull;
import java.math.BigDecimal;

/**
* @author yuexie
* @date 2024/11/1 20:25
**/
@Slf4j
public class ByteValueVisitor implements ValueVisitor<Byte>{

@Override
Expand Down Expand Up @@ -54,4 +58,40 @@ public Byte visit(@Nonnull Double value) {
return value.byteValue();
}

@Override
public Byte visit(@Nonnull String value) {
try {
return Byte.valueOf(value);
} catch (NumberFormatException e) {
log.warn("Failed to parse string '{}' as Byte, using 0", value);
return (byte) 0;
}
}

@Override
public Byte visit(@Nonnull BigDecimal value) {
return value.byteValue();
}

@Override
public Byte visit(@Nonnull Object value) {
// Directly Byte type, return directly
if (value instanceof Byte byteValue) {
return byteValue;
}

// Number type (including BigDecimal, Integer, Long, Short, Float, Double, etc.)
if (value instanceof Number number) {
return number.byteValue();
}

// String type, call dedicated visit(String) method
if (value instanceof String stringValue) {
return visit(stringValue);
}

// Other types: try to convert to string then parse
return visit(value.toString());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import lombok.extern.slf4j.Slf4j;

import javax.annotation.Nonnull;
import java.sql.Time;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZonedDateTime;
Expand Down Expand Up @@ -87,7 +88,21 @@ public Integer visit(@Nonnull Integer value) {

@Override
public Integer visit(@Nonnull Date value) {
return (int) value.getTime();
// Handle java.sql.Time: Time32Vector needs milliseconds since midnight
if (value instanceof Time sqlTime) {
// java.sql.Time.getTime() returns milliseconds since Unix epoch
// But Time32Vector needs milliseconds since midnight of the day
// Convert to LocalTime then calculate milliseconds
return (int) (sqlTime.toLocalTime().toNanoOfDay() / 1_000_000);
}

// Handle java.sql.Date: DateDayVector needs days since 1970-01-01
if (value instanceof java.sql.Date sqlDate) {
return (int) sqlDate.toLocalDate().toEpochDay();
}

// For java.util.Date, assume it's a date type, convert milliseconds to days
return (int) (value.getTime() / (24 * 60 * 60 * 1000L));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,8 @@
import lombok.extern.slf4j.Slf4j;

import javax.annotation.Nonnull;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.sql.Time;
import java.time.*;
import java.util.Date;

/**
Expand All @@ -50,6 +46,8 @@ public Long visit(@Nonnull Object value) {

if (value instanceof Long longValue) {
return visit(longValue);
} else if (value instanceof Time sqlTime) {
return this.visit(sqlTime);
} else if (value instanceof Date dateValue) {
return this.visit(dateValue);
} else if (value instanceof LocalDateTime localDateTime) {
Expand Down Expand Up @@ -102,7 +100,10 @@ public Long visit(@Nonnull ZonedDateTime value) {

@Override
public Long visit(@Nonnull LocalDateTime value) {
return value.toInstant(ZoneOffset.of(ZoneId.systemDefault().getId())).toEpochMilli();
// LocalDateTime has no timezone information, treat it as local time in system default timezone.
// Use atZone() instead of toInstant(ZoneOffset.of(...)) because ZoneOffset.of() requires
// an offset (e.g., "+08:00"), cannot directly use zone ID
return value.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
}

@Override
Expand All @@ -115,4 +116,12 @@ public Long visit(@Nonnull Instant value) {
log.debug("visit instant: {}", value.toEpochMilli());
return value.toEpochMilli();
}

public Long visit(@Nonnull Time value) {
long nanosSinceMidnight = value.toLocalTime().toNanoOfDay();
long microsSinceMidnight = nanosSinceMidnight / 1_000;
log.debug("Converting java.sql.Time {} (toLocalTime: {}) to microseconds since midnight: {} (nanos: {})",
value, value.toLocalTime(), microsSinceMidnight, nanosSinceMidnight);
return microsSinceMidnight;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.secretflow.dataproxy.core.visitor;

import javax.annotation.Nonnull;
import java.math.BigDecimal;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
Expand Down Expand Up @@ -65,6 +66,10 @@ default T visit(@Nonnull byte[] value) {
throw new UnsupportedOperationException("byte[] not supported");
}

default T visit(@Nonnull BigDecimal value) {
throw new UnsupportedOperationException("BigDecimal not supported");
}

default T visit(@Nonnull Object value) {
throw new UnsupportedOperationException("Object not supported");
}
Expand Down
22 changes: 22 additions & 0 deletions dataproxy-integration-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@
<groupId>org.secretflow</groupId>
<artifactId>dataproxy-plugin-odps</artifactId>
</dependency>
<dependency>
<groupId>org.secretflow</groupId>
<artifactId>dataproxy-plugin-dameng</artifactId>
</dependency>

<dependency>
<groupId>org.projectlombok</groupId>
Expand Down Expand Up @@ -67,6 +71,24 @@
<artifactId>mockito-junit-jupiter</artifactId>
<scope>test</scope>
</dependency>

<!-- Testcontainers -->
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.dameng</groupId>
<artifactId>DmJdbcDriver18</artifactId>
<version>8.1.3.62</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Loading
Loading