diff --git a/sqlmesh/cli/project_init.py b/sqlmesh/cli/project_init.py index 87d77da4b9..81ff534dc4 100644 --- a/sqlmesh/cli/project_init.py +++ b/sqlmesh/cli/project_init.py @@ -121,6 +121,13 @@ def _gen_config( # https://sqlmesh.readthedocs.io/en/stable/guides/configuration/#virtual-data-environment-modes virtual_environment_mode: {VirtualEnvironmentMode.DEV_ONLY.lower()} +# --- Plan Defaults --- +# https://sqlmesh.readthedocs.io/en/stable/reference/configuration/#plan +plan: + # For Virtual Data Environments, this ensures that any changes are always considered against prod, + # rather than the previous state of that environment + always_recreate_environment: True + # --- Model Defaults --- # https://sqlmesh.readthedocs.io/en/stable/reference/model_configuration/#model-defaults model_defaults: diff --git a/sqlmesh/core/context.py b/sqlmesh/core/context.py index 1a5375183c..4c1ffb1e92 100644 --- a/sqlmesh/core/context.py +++ b/sqlmesh/core/context.py @@ -1518,8 +1518,11 @@ def plan_builder( include_unmodified = self.config.plan.include_unmodified if skip_backfill and not no_gaps and not is_dev: - raise ConfigError( - "When targeting the production environment either the backfill should not be skipped or the lack of data gaps should be enforced (--no-gaps flag)." + # note: we deliberately don't mention the --no-gaps flag in case the plan came from the sqlmesh_dbt command + # todo: perhaps we could have better error messages if we check sys.argv[0] for which cli is running? + self.console.log_warning( + "Skipping the backfill stage for production can lead to unexpected results, such as tables being empty or incremental data with non-contiguous time ranges being made available.\n" + "If you are doing this deliberately to create an empty version of a table to test a change, please consider using Virtual Data Environments instead." ) if not skip_linter: diff --git a/sqlmesh_dbt/cli.py b/sqlmesh_dbt/cli.py index c215663f0a..370f115d61 100644 --- a/sqlmesh_dbt/cli.py +++ b/sqlmesh_dbt/cli.py @@ -92,11 +92,24 @@ def dbt( "--full-refresh", help="If specified, dbt will drop incremental models and fully-recalculate the incremental table from the model definition.", ) +@click.option( + "--env", + "--environment", + help="Run against a specific Virtual Data Environment (VDE) instead of the main environment", +) +@click.option( + "--empty/--no-empty", default=False, help="If specified, limit input refs and sources" +) @vars_option @click.pass_context -def run(ctx: click.Context, vars: t.Optional[t.Dict[str, t.Any]], **kwargs: t.Any) -> None: +def run( + ctx: click.Context, + vars: t.Optional[t.Dict[str, t.Any]], + env: t.Optional[str] = None, + **kwargs: t.Any, +) -> None: """Compile SQL and execute against the current target database.""" - _get_dbt_operations(ctx, vars).run(**kwargs) + _get_dbt_operations(ctx, vars).run(environment=env, **kwargs) @dbt.command(name="list") diff --git a/sqlmesh_dbt/operations.py b/sqlmesh_dbt/operations.py index 270bba6511..ac7ad031f3 100644 --- a/sqlmesh_dbt/operations.py +++ b/sqlmesh_dbt/operations.py @@ -11,6 +11,7 @@ from sqlmesh.dbt.project import Project from sqlmesh_dbt.console import DbtCliConsole from sqlmesh.core.model import Model + from sqlmesh.core.plan import Plan logger = logging.getLogger(__name__) @@ -35,21 +36,20 @@ def list_( def run( self, + environment: t.Optional[str] = None, select: t.Optional[t.List[str]] = None, exclude: t.Optional[t.List[str]] = None, full_refresh: bool = False, - ) -> None: - select_models = None - - if sqlmesh_selector := selectors.to_sqlmesh(select or [], exclude or []): - select_models = [sqlmesh_selector] - - self.context.plan( - select_models=select_models, - run=True, - no_diff=True, - no_prompts=True, - auto_apply=True, + empty: bool = False, + ) -> Plan: + return self.context.plan( + **self._plan_options( + environment=environment, + select=select, + exclude=exclude, + full_refresh=full_refresh, + empty=empty, + ) ) def _selected_models( @@ -71,6 +71,86 @@ def _selected_models( return selected_models + def _plan_options( + self, + environment: t.Optional[str] = None, + select: t.Optional[t.List[str]] = None, + exclude: t.Optional[t.List[str]] = None, + empty: bool = False, + full_refresh: bool = False, + ) -> t.Dict[str, t.Any]: + import sqlmesh.core.constants as c + + # convert --select and --exclude to a selector expression for the SQLMesh selector engine + select_models = None + if sqlmesh_selector := selectors.to_sqlmesh(select or [], exclude or []): + select_models = [sqlmesh_selector] + + is_dev = environment and environment != c.PROD + is_prod = not is_dev + + options: t.Dict[str, t.Any] = {} + + if is_prod or (is_dev and select_models): + # prod plans should "catch up" before applying the changes so that after the command finishes prod is the latest it can be + # dev plans *with* selectors should do the same as the user is saying "specifically update these models to the latest" + # dev plans *without* selectors should just have the defaults of never exceeding prod as the user is saying "just create this env" without focusing on any specific models + options.update( + dict( + # always catch the data up to latest rather than only operating on what has been loaded before + run=True, + # don't taking cron schedules into account when deciding what models to run, do everything even if it just ran + ignore_cron=True, + ) + ) + + if is_dev: + options.update( + dict( + # don't create views for all of prod in the dev environment + include_unmodified=False, + # always plan from scratch against prod. note that this is coupled with the `always_recreate_environment=True` setting in the default config file. + # the result is that rather than planning against the previous state of an existing dev environment, the full scope of changes vs prod are always shown + create_from=c.PROD, + # Always enable dev previews for incremental / forward-only models. + # Due to how DBT does incrementals (INCREMENTAL_UNMANAGED on the SQLMesh engine), this will result in the full model being refreshed + # with the entire dataset, which can potentially be very large. If this is undesirable, users have two options: + # - work around this using jinja to conditionally add extra filters to the WHERE clause or a LIMIT to the model query + # - upgrade to SQLMesh's incremental models, where we have variables for the start/end date and inject leak guards to + # limit the amount of data backfilled + # + # Note: enable_preview=True is *different* behaviour to the `sqlmesh` CLI, which uses enable_preview=None. + # This means the `sqlmesh` CLI will only enable dev previews for dbt projects if the target adapter supports cloning, + # whereas we enable it unconditionally here + enable_preview=True, + ) + ) + + if empty: + # `dbt --empty` adds LIMIT 0 to the queries, resulting in empty tables. In addition, it happily clobbers existing tables regardless of if they are populated. + # This *partially* lines up with --skip-backfill in SQLMesh, which indicates to not populate tables if they happened to be created/updated as part of this plan. + # However, if a table already exists and has data in it, there is no change so SQLMesh will not recreate the table and thus it will not be cleared. + # So in order to fully replicate dbt's --empty, we also need --full-refresh semantics in order to replace existing tables + options["skip_backfill"] = True + full_refresh = True + + if full_refresh: + # TODO: handling this requires some updates in the engine to enable restatements+changes in the same plan without affecting prod + # if the plan targets dev + pass + + return dict( + environment=environment, + select_models=select_models, + # dont output a diff of model changes + no_diff=True, + # don't throw up any prompts like "set the effective date" - use defaults + no_prompts=True, + # start doing work immediately (since no_diff is set, there isnt really anything for the user to say yes/no to) + auto_apply=True, + **options, + ) + @property def console(self) -> DbtCliConsole: console = self.context.console @@ -103,6 +183,12 @@ def create( from sqlmesh_dbt.console import DbtCliConsole from sqlmesh.utils.errors import SQLMeshError + # clear any existing handlers set up by click/rich as defaults so that once SQLMesh logging config is applied, + # we dont get duplicate messages logged from things like console.log_warning() + root_logger = logging.getLogger() + while root_logger.hasHandlers(): + root_logger.removeHandler(root_logger.handlers[0]) + configure_logging(force_debug=debug) set_console(DbtCliConsole()) diff --git a/tests/cli/test_cli.py b/tests/cli/test_cli.py index 433e2165d8..ef5b80e151 100644 --- a/tests/cli/test_cli.py +++ b/tests/cli/test_cli.py @@ -260,10 +260,7 @@ def test_plan_skip_backfill(runner, tmp_path, flag): # plan for `prod` errors if `--skip-backfill` is passed without --no-gaps result = runner.invoke(cli, ["--log-file-dir", tmp_path, "--paths", tmp_path, "plan", flag]) assert result.exit_code == 1 - assert ( - "Error: When targeting the production environment either the backfill should not be skipped or the lack of data gaps should be enforced (--no-gaps flag)." - in result.output - ) + assert "Skipping the backfill stage for production can lead to unexpected" in result.output # plan executes virtual update without executing model batches # Input: `y` to perform virtual update diff --git a/tests/dbt/cli/test_operations.py b/tests/dbt/cli/test_operations.py index 9b5b3113b3..15051542c6 100644 --- a/tests/dbt/cli/test_operations.py +++ b/tests/dbt/cli/test_operations.py @@ -1,13 +1,36 @@ +import typing as t from pathlib import Path import pytest from sqlmesh_dbt.operations import create from sqlmesh.utils import yaml from sqlmesh.utils.errors import SQLMeshError import time_machine +from sqlmesh.core.console import NoopConsole +from sqlmesh.core.plan import PlanBuilder +from sqlmesh.core.config.common import VirtualEnvironmentMode pytestmark = pytest.mark.slow +class PlanCapturingConsole(NoopConsole): + def plan( + self, + plan_builder: PlanBuilder, + auto_apply: bool, + default_catalog: t.Optional[str], + no_diff: bool = False, + no_prompts: bool = False, + ) -> None: + self.plan_builder = plan_builder + self.auto_apply = auto_apply + self.default_catalog = default_catalog + self.no_diff = no_diff + self.no_prompts = no_prompts + + # normal console starts applying the plan here; we dont because we just want to capture the parameters + # and check they were set correctly + + def test_create_sets_and_persists_default_start_date(jaffle_shop_duckdb: Path): with time_machine.travel("2020-01-02 00:00:00 UTC"): from sqlmesh.utils.date import yesterday_ds, to_ds @@ -71,6 +94,18 @@ def test_create_can_specify_profile_and_target(jaffle_shop_duckdb: Path): assert dbt_project.context.target_name == "dev" +def test_default_options(jaffle_shop_duckdb: Path): + operations = create() + + config = operations.context.config + dbt_project = operations.project + + assert config.plan.always_recreate_environment is True + assert config.virtual_environment_mode == VirtualEnvironmentMode.DEV_ONLY + assert config.model_defaults.start is not None + assert config.model_defaults.dialect == dbt_project.context.target.dialect + + def test_create_can_set_project_variables(jaffle_shop_duckdb: Path): (jaffle_shop_duckdb / "models" / "test_model.sql").write_text(""" select '{{ var('foo') }}' as a @@ -83,3 +118,135 @@ def test_create_can_set_project_variables(jaffle_shop_duckdb: Path): query = test_model.render_query() assert query is not None assert query.sql() == "SELECT 'bar' AS \"a\"" + + +def test_run_option_mapping(jaffle_shop_duckdb: Path): + operations = create(project_dir=jaffle_shop_duckdb) + console = PlanCapturingConsole() + operations.context.console = console + + plan = operations.run() + assert plan.environment.name == "prod" + assert console.no_prompts is True + assert console.no_diff is True + assert console.auto_apply is True + assert plan.end_bounded is False + assert plan.ignore_cron is True + assert plan.skip_backfill is False + assert plan.selected_models_to_backfill is None + assert {s.name for s in plan.snapshots} == {k for k in operations.context.snapshots} + + plan = operations.run(select=["main.stg_orders+"]) + assert plan.environment.name == "prod" + assert console.no_prompts is True + assert console.no_diff is True + assert console.auto_apply is True + assert plan.end_bounded is False + assert plan.ignore_cron is True + assert plan.skip_backfill is False + assert plan.selected_models_to_backfill == { + '"jaffle_shop"."main"."customers"', + '"jaffle_shop"."main"."orders"', + '"jaffle_shop"."main"."stg_orders"', + } + assert {s.name for s in plan.snapshots} == plan.selected_models_to_backfill + + plan = operations.run(select=["main.stg_orders+"], exclude=["main.customers"]) + assert plan.environment.name == "prod" + assert console.no_prompts is True + assert console.no_diff is True + assert console.auto_apply is True + assert plan.end_bounded is False + assert plan.ignore_cron is True + assert plan.skip_backfill is False + assert plan.selected_models_to_backfill == { + '"jaffle_shop"."main"."orders"', + '"jaffle_shop"."main"."stg_orders"', + } + assert {s.name for s in plan.snapshots} == plan.selected_models_to_backfill + + plan = operations.run(exclude=["main.customers"]) + assert plan.environment.name == "prod" + assert console.no_prompts is True + assert console.no_diff is True + assert console.auto_apply is True + assert plan.end_bounded is False + assert plan.ignore_cron is True + assert plan.skip_backfill is False + assert plan.selected_models_to_backfill == {k for k in operations.context.snapshots} - { + '"jaffle_shop"."main"."customers"' + } + assert {s.name for s in plan.snapshots} == plan.selected_models_to_backfill + + plan = operations.run(empty=True) + assert plan.environment.name == "prod" + assert console.no_prompts is True + assert console.no_diff is True + assert console.auto_apply is True + assert plan.end_bounded is False + assert plan.ignore_cron is True + assert plan.skip_backfill is True + assert plan.selected_models_to_backfill is None + assert {s.name for s in plan.snapshots} == {k for k in operations.context.snapshots} + + +def test_run_option_mapping_dev(jaffle_shop_duckdb: Path): + # create prod so that dev has something to compare against + operations = create(project_dir=jaffle_shop_duckdb) + operations.run() + + (jaffle_shop_duckdb / "models" / "new_model.sql").write_text("select 1") + + operations = create(project_dir=jaffle_shop_duckdb) + + console = PlanCapturingConsole() + operations.context.console = console + + plan = operations.run(environment="dev") + assert plan.environment.name == "dev" + assert console.no_prompts is True + assert console.no_diff is True + assert console.auto_apply is True + assert plan.include_unmodified is False + assert plan.context_diff.create_from == "prod" + assert plan.context_diff.is_new_environment is True + assert console.plan_builder._enable_preview is True + assert plan.end_bounded is True + assert plan.ignore_cron is False + assert plan.skip_backfill is False + assert plan.selected_models_to_backfill == {'"jaffle_shop"."main"."new_model"'} + + plan = operations.run(environment="dev", empty=True) + assert plan.environment.name == "dev" + assert console.no_prompts is True + assert console.no_diff is True + assert console.auto_apply is True + assert plan.include_unmodified is False + assert plan.context_diff.create_from == "prod" + assert plan.context_diff.is_new_environment is True + assert console.plan_builder._enable_preview is True + assert plan.end_bounded is True + assert plan.ignore_cron is False + assert plan.skip_backfill is True + assert plan.selected_models_to_backfill == {'"jaffle_shop"."main"."new_model"'} + + plan = operations.run(environment="dev", select=["main.stg_orders+"]) + assert plan.environment.name == "dev" + assert console.no_prompts is True + assert console.no_diff is True + assert console.auto_apply is True + assert plan.include_unmodified is False + assert plan.context_diff.create_from == "prod" + assert plan.context_diff.is_new_environment is True + assert console.plan_builder._enable_preview is True + # dev plans with --select have run=True, ignore_cron=True set + # as opposed to dev plans that dont have a specific selector + assert plan.end_bounded is False + assert plan.ignore_cron is True + assert plan.skip_backfill is False + # note: the new model in the dev environment is ignored in favour of the explicitly selected ones + assert plan.selected_models_to_backfill == { + '"jaffle_shop"."main"."customers"', + '"jaffle_shop"."main"."orders"', + '"jaffle_shop"."main"."stg_orders"', + }