Skip to content

Commit 990c979

Browse files
committed
fix incremental append
1 parent 6468afb commit 990c979

File tree

2 files changed

+51
-21
lines changed

2 files changed

+51
-21
lines changed

sqlmesh/core/snapshot/evaluator.py

Lines changed: 51 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1515,19 +1515,6 @@ def demote(self, view_name: str, **kwargs: t.Any) -> None:
15151515

15161516

15171517
class MaterializableStrategy(PromotableStrategy, abc.ABC):
1518-
def append(
1519-
self,
1520-
table_name: str,
1521-
query_or_df: QueryOrDF,
1522-
model: Model,
1523-
render_kwargs: t.Dict[str, t.Any],
1524-
**kwargs: t.Any,
1525-
) -> None:
1526-
self.adapter.insert_append(
1527-
table_name,
1528-
query_or_df,
1529-
)
1530-
15311518
def create(
15321519
self,
15331520
table_name: str,
@@ -1666,7 +1653,27 @@ def _get_target_and_source_columns(
16661653
return target_column_to_types, source_columns
16671654

16681655

1669-
class IncrementalByPartitionStrategy(MaterializableStrategy):
1656+
class IncrementalStrategy(MaterializableStrategy, abc.ABC):
1657+
def append(
1658+
self,
1659+
table_name: str,
1660+
query_or_df: QueryOrDF,
1661+
model: Model,
1662+
render_kwargs: t.Dict[str, t.Any],
1663+
**kwargs: t.Any,
1664+
) -> None:
1665+
columns_to_types, source_columns = self._get_target_and_source_columns(
1666+
model, table_name, render_kwargs=render_kwargs
1667+
)
1668+
self.adapter.insert_append(
1669+
table_name,
1670+
query_or_df,
1671+
target_columns_to_types=columns_to_types,
1672+
source_columns=source_columns,
1673+
)
1674+
1675+
1676+
class IncrementalByPartitionStrategy(IncrementalStrategy):
16701677
def insert(
16711678
self,
16721679
table_name: str,
@@ -1691,7 +1698,7 @@ def insert(
16911698
)
16921699

16931700

1694-
class IncrementalByTimeRangeStrategy(MaterializableStrategy):
1701+
class IncrementalByTimeRangeStrategy(IncrementalStrategy):
16951702
def insert(
16961703
self,
16971704
table_name: str,
@@ -1716,7 +1723,7 @@ def insert(
17161723
)
17171724

17181725

1719-
class IncrementalByUniqueKeyStrategy(MaterializableStrategy):
1726+
class IncrementalByUniqueKeyStrategy(IncrementalStrategy):
17201727
def insert(
17211728
self,
17221729
table_name: str,
@@ -1776,7 +1783,7 @@ def append(
17761783
)
17771784

17781785

1779-
class IncrementalUnmanagedStrategy(MaterializableStrategy):
1786+
class IncrementalUnmanagedStrategy(IncrementalStrategy):
17801787
def append(
17811788
self,
17821789
table_name: str,
@@ -1832,6 +1839,20 @@ def insert(
18321839

18331840

18341841
class FullRefreshStrategy(MaterializableStrategy):
1842+
def append(
1843+
self,
1844+
table_name: str,
1845+
query_or_df: QueryOrDF,
1846+
model: Model,
1847+
render_kwargs: t.Dict[str, t.Any],
1848+
**kwargs: t.Any,
1849+
) -> None:
1850+
self.adapter.insert_append(
1851+
table_name,
1852+
query_or_df,
1853+
target_columns_to_types=model.columns_to_types,
1854+
)
1855+
18351856
def insert(
18361857
self,
18371858
table_name: str,
@@ -1893,8 +1914,19 @@ def insert(
18931914
# Data has already been inserted at the time of table creation.
18941915
pass
18951916

1917+
def append(
1918+
self,
1919+
table_name: str,
1920+
query_or_df: QueryOrDF,
1921+
model: Model,
1922+
render_kwargs: t.Dict[str, t.Any],
1923+
**kwargs: t.Any,
1924+
) -> None:
1925+
# Data has already been inserted at the time of table creation.
1926+
pass
1927+
18961928

1897-
class SCDType2Strategy(MaterializableStrategy):
1929+
class SCDType2Strategy(IncrementalStrategy):
18981930
def create(
18991931
self,
19001932
table_name: str,
@@ -2181,7 +2213,7 @@ def _is_materialized_view(self, model: Model) -> bool:
21812213
C = t.TypeVar("C", bound=CustomKind)
21822214

21832215

2184-
class CustomMaterialization(MaterializableStrategy, t.Generic[C]):
2216+
class CustomMaterialization(IncrementalStrategy, t.Generic[C]):
21852217
"""Base class for custom materializations."""
21862218

21872219
def insert(

tests/core/test_integration.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7948,7 +7948,6 @@ def test_incremental_unmanaged_model_ignore_destructive_change(tmp_path: Path):
79487948
data_dir = tmp_path / "data"
79497949
data_dir.mkdir()
79507950
data_filepath = data_dir / "test.duckdb"
7951-
set_console(TerminalConsole())
79527951

79537952
config = Config(
79547953
model_defaults=ModelDefaultsConfig(dialect="duckdb"),
@@ -8058,7 +8057,6 @@ def test_scd_type_2_by_time_ignore_destructive_change(tmp_path: Path):
80588057
data_dir = tmp_path / "data"
80598058
data_dir.mkdir()
80608059
data_filepath = data_dir / "test.duckdb"
8061-
set_console(TerminalConsole())
80628060

80638061
config = Config(
80648062
model_defaults=ModelDefaultsConfig(dialect="duckdb"),

0 commit comments

Comments
 (0)