Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sqlmesh/core/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -587,6 +587,7 @@ def state_sync(self) -> StateSync:
self._state_sync = self._new_state_sync()

if self._state_sync.get_versions(validate=False).schema_version == 0:
self.console.log_status_update("Initializing new project state...")
self._state_sync.migrate(default_catalog=self.default_catalog)
self._state_sync.get_versions()
self._state_sync = CachingStateSync(self._state_sync) # type: ignore
Expand Down
10 changes: 6 additions & 4 deletions sqlmesh/core/state_sync/db/environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,11 +285,13 @@ def get_environment_statements(self, environment: str) -> t.List[EnvironmentStat
return []

def _environment_from_row(self, row: t.Tuple[str, ...]) -> Environment:
return Environment(**{field: row[i] for i, field in enumerate(Environment.all_fields())})
return Environment(
**{field: row[i] for i, field in enumerate(sorted(Environment.all_fields()))}
)

def _environment_summmary_from_row(self, row: t.Tuple[str, ...]) -> EnvironmentSummary:
return EnvironmentSummary(
**{field: row[i] for i, field in enumerate(EnvironmentSummary.all_fields())}
**{field: row[i] for i, field in enumerate(sorted(EnvironmentSummary.all_fields()))}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is sorting needed?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because all_fields() returns a set

)

def _environments_query(
Expand All @@ -298,7 +300,7 @@ def _environments_query(
lock_for_update: bool = False,
required_fields: t.Optional[t.List[str]] = None,
) -> exp.Select:
query_fields = required_fields if required_fields else Environment.all_fields()
query_fields = required_fields if required_fields else sorted(Environment.all_fields())
query = (
exp.select(*(exp.to_identifier(field) for field in query_fields))
.from_(self.environments_table)
Expand Down Expand Up @@ -328,7 +330,7 @@ def _fetch_environment_summaries(
self.engine_adapter,
self._environments_query(
where=where,
required_fields=list(EnvironmentSummary.all_fields()),
required_fields=sorted(EnvironmentSummary.all_fields()),
),
)
]
Expand Down
7 changes: 6 additions & 1 deletion sqlmesh/core/state_sync/db/migrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,9 +173,14 @@ def _apply_migrations(

snapshot_count_before = self.snapshot_state.count() if versions.schema_version else None

state_table_exist = any(self.engine_adapter.table_exists(t) for t in self._state_tables)

for migration in migrations:
logger.info(f"Applying migration {migration}")
migration.migrate(state_sync, default_catalog=default_catalog)
migration.migrate_schemas(state_sync, default_catalog=default_catalog)
if state_table_exist:
# No need to run DML for the initial migration since all tables are empty
migration.migrate_rows(state_sync, default_catalog=default_catalog)

snapshot_count_after = self.snapshot_state.count()

Expand Down
6 changes: 5 additions & 1 deletion sqlmesh/migrations/v0001_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from sqlmesh.utils.migration import index_text_type


def migrate(state_sync, **kwargs): # type: ignore
def migrate_schemas(state_sync, **kwargs): # type: ignore
engine_adapter = state_sync.engine_adapter
schema = state_sync.schema
snapshots_table = "_snapshots"
Expand Down Expand Up @@ -58,3 +58,7 @@ def migrate(state_sync, **kwargs): # type: ignore
"sqlglot_version": exp.DataType.build("text"),
},
)


def migrate_rows(state_sync, **kwargs): # type: ignore
pass
6 changes: 5 additions & 1 deletion sqlmesh/migrations/v0002_remove_identify.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
"""Remove identify=True kwarg for rendering sql"""


def migrate(state_sync, **kwargs): # type: ignore
def migrate_schemas(state_sync, **kwargs): # type: ignore
pass


def migrate_rows(state_sync, **kwargs): # type: ignore
pass
6 changes: 5 additions & 1 deletion sqlmesh/migrations/v0003_move_batch_size.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@
from sqlglot import exp


def migrate(state_sync, **kwargs): # type: ignore
def migrate_schemas(state_sync, **kwargs): # type: ignore
pass


def migrate_rows(state_sync, **kwargs): # type: ignore
snapshots_table = "_snapshots"
if state_sync.schema:
snapshots_table = f"{state_sync.schema}.{snapshots_table}"
Expand Down
6 changes: 5 additions & 1 deletion sqlmesh/migrations/v0004_environmnent_add_finalized_at.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from sqlglot import exp


def migrate(state_sync, **kwargs): # type: ignore
def migrate_schemas(state_sync, **kwargs): # type: ignore
engine_adapter = state_sync.engine_adapter
environments_table = "_environments"
if state_sync.schema:
Expand All @@ -21,3 +21,7 @@ def migrate(state_sync, **kwargs): # type: ignore
)

engine_adapter.execute(alter_table_exp)


