Skip to content

Commit 25822fe

Browse files
committed
feedback
1 parent 05ad6ef commit 25822fe

File tree

84 files changed

+778
-659
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

84 files changed

+778
-659
lines changed

sqlmesh/core/dialect.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1122,7 +1122,7 @@ def select_from_values(
11221122
for i in range(0, num_rows, batch_size):
11231123
yield select_from_values_for_batch_range(
11241124
values=values,
1125-
columns_to_types=columns_to_types,
1125+
target_columns_to_types=columns_to_types,
11261126
batch_start=i,
11271127
batch_end=min(i + batch_size, num_rows),
11281128
alias=alias,
@@ -1131,14 +1131,14 @@ def select_from_values(
11311131

11321132
def select_from_values_for_batch_range(
11331133
values: t.List[t.Tuple[t.Any, ...]],
1134-
columns_to_types: t.Dict[str, exp.DataType],
1134+
target_columns_to_types: t.Dict[str, exp.DataType],
11351135
batch_start: int,
11361136
batch_end: int,
11371137
alias: str = "t",
11381138
source_columns: t.Optional[t.List[str]] = None,
11391139
) -> exp.Select:
1140-
source_columns = source_columns or list(columns_to_types)
1141-
source_columns_to_types = get_source_columns_to_types(columns_to_types, source_columns)
1140+
source_columns = source_columns or list(target_columns_to_types)
1141+
source_columns_to_types = get_source_columns_to_types(target_columns_to_types, source_columns)
11421142

11431143
if not values:
11441144
# Ensures we don't generate an empty VALUES clause & forces a zero-row output
@@ -1166,11 +1166,13 @@ def select_from_values_for_batch_range(
11661166

11671167
casted_columns = [
11681168
exp.alias_(
1169-
exp.cast(exp.column(column) if column in source_columns else exp.Null(), to=kind),
1169+
exp.cast(
1170+
exp.column(column) if column in source_columns_to_types else exp.Null(), to=kind
1171+
),
11701172
column,
11711173
copy=False,
11721174
)
1173-
for column, kind in columns_to_types.items()
1175+
for column, kind in target_columns_to_types.items()
11741176
]
11751177
return exp.select(*casted_columns).from_(values_exp, copy=False).where(where, copy=False)
11761178

sqlmesh/core/engine_adapter/athena.py

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -84,12 +84,12 @@ def catalog_support(self) -> CatalogSupport:
8484
def create_state_table(
8585
self,
8686
table_name: str,
87-
columns_to_types: t.Dict[str, exp.DataType],
87+
target_columns_to_types: t.Dict[str, exp.DataType],
8888
primary_key: t.Optional[t.Tuple[str, ...]] = None,
8989
) -> None:
9090
self.create_table(
9191
table_name,
92-
columns_to_types,
92+
target_columns_to_types,
9393
primary_key=primary_key,
9494
# it's painfully slow, but it works
9595
table_format="iceberg",
@@ -178,7 +178,7 @@ def _build_create_table_exp(
178178
expression: t.Optional[exp.Expression],
179179
exists: bool = True,
180180
replace: bool = False,
181-
columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
181+
target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
182182
table_description: t.Optional[str] = None,
183183
table_kind: t.Optional[str] = None,
184184
partitioned_by: t.Optional[t.List[exp.Expression]] = None,
@@ -198,7 +198,7 @@ def _build_create_table_exp(
198198
properties = self._build_table_properties_exp(
199199
table=table,
200200
expression=expression,
201-
columns_to_types=columns_to_types,
201+
target_columns_to_types=target_columns_to_types,
202202
partitioned_by=partitioned_by,
203203
table_properties=table_properties,
204204
table_description=table_description,
@@ -237,7 +237,7 @@ def _build_table_properties_exp(
237237
partition_interval_unit: t.Optional[IntervalUnit] = None,
238238
clustered_by: t.Optional[t.List[exp.Expression]] = None,
239239
table_properties: t.Optional[t.Dict[str, exp.Expression]] = None,
240-
columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
240+
target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
241241
table_description: t.Optional[str] = None,
242242
table_kind: t.Optional[str] = None,
243243
table: t.Optional[exp.Table] = None,
@@ -265,12 +265,12 @@ def _build_table_properties_exp(
265265

266266
if partitioned_by:
267267
schema_expressions: t.List[exp.Expression] = []
268-
if is_hive and columns_to_types:
268+
if is_hive and target_columns_to_types:
269269
# For Hive-style tables, you cannot include the partitioned by columns in the main set of columns
270270
# In the PARTITIONED BY expression, you also cant just include the column names, you need to include the data type as well
271271
# ref: https://docs.aws.amazon.com/athena/latest/ug/partitions.html
272272
for match_name, match_dtype in self._find_matching_columns(
273-
partitioned_by, columns_to_types
273+
partitioned_by, target_columns_to_types
274274
):
275275
column_def = exp.ColumnDef(this=exp.to_identifier(match_name), kind=match_dtype)
276276
schema_expressions.append(column_def)
@@ -431,7 +431,7 @@ def replace_query(
431431
self,
432432
table_name: TableName,
433433
query_or_df: QueryOrDF,
434-
columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
434+
target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
435435
table_description: t.Optional[str] = None,
436436
column_descriptions: t.Optional[t.Dict[str, str]] = None,
437437
source_columns: t.Optional[t.List[str]] = None,
@@ -445,7 +445,7 @@ def replace_query(
445445
return super().replace_query(
446446
table_name=table,
447447
query_or_df=query_or_df,
448-
columns_to_types=columns_to_types,
448+
target_columns_to_types=target_columns_to_types,
449449
table_description=table_description,
450450
column_descriptions=column_descriptions,
451451
source_columns=source_columns,
@@ -456,7 +456,7 @@ def _insert_overwrite_by_time_partition(
456456
self,
457457
table_name: TableName,
458458
source_queries: t.List[SourceQuery],
459-
columns_to_types: t.Dict[str, exp.DataType],
459+
target_columns_to_types: t.Dict[str, exp.DataType],
460460
where: exp.Condition,
461461
**kwargs: t.Any,
462462
) -> None:
@@ -467,7 +467,7 @@ def _insert_overwrite_by_time_partition(
467467
if table_type == "iceberg":
468468
# Iceberg tables work as expected, we can use the default behaviour
469469
return super()._insert_overwrite_by_time_partition(
470-
table, source_queries, columns_to_types, where, **kwargs
470+
table, source_queries, target_columns_to_types, where, **kwargs
471471
)
472472

473473
# For Hive tables, we need to drop all the partitions covered by the query and delete the data from S3
@@ -477,7 +477,7 @@ def _insert_overwrite_by_time_partition(
477477
return super()._insert_overwrite_by_time_partition(
478478
table,
479479
source_queries,
480-
columns_to_types,
480+
target_columns_to_types,
481481
where,
482482
insert_overwrite_strategy_override=InsertOverwriteStrategy.INTO_IS_OVERWRITE, # since we already cleared the data
483483
**kwargs,

0 commit comments

Comments
 (0)