diff --git a/.circleci/manage-test-db.sh b/.circleci/manage-test-db.sh index f44bd54845..ba1d1070fb 100755 --- a/.circleci/manage-test-db.sh +++ b/.circleci/manage-test-db.sh @@ -51,7 +51,7 @@ databricks_init() { # Note: the cluster doesnt need to be running to create / drop catalogs, but it does need to be running to run the integration tests echo "Ensuring cluster is running" - databricks clusters start $CLUSTER_ID || true + databricks clusters start $CLUSTER_ID } databricks_up() { diff --git a/Makefile b/Makefile index 3fea757169..bad2cf2907 100644 --- a/Makefile +++ b/Makefile @@ -174,7 +174,7 @@ athena-test: guard-AWS_ACCESS_KEY_ID guard-AWS_SECRET_ACCESS_KEY guard-ATHENA_S3 pytest -n auto -m "athena" --retries 3 --junitxml=test-results/junit-athena.xml fabric-test: guard-FABRIC_HOST guard-FABRIC_CLIENT_ID guard-FABRIC_CLIENT_SECRET guard-FABRIC_DATABASE engine-fabric-install - pytest -n auto -m "fabric" --retries 3 --junitxml=test-results/junit-fabric.xml + pytest -n auto -m "fabric" --retries 3 --junitxml=test-results/junit-fabric.xml gcp-postgres-test: guard-GCP_POSTGRES_INSTANCE_CONNECTION_STRING guard-GCP_POSTGRES_USER guard-GCP_POSTGRES_PASSWORD guard-GCP_POSTGRES_KEYFILE_JSON engine-gcppostgres-install pytest -n auto -m "gcp_postgres" --retries 3 --junitxml=test-results/junit-gcp-postgres.xml diff --git a/pyproject.toml b/pyproject.toml index 4e201a734d..e125bfb281 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -259,6 +259,7 @@ markers = [ "mssql: test for MSSQL", "mysql: test for MySQL", "postgres: test for Postgres", + "gcp_postgres: test for Postgres on GCP", "redshift: test for Redshift", "snowflake: test for Snowflake", "spark: test for Spark", @@ -267,7 +268,7 @@ markers = [ ] addopts = "-n 0 --dist=loadgroup" asyncio_default_fixture_loop_scope = "session" -log_cli = false # Set this to true to enable logging during tests +log_cli = true # Set this to true to enable logging during tests log_cli_format = "%(asctime)s.%(msecs)03d %(filename)s:%(lineno)d %(levelname)s %(message)s" log_cli_level = "INFO" filterwarnings = [ diff --git a/sqlmesh/core/config/loader.py b/sqlmesh/core/config/loader.py index a7b997e303..fe9deed0c2 100644 --- a/sqlmesh/core/config/loader.py +++ b/sqlmesh/core/config/loader.py @@ -83,6 +83,7 @@ def load_config_from_paths( personal_paths: t.Optional[t.List[Path]] = None, config_name: str = "config", load_from_env: bool = True, + variables: t.Optional[t.Dict[str, t.Any]] = None, **kwargs: t.Any, ) -> C: project_paths = project_paths or [] @@ -116,7 +117,7 @@ def load_config_from_paths( "YAML configs do not support multiple configs. Use Python instead.", ) yaml_config_path = path.resolve() - non_python_configs.append(load_config_from_yaml(path)) + non_python_configs.append(load_config_from_yaml(path, variables)) elif extension == "py": try: python_config = load_config_from_python_module( @@ -194,8 +195,10 @@ def load_config_from_paths( return non_python_config -def load_config_from_yaml(path: Path) -> t.Dict[str, t.Any]: - content = yaml_load(path) +def load_config_from_yaml( + path: Path, variables: t.Optional[t.Dict[str, t.Any]] = None +) -> t.Dict[str, t.Any]: + content = yaml_load(path, variables=variables) if not isinstance(content, dict): raise ConfigError( f"Invalid YAML configuration: expected a dictionary but got {type(content).__name__}. " diff --git a/sqlmesh/core/engine_adapter/fabric.py b/sqlmesh/core/engine_adapter/fabric.py index 6f0123d022..6d7d40c3bb 100644 --- a/sqlmesh/core/engine_adapter/fabric.py +++ b/sqlmesh/core/engine_adapter/fabric.py @@ -3,6 +3,7 @@ import typing as t import logging import requests +import time from functools import cached_property from sqlglot import exp from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_result @@ -15,6 +16,7 @@ from sqlmesh.utils.errors import SQLMeshError from sqlmesh.utils.connection_pool import ConnectionPool + if t.TYPE_CHECKING: from sqlmesh.core._typing import TableName @@ -172,8 +174,17 @@ def __init__(self, tenant_id: str, workspace_id: str, client_id: str, client_sec self.client_secret = client_secret self.workspace_id = workspace_id - def create_warehouse(self, warehouse_name: str) -> None: + def create_warehouse( + self, warehouse_name: str, if_not_exists: bool = True, attempt: int = 0 + ) -> None: """Create a catalog (warehouse) in Microsoft Fabric via REST API.""" + + # attempt count is arbitrary, it essentially equates to 5 minutes of 30 second waits + if attempt > 10: + raise SQLMeshError( + f"Gave up waiting for Fabric warehouse {warehouse_name} to become available" + ) + logger.info(f"Creating Fabric warehouse: {warehouse_name}") request_data = { @@ -182,7 +193,34 @@ def create_warehouse(self, warehouse_name: str) -> None: } response = self.session.post(self._endpoint_url("warehouses"), json=request_data) - response.raise_for_status() + + if ( + if_not_exists + and response.status_code == 400 + and (errorCode := response.json().get("errorCode", None)) + ): + if errorCode == "ItemDisplayNameAlreadyInUse": + logger.warning(f"Fabric warehouse {warehouse_name} already exists") + return + if errorCode == "ItemDisplayNameNotAvailableYet": + logger.warning(f"Fabric warehouse {warehouse_name} is still spinning up; waiting") + # Fabric error message is something like: + # - "Requested 'circleci_51d7087e__dev' is not available yet and is expected to become available in the upcoming minutes." + # This seems to happen if a catalog is dropped and then a new one with the same name is immediately created. + # There appears to be some delayed async process on the Fabric side that actually drops the warehouses and frees up the names to be used again + time.sleep(30) + return self.create_warehouse( + warehouse_name=warehouse_name, if_not_exists=if_not_exists, attempt=attempt + 1 + ) + + try: + response.raise_for_status() + except: + # the important information to actually debug anything is in the response body which Requests never prints + logger.exception( + f"Failed to create warehouse {warehouse_name}. status: {response.status_code}, body: {response.text}" + ) + raise # Handle direct success (201) or async creation (202) if response.status_code == 201: @@ -197,11 +235,12 @@ def create_warehouse(self, warehouse_name: str) -> None: logger.error(f"Unexpected response from Fabric API: {response}\n{response.text}") raise SQLMeshError(f"Unable to create warehouse: {response}") - def delete_warehouse(self, warehouse_name: str) -> None: + def delete_warehouse(self, warehouse_name: str, if_exists: bool = True) -> None: """Drop a catalog (warehouse) in Microsoft Fabric via REST API.""" logger.info(f"Deleting Fabric warehouse: {warehouse_name}") # Get the warehouse ID by listing warehouses + # TODO: handle continuationUri for pagination, ref: https://learn.microsoft.com/en-us/rest/api/fabric/warehouse/items/list-warehouses?tabs=HTTP#warehouses response = self.session.get(self._endpoint_url("warehouses")) response.raise_for_status() @@ -213,9 +252,12 @@ def delete_warehouse(self, warehouse_name: str) -> None: warehouse_id = warehouse_name_to_id.get(warehouse_name, None) if not warehouse_id: - logger.error( + logger.warning( f"Fabric warehouse does not exist: {warehouse_name}\n(available warehouses: {', '.join(warehouse_name_to_id)})" ) + if if_exists: + return + raise SQLMeshError( f"Unable to delete Fabric warehouse {warehouse_name} as it doesnt exist" ) diff --git a/tests/conftest.py b/tests/conftest.py index e5bbc4f425..e4911de80c 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -212,6 +212,39 @@ def pytest_collection_modifyitems(items, *args, **kwargs): item.add_marker("fast") +@pytest.hookimpl(hookwrapper=True, tryfirst=True) +def pytest_runtest_makereport(item: pytest.Item, call: pytest.CallInfo): + # The tmp_path fixture frequently throws errors like: + # - KeyError: <_pytest.stash.StashKey object at 0x79ba385fe1a0> + # in its teardown. This causes pytest to mark the test as failed even though we have zero control over this behaviour. + # So we log/swallow that particular error here rather than raising it + + # note: the hook always has to yield + outcome = yield + + # we only care about tests that used the tmp_path fixture + if "tmp_path" not in getattr(item, "fixturenames", []): + return + + result: pytest.TestReport = outcome.get_result() + + if result.when != "teardown": + return + + # If we specifically failed with a StashKey error in teardown, mark the test as passed + if result.failed: + exception = call.excinfo + if ( + exception + and isinstance(exception.value, KeyError) + and "_pytest.stash.StashKey" in repr(exception) + ): + result.outcome = "passed" + item.add_report_section( + "teardown", "stderr", f"Ignored tmp_path teardown error: {exception}" + ) + + # Ignore all local config files @pytest.fixture(scope="session", autouse=True) def ignore_local_config_files(): diff --git a/tests/core/engine_adapter/integration/__init__.py b/tests/core/engine_adapter/integration/__init__.py index baf45efa9c..c5377e309a 100644 --- a/tests/core/engine_adapter/integration/__init__.py +++ b/tests/core/engine_adapter/integration/__init__.py @@ -193,6 +193,7 @@ def __init__( engine_adapter: EngineAdapter, mark: str, gateway: str, + tmp_path: pathlib.Path, is_remote: bool = False, columns_to_types: t.Optional[t.Dict[str, t.Union[str, exp.DataType]]] = None, ): @@ -210,6 +211,7 @@ def __init__( self._catalogs: t.List[ str ] = [] # keep track of any catalogs created via self.create_catalog() so we can drop them at the end + self.tmp_path = tmp_path @property def test_type(self) -> str: @@ -655,6 +657,7 @@ def create_context( private_sqlmesh_dir / "config.yml", private_sqlmesh_dir / "config.yaml", ], + variables={"tmp_path": str(path or self.tmp_path)}, ) if config_mutator: config_mutator(self.gateway, config) diff --git a/tests/core/engine_adapter/integration/config.yaml b/tests/core/engine_adapter/integration/config.yaml index b75efc762b..8e87b2c3c8 100644 --- a/tests/core/engine_adapter/integration/config.yaml +++ b/tests/core/engine_adapter/integration/config.yaml @@ -5,7 +5,7 @@ gateways: type: duckdb catalogs: memory: ':memory:' - testing: 'testing.duckdb' + testing: "{{ var('tmp_path') }}/testing.duckdb" # Databases with docker images available inttest_trino_hive: diff --git a/tests/core/engine_adapter/integration/conftest.py b/tests/core/engine_adapter/integration/conftest.py index 4d374cfdbc..eafdf2fe1d 100644 --- a/tests/core/engine_adapter/integration/conftest.py +++ b/tests/core/engine_adapter/integration/conftest.py @@ -27,15 +27,15 @@ logger = logging.getLogger(__name__) -@pytest.fixture(scope="session") -def config() -> Config: +@pytest.fixture +def config(tmp_path: pathlib.Path) -> Config: return load_config_from_paths( Config, project_paths=[ - pathlib.Path("examples/wursthall/config.yaml"), pathlib.Path(os.path.join(os.path.dirname(__file__), "config.yaml")), ], personal_paths=[pathlib.Path("~/.sqlmesh/config.yaml").expanduser()], + variables={"tmp_path": str(tmp_path)}, ) @@ -89,7 +89,9 @@ def _create(engine_name: str, gateway: str) -> EngineAdapter: @pytest.fixture def create_test_context( - request: FixtureRequest, create_engine_adapter: t.Callable[[str, str], EngineAdapter] + request: FixtureRequest, + create_engine_adapter: t.Callable[[str, str], EngineAdapter], + tmp_path: pathlib.Path, ) -> t.Callable[[IntegrationTestEngine, str, str, str], t.Iterable[TestContext]]: def _create( engine: IntegrationTestEngine, gateway: str, test_type: str, table_format: str @@ -103,6 +105,7 @@ def _create( engine_adapter, f"{engine.engine}_{table_format}", gateway, + tmp_path=tmp_path, is_remote=is_remote, ) diff --git a/tests/core/engine_adapter/integration/test_integration.py b/tests/core/engine_adapter/integration/test_integration.py index ec5c6b4208..1960848e24 100644 --- a/tests/core/engine_adapter/integration/test_integration.py +++ b/tests/core/engine_adapter/integration/test_integration.py @@ -1,7 +1,6 @@ # type: ignore from __future__ import annotations -import os import pathlib import re import sys @@ -19,7 +18,6 @@ from sqlmesh import Config, Context from sqlmesh.cli.project_init import init_example_project -from sqlmesh.core.config import load_config_from_paths from sqlmesh.core.config.connection import ConnectionConfig import sqlmesh.core.dialect as d from sqlmesh.core.environment import EnvironmentSuffixTarget @@ -1936,49 +1934,16 @@ def test_transaction(ctx: TestContext): ctx.compare_with_current(table, input_data) -def test_sushi(ctx: TestContext, tmp_path_factory: pytest.TempPathFactory): +def test_sushi(ctx: TestContext, tmp_path: pathlib.Path): if ctx.mark == "athena_hive": pytest.skip( "Sushi end-to-end tests only need to run once for Athena because sushi needs a hybrid of both Hive and Iceberg" ) - tmp_path = tmp_path_factory.mktemp(f"sushi_{ctx.test_id}") - sushi_test_schema = ctx.add_test_suffix("sushi") sushi_state_schema = ctx.add_test_suffix("sushi_state") raw_test_schema = ctx.add_test_suffix("raw") - config = load_config_from_paths( - Config, - project_paths=[ - pathlib.Path(os.path.join(os.path.dirname(__file__), "config.yaml")), - ], - personal_paths=[pathlib.Path("~/.sqlmesh/config.yaml").expanduser()], - ) - before_all = [ - f"CREATE SCHEMA IF NOT EXISTS {raw_test_schema}", - f"DROP VIEW IF EXISTS {raw_test_schema}.demographics", - f"CREATE VIEW {raw_test_schema}.demographics AS (SELECT 1 AS customer_id, '00000' AS zip)", - ] - config.before_all = [ - quote_identifiers( - parse_one(e, dialect=config.model_defaults.dialect), - dialect=config.model_defaults.dialect, - ).sql(dialect=config.model_defaults.dialect) - for e in before_all - ] - - # To enable parallelism in integration tests - config.gateways = {ctx.gateway: config.gateways[ctx.gateway]} - current_gateway_config = config.gateways[ctx.gateway] - current_gateway_config.state_schema = sushi_state_schema - - if ctx.dialect == "athena": - # Ensure that this test is using the same s3_warehouse_location as TestContext (which includes the testrun_id) - current_gateway_config.connection.s3_warehouse_location = ( - ctx.engine_adapter.s3_warehouse_location - ) - # Copy sushi example to tmpdir shutil.copytree(pathlib.Path("./examples/sushi"), tmp_path, dirs_exist_ok=True) @@ -2000,7 +1965,23 @@ def test_sushi(ctx: TestContext, tmp_path_factory: pytest.TempPathFactory): contents = contents.replace(search, replace) f.write_text(contents) - context = Context(paths=tmp_path, config=config, gateway=ctx.gateway) + before_all = [ + f"CREATE SCHEMA IF NOT EXISTS {raw_test_schema}", + f"DROP VIEW IF EXISTS {raw_test_schema}.demographics", + f"CREATE VIEW {raw_test_schema}.demographics AS (SELECT 1 AS customer_id, '00000' AS zip)", + ] + + def _mutate_config(gateway: str, config: Config) -> None: + config.gateways[gateway].state_schema = sushi_state_schema + config.before_all = [ + quote_identifiers( + parse_one(e, dialect=config.model_defaults.dialect), + dialect=config.model_defaults.dialect, + ).sql(dialect=config.model_defaults.dialect) + for e in before_all + ] + + context = ctx.create_context(_mutate_config, path=tmp_path, ephemeral_state_connection=False) end = now() start = to_date(end - timedelta(days=7)) @@ -2355,9 +2336,7 @@ def validate_no_comments( ctx._schemas.append(schema) -def test_init_project(ctx: TestContext, tmp_path_factory: pytest.TempPathFactory): - tmp_path = tmp_path_factory.mktemp(f"init_project_{ctx.test_id}") - +def test_init_project(ctx: TestContext, tmp_path: pathlib.Path): schema_name = ctx.add_test_suffix(TEST_SCHEMA) state_schema = ctx.add_test_suffix("sqlmesh_state") @@ -2383,33 +2362,15 @@ def _normalize_snowflake(name: str, prefix_regex: str = "(sqlmesh__)(.*)"): init_example_project(tmp_path, ctx.engine_type, schema_name=schema_name) - config = load_config_from_paths( - Config, - project_paths=[ - pathlib.Path(os.path.join(os.path.dirname(__file__), "config.yaml")), - ], - personal_paths=[pathlib.Path("~/.sqlmesh/config.yaml").expanduser()], - ) - - # ensure default dialect comes from init_example_project and not ~/.sqlmesh/config.yaml - if config.model_defaults.dialect != ctx.dialect: - config.model_defaults = config.model_defaults.copy(update={"dialect": ctx.dialect}) - - # To enable parallelism in integration tests - config.gateways = {ctx.gateway: config.gateways[ctx.gateway]} - current_gateway_config = config.gateways[ctx.gateway] - - if ctx.dialect == "athena": - # Ensure that this test is using the same s3_warehouse_location as TestContext (which includes the testrun_id) - current_gateway_config.connection.s3_warehouse_location = ( - ctx.engine_adapter.s3_warehouse_location - ) + def _mutate_config(gateway: str, config: Config): + # ensure default dialect comes from init_example_project and not ~/.sqlmesh/config.yaml + if config.model_defaults.dialect != ctx.dialect: + config.model_defaults = config.model_defaults.copy(update={"dialect": ctx.dialect}) - # Ensure the state schema is unique to this test - config.gateways[ctx.gateway].state_schema = state_schema + # Ensure the state schema is unique to this test (since we deliberately use the warehouse as the state connection) + config.gateways[gateway].state_schema = state_schema - context = Context(paths=tmp_path, config=config, gateway=ctx.gateway) - ctx.engine_adapter = context.engine_adapter + context = ctx.create_context(_mutate_config, path=tmp_path, ephemeral_state_connection=False) if ctx.default_table_format: # if the default table format is explicitly set, ensure its being used @@ -3607,6 +3568,7 @@ def test_identifier_length_limit(ctx: TestContext): EnvironmentSuffixTarget.CATALOG, ], ) +@pytest.mark.xdist_group("serial") def test_janitor( ctx: TestContext, tmp_path: pathlib.Path, environment_suffix_target: EnvironmentSuffixTarget ): @@ -3621,9 +3583,10 @@ def test_janitor( init_example_project(tmp_path, ctx.engine_type, schema_name=parsed_schema.db) - def _set_config(_gateway: str, config: Config) -> None: + def _set_config(gateway: str, config: Config) -> None: config.environment_suffix_target = environment_suffix_target config.model_defaults.dialect = ctx.dialect + config.gateways[gateway].connection.concurrent_tasks = 1 sqlmesh = ctx.create_context(path=tmp_path, config_mutator=_set_config) @@ -3648,7 +3611,10 @@ def _set_config(_gateway: str, config: Config) -> None: # check physical objects snapshot_table_name = exp.to_table(new_model.table_name(), dialect=ctx.dialect) - snapshot_schema = snapshot_table_name.db + snapshot_schema = parsed_schema.copy() + snapshot_schema.set( + "db", exp.to_identifier(snapshot_table_name.db) + ) # we need this to be catalog.schema and not just schema for environment_suffix_target: catalog prod_schema = normalize_identifiers(d.to_schema(schema), dialect=ctx.dialect) dev_env_schema = prod_schema.copy() diff --git a/tests/core/engine_adapter/integration/test_integration_fabric.py b/tests/core/engine_adapter/integration/test_integration_fabric.py new file mode 100644 index 0000000000..a272005bdc --- /dev/null +++ b/tests/core/engine_adapter/integration/test_integration_fabric.py @@ -0,0 +1,41 @@ +import typing as t +import pytest +from pytest import FixtureRequest +from sqlmesh.core.engine_adapter import FabricEngineAdapter +from tests.core.engine_adapter.integration import TestContext + +from tests.core.engine_adapter.integration import ( + TestContext, + generate_pytest_params, + ENGINES_BY_NAME, + IntegrationTestEngine, +) + + +@pytest.fixture( + params=list(generate_pytest_params(ENGINES_BY_NAME["fabric"], show_variant_in_test_id=False)) +) +def ctx( + request: FixtureRequest, + create_test_context: t.Callable[[IntegrationTestEngine, str, str], t.Iterable[TestContext]], +) -> t.Iterable[TestContext]: + yield from create_test_context(*request.param) + + +@pytest.fixture +def engine_adapter(ctx: TestContext) -> FabricEngineAdapter: + assert isinstance(ctx.engine_adapter, FabricEngineAdapter) + return ctx.engine_adapter + + +def test_create_drop_catalog(ctx: TestContext, engine_adapter: FabricEngineAdapter): + catalog_name = ctx.add_test_suffix("test_catalog") + + try: + ctx.create_catalog(catalog_name) + # if already exists, should be no-op, not error + ctx.create_catalog(catalog_name) + ctx.drop_catalog(catalog_name) + finally: + # if doesnt exist, should be no-op, not error + ctx.drop_catalog(catalog_name)