-
Notifications
You must be signed in to change notification settings - Fork 358
Fix: Use drop cascade in janitor #5133
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
2498ffb
37fb0e3
10b0797
046b42e
057d24c
1daf71e
65fb037
879417c
242d4da
3fd21fc
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -108,6 +108,7 @@ class EngineAdapter: | |
| SUPPORTS_CLONING = False | ||
| SUPPORTS_MANAGED_MODELS = False | ||
| SUPPORTS_CREATE_DROP_CATALOG = False | ||
| SUPPORTED_DROP_CASCADE_OBJECT_KINDS: t.List[str] = [] | ||
| SCHEMA_DIFFER = SchemaDiffer() | ||
| SUPPORTS_TUPLE_IN = True | ||
| HAS_VIEW_BINDING = False | ||
|
|
@@ -1037,14 +1038,14 @@ def drop_data_object(self, data_object: DataObject, ignore_if_not_exists: bool = | |
| f"Can't drop data object '{data_object.to_table().sql(dialect=self.dialect)}' of type '{data_object.type.value}'" | ||
| ) | ||
|
|
||
| def drop_table(self, table_name: TableName, exists: bool = True) -> None: | ||
| def drop_table(self, table_name: TableName, exists: bool = True, **kwargs: t.Any) -> None: | ||
| """Drops a table. | ||
|
|
||
| Args: | ||
| table_name: The name of the table to drop. | ||
| exists: If exists, defaults to True. | ||
| """ | ||
| self._drop_object(name=table_name, exists=exists) | ||
| self._drop_object(name=table_name, exists=exists, **kwargs) | ||
|
|
||
| def drop_managed_table(self, table_name: TableName, exists: bool = True) -> None: | ||
| """Drops a managed table. | ||
|
|
@@ -1060,6 +1061,7 @@ def _drop_object( | |
| name: TableName | SchemaName, | ||
| exists: bool = True, | ||
| kind: str = "TABLE", | ||
| cascade: bool = False, | ||
| **drop_args: t.Any, | ||
| ) -> None: | ||
| """Drops an object. | ||
|
|
@@ -1070,8 +1072,13 @@ def _drop_object( | |
| name: The name of the table to drop. | ||
| exists: If exists, defaults to True. | ||
| kind: What kind of object to drop. Defaults to TABLE | ||
| cascade: Whether or not to DROP ... CASCADE. | ||
| Note that this is ignored for :kind's that are not present in self.SUPPORTED_DROP_CASCADE_OBJECT_KINDS | ||
| **drop_args: Any extra arguments to set on the Drop expression | ||
| """ | ||
| if cascade and kind.upper() in self.SUPPORTED_DROP_CASCADE_OBJECT_KINDS: | ||
| drop_args["cascade"] = cascade | ||
|
Comment on lines
+1079
to
+1080
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since we're expecting
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we don't really "expect" the
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's fair; I was thinking that a warning could simply surface the fact that some objects were left orphan because we couldn't cascade. Although, if we don't know which objects depend on the removed thing, then its value is probably questionable anyway.
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah I did consider warning but then I noticed we silently do nothing elsewhere so decided to keep with that strategy |
||
|
|
||
| self.execute(exp.Drop(this=exp.to_table(name), kind=kind, exists=exists, **drop_args)) | ||
|
|
||
| def get_alter_expressions( | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1207,6 +1207,10 @@ def _cleanup_snapshot( | |
| table_name, | ||
| is_table_deployable=is_table_deployable, | ||
| physical_schema=snapshot.physical_schema, | ||
| # we need to set cascade=true or we will get a 'cant drop because other objects depend on it'-style | ||
| # error on engines that enforce referential integrity, such as Postgres | ||
| # this situation can happen when a snapshot expires but downstream view snapshots that reference it have not yet expired | ||
| cascade=True, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should we control this with a flag which is set if the engine supports cascade or not (maybe from the schema differ)? unless im doing something wrong I tried with a BigQuery project for example to run the janitor which stops when it tries to drop a table with the error relevant docs: https://cloud.google.com/bigquery/docs/reference/standard-sql/data-definition-language#drop_table_statement it seems cascade is supported for schema but not table
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Imo, this should happen further downstream, e.g., in the adapter itself.
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh, nice catch, the downside of not running this test across all engines. I naively thought SQLGlot would not generate the I'll improve the coverage and make sure this works on all engines |
||
| ) | ||
| except Exception: | ||
| # Use `get_data_object` to check if the table exists instead of `table_exists` since the former | ||
|
|
@@ -1726,7 +1730,7 @@ def migrate( | |
|
|
||
| def delete(self, name: str, **kwargs: t.Any) -> None: | ||
| _check_table_db_is_physical_schema(name, kwargs["physical_schema"]) | ||
| self.adapter.drop_table(name) | ||
| self.adapter.drop_table(name, cascade=kwargs.pop("cascade", False)) | ||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I deliberately just pop off the |
||
| logger.info("Dropped table '%s'", name) | ||
|
|
||
| def _replace_query_for_model( | ||
|
|
@@ -2324,15 +2328,16 @@ def migrate( | |
| ) | ||
|
|
||
| def delete(self, name: str, **kwargs: t.Any) -> None: | ||
| cascade = kwargs.pop("cascade", False) | ||
| try: | ||
| self.adapter.drop_view(name) | ||
| self.adapter.drop_view(name, cascade=cascade) | ||
| except Exception: | ||
| logger.debug( | ||
| "Failed to drop view '%s'. Trying to drop the materialized view instead", | ||
| name, | ||
| exc_info=True, | ||
| ) | ||
| self.adapter.drop_view(name, materialized=True) | ||
| self.adapter.drop_view(name, materialized=True, cascade=cascade) | ||
| logger.info("Dropped view '%s'", name) | ||
|
|
||
| def _is_materialized_view(self, model: Model) -> bool: | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,6 +21,7 @@ | |
| from sqlmesh.core.config import load_config_from_paths | ||
| from sqlmesh.core.config.connection import ConnectionConfig | ||
| import sqlmesh.core.dialect as d | ||
| from sqlmesh.core.environment import EnvironmentSuffixTarget | ||
| from sqlmesh.core.dialect import select_from_values | ||
| from sqlmesh.core.model import Model, load_sql_based_model | ||
| from sqlmesh.core.engine_adapter.shared import DataObject, DataObjectType | ||
|
|
@@ -2333,11 +2334,7 @@ def _normalize_snowflake(name: str, prefix_regex: str = "(sqlmesh__)(.*)"): | |
| k: [_normalize_snowflake(name) for name in v] for k, v in object_names.items() | ||
| } | ||
|
|
||
| if ctx.mark.startswith("gcp_postgres"): | ||
| engine_type = "gcp_postgres" | ||
| else: | ||
| engine_type = ctx.mark.split("_")[0] | ||
| init_example_project(tmp_path, engine_type, schema_name=schema_name) | ||
| init_example_project(tmp_path, ctx.engine_type, schema_name=schema_name) | ||
|
|
||
| config = load_config_from_paths( | ||
| Config, | ||
|
|
@@ -3557,3 +3554,137 @@ def test_identifier_length_limit(ctx: TestContext): | |
| match=re.escape(match), | ||
| ): | ||
| adapter.create_table(long_table_name, {"col": exp.DataType.build("int")}) | ||
|
|
||
|
|
||
| @pytest.mark.parametrize( | ||
| "environment_suffix_target", | ||
| [ | ||
| EnvironmentSuffixTarget.TABLE, | ||
| EnvironmentSuffixTarget.SCHEMA, | ||
| EnvironmentSuffixTarget.CATALOG, | ||
| ], | ||
| ) | ||
| def test_janitor( | ||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This test inits a project, creates a dev env, invalidates it, runs the janitor to clean it up and checks it was cleaned up
|
||
| ctx: TestContext, tmp_path: pathlib.Path, environment_suffix_target: EnvironmentSuffixTarget | ||
| ): | ||
| if ( | ||
| environment_suffix_target == EnvironmentSuffixTarget.CATALOG | ||
| and not ctx.engine_adapter.SUPPORTS_CREATE_DROP_CATALOG | ||
| ): | ||
| pytest.skip("Engine does not support catalog-based virtual environments") | ||
|
|
||
| schema = ctx.schema() # catalog.schema | ||
| parsed_schema = d.to_schema(schema) | ||
|
|
||
| init_example_project(tmp_path, ctx.engine_type, schema_name=parsed_schema.db) | ||
|
|
||
| def _set_config(_gateway: str, config: Config) -> None: | ||
| config.environment_suffix_target = environment_suffix_target | ||
| config.model_defaults.dialect = ctx.dialect | ||
|
|
||
| sqlmesh = ctx.create_context(path=tmp_path, config_mutator=_set_config) | ||
|
|
||
| sqlmesh.plan(auto_apply=True) | ||
|
|
||
| # create a new model in dev | ||
| (tmp_path / "models" / "new_model.sql").write_text(f""" | ||
| MODEL ( | ||
| name {schema}.new_model, | ||
| kind FULL | ||
| ); | ||
|
|
||
| select * from {schema}.full_model | ||
| """) | ||
| sqlmesh.load() | ||
|
|
||
| result = sqlmesh.plan(environment="dev", auto_apply=True) | ||
| assert result.context_diff.is_new_environment | ||
| assert len(result.context_diff.new_snapshots) == 1 | ||
| new_model = list(result.context_diff.new_snapshots.values())[0] | ||
| assert "new_model" in new_model.name.lower() | ||
|
|
||
| # check physical objects | ||
| snapshot_table_name = exp.to_table(new_model.table_name(), dialect=ctx.dialect) | ||
| snapshot_schema = snapshot_table_name.db | ||
|
|
||
| prod_schema = normalize_identifiers(d.to_schema(schema), dialect=ctx.dialect) | ||
| dev_env_schema = prod_schema.copy() | ||
| if environment_suffix_target == EnvironmentSuffixTarget.CATALOG: | ||
| dev_env_schema.set("catalog", exp.to_identifier(f"{prod_schema.catalog}__dev")) | ||
| else: | ||
| dev_env_schema.set("db", exp.to_identifier(f"{prod_schema.db}__dev")) | ||
| normalize_identifiers(dev_env_schema, dialect=ctx.dialect) | ||
|
|
||
| md = ctx.get_metadata_results(prod_schema) | ||
| if environment_suffix_target == EnvironmentSuffixTarget.TABLE: | ||
| assert sorted([v.lower() for v in md.views]) == [ | ||
| "full_model", | ||
| "incremental_model", | ||
| "new_model__dev", | ||
| "seed_model", | ||
| ] | ||
| else: | ||
| assert sorted([v.lower() for v in md.views]) == [ | ||
| "full_model", | ||
| "incremental_model", | ||
| "seed_model", | ||
| ] | ||
| assert not md.tables | ||
| assert not md.managed_tables | ||
|
|
||
| if environment_suffix_target != EnvironmentSuffixTarget.TABLE: | ||
| # note: this is "catalog__dev.schema" for EnvironmentSuffixTarget.CATALOG and "catalog.schema__dev" for EnvironmentSuffixTarget.SCHEMA | ||
| md = ctx.get_metadata_results(dev_env_schema) | ||
| assert [v.lower() for v in md.views] == ["new_model"] | ||
| assert not md.tables | ||
| assert not md.managed_tables | ||
|
|
||
| md = ctx.get_metadata_results(snapshot_schema) | ||
| assert not md.views | ||
| assert not md.managed_tables | ||
| assert sorted(t.split("__")[1].lower() for t in md.tables) == [ | ||
| "full_model", | ||
| "incremental_model", | ||
| "new_model", | ||
| "seed_model", | ||
| ] | ||
|
|
||
| # invalidate dev and run the janitor to clean it up | ||
| sqlmesh.invalidate_environment("dev") | ||
| assert sqlmesh.run_janitor( | ||
| ignore_ttl=True | ||
| ) # ignore_ttl to delete the new_model snapshot even though it hasnt expired yet | ||
|
|
||
| # there should be no dev environment or dev tables / schemas | ||
| md = ctx.get_metadata_results(prod_schema) | ||
| assert sorted([v.lower() for v in md.views]) == [ | ||
| "full_model", | ||
| "incremental_model", | ||
| "seed_model", | ||
| ] | ||
| assert not md.tables | ||
| assert not md.managed_tables | ||
|
|
||
| if environment_suffix_target != EnvironmentSuffixTarget.TABLE: | ||
| if environment_suffix_target == EnvironmentSuffixTarget.SCHEMA: | ||
| md = ctx.get_metadata_results(dev_env_schema) | ||
| else: | ||
| try: | ||
| md = ctx.get_metadata_results(dev_env_schema) | ||
| except Exception as e: | ||
| # Most engines will raise an error when @set_catalog tries to set a catalog that doesnt exist | ||
| # in this case, we just swallow the error. We know this call already worked before in the earlier checks | ||
| md = MetadataResults() | ||
|
|
||
| assert not md.views | ||
| assert not md.tables | ||
| assert not md.managed_tables | ||
|
|
||
| md = ctx.get_metadata_results(snapshot_schema) | ||
| assert not md.views | ||
| assert not md.managed_tables | ||
| assert sorted(t.split("__")[1].lower() for t in md.tables) == [ | ||
| "full_model", | ||
| "incremental_model", | ||
| "seed_model", | ||
| ] | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since
drop_view()already had**kwargsI figured it was ok to add it todrop_table()as well