diff --git a/sqlmesh/core/context.py b/sqlmesh/core/context.py index 0339f6506c..d7a2984f3a 100644 --- a/sqlmesh/core/context.py +++ b/sqlmesh/core/context.py @@ -1429,6 +1429,7 @@ def plan_builder( explain: t.Optional[bool] = None, ignore_cron: t.Optional[bool] = None, min_intervals: t.Optional[int] = None, + always_include_local_changes: t.Optional[bool] = None, ) -> PlanBuilder: """Creates a plan builder. @@ -1467,6 +1468,8 @@ def plan_builder( diff_rendered: Whether the diff should compare raw vs rendered models min_intervals: Adjust the plan start date on a per-model basis in order to ensure at least this many intervals are covered on every model when checking for missing intervals + always_include_local_changes: Usually when restatements are present, local changes in the filesystem are ignored. + However, it can be desirable to deploy changes + restatements in the same plan, so this flag overrides the default behaviour. Returns: The plan builder. @@ -1583,13 +1586,20 @@ def plan_builder( "Selector did not return any models. Please check your model selection and try again." ) + if always_include_local_changes is None: + # default behaviour - if restatements are detected; we operate entirely out of state and ignore local changes + force_no_diff = restate_models is not None or ( + backfill_models is not None and not backfill_models + ) + else: + force_no_diff = not always_include_local_changes + snapshots = self._snapshots(models_override) context_diff = self._context_diff( environment or c.PROD, snapshots=snapshots, create_from=create_from, - force_no_diff=restate_models is not None - or (backfill_models is not None and not backfill_models), + force_no_diff=force_no_diff, ensure_finalized_snapshots=self.config.plan.use_finalized_state, diff_rendered=diff_rendered, always_recreate_environment=self.config.plan.always_recreate_environment, @@ -1644,6 +1654,14 @@ def plan_builder( elif forward_only is None: forward_only = self.config.plan.forward_only + # When handling prod restatements, only clear intervals from other model versions if we are using full virtual environments + # If we are not, then there is no point, because none of the data in dev environments can be promoted by definition + restate_all_snapshots = ( + expanded_restate_models is not None + and not is_dev + and self.config.virtual_environment_mode.is_full + ) + return self.PLAN_BUILDER_TYPE( context_diff=context_diff, start=start, @@ -1651,6 +1669,7 @@ def plan_builder( execution_time=execution_time, apply=self.apply, restate_models=expanded_restate_models, + restate_all_snapshots=restate_all_snapshots, backfill_models=backfill_models, no_gaps=no_gaps, skip_backfill=skip_backfill, diff --git a/sqlmesh/core/plan/builder.py b/sqlmesh/core/plan/builder.py index 79af460d1d..2eb4c54aeb 100644 --- a/sqlmesh/core/plan/builder.py +++ b/sqlmesh/core/plan/builder.py @@ -65,6 +65,9 @@ class PlanBuilder: restate_models: A list of models for which the data should be restated for the time range specified in this plan. Note: models defined outside SQLMesh (external) won't be a part of the restatement. + restate_all_snapshots: If restatements are present, this flag indicates whether or not the intervals + being restated should be cleared from state for other versions of this model (typically, versions that are present in other environments). + If set to None, the default behaviour is to not clear anything unless the target environment is prod. backfill_models: A list of fully qualified model names for which the data should be backfilled as part of this plan. no_gaps: Whether to ensure that new snapshots for nodes that are already a part of the target environment have no data gaps when compared against previous @@ -103,6 +106,7 @@ def __init__( execution_time: t.Optional[TimeLike] = None, apply: t.Optional[t.Callable[[Plan], None]] = None, restate_models: t.Optional[t.Iterable[str]] = None, + restate_all_snapshots: bool = False, backfill_models: t.Optional[t.Iterable[str]] = None, no_gaps: bool = False, skip_backfill: bool = False, @@ -154,6 +158,7 @@ def __init__( self._auto_categorization_enabled = auto_categorization_enabled self._include_unmodified = include_unmodified self._restate_models = set(restate_models) if restate_models is not None else None + self._restate_all_snapshots = restate_all_snapshots self._effective_from = effective_from # note: this deliberately doesnt default to now() here. @@ -277,7 +282,6 @@ def build(self) -> Plan: if self._latest_plan: return self._latest_plan - self._ensure_no_new_snapshots_with_restatements() self._ensure_new_env_with_changes() self._ensure_valid_date_range() self._ensure_no_broken_references() @@ -340,6 +344,7 @@ def build(self) -> Plan: deployability_index=deployability_index, selected_models_to_restate=self._restate_models, restatements=restatements, + restate_all_snapshots=self._restate_all_snapshots, start_override_per_model=self._start_override_per_model, end_override_per_model=end_override_per_model, selected_models_to_backfill=self._backfill_models, @@ -859,15 +864,6 @@ def _ensure_no_broken_references(self) -> None: f"""Removed {broken_references_msg} are referenced in '{snapshot.name}'. Please remove broken references before proceeding.""" ) - def _ensure_no_new_snapshots_with_restatements(self) -> None: - if self._restate_models is not None and ( - self._context_diff.new_snapshots or self._context_diff.modified_snapshots - ): - raise PlanError( - "Model changes and restatements can't be a part of the same plan. " - "Revert or apply changes before proceeding with restatements." - ) - def _ensure_new_env_with_changes(self) -> None: if ( self._is_dev diff --git a/sqlmesh/core/plan/definition.py b/sqlmesh/core/plan/definition.py index 5ed3e4b188..3ed260791a 100644 --- a/sqlmesh/core/plan/definition.py +++ b/sqlmesh/core/plan/definition.py @@ -67,6 +67,8 @@ class Plan(PydanticModel, frozen=True): Note that dev previews are also considered restatements, so :selected_models_to_restate can be empty while :restatements is still populated with dev previews """ + restate_all_snapshots: bool + """Whether or not to clear intervals from state for other versions of the models listed in :restatements""" start_override_per_model: t.Optional[t.Dict[str, datetime]] end_override_per_model: t.Optional[t.Dict[str, datetime]] @@ -268,6 +270,7 @@ def to_evaluatable(self) -> EvaluatablePlan: skip_backfill=self.skip_backfill, empty_backfill=self.empty_backfill, restatements={s.name: i for s, i in self.restatements.items()}, + restate_all_snapshots=self.restate_all_snapshots, is_dev=self.is_dev, allow_destructive_models=self.allow_destructive_models, allow_additive_models=self.allow_additive_models, @@ -312,6 +315,7 @@ class EvaluatablePlan(PydanticModel): skip_backfill: bool empty_backfill: bool restatements: t.Dict[str, Interval] + restate_all_snapshots: bool is_dev: bool allow_destructive_models: t.Set[str] allow_additive_models: t.Set[str] diff --git a/sqlmesh/core/plan/stages.py b/sqlmesh/core/plan/stages.py index 0d829a6739..0f900a9274 100644 --- a/sqlmesh/core/plan/stages.py +++ b/sqlmesh/core/plan/stages.py @@ -13,6 +13,7 @@ SnapshotTableInfo, SnapshotId, ) +from sqlmesh.utils.errors import PlanError @dataclass @@ -452,13 +453,18 @@ def _get_after_all_stage( def _get_restatement_stage( self, plan: EvaluatablePlan, snapshots_by_name: t.Dict[str, Snapshot] ) -> t.Optional[RestatementStage]: - if not plan.restatements or plan.is_dev: - # The RestatementStage to clear intervals from state across all environments is not needed for plans against dev, only prod - return None + if plan.restate_all_snapshots: + if plan.is_dev: + raise PlanError( + "Clearing intervals from state across dev model versions is only valid for prod plans" + ) - return RestatementStage( - all_snapshots=snapshots_by_name, - ) + if plan.restatements: + return RestatementStage( + all_snapshots=snapshots_by_name, + ) + + return None def _get_physical_layer_update_stage( self, diff --git a/sqlmesh_dbt/cli.py b/sqlmesh_dbt/cli.py index 370f115d61..fa75d303a1 100644 --- a/sqlmesh_dbt/cli.py +++ b/sqlmesh_dbt/cli.py @@ -90,7 +90,9 @@ def dbt( @click.option( "-f", "--full-refresh", - help="If specified, dbt will drop incremental models and fully-recalculate the incremental table from the model definition.", + is_flag=True, + default=False, + help="If specified, sqlmesh will drop incremental models and fully-recalculate the incremental table from the model definition.", ) @click.option( "--env", diff --git a/sqlmesh_dbt/operations.py b/sqlmesh_dbt/operations.py index ac7ad031f3..f95d0d931e 100644 --- a/sqlmesh_dbt/operations.py +++ b/sqlmesh_dbt/operations.py @@ -11,7 +11,7 @@ from sqlmesh.dbt.project import Project from sqlmesh_dbt.console import DbtCliConsole from sqlmesh.core.model import Model - from sqlmesh.core.plan import Plan + from sqlmesh.core.plan import Plan, PlanBuilder logger = logging.getLogger(__name__) @@ -42,8 +42,39 @@ def run( full_refresh: bool = False, empty: bool = False, ) -> Plan: - return self.context.plan( - **self._plan_options( + plan_builder = self._plan_builder( + environment=environment, + select=select, + exclude=exclude, + full_refresh=full_refresh, + empty=empty, + ) + + plan = plan_builder.build() + + self.console.plan( + plan_builder, + default_catalog=self.context.default_catalog, + # start doing work immediately (since no_diff is set, there isnt really anything for the user to say yes/no to) + auto_apply=True, + # dont output a diff of model changes + no_diff=True, + # don't throw up any prompts like "set the effective date" - use defaults + no_prompts=True, + ) + + return plan + + def _plan_builder( + self, + environment: t.Optional[str] = None, + select: t.Optional[t.List[str]] = None, + exclude: t.Optional[t.List[str]] = None, + full_refresh: bool = False, + empty: bool = False, + ) -> PlanBuilder: + return self.context.plan_builder( + **self._plan_builder_options( environment=environment, select=select, exclude=exclude, @@ -71,13 +102,15 @@ def _selected_models( return selected_models - def _plan_options( + def _plan_builder_options( self, - environment: t.Optional[str] = None, + # upstream dbt options select: t.Optional[t.List[str]] = None, exclude: t.Optional[t.List[str]] = None, empty: bool = False, full_refresh: bool = False, + # sqlmesh extra options + environment: t.Optional[str] = None, ) -> t.Dict[str, t.Any]: import sqlmesh.core.constants as c @@ -130,24 +163,38 @@ def _plan_options( # `dbt --empty` adds LIMIT 0 to the queries, resulting in empty tables. In addition, it happily clobbers existing tables regardless of if they are populated. # This *partially* lines up with --skip-backfill in SQLMesh, which indicates to not populate tables if they happened to be created/updated as part of this plan. # However, if a table already exists and has data in it, there is no change so SQLMesh will not recreate the table and thus it will not be cleared. - # So in order to fully replicate dbt's --empty, we also need --full-refresh semantics in order to replace existing tables + # Currently, SQLMesh has no way to say "restate with empty data", because --restate-model coupled with --skip-backfill ends up being a no-op options["skip_backfill"] = True - full_refresh = True + + self.console.log_warning( + "dbt's `--empty` drops the tables for all selected models and replaces them with empty ones.\n" + "This can easily result in accidental data loss, so SQLMesh limits this to only new or modified models and leaves the tables for existing unmodified models alone.\n\n" + "If you were creating empty tables to preview model changes, please consider using `--environment` to preview these changes in an isolated Virtual Data Environment instead.\n\n" + "Otherwise, if you really do want dbt's `--empty` behaviour of clearing every selected table, please file an issue on GitHub so we can better understand the use-case.\n" + ) + + if full_refresh: + # --full-refresh is implemented in terms of "add every model as a restatement" + # however, `--empty` sets skip_backfill=True, which causes the BackfillStage of the plan to be skipped. + # the re-processing of data intervals happens in the BackfillStage, so if it gets skipped, restatements become a no-op + raise ValueError("`--full-refresh` alongside `--empty` is not currently supported.") if full_refresh: - # TODO: handling this requires some updates in the engine to enable restatements+changes in the same plan without affecting prod - # if the plan targets dev - pass + options.update( + dict( + # Add every selected model as a restatement to force them to get repopulated from scratch + restate_models=list(self.context.models) + if not select_models + else select_models, + # by default in SQLMesh, restatements only operate on what has been committed to state. + # in order to emulate dbt, we need to use the local filesystem instead, so we override this default + always_include_local_changes=True, + ) + ) return dict( environment=environment, select_models=select_models, - # dont output a diff of model changes - no_diff=True, - # don't throw up any prompts like "set the effective date" - use defaults - no_prompts=True, - # start doing work immediately (since no_diff is set, there isnt really anything for the user to say yes/no to) - auto_apply=True, **options, ) diff --git a/tests/core/test_plan.py b/tests/core/test_plan.py index c9c19376d9..4f6b99a4ee 100644 --- a/tests/core/test_plan.py +++ b/tests/core/test_plan.py @@ -826,6 +826,7 @@ def test_missing_intervals_lookback(make_snapshot, mocker: MockerFixture): indirectly_modified={}, deployability_index=DeployabilityIndex.all_deployable(), restatements={}, + restate_all_snapshots=False, end_bounded=False, ensure_finalized_snapshots=False, start_override_per_model=None, @@ -1074,36 +1075,6 @@ def test_restate_missing_model(make_snapshot, mocker: MockerFixture): PlanBuilder(context_diff, restate_models=["missing"]).build() -def test_new_snapshots_with_restatements(make_snapshot, mocker: MockerFixture): - snapshot_a = make_snapshot(SqlModel(name="a", query=parse_one("select 1, ds"))) - - context_diff = ContextDiff( - environment="test_environment", - is_new_environment=True, - is_unfinalized_environment=False, - normalize_environment_name=True, - create_from="prod", - create_from_env_exists=True, - added=set(), - removed_snapshots={}, - modified_snapshots={}, - snapshots={snapshot_a.snapshot_id: snapshot_a}, - new_snapshots={snapshot_a.snapshot_id: snapshot_a}, - previous_plan_id=None, - previously_promoted_snapshot_ids=set(), - previous_finalized_snapshots=None, - previous_gateway_managed_virtual_layer=False, - gateway_managed_virtual_layer=False, - environment_statements=[], - ) - - with pytest.raises( - PlanError, - match=r"Model changes and restatements can't be a part of the same plan.*", - ): - PlanBuilder(context_diff, restate_models=["a"]).build() - - def test_end_validation(make_snapshot, mocker: MockerFixture): snapshot_a = make_snapshot( SqlModel( diff --git a/tests/core/test_plan_stages.py b/tests/core/test_plan_stages.py index 444ce1bb9b..832731352d 100644 --- a/tests/core/test_plan_stages.py +++ b/tests/core/test_plan_stages.py @@ -106,6 +106,7 @@ def test_build_plan_stages_basic( skip_backfill=False, empty_backfill=False, restatements={}, + restate_all_snapshots=False, is_dev=False, allow_destructive_models=set(), allow_additive_models=set(), @@ -215,6 +216,7 @@ def test_build_plan_stages_with_before_all_and_after_all( skip_backfill=False, empty_backfill=False, restatements={}, + restate_all_snapshots=False, is_dev=False, allow_destructive_models=set(), allow_additive_models=set(), @@ -325,6 +327,7 @@ def test_build_plan_stages_select_models( skip_backfill=False, empty_backfill=False, restatements={}, + restate_all_snapshots=False, is_dev=False, allow_destructive_models=set(), allow_additive_models=set(), @@ -426,6 +429,7 @@ def test_build_plan_stages_basic_no_backfill( skip_backfill=skip_backfill, empty_backfill=empty_backfill, restatements={}, + restate_all_snapshots=False, is_dev=False, allow_destructive_models=set(), allow_additive_models=set(), @@ -562,6 +566,7 @@ def test_build_plan_stages_restatement_prod_only( '"a"': (to_timestamp("2023-01-01"), to_timestamp("2023-01-02")), '"b"': (to_timestamp("2023-01-01"), to_timestamp("2023-01-02")), }, + restate_all_snapshots=True, is_dev=False, allow_destructive_models=set(), allow_additive_models=set(), @@ -718,6 +723,7 @@ def _get_snapshots(snapshot_ids: t.Iterable[SnapshotIdLike]): '"a"': (to_timestamp("2023-01-01"), to_timestamp("2023-01-02")), '"b"': (to_timestamp("2023-01-01"), to_timestamp("2023-01-02")), }, + restate_all_snapshots=True, is_dev=False, allow_destructive_models=set(), allow_additive_models=set(), @@ -881,6 +887,7 @@ def test_build_plan_stages_restatement_dev_does_not_clear_intervals( restatements={ '"a"': (to_timestamp("2023-01-01"), to_timestamp("2023-01-02")), }, + restate_all_snapshots=False, is_dev=True, allow_destructive_models=set(), allow_additive_models=set(), @@ -988,6 +995,7 @@ def test_build_plan_stages_forward_only( skip_backfill=False, empty_backfill=False, restatements={}, + restate_all_snapshots=False, is_dev=False, allow_destructive_models=set(), allow_additive_models=set(), @@ -1116,6 +1124,7 @@ def test_build_plan_stages_forward_only_dev( skip_backfill=False, empty_backfill=False, restatements={}, + restate_all_snapshots=False, is_dev=True, allow_destructive_models=set(), allow_additive_models=set(), @@ -1241,6 +1250,7 @@ def _get_snapshots(snapshot_ids: t.List[SnapshotId]) -> t.Dict[SnapshotId, Snaps skip_backfill=False, empty_backfill=False, restatements={}, + restate_all_snapshots=False, is_dev=True, allow_destructive_models=set(), allow_additive_models=set(), @@ -1378,6 +1388,7 @@ def test_build_plan_stages_forward_only_ensure_finalized_snapshots( skip_backfill=False, empty_backfill=False, restatements={}, + restate_all_snapshots=False, is_dev=False, allow_destructive_models=set(), allow_additive_models=set(), @@ -1454,6 +1465,7 @@ def test_build_plan_stages_removed_model( skip_backfill=False, empty_backfill=False, restatements={}, + restate_all_snapshots=False, is_dev=False, allow_destructive_models=set(), allow_additive_models=set(), @@ -1537,6 +1549,7 @@ def test_build_plan_stages_environment_suffix_target_changed( skip_backfill=False, empty_backfill=False, restatements={}, + restate_all_snapshots=False, is_dev=True, allow_destructive_models=set(), allow_additive_models=set(), @@ -1636,6 +1649,7 @@ def test_build_plan_stages_indirect_non_breaking_view_migration( skip_backfill=False, empty_backfill=False, restatements={}, + restate_all_snapshots=False, is_dev=False, allow_destructive_models=set(), allow_additive_models=set(), @@ -1722,6 +1736,7 @@ def test_build_plan_stages_virtual_environment_mode_filtering( skip_backfill=False, empty_backfill=False, restatements={}, + restate_all_snapshots=False, is_dev=True, allow_destructive_models=set(), allow_additive_models=set(), @@ -1775,6 +1790,7 @@ def test_build_plan_stages_virtual_environment_mode_filtering( skip_backfill=False, empty_backfill=False, restatements={}, + restate_all_snapshots=False, is_dev=False, allow_destructive_models=set(), allow_additive_models=set(), @@ -1838,6 +1854,7 @@ def test_build_plan_stages_virtual_environment_mode_filtering( skip_backfill=False, empty_backfill=False, restatements={}, + restate_all_snapshots=False, is_dev=False, allow_destructive_models=set(), allow_additive_models=set(), @@ -1912,6 +1929,7 @@ def test_build_plan_stages_virtual_environment_mode_no_updates( skip_backfill=False, empty_backfill=False, restatements={}, + restate_all_snapshots=False, is_dev=False, allow_destructive_models=set(), allow_additive_models=set(), @@ -1976,6 +1994,7 @@ def test_adjust_intervals_new_forward_only_dev_intervals( skip_backfill=False, empty_backfill=False, restatements={}, + restate_all_snapshots=False, is_dev=True, # Dev environment allow_destructive_models=set(), allow_additive_models=set(), @@ -2045,6 +2064,7 @@ def test_adjust_intervals_restatement_removal( skip_backfill=False, empty_backfill=False, restatements=restatements, + restate_all_snapshots=True, is_dev=False, allow_destructive_models=set(), allow_additive_models=set(), @@ -2136,6 +2156,7 @@ def test_adjust_intervals_should_force_rebuild(make_snapshot, mocker: MockerFixt skip_backfill=False, empty_backfill=False, restatements={}, + restate_all_snapshots=False, is_dev=False, allow_destructive_models=set(), allow_additive_models=set(), diff --git a/tests/dbt/cli/conftest.py b/tests/dbt/cli/conftest.py index dfad2f0046..e555f9144a 100644 --- a/tests/dbt/cli/conftest.py +++ b/tests/dbt/cli/conftest.py @@ -3,7 +3,15 @@ import os import functools from click.testing import CliRunner, Result +from sqlmesh_dbt.operations import init_project_if_required import pytest +import uuid + + +class EmptyProjectCreator(t.Protocol): + def __call__( + self, project_name: t.Optional[str] = None, target_name: t.Optional[str] = None + ) -> Path: ... @pytest.fixture @@ -14,7 +22,7 @@ def jaffle_shop_duckdb(copy_to_temp_path: t.Callable[..., t.List[Path]]) -> t.It current_path = os.getcwd() output_path = copy_to_temp_path(paths=fixture_path)[0] - # so that we can invoke commands from the perspective of a user that is alrady in the correct directory + # so that we can invoke commands from the perspective of a user that is already in the correct directory os.chdir(output_path) yield output_path @@ -22,6 +30,55 @@ def jaffle_shop_duckdb(copy_to_temp_path: t.Callable[..., t.List[Path]]) -> t.It os.chdir(current_path) +@pytest.fixture +def create_empty_project( + copy_to_temp_path: t.Callable[..., t.List[Path]], +) -> t.Iterable[t.Callable[..., Path]]: + default_project_name = f"test_{str(uuid.uuid4())[:8]}" + default_target_name = "duckdb" + fixture_path = Path(__file__).parent / "fixtures" / "empty_project" + assert fixture_path.exists() + + current_path = os.getcwd() + + def _create_empty_project( + project_name: t.Optional[str] = None, target_name: t.Optional[str] = None + ) -> Path: + project_name = project_name or default_project_name + target_name = target_name or default_target_name + output_path = copy_to_temp_path(paths=fixture_path)[0] + + dbt_project_yml = output_path / "dbt_project.yml" + profiles_yml = output_path / "profiles.yml" + + assert dbt_project_yml.exists() + assert profiles_yml.exists() + + (output_path / "models").mkdir() + (output_path / "seeds").mkdir() + + dbt_project_yml.write_text( + dbt_project_yml.read_text().replace("empty_project", project_name) + ) + profiles_yml.write_text( + profiles_yml.read_text() + .replace("empty_project", project_name) + .replace("__DEFAULT_TARGET__", target_name) + ) + + init_project_if_required(output_path) + + # so that we can invoke commands from the perspective of a user that is already in the correct directory + os.chdir(output_path) + + return output_path + + yield _create_empty_project + + # cleanup - switch cwd back to original + os.chdir(current_path) + + @pytest.fixture def invoke_cli() -> t.Callable[..., Result]: from sqlmesh_dbt.cli import dbt diff --git a/tests/dbt/cli/fixtures/empty_project/dbt_project.yml b/tests/dbt/cli/fixtures/empty_project/dbt_project.yml new file mode 100644 index 0000000000..beceadcd33 --- /dev/null +++ b/tests/dbt/cli/fixtures/empty_project/dbt_project.yml @@ -0,0 +1,18 @@ +name: 'empty_project' + +config-version: 2 +version: '0.1' + +profile: 'empty_project' + +model-paths: ["models"] +seed-paths: ["seeds"] +test-paths: ["tests"] +analysis-paths: ["analysis"] +macro-paths: ["macros"] + +target-path: "target" +clean-targets: + - "target" + - "dbt_modules" + - "logs" \ No newline at end of file diff --git a/tests/dbt/cli/fixtures/empty_project/profiles.yml b/tests/dbt/cli/fixtures/empty_project/profiles.yml new file mode 100644 index 0000000000..a4f9836b7e --- /dev/null +++ b/tests/dbt/cli/fixtures/empty_project/profiles.yml @@ -0,0 +1,9 @@ +empty_project: + + target: __DEFAULT_TARGET__ + + outputs: + duckdb: + type: duckdb + path: 'empty_project.duckdb' + threads: 4 diff --git a/tests/dbt/cli/test_operations.py b/tests/dbt/cli/test_operations.py index f8ce239d3b..e9c4dc0063 100644 --- a/tests/dbt/cli/test_operations.py +++ b/tests/dbt/cli/test_operations.py @@ -2,17 +2,18 @@ from pathlib import Path import pytest from sqlmesh_dbt.operations import create +from sqlmesh_dbt.console import DbtCliConsole from sqlmesh.utils import yaml from sqlmesh.utils.errors import SQLMeshError import time_machine -from sqlmesh.core.console import NoopConsole from sqlmesh.core.plan import PlanBuilder from sqlmesh.core.config.common import VirtualEnvironmentMode +from tests.dbt.cli.conftest import EmptyProjectCreator pytestmark = pytest.mark.slow -class PlanCapturingConsole(NoopConsole): +class PlanCapturingConsole(DbtCliConsole): def plan( self, plan_builder: PlanBuilder, @@ -257,3 +258,78 @@ def test_run_option_mapping_dev(jaffle_shop_duckdb: Path): '"jaffle_shop"."main"."orders"', '"jaffle_shop"."main"."stg_orders"', } + + +@pytest.mark.parametrize( + "env_name,vde_mode", + [ + ("prod", VirtualEnvironmentMode.DEV_ONLY), + ("prod", VirtualEnvironmentMode.FULL), + ("dev", VirtualEnvironmentMode.DEV_ONLY), + ("dev", VirtualEnvironmentMode.FULL), + ], +) +def test_run_option_full_refresh( + create_empty_project: EmptyProjectCreator, env_name: str, vde_mode: VirtualEnvironmentMode +): + # create config file prior to load + project_path = create_empty_project(project_name="test") + + config_path = project_path / "sqlmesh.yaml" + config = yaml.load(config_path) + config["virtual_environment_mode"] = vde_mode.value + + with config_path.open("w") as f: + yaml.dump(config, f) + + (project_path / "models" / "model_a.sql").write_text("select 1") + (project_path / "models" / "model_b.sql").write_text("select 2") + + operations = create(project_dir=project_path) + + assert operations.context.config.virtual_environment_mode == vde_mode + + console = PlanCapturingConsole() + operations.context.console = console + + plan = operations.run(environment=env_name, full_refresh=True) + + # both models added as backfills + restatements regardless of env / vde mode setting + assert plan.environment.name == env_name + assert len(plan.restatements) == 2 + assert list(plan.restatements)[0].name == '"test"."main"."model_a"' + assert list(plan.restatements)[1].name == '"test"."main"."model_b"' + + assert plan.requires_backfill + assert not plan.empty_backfill + assert not plan.skip_backfill + assert plan.models_to_backfill == set(['"test"."main"."model_a"', '"test"."main"."model_b"']) + + if vde_mode == VirtualEnvironmentMode.DEV_ONLY: + # We do not clear intervals across all model versions in the default DEV_ONLY mode, even when targeting prod, + # because dev data is hardcoded to preview only so by definition and can never be deployed + assert not plan.restate_all_snapshots + else: + if env_name == "prod": + # in FULL mode, we do it for prod + assert plan.restate_all_snapshots + else: + # but not dev + assert not plan.restate_all_snapshots + + +def test_run_option_full_refresh_with_selector(jaffle_shop_duckdb: Path): + operations = create(project_dir=jaffle_shop_duckdb) + assert len(operations.context.models) > 5 + + console = PlanCapturingConsole() + operations.context.console = console + + plan = operations.run(select=["main.stg_customers"], full_refresh=True) + assert len(plan.restatements) == 1 + assert list(plan.restatements)[0].name == '"jaffle_shop"."main"."stg_customers"' + + assert plan.requires_backfill + assert not plan.empty_backfill + assert not plan.skip_backfill + assert plan.models_to_backfill == set(['"jaffle_shop"."main"."stg_customers"']) diff --git a/tests/dbt/cli/test_run.py b/tests/dbt/cli/test_run.py index 4d80514fc8..9af1de8561 100644 --- a/tests/dbt/cli/test_run.py +++ b/tests/dbt/cli/test_run.py @@ -3,7 +3,9 @@ from pathlib import Path from click.testing import Result import time_machine +from sqlmesh_dbt.operations import create from tests.cli.test_cli import FREEZE_TIME +from tests.dbt.cli.conftest import EmptyProjectCreator pytestmark = pytest.mark.slow @@ -38,3 +40,40 @@ def test_run_with_selectors(jaffle_shop_duckdb: Path, invoke_cli: t.Callable[... assert "main.orders" not in result.output assert "Model batches executed" in result.output + + +def test_run_with_changes_and_full_refresh( + create_empty_project: EmptyProjectCreator, invoke_cli: t.Callable[..., Result] +): + project_path = create_empty_project(project_name="test") + + engine_adapter = create(project_path).context.engine_adapter + engine_adapter.execute("create table external_table as select 'foo' as a, 'bar' as b") + + (project_path / "models" / "model_a.sql").write_text("select a, b from external_table") + (project_path / "models" / "model_b.sql").write_text("select a, b from {{ ref('model_a') }}") + + # populate initial env + result = invoke_cli(["run"]) + assert result.exit_code == 0 + assert not result.exception + + assert engine_adapter.fetchall("select a, b from model_b") == [("foo", "bar")] + + engine_adapter.execute("insert into external_table (a, b) values ('baz', 'bing')") + (project_path / "models" / "model_b.sql").write_text( + "select a, b, 'changed' as c from {{ ref('model_a') }}" + ) + + # run with --full-refresh. this should: + # - fully refresh model_a (pick up the new records from external_table) + # - deploy the local change to model_b (introducing the 'changed' column) + result = invoke_cli(["run", "--full-refresh"]) + assert result.exit_code == 0 + assert not result.exception + + assert engine_adapter.fetchall("select a, b from model_a") == [("foo", "bar"), ("baz", "bing")] + assert engine_adapter.fetchall("select a, b, c from model_b") == [ + ("foo", "bar", "changed"), + ("baz", "bing", "changed"), + ]