Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions sqlmesh/cli/project_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,13 @@ def _gen_config(
# https://sqlmesh.readthedocs.io/en/stable/guides/configuration/#virtual-data-environment-modes
virtual_environment_mode: {VirtualEnvironmentMode.DEV_ONLY.lower()}

# --- Plan Defaults ---
# https://sqlmesh.readthedocs.io/en/stable/reference/configuration/#plan
plan:
# For Virtual Data Environments, this ensures that any changes are always considered against prod,
# rather than the previous state of that environment
always_recreate_environment: True

# --- Model Defaults ---
# https://sqlmesh.readthedocs.io/en/stable/reference/model_configuration/#model-defaults
model_defaults:
Expand Down
7 changes: 5 additions & 2 deletions sqlmesh/core/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -1518,8 +1518,11 @@ def plan_builder(
include_unmodified = self.config.plan.include_unmodified

if skip_backfill and not no_gaps and not is_dev:
raise ConfigError(
"When targeting the production environment either the backfill should not be skipped or the lack of data gaps should be enforced (--no-gaps flag)."
# note: we deliberately don't mention the --no-gaps flag in case the plan came from the sqlmesh_dbt command
# todo: perhaps we could have better error messages if we check sys.argv[0] for which cli is running?
self.console.log_warning(
"Skipping the backfill stage for production can lead to unexpected results, such as tables being empty or incremental data with non-contiguous time ranges being made available.\n"
"If you are doing this deliberately to create an empty version of a table to test a change, please consider using Virtual Data Environments instead."
)

if not skip_linter:
Expand Down
17 changes: 15 additions & 2 deletions sqlmesh_dbt/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,24 @@ def dbt(
"--full-refresh",
help="If specified, dbt will drop incremental models and fully-recalculate the incremental table from the model definition.",
)
@click.option(
"--env",
"--environment",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we add --env shortcut? or maybe make it an argument instead of an option? like we do for a plan? feel intuitive. Plus it doesn't look like dbt run has any arguments.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would feel intuitive for a sqlmesh user, but people used to running dbt run are used to specifying a boatload of options, right?

dbt run --help reveals that the original command has no arguments, so the established practice appears to be to keep adding options

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did add --env as an alias

help="Run against a specific Virtual Data Environment (VDE) instead of the main environment",
)
@click.option(
"--empty/--no-empty", default=False, help="If specified, limit input refs and sources"
)
@vars_option
@click.pass_context
def run(ctx: click.Context, vars: t.Optional[t.Dict[str, t.Any]], **kwargs: t.Any) -> None:
def run(
ctx: click.Context,
vars: t.Optional[t.Dict[str, t.Any]],
env: t.Optional[str] = None,
**kwargs: t.Any,
) -> None:
"""Compile SQL and execute against the current target database."""
_get_dbt_operations(ctx, vars).run(**kwargs)
_get_dbt_operations(ctx, vars).run(environment=env, **kwargs)


@dbt.command(name="list")
Expand Down
110 changes: 98 additions & 12 deletions sqlmesh_dbt/operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from sqlmesh.dbt.project import Project
from sqlmesh_dbt.console import DbtCliConsole
from sqlmesh.core.model import Model
from sqlmesh.core.plan import Plan

logger = logging.getLogger(__name__)

Expand All @@ -35,21 +36,20 @@ def list_(

def run(
self,
environment: t.Optional[str] = None,
select: t.Optional[t.List[str]] = None,
exclude: t.Optional[t.List[str]] = None,
full_refresh: bool = False,
) -> None:
select_models = None

if sqlmesh_selector := selectors.to_sqlmesh(select or [], exclude or []):
select_models = [sqlmesh_selector]

self.context.plan(
select_models=select_models,
run=True,
no_diff=True,
no_prompts=True,
auto_apply=True,
empty: bool = False,
) -> Plan:
return self.context.plan(
**self._plan_options(
environment=environment,
select=select,
exclude=exclude,
full_refresh=full_refresh,
empty=empty,
)
)

def _selected_models(
Expand All @@ -71,6 +71,86 @@ def _selected_models(

return selected_models

def _plan_options(
self,
environment: t.Optional[str] = None,
select: t.Optional[t.List[str]] = None,
exclude: t.Optional[t.List[str]] = None,
empty: bool = False,
full_refresh: bool = False,
) -> t.Dict[str, t.Any]:
import sqlmesh.core.constants as c

# convert --select and --exclude to a selector expression for the SQLMesh selector engine
select_models = None
if sqlmesh_selector := selectors.to_sqlmesh(select or [], exclude or []):
select_models = [sqlmesh_selector]

is_dev = environment and environment != c.PROD
is_prod = not is_dev

options: t.Dict[str, t.Any] = {}

if is_prod or (is_dev and select_models):
# prod plans should "catch up" before applying the changes so that after the command finishes prod is the latest it can be
# dev plans *with* selectors should do the same as the user is saying "specifically update these models to the latest"
# dev plans *without* selectors should just have the defaults of never exceeding prod as the user is saying "just create this env" without focusing on any specific models
options.update(
dict(
# always catch the data up to latest rather than only operating on what has been loaded before
run=True,
# don't taking cron schedules into account when deciding what models to run, do everything even if it just ran
ignore_cron=True,
)
)

if is_dev:
options.update(
dict(
# don't create views for all of prod in the dev environment
include_unmodified=False,
# always plan from scratch against prod. note that this is coupled with the `always_recreate_environment=True` setting in the default config file.
# the result is that rather than planning against the previous state of an existing dev environment, the full scope of changes vs prod are always shown
create_from=c.PROD,
# Always enable dev previews for incremental / forward-only models.
# Due to how DBT does incrementals (INCREMENTAL_UNMANAGED on the SQLMesh engine), this will result in the full model being refreshed
# with the entire dataset, which can potentially be very large. If this is undesirable, users have two options:
# - work around this using jinja to conditionally add extra filters to the WHERE clause or a LIMIT to the model query
# - upgrade to SQLMesh's incremental models, where we have variables for the start/end date and inject leak guards to
# limit the amount of data backfilled
#
# Note: enable_preview=True is *different* behaviour to the `sqlmesh` CLI, which uses enable_preview=None.
# This means the `sqlmesh` CLI will only enable dev previews for dbt projects if the target adapter supports cloning,
# whereas we enable it unconditionally here
enable_preview=True,
)
)

if empty:
# `dbt --empty` adds LIMIT 0 to the queries, resulting in empty tables. In addition, it happily clobbers existing tables regardless of if they are populated.
# This *partially* lines up with --skip-backfill in SQLMesh, which indicates to not populate tables if they happened to be created/updated as part of this plan.
# However, if a table already exists and has data in it, there is no change so SQLMesh will not recreate the table and thus it will not be cleared.
# So in order to fully replicate dbt's --empty, we also need --full-refresh semantics in order to replace existing tables
options["skip_backfill"] = True
full_refresh = True

if full_refresh:
# TODO: handling this requires some updates in the engine to enable restatements+changes in the same plan without affecting prod
# if the plan targets dev
pass

return dict(
environment=environment,
select_models=select_models,
# dont output a diff of model changes
no_diff=True,
# don't throw up any prompts like "set the effective date" - use defaults
no_prompts=True,
# start doing work immediately (since no_diff is set, there isnt really anything for the user to say yes/no to)
auto_apply=True,
**options,
)

@property
def console(self) -> DbtCliConsole:
console = self.context.console
Expand Down Expand Up @@ -103,6 +183,12 @@ def create(
from sqlmesh_dbt.console import DbtCliConsole
from sqlmesh.utils.errors import SQLMeshError

# clear any existing handlers set up by click/rich as defaults so that once SQLMesh logging config is applied,
# we dont get duplicate messages logged from things like console.log_warning()
root_logger = logging.getLogger()
while root_logger.hasHandlers():
root_logger.removeHandler(root_logger.handlers[0])

configure_logging(force_debug=debug)
set_console(DbtCliConsole())

Expand Down
5 changes: 1 addition & 4 deletions tests/cli/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,10 +260,7 @@ def test_plan_skip_backfill(runner, tmp_path, flag):
# plan for `prod` errors if `--skip-backfill` is passed without --no-gaps
result = runner.invoke(cli, ["--log-file-dir", tmp_path, "--paths", tmp_path, "plan", flag])
assert result.exit_code == 1
assert (
"Error: When targeting the production environment either the backfill should not be skipped or the lack of data gaps should be enforced (--no-gaps flag)."
in result.output
)
assert "Skipping the backfill stage for production can lead to unexpected" in result.output

# plan executes virtual update without executing model batches
# Input: `y` to perform virtual update
Expand Down
167 changes: 167 additions & 0 deletions tests/dbt/cli/test_operations.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,36 @@
import typing as t
from pathlib import Path
import pytest
from sqlmesh_dbt.operations import create
from sqlmesh.utils import yaml
from sqlmesh.utils.errors import SQLMeshError
import time_machine
from sqlmesh.core.console import NoopConsole
from sqlmesh.core.plan import PlanBuilder
from sqlmesh.core.config.common import VirtualEnvironmentMode

pytestmark = pytest.mark.slow


class PlanCapturingConsole(NoopConsole):
def plan(
self,
plan_builder: PlanBuilder,
auto_apply: bool,
default_catalog: t.Optional[str],
no_diff: bool = False,
no_prompts: bool = False,
) -> None:
self.plan_builder = plan_builder
self.auto_apply = auto_apply
self.default_catalog = default_catalog
self.no_diff = no_diff
self.no_prompts = no_prompts

# normal console starts applying the plan here; we dont because we just want to capture the parameters
# and check they were set correctly


def test_create_sets_and_persists_default_start_date(jaffle_shop_duckdb: Path):
with time_machine.travel("2020-01-02 00:00:00 UTC"):
from sqlmesh.utils.date import yesterday_ds, to_ds
Expand Down Expand Up @@ -71,6 +94,18 @@ def test_create_can_specify_profile_and_target(jaffle_shop_duckdb: Path):
assert dbt_project.context.target_name == "dev"


def test_default_options(jaffle_shop_duckdb: Path):
operations = create()

config = operations.context.config
dbt_project = operations.project

assert config.plan.always_recreate_environment is True
assert config.virtual_environment_mode == VirtualEnvironmentMode.DEV_ONLY
assert config.model_defaults.start is not None
assert config.model_defaults.dialect == dbt_project.context.target.dialect


def test_create_can_set_project_variables(jaffle_shop_duckdb: Path):
(jaffle_shop_duckdb / "models" / "test_model.sql").write_text("""
select '{{ var('foo') }}' as a
Expand All @@ -83,3 +118,135 @@ def test_create_can_set_project_variables(jaffle_shop_duckdb: Path):
query = test_model.render_query()
assert query is not None
assert query.sql() == "SELECT 'bar' AS \"a\""


def test_run_option_mapping(jaffle_shop_duckdb: Path):
operations = create(project_dir=jaffle_shop_duckdb)
console = PlanCapturingConsole()
operations.context.console = console

plan = operations.run()
assert plan.environment.name == "prod"
assert console.no_prompts is True
assert console.no_diff is True
assert console.auto_apply is True
assert plan.end_bounded is False
assert plan.ignore_cron is True
assert plan.skip_backfill is False
assert plan.selected_models_to_backfill is None
assert {s.name for s in plan.snapshots} == {k for k in operations.context.snapshots}

plan = operations.run(select=["main.stg_orders+"])
assert plan.environment.name == "prod"
assert console.no_prompts is True
assert console.no_diff is True
assert console.auto_apply is True
assert plan.end_bounded is False
assert plan.ignore_cron is True
assert plan.skip_backfill is False
assert plan.selected_models_to_backfill == {
'"jaffle_shop"."main"."customers"',
'"jaffle_shop"."main"."orders"',
'"jaffle_shop"."main"."stg_orders"',
}
assert {s.name for s in plan.snapshots} == plan.selected_models_to_backfill

plan = operations.run(select=["main.stg_orders+"], exclude=["main.customers"])
assert plan.environment.name == "prod"
assert console.no_prompts is True
assert console.no_diff is True
assert console.auto_apply is True
assert plan.end_bounded is False
assert plan.ignore_cron is True
assert plan.skip_backfill is False
assert plan.selected_models_to_backfill == {
'"jaffle_shop"."main"."orders"',
'"jaffle_shop"."main"."stg_orders"',
}
assert {s.name for s in plan.snapshots} == plan.selected_models_to_backfill

plan = operations.run(exclude=["main.customers"])
assert plan.environment.name == "prod"
assert console.no_prompts is True
assert console.no_diff is True
assert console.auto_apply is True
assert plan.end_bounded is False
assert plan.ignore_cron is True
assert plan.skip_backfill is False
assert plan.selected_models_to_backfill == {k for k in operations.context.snapshots} - {
'"jaffle_shop"."main"."customers"'
}
assert {s.name for s in plan.snapshots} == plan.selected_models_to_backfill

plan = operations.run(empty=True)
assert plan.environment.name == "prod"
assert console.no_prompts is True
assert console.no_diff is True
assert console.auto_apply is True
assert plan.end_bounded is False
assert plan.ignore_cron is True
assert plan.skip_backfill is True
assert plan.selected_models_to_backfill is None
assert {s.name for s in plan.snapshots} == {k for k in operations.context.snapshots}


def test_run_option_mapping_dev(jaffle_shop_duckdb: Path):
# create prod so that dev has something to compare against
operations = create(project_dir=jaffle_shop_duckdb)
operations.run()

(jaffle_shop_duckdb / "models" / "new_model.sql").write_text("select 1")

operations = create(project_dir=jaffle_shop_duckdb)

console = PlanCapturingConsole()
operations.context.console = console

plan = operations.run(environment="dev")
assert plan.environment.name == "dev"
assert console.no_prompts is True
assert console.no_diff is True
assert console.auto_apply is True
assert plan.include_unmodified is False
assert plan.context_diff.create_from == "prod"
assert plan.context_diff.is_new_environment is True
assert console.plan_builder._enable_preview is True
assert plan.end_bounded is True
assert plan.ignore_cron is False
assert plan.skip_backfill is False
assert plan.selected_models_to_backfill == {'"jaffle_shop"."main"."new_model"'}

plan = operations.run(environment="dev", empty=True)
assert plan.environment.name == "dev"
assert console.no_prompts is True
assert console.no_diff is True
assert console.auto_apply is True
assert plan.include_unmodified is False
assert plan.context_diff.create_from == "prod"
assert plan.context_diff.is_new_environment is True
assert console.plan_builder._enable_preview is True
assert plan.end_bounded is True
assert plan.ignore_cron is False
assert plan.skip_backfill is True
assert plan.selected_models_to_backfill == {'"jaffle_shop"."main"."new_model"'}

plan = operations.run(environment="dev", select=["main.stg_orders+"])
assert plan.environment.name == "dev"
assert console.no_prompts is True
assert console.no_diff is True
assert console.auto_apply is True
assert plan.include_unmodified is False
assert plan.context_diff.create_from == "prod"
assert plan.context_diff.is_new_environment is True
assert console.plan_builder._enable_preview is True
# dev plans with --select have run=True, ignore_cron=True set
# as opposed to dev plans that dont have a specific selector
assert plan.end_bounded is False
assert plan.ignore_cron is True
assert plan.skip_backfill is False
# note: the new model in the dev environment is ignored in favour of the explicitly selected ones
assert plan.selected_models_to_backfill == {
'"jaffle_shop"."main"."customers"',
'"jaffle_shop"."main"."orders"',
'"jaffle_shop"."main"."stg_orders"',
}