def migrate_rows(state_sync, **kwargs): # type: ignore
pass
6 changes: 5 additions & 1 deletion sqlmesh/migrations/v0005_create_seed_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from sqlmesh.utils.migration import index_text_type


def migrate(state_sync, **kwargs): # type: ignore
def migrate_schemas(state_sync, **kwargs): # type: ignore
engine_adapter = state_sync.engine_adapter
seeds_table = "_seeds"
if state_sync.schema:
Expand All @@ -22,3 +22,7 @@ def migrate(state_sync, **kwargs): # type: ignore
},
primary_key=("name", "identifier"),
)


def migrate_rows(state_sync, **kwargs): # type: ignore
pass
6 changes: 5 additions & 1 deletion sqlmesh/migrations/v0006_change_seed_hash.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
"""Seed hashes moved from to_string to to_json for performance."""


def migrate(state_sync, **kwargs): # type: ignore
def migrate_schemas(state_sync, **kwargs): # type: ignore
pass


def migrate_rows(state_sync, **kwargs): # type: ignore
pass
6 changes: 5 additions & 1 deletion sqlmesh/migrations/v0007_env_table_info_to_kind.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,11 @@ def _hash(data): # type: ignore
return str(zlib.crc32(";".join("" if d is None else d for d in data).encode("utf-8")))


def migrate(state_sync, **kwargs): # type: ignore
def migrate_schemas(state_sync, **kwargs): # type: ignore
pass


def migrate_rows(state_sync, **kwargs): # type: ignore
import pandas as pd

engine_adapter = state_sync.engine_adapter
Expand Down
6 changes: 5 additions & 1 deletion sqlmesh/migrations/v0008_create_intervals_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from sqlmesh.utils.migration import index_text_type


def migrate(state_sync, **kwargs): # type: ignore
def migrate_schemas(state_sync, **kwargs): # type: ignore
engine_adapter = state_sync.engine_adapter
intervals_table = "_intervals"
if state_sync.schema:
Expand Down Expand Up @@ -36,3 +36,7 @@ def migrate(state_sync, **kwargs): # type: ignore
engine_adapter.create_index(
intervals_table, "name_identifier_idx", ("name", "identifier", "created_ts")
)


def migrate_rows(state_sync, **kwargs): # type: ignore
pass
6 changes: 5 additions & 1 deletion sqlmesh/migrations/v0009_remove_pre_post_hooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@
from sqlmesh.utils.migration import index_text_type


def migrate(state_sync, **kwargs): # type: ignore
def migrate_schemas(state_sync, **kwargs): # type: ignore
pass


def migrate_rows(state_sync, **kwargs): # type: ignore
import pandas as pd

engine_adapter = state_sync.engine_adapter
Expand Down
6 changes: 5 additions & 1 deletion sqlmesh/migrations/v0010_seed_hash_batch_size.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
"""Seed metadata hashes now correctly include the batch_size."""


def migrate(state_sync, **kwargs): # type: ignore
def migrate_schemas(state_sync, **kwargs): # type: ignore
pass


def migrate_rows(state_sync, **kwargs): # type: ignore
pass
16 changes: 13 additions & 3 deletions sqlmesh/migrations/v0011_add_model_kind_name.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@
from sqlmesh.utils.migration import index_text_type


def migrate(state_sync, **kwargs): # type: ignore
import pandas as pd

def migrate_schemas(state_sync, **kwargs): # type: ignore
engine_adapter = state_sync.engine_adapter
schema = state_sync.schema
snapshots_table = "_snapshots"
Expand All @@ -30,6 +28,18 @@ def migrate(state_sync, **kwargs): # type: ignore
)
engine_adapter.execute(alter_table_exp)


def migrate_rows(state_sync, **kwargs): # type: ignore
import pandas as pd

engine_adapter = state_sync.engine_adapter
schema = state_sync.schema
snapshots_table = "_snapshots"
if schema:
snapshots_table = f"{schema}.{snapshots_table}"

index_type = index_text_type(engine_adapter.dialect)

new_snapshots = []

for name, identifier, version, snapshot in engine_adapter.fetchall(
Expand Down
6 changes: 5 additions & 1 deletion sqlmesh/migrations/v0012_update_jinja_expressions.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,11 @@
from sqlmesh.utils.migration import index_text_type


def migrate(state_sync, **kwargs): # type: ignore
def migrate_schemas(state_sync, **kwargs): # type: ignore
pass


def migrate_rows(state_sync, **kwargs): # type: ignore
import pandas as pd

engine_adapter = state_sync.engine_adapter
Expand Down
6 changes: 5 additions & 1 deletion sqlmesh/migrations/v0013_serde_using_model_dialects.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,11 @@
from sqlmesh.utils.migration import index_text_type


def migrate(state_sync, **kwargs): # type: ignore
def migrate_schemas(state_sync, **kwargs): # type: ignore
pass


def migrate_rows(state_sync, **kwargs): # type: ignore
import pandas as pd

engine_adapter = state_sync.engine_adapter
Expand Down
6 changes: 5 additions & 1 deletion sqlmesh/migrations/v0014_fix_dev_intervals.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
"""Fix snapshot intervals that have been erroneously marked as dev."""


def migrate(state_sync, **kwargs): # type: ignore
def migrate_schemas(state_sync, **kwargs): # type: ignore
pass


def migrate_rows(state_sync, **kwargs): # type: ignore
schema = state_sync.schema
intervals_table = "_intervals"
if schema:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from sqlmesh.utils.migration import blob_text_type


def migrate(state_sync, **kwargs): # type: ignore
def migrate_schemas(state_sync, **kwargs): # type: ignore
engine_adapter = state_sync.engine_adapter
environments_table = "_environments"
if state_sync.schema:
Expand All @@ -24,3 +24,7 @@ def migrate(state_sync, **kwargs): # type: ignore
)

engine_adapter.execute(alter_table_exp)


def migrate_rows(state_sync, **kwargs): # type: ignore
pass
6 changes: 5 additions & 1 deletion sqlmesh/migrations/v0016_fix_windows_path.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@
from sqlmesh.utils.migration import index_text_type


def migrate(state_sync, **kwargs): # type: ignore
def migrate_schemas(state_sync, **kwargs): # type: ignore
pass


def migrate_rows(state_sync, **kwargs): # type: ignore
import pandas as pd

engine_adapter = state_sync.engine_adapter
Expand Down
6 changes: 5 additions & 1 deletion sqlmesh/migrations/v0017_fix_windows_seed_path.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@
from sqlmesh.utils.migration import index_text_type


def migrate(state_sync, **kwargs): # type: ignore
def migrate_schemas(state_sync, **kwargs): # type: ignore
pass


def migrate_rows(state_sync, **kwargs): # type: ignore
import pandas as pd

engine_adapter = state_sync.engine_adapter
Expand Down
6 changes: 5 additions & 1 deletion sqlmesh/migrations/v0018_rename_snapshot_model_to_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@
from sqlmesh.utils.migration import index_text_type


def migrate(state_sync, **kwargs): # type: ignore
def migrate_schemas(state_sync, **kwargs): # type: ignore
pass


def migrate_rows(state_sync, **kwargs): # type: ignore
import pandas as pd

engine_adapter = state_sync.engine_adapter
Expand Down
9 changes: 8 additions & 1 deletion sqlmesh/migrations/v0019_add_env_suffix_target.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from sqlglot import exp


def migrate(state_sync, **kwargs): # type: ignore
def migrate_schemas(state_sync, **kwargs): # type: ignore
engine_adapter = state_sync.engine_adapter
environments_table = "_environments"
if state_sync.schema:
Expand All @@ -21,6 +21,13 @@ def migrate(state_sync, **kwargs): # type: ignore
)
engine_adapter.execute(alter_table_exp)


def migrate_rows(state_sync, **kwargs): # type: ignore
engine_adapter = state_sync.engine_adapter
environments_table = "_environments"
if state_sync.schema:
environments_table = f"{state_sync.schema}.{environments_table}"

state_sync.engine_adapter.update_table(
environments_table,
{"suffix_target": "schema"},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@
from sqlmesh.utils.migration import index_text_type


def migrate(state_sync, **kwargs): # type: ignore
def migrate_schemas(state_sync, **kwargs): # type: ignore
pass


def migrate_rows(state_sync, **kwargs): # type: ignore
import pandas as pd

engine_adapter = state_sync.engine_adapter
Expand Down
6 changes: 5 additions & 1 deletion sqlmesh/migrations/v0021_fix_table_properties.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@
from sqlmesh.utils.migration import index_text_type


def migrate(state_sync, **kwargs): # type: ignore
def migrate_schemas(state_sync, **kwargs): # type: ignore
pass


def migrate_rows(state_sync, **kwargs): # type: ignore
import pandas as pd

engine_adapter = state_sync.engine_adapter
Expand Down
6 changes: 5 additions & 1 deletion sqlmesh/migrations/v0022_move_project_to_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@
from sqlmesh.utils.migration import index_text_type


def migrate(state_sync, **kwargs): # type: ignore
def migrate_schemas(state_sync, **kwargs): # type: ignore
pass


def migrate_rows(state_sync, **kwargs): # type: ignore
import pandas as pd

engine_adapter = state_sync.engine_adapter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@
from sqlmesh.utils.dag import DAG


def migrate(state_sync: t.Any, **kwargs) -> None: # type: ignore
def migrate_schemas(state_sync: t.Any, **kwargs) -> None: # type: ignore
pass


def migrate_rows(state_sync: t.Any, **kwargs) -> None: # type: ignore
engine_adapter = state_sync.engine_adapter
schema = state_sync.schema
snapshots_table = "_snapshots"
Expand Down
Loading