Skip to content

Commit 057d24c

Browse files
committed
Add janitor test across all adapters and fix drop cascade in BigQuery
1 parent 046b42e commit 057d24c

File tree

4 files changed

+185
-9
lines changed

4 files changed

+185
-9
lines changed

sqlmesh/core/engine_adapter/bigquery.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1260,6 +1260,21 @@ def _native_df_to_pandas_df(
12601260

12611261
return super()._native_df_to_pandas_df(query_or_df)
12621262

1263+
def _drop_object(
1264+
self,
1265+
name: TableName | SchemaName,
1266+
exists: bool = True,
1267+
kind: str = "TABLE",
1268+
**drop_args: t.Any,
1269+
) -> None:
1270+
if kind.upper() == "TABLE" and "cascade" in drop_args:
1271+
# BigQuery doesnt support DROP CASCADE for tables
1272+
# ref: https://cloud.google.com/bigquery/docs/reference/standard-sql/data-definition-language#drop_table_statement
1273+
# so set it to False here so SQLGlot doesnt output a CASCADE argument
1274+
drop_args["cascade"] = False
1275+
1276+
super()._drop_object(name=name, exists=exists, kind=kind, **drop_args)
1277+
12631278
@property
12641279
def _query_data(self) -> t.Any:
12651280
return self._connection_pool.get_attribute("query_data")

tests/core/engine_adapter/integration/__init__.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
from _pytest.mark.structures import ParameterSet
2828

2929
if t.TYPE_CHECKING:
30-
from sqlmesh.core._typing import TableName
30+
from sqlmesh.core._typing import TableName, SchemaName
3131
from sqlmesh.core.engine_adapter._typing import Query
3232

3333
TEST_SCHEMA = "test_schema"
@@ -222,6 +222,13 @@ def df_type(self) -> t.Optional[str]:
222222
return self._test_type.split("-", maxsplit=1)[1]
223223
return None
224224

225+
@property
226+
def engine_type(self) -> str:
227+
if self.mark.startswith("gcp_postgres"):
228+
return "gcp_postgres"
229+
230+
return self.mark.split("_")[0]
231+
225232
@property
226233
def columns_to_types(self):
227234
if self._columns_to_types is None:
@@ -307,7 +314,7 @@ def default_table_format(self) -> t.Optional[str]:
307314
def add_test_suffix(self, value: str) -> str:
308315
return f"{value}_{self.test_id}"
309316

310-
def get_metadata_results(self, schema: t.Optional[str] = None) -> MetadataResults:
317+
def get_metadata_results(self, schema: t.Optional[SchemaName] = None) -> MetadataResults:
311318
schema = schema if schema else self.schema(TEST_SCHEMA)
312319
return MetadataResults.from_data_objects(self.engine_adapter.get_data_objects(schema))
313320

tests/core/engine_adapter/integration/test_integration.py

Lines changed: 136 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
from sqlmesh.core.config import load_config_from_paths
2222
from sqlmesh.core.config.connection import ConnectionConfig
2323
import sqlmesh.core.dialect as d
24+
from sqlmesh.core.environment import EnvironmentSuffixTarget
2425
from sqlmesh.core.dialect import select_from_values
2526
from sqlmesh.core.model import Model, load_sql_based_model
2627
from sqlmesh.core.engine_adapter.shared import DataObject, DataObjectType
@@ -2333,11 +2334,7 @@ def _normalize_snowflake(name: str, prefix_regex: str = "(sqlmesh__)(.*)"):
23332334
k: [_normalize_snowflake(name) for name in v] for k, v in object_names.items()
23342335
}
23352336

2336-
if ctx.mark.startswith("gcp_postgres"):
2337-
engine_type = "gcp_postgres"
2338-
else:
2339-
engine_type = ctx.mark.split("_")[0]
2340-
init_example_project(tmp_path, engine_type, schema_name=schema_name)
2337+
init_example_project(tmp_path, ctx.engine_type, schema_name=schema_name)
23412338

23422339
config = load_config_from_paths(
23432340
Config,
@@ -3557,3 +3554,137 @@ def test_identifier_length_limit(ctx: TestContext):
35573554
match=re.escape(match),
35583555
):
35593556
adapter.create_table(long_table_name, {"col": exp.DataType.build("int")})
3557+
3558+
3559+
@pytest.mark.parametrize(
3560+
"environment_suffix_target",
3561+
[
3562+
EnvironmentSuffixTarget.TABLE,
3563+
EnvironmentSuffixTarget.SCHEMA,
3564+
EnvironmentSuffixTarget.CATALOG,
3565+
],
3566+
)
3567+
def test_janitor(
3568+
ctx: TestContext, tmp_path: pathlib.Path, environment_suffix_target: EnvironmentSuffixTarget
3569+
):
3570+
if (
3571+
environment_suffix_target == EnvironmentSuffixTarget.CATALOG
3572+
and not ctx.engine_adapter.SUPPORTS_CREATE_DROP_CATALOG
3573+
):
3574+
pytest.skip("Engine does not support catalog-based virtual environments")
3575+
3576+
schema = ctx.schema() # catalog.schema
3577+
parsed_schema = d.to_schema(schema)
3578+
3579+
init_example_project(tmp_path, ctx.engine_type, schema_name=parsed_schema.db)
3580+
3581+
def _set_config(_gateway: str, config: Config) -> None:
3582+
config.environment_suffix_target = environment_suffix_target
3583+
config.model_defaults.dialect = ctx.dialect
3584+
3585+
sqlmesh = ctx.create_context(path=tmp_path, config_mutator=_set_config)
3586+
3587+
sqlmesh.plan(auto_apply=True)
3588+
3589+
# create a new model in dev
3590+
(tmp_path / "models" / "new_model.sql").write_text(f"""
3591+
MODEL (
3592+
name {schema}.new_model,
3593+
kind FULL
3594+
);
3595+
3596+
select * from {schema}.full_model
3597+
""")
3598+
sqlmesh.load()
3599+
3600+
result = sqlmesh.plan(environment="dev", auto_apply=True)
3601+
assert result.context_diff.is_new_environment
3602+
assert len(result.context_diff.new_snapshots) == 1
3603+
new_model = list(result.context_diff.new_snapshots.values())[0]
3604+
assert "new_model" in new_model.name.lower()
3605+
3606+
# check physical objects
3607+
snapshot_table_name = exp.to_table(new_model.table_name(), dialect=ctx.dialect)
3608+
snapshot_schema = snapshot_table_name.db
3609+
3610+
prod_schema = normalize_identifiers(d.to_schema(schema), dialect=ctx.dialect)
3611+
dev_env_schema = prod_schema.copy()
3612+
if environment_suffix_target == EnvironmentSuffixTarget.CATALOG:
3613+
dev_env_schema.set("catalog", exp.to_identifier(f"{prod_schema.catalog}__dev"))
3614+
else:
3615+
dev_env_schema.set("db", exp.to_identifier(f"{prod_schema.db}__dev"))
3616+
normalize_identifiers(dev_env_schema, dialect=ctx.dialect)
3617+
3618+
md = ctx.get_metadata_results(prod_schema)
3619+
if environment_suffix_target == EnvironmentSuffixTarget.TABLE:
3620+
assert sorted([v.lower() for v in md.views]) == [
3621+
"full_model",
3622+
"incremental_model",
3623+
"new_model__dev",
3624+
"seed_model",
3625+
]
3626+
else:
3627+
assert sorted([v.lower() for v in md.views]) == [
3628+
"full_model",
3629+
"incremental_model",
3630+
"seed_model",
3631+
]
3632+
assert not md.tables
3633+
assert not md.managed_tables
3634+
3635+
if environment_suffix_target != EnvironmentSuffixTarget.TABLE:
3636+
# note: this is "catalog__dev.schema" for EnvironmentSuffixTarget.CATALOG and "catalog.schema__dev" for EnvironmentSuffixTarget.SCHEMA
3637+
md = ctx.get_metadata_results(dev_env_schema)
3638+
assert [v.lower() for v in md.views] == ["new_model"]
3639+
assert not md.tables
3640+
assert not md.managed_tables
3641+
3642+
md = ctx.get_metadata_results(snapshot_schema)
3643+
assert not md.views
3644+
assert not md.managed_tables
3645+
assert sorted(t.split("__")[1].lower() for t in md.tables) == [
3646+
"full_model",
3647+
"incremental_model",
3648+
"new_model",
3649+
"seed_model",
3650+
]
3651+
3652+
# invalidate dev and run the janitor to clean it up
3653+
sqlmesh.invalidate_environment("dev")
3654+
assert sqlmesh.run_janitor(
3655+
ignore_ttl=True
3656+
) # ignore_ttl to delete the new_model snapshot even though it hasnt expired yet
3657+
3658+
# there should be no dev environment or dev tables / schemas
3659+
md = ctx.get_metadata_results(prod_schema)
3660+
assert sorted([v.lower() for v in md.views]) == [
3661+
"full_model",
3662+
"incremental_model",
3663+
"seed_model",
3664+
]
3665+
assert not md.tables
3666+
assert not md.managed_tables
3667+
3668+
if environment_suffix_target != EnvironmentSuffixTarget.TABLE:
3669+
if environment_suffix_target == EnvironmentSuffixTarget.SCHEMA:
3670+
md = ctx.get_metadata_results(dev_env_schema)
3671+
else:
3672+
try:
3673+
md = ctx.get_metadata_results(dev_env_schema)
3674+
except Exception as e:
3675+
# Most engines will raise an error when @set_catalog tries to set a catalog that doesnt exist
3676+
# in this case, we just swallow the error. We know this call already worked before in the earlier checks
3677+
md = MetadataResults()
3678+
3679+
assert not md.views
3680+
assert not md.tables
3681+
assert not md.managed_tables
3682+
3683+
md = ctx.get_metadata_results(snapshot_schema)
3684+
assert not md.views
3685+
assert not md.managed_tables
3686+
assert sorted(t.split("__")[1].lower() for t in md.tables) == [
3687+
"full_model",
3688+
"incremental_model",
3689+
"seed_model",
3690+
]

tests/core/engine_adapter/test_bigquery.py

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,10 @@
2020

2121

2222
@pytest.fixture
23-
def adapter(make_mocked_engine_adapter: t.Callable) -> BigQueryEngineAdapter:
24-
return make_mocked_engine_adapter(BigQueryEngineAdapter)
23+
def adapter(make_mocked_engine_adapter: t.Callable, mocker: MockerFixture) -> BigQueryEngineAdapter:
24+
mocked_adapter = make_mocked_engine_adapter(BigQueryEngineAdapter)
25+
mocker.patch("sqlmesh.core.engine_adapter.bigquery.BigQueryEngineAdapter.execute")
26+
return mocked_adapter
2527

2628

2729
def test_insert_overwrite_by_time_partition_query(
@@ -575,6 +577,8 @@ def test_begin_end_session(mocker: MockerFixture):
575577

576578

577579
def _to_sql_calls(execute_mock: t.Any, identify: bool = True) -> t.List[str]:
580+
if isinstance(execute_mock, BigQueryEngineAdapter):
581+
execute_mock = execute_mock.execute
578582
output = []
579583
for call in execute_mock.call_args_list:
580584
value = call[0][0]
@@ -1150,3 +1154,22 @@ def test_job_cancellation_on_keyboard_interrupt_job_already_done(mocker: MockerF
11501154
# Verify job status was checked but cancellation was NOT called
11511155
mock_job.done.assert_called_once()
11521156
mock_job.cancel.assert_not_called()
1157+
1158+
1159+
def test_drop_cascade(adapter: BigQueryEngineAdapter):
1160+
adapter.drop_table("foo", cascade=True)
1161+
adapter.drop_table("foo", cascade=False)
1162+
1163+
# BigQuery doesnt support DROP CASCADE for tables
1164+
# ref: https://cloud.google.com/bigquery/docs/reference/standard-sql/data-definition-language#drop_table_statement
1165+
assert _to_sql_calls(adapter) == ["DROP TABLE IF EXISTS `foo`", "DROP TABLE IF EXISTS `foo`"]
1166+
adapter.execute.reset_mock() # type: ignore
1167+
1168+
# But, it does for schemas
1169+
adapter.drop_schema("foo", cascade=True)
1170+
adapter.drop_schema("foo", cascade=False)
1171+
1172+
assert _to_sql_calls(adapter) == [
1173+
"DROP SCHEMA IF EXISTS `foo` CASCADE",
1174+
"DROP SCHEMA IF EXISTS `foo`",
1175+
]

0 commit comments

Comments
 (0)