Skip to content

Commit 72bfc06

Browse files
lafirmlafirmthemisvaltinos
authored
Fix: Handle duplicate key error when we add a project var or change the project name (#4569)
Co-authored-by: lafirm <lafir.malim@emishealth.com> Co-authored-by: Themis Valtinos <73662635+themisvaltinos@users.noreply.github.com>
1 parent a775184 commit 72bfc06

File tree

2 files changed

+110
-2
lines changed

2 files changed

+110
-2
lines changed

sqlmesh/core/context.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -692,8 +692,11 @@ def load(self, update_schemas: bool = True) -> GenericContext[C]:
692692
if snapshot.node.project in self._projects:
693693
uncached.add(snapshot.name)
694694
else:
695-
store = self._standalone_audits if snapshot.is_audit else self._models
696-
store[snapshot.name] = snapshot.node # type: ignore
695+
local_store = self._standalone_audits if snapshot.is_audit else self._models
696+
if snapshot.name in local_store:
697+
uncached.add(snapshot.name)
698+
else:
699+
local_store[snapshot.name] = snapshot.node # type: ignore
697700

698701
for model in self._models.values():
699702
self.dag.add(model.fqn, model.depends_on)

tests/core/integration/test_multi_repo.py

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -421,6 +421,111 @@ def test_multi_hybrid(mocker):
421421
validate_apply_basics(context, c.PROD, plan.snapshots.values())
422422

423423

424+
def test_multi_repo_no_project_to_project(copy_to_temp_path):
425+
paths = copy_to_temp_path("examples/multi")
426+
repo_1_path = f"{paths[0]}/repo_1"
427+
repo_1_config_path = f"{repo_1_path}/config.yaml"
428+
with open(repo_1_config_path, "r") as f:
429+
config_content = f.read()
430+
with open(repo_1_config_path, "w") as f:
431+
f.write(config_content.replace("project: repo_1\n", ""))
432+
433+
context = Context(paths=[repo_1_path], gateway="memory")
434+
context._new_state_sync().reset(default_catalog=context.default_catalog)
435+
plan = context.plan_builder().build()
436+
context.apply(plan)
437+
438+
# initially models in prod have no project
439+
prod_snapshots = context.state_reader.get_snapshots(
440+
context.state_reader.get_environment(c.PROD).snapshots
441+
)
442+
for snapshot in prod_snapshots.values():
443+
assert snapshot.node.project == ""
444+
445+
# we now adopt multi project by adding a project name
446+
with open(repo_1_config_path, "r") as f:
447+
config_content = f.read()
448+
with open(repo_1_config_path, "w") as f:
449+
f.write("project: repo_1\n" + config_content)
450+
451+
context_with_project = Context(
452+
paths=[repo_1_path],
453+
state_sync=context.state_sync,
454+
gateway="memory",
455+
)
456+
context_with_project._engine_adapter = context.engine_adapter
457+
del context_with_project.engine_adapters
458+
459+
# local models should take precedence to pick up the new project name
460+
local_model_a = context_with_project.get_model("bronze.a")
461+
assert local_model_a.project == "repo_1"
462+
local_model_b = context_with_project.get_model("bronze.b")
463+
assert local_model_b.project == "repo_1"
464+
465+
# also verify the plan works
466+
plan = context_with_project.plan_builder().build()
467+
context_with_project.apply(plan)
468+
validate_apply_basics(context_with_project, c.PROD, plan.snapshots.values())
469+
470+
471+
def test_multi_repo_local_model_overrides_prod_from_other_project(copy_to_temp_path):
472+
paths = copy_to_temp_path("examples/multi")
473+
repo_1_path = f"{paths[0]}/repo_1"
474+
repo_2_path = f"{paths[0]}/repo_2"
475+
476+
context = Context(paths=[repo_1_path, repo_2_path], gateway="memory")
477+
context._new_state_sync().reset(default_catalog=context.default_catalog)
478+
plan = context.plan_builder().build()
479+
assert len(plan.new_snapshots) == 5
480+
context.apply(plan)
481+
482+
prod_model_c = context.get_model("silver.c")
483+
assert prod_model_c.project == "repo_2"
484+
485+
with open(f"{repo_1_path}/models/c.sql", "w") as f:
486+
f.write(
487+
dedent("""\
488+
MODEL (
489+
name silver.c,
490+
kind FULL
491+
);
492+
493+
SELECT DISTINCT col_a, col_b
494+
FROM bronze.a
495+
""")
496+
)
497+
498+
# silver.c exists locally in repo 1 now AND in prod under repo_2
499+
context_repo1 = Context(
500+
paths=[repo_1_path],
501+
state_sync=context.state_sync,
502+
gateway="memory",
503+
)
504+
context_repo1._engine_adapter = context.engine_adapter
505+
del context_repo1.engine_adapters
506+
507+
# local model should take precedence and its project should reflect the new project name
508+
local_model_c = context_repo1.get_model("silver.c")
509+
assert local_model_c.project == "repo_1"
510+
511+
rendered = context_repo1.render("silver.c").sql()
512+
assert "col_b" in rendered
513+
514+
# its downstream dependencies though should still be picked up
515+
plan = context_repo1.plan_builder().build()
516+
directly_modified_names = {snapshot.name for snapshot in plan.directly_modified}
517+
assert '"memory"."silver"."c"' in directly_modified_names
518+
assert '"memory"."silver"."d"' in directly_modified_names
519+
missing_interval_names = {s.snapshot_id.name for s in plan.missing_intervals}
520+
assert '"memory"."silver"."c"' in missing_interval_names
521+
assert '"memory"."silver"."d"' in missing_interval_names
522+
523+
context_repo1.apply(plan)
524+
validate_apply_basics(context_repo1, c.PROD, plan.snapshots.values())
525+
result = context_repo1.fetchdf("SELECT * FROM memory.silver.c")
526+
assert "col_b" in result.columns
527+
528+
424529
def test_engine_adapters_multi_repo_all_gateways_gathered(copy_to_temp_path):
425530
paths = copy_to_temp_path("examples/multi")
426531
repo_1_path = paths[0] / "repo_1"

0 commit comments

Comments
 (0)