Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .circleci/manage-test-db.sh
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ databricks_init() {

# Note: the cluster doesnt need to be running to create / drop catalogs, but it does need to be running to run the integration tests
echo "Ensuring cluster is running"
databricks clusters start $CLUSTER_ID || true
databricks clusters start $CLUSTER_ID
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to make Databricks fail early with the cluster start error rather than time out for 40mins trying to run tests

}

databricks_up() {
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ athena-test: guard-AWS_ACCESS_KEY_ID guard-AWS_SECRET_ACCESS_KEY guard-ATHENA_S3
pytest -n auto -m "athena" --retries 3 --junitxml=test-results/junit-athena.xml

fabric-test: guard-FABRIC_HOST guard-FABRIC_CLIENT_ID guard-FABRIC_CLIENT_SECRET guard-FABRIC_DATABASE engine-fabric-install
pytest -n auto -m "fabric" --retries 3 --junitxml=test-results/junit-fabric.xml
pytest -n auto -m "fabric" --retries 3 --junitxml=test-results/junit-fabric.xml

gcp-postgres-test: guard-GCP_POSTGRES_INSTANCE_CONNECTION_STRING guard-GCP_POSTGRES_USER guard-GCP_POSTGRES_PASSWORD guard-GCP_POSTGRES_KEYFILE_JSON engine-gcppostgres-install
pytest -n auto -m "gcp_postgres" --retries 3 --junitxml=test-results/junit-gcp-postgres.xml
Expand Down
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ markers = [
"mssql: test for MSSQL",
"mysql: test for MySQL",
"postgres: test for Postgres",
"gcp_postgres: test for Postgres on GCP",
"redshift: test for Redshift",
"snowflake: test for Snowflake",
"spark: test for Spark",
Expand All @@ -267,7 +268,7 @@ markers = [
]
addopts = "-n 0 --dist=loadgroup"
asyncio_default_fixture_loop_scope = "session"
log_cli = false # Set this to true to enable logging during tests
log_cli = true # Set this to true to enable logging during tests
log_cli_format = "%(asctime)s.%(msecs)03d %(filename)s:%(lineno)d %(levelname)s %(message)s"
log_cli_level = "INFO"
filterwarnings = [
Expand Down
9 changes: 6 additions & 3 deletions sqlmesh/core/config/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ def load_config_from_paths(
personal_paths: t.Optional[t.List[Path]] = None,
config_name: str = "config",
load_from_env: bool = True,
variables: t.Optional[t.Dict[str, t.Any]] = None,
**kwargs: t.Any,
) -> C:
project_paths = project_paths or []
Expand Down Expand Up @@ -116,7 +117,7 @@ def load_config_from_paths(
"YAML configs do not support multiple configs. Use Python instead.",
)
yaml_config_path = path.resolve()
non_python_configs.append(load_config_from_yaml(path))
non_python_configs.append(load_config_from_yaml(path, variables))
elif extension == "py":
try:
python_config = load_config_from_python_module(
Expand Down Expand Up @@ -194,8 +195,10 @@ def load_config_from_paths(
return non_python_config


def load_config_from_yaml(path: Path) -> t.Dict[str, t.Any]:
content = yaml_load(path)
def load_config_from_yaml(
path: Path, variables: t.Optional[t.Dict[str, t.Any]] = None
) -> t.Dict[str, t.Any]:
content = yaml_load(path, variables=variables)
if not isinstance(content, dict):
raise ConfigError(
f"Invalid YAML configuration: expected a dictionary but got {type(content).__name__}. "
Expand Down
50 changes: 46 additions & 4 deletions sqlmesh/core/engine_adapter/fabric.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import typing as t
import logging
import requests
import time
from functools import cached_property
from sqlglot import exp
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_result
Expand All @@ -15,6 +16,7 @@
from sqlmesh.utils.errors import SQLMeshError
from sqlmesh.utils.connection_pool import ConnectionPool


if t.TYPE_CHECKING:
from sqlmesh.core._typing import TableName

Expand Down Expand Up @@ -172,8 +174,17 @@ def __init__(self, tenant_id: str, workspace_id: str, client_id: str, client_sec
self.client_secret = client_secret
self.workspace_id = workspace_id

def create_warehouse(self, warehouse_name: str) -> None:
def create_warehouse(
self, warehouse_name: str, if_not_exists: bool = True, attempt: int = 0
) -> None:
"""Create a catalog (warehouse) in Microsoft Fabric via REST API."""

# attempt count is arbitrary, it essentially equates to 5 minutes of 30 second waits
if attempt > 10:
raise SQLMeshError(
f"Gave up waiting for Fabric warehouse {warehouse_name} to become available"
)

logger.info(f"Creating Fabric warehouse: {warehouse_name}")

request_data = {
Expand All @@ -182,7 +193,34 @@ def create_warehouse(self, warehouse_name: str) -> None:
}

response = self.session.post(self._endpoint_url("warehouses"), json=request_data)
response.raise_for_status()

if (
if_not_exists
and response.status_code == 400
and (errorCode := response.json().get("errorCode", None))
):
if errorCode == "ItemDisplayNameAlreadyInUse":
logger.warning(f"Fabric warehouse {warehouse_name} already exists")
return
if errorCode == "ItemDisplayNameNotAvailableYet":
logger.warning(f"Fabric warehouse {warehouse_name} is still spinning up; waiting")
# Fabric error message is something like:
# - "Requested 'circleci_51d7087e__dev' is not available yet and is expected to become available in the upcoming minutes."
# This seems to happen if a catalog is dropped and then a new one with the same name is immediately created.
# There appears to be some delayed async process on the Fabric side that actually drops the warehouses and frees up the names to be used again
time.sleep(30)
return self.create_warehouse(
warehouse_name=warehouse_name, if_not_exists=if_not_exists, attempt=attempt + 1
)

try:
response.raise_for_status()
except:
# the important information to actually debug anything is in the response body which Requests never prints
logger.exception(
f"Failed to create warehouse {warehouse_name}. status: {response.status_code}, body: {response.text}"
)
raise

# Handle direct success (201) or async creation (202)
if response.status_code == 201:
Expand All @@ -197,11 +235,12 @@ def create_warehouse(self, warehouse_name: str) -> None:
logger.error(f"Unexpected response from Fabric API: {response}\n{response.text}")
raise SQLMeshError(f"Unable to create warehouse: {response}")

def delete_warehouse(self, warehouse_name: str) -> None:
def delete_warehouse(self, warehouse_name: str, if_exists: bool = True) -> None:
"""Drop a catalog (warehouse) in Microsoft Fabric via REST API."""
logger.info(f"Deleting Fabric warehouse: {warehouse_name}")

# Get the warehouse ID by listing warehouses
# TODO: handle continuationUri for pagination, ref: https://learn.microsoft.com/en-us/rest/api/fabric/warehouse/items/list-warehouses?tabs=HTTP#warehouses
response = self.session.get(self._endpoint_url("warehouses"))
response.raise_for_status()

Expand All @@ -213,9 +252,12 @@ def delete_warehouse(self, warehouse_name: str) -> None:
warehouse_id = warehouse_name_to_id.get(warehouse_name, None)

if not warehouse_id:
logger.error(
logger.warning(
f"Fabric warehouse does not exist: {warehouse_name}\n(available warehouses: {', '.join(warehouse_name_to_id)})"
)
if if_exists:
return

raise SQLMeshError(
f"Unable to delete Fabric warehouse {warehouse_name} as it doesnt exist"
)
Expand Down
33 changes: 33 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,39 @@ def pytest_collection_modifyitems(items, *args, **kwargs):
item.add_marker("fast")


@pytest.hookimpl(hookwrapper=True, tryfirst=True)
def pytest_runtest_makereport(item: pytest.Item, call: pytest.CallInfo):
Copy link
Collaborator Author

@erindru erindru Aug 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to be the only hook that can catch the StashKey errors. Fixtures that yield their values (like tmp_path) appear to hit different codepaths than other types of fixtures.

I tried both pytest_fixture_post_finalizer and pytest_runtest_teardown before resorting to this

# The tmp_path fixture frequently throws errors like:
# - KeyError: <_pytest.stash.StashKey object at 0x79ba385fe1a0>
# in its teardown. This causes pytest to mark the test as failed even though we have zero control over this behaviour.
# So we log/swallow that particular error here rather than raising it

# note: the hook always has to yield
outcome = yield

# we only care about tests that used the tmp_path fixture
if "tmp_path" not in getattr(item, "fixturenames", []):
return

result: pytest.TestReport = outcome.get_result()

if result.when != "teardown":
return

# If we specifically failed with a StashKey error in teardown, mark the test as passed
if result.failed:
exception = call.excinfo
if (
exception
and isinstance(exception.value, KeyError)
and "_pytest.stash.StashKey" in repr(exception)
):
result.outcome = "passed"
item.add_report_section(
"teardown", "stderr", f"Ignored tmp_path teardown error: {exception}"
)


# Ignore all local config files
@pytest.fixture(scope="session", autouse=True)
def ignore_local_config_files():
Expand Down
3 changes: 3 additions & 0 deletions tests/core/engine_adapter/integration/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ def __init__(
engine_adapter: EngineAdapter,
mark: str,
gateway: str,
tmp_path: pathlib.Path,
is_remote: bool = False,
columns_to_types: t.Optional[t.Dict[str, t.Union[str, exp.DataType]]] = None,
):
Expand All @@ -210,6 +211,7 @@ def __init__(
self._catalogs: t.List[
str
] = [] # keep track of any catalogs created via self.create_catalog() so we can drop them at the end
self.tmp_path = tmp_path

@property
def test_type(self) -> str:
Expand Down Expand Up @@ -655,6 +657,7 @@ def create_context(
private_sqlmesh_dir / "config.yml",
private_sqlmesh_dir / "config.yaml",
],
variables={"tmp_path": str(path or self.tmp_path)},
)
if config_mutator:
config_mutator(self.gateway, config)
Expand Down
2 changes: 1 addition & 1 deletion tests/core/engine_adapter/integration/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ gateways:
type: duckdb
catalogs:
memory: ':memory:'
testing: 'testing.duckdb'
testing: "{{ var('tmp_path') }}/testing.duckdb"
Copy link
Collaborator Author

@erindru erindru Aug 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was causing flakiness in style_and_cicd_tests, example

duckdb.duckdb.IOException: IO Error: Could not set lock on file "testing.duckdb": Conflicting lock is held in /home/circleci/.pyenv/versions/3.12.11/bin/python3.12 (PID 3249). See also https://duckdb.org/docs/stable/connect/concurrency

The duckdb integration tests get run as part of that and without prefixing the path, testing.duckdb gets created in the sqlmesh root dir rather than the unique dir for each test.

This causes it to be re-used between tests and potentially accessed in parallel from multiple workers although there is an xdist_group that forces most of the tests to run sequentially


# Databases with docker images available
inttest_trino_hive:
Expand Down
11 changes: 7 additions & 4 deletions tests/core/engine_adapter/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,15 @@
logger = logging.getLogger(__name__)


@pytest.fixture(scope="session")
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was session-scoped because its predecessor was session scoped and the session scope got retained through an earlier refactor.

However, I was encountering some very hard-to-pin-down issues that I have seen a bunch of times regarding StashKey, example:

teardown failed on attempt 2! Exiting immediately!
        Traceback (most recent call last):
          File "/home/********/.pyenv/versions/3.12.11/lib/python3.12/site-packages/_pytest/runner.py", line 344, in from_call
            result: TResult | None = func()
                                     ^^^^^^
        KeyError: <_pytest.stash.StashKey object at 0x7f1666072320>

That looks like a concurrency issue and I had a theory it was related to session and function scoped fixtures being mixed.

So i've made this function scoped because I couldnt see a strong reason for it to be session scoped

def config() -> Config:
@pytest.fixture
def config(tmp_path: pathlib.Path) -> Config:
return load_config_from_paths(
Config,
project_paths=[
pathlib.Path("examples/wursthall/config.yaml"),
pathlib.Path(os.path.join(os.path.dirname(__file__), "config.yaml")),
],
personal_paths=[pathlib.Path("~/.sqlmesh/config.yaml").expanduser()],
variables={"tmp_path": str(tmp_path)},
)


Expand Down Expand Up @@ -89,7 +89,9 @@ def _create(engine_name: str, gateway: str) -> EngineAdapter:

@pytest.fixture
def create_test_context(
request: FixtureRequest, create_engine_adapter: t.Callable[[str, str], EngineAdapter]
request: FixtureRequest,
create_engine_adapter: t.Callable[[str, str], EngineAdapter],
tmp_path: pathlib.Path,
) -> t.Callable[[IntegrationTestEngine, str, str, str], t.Iterable[TestContext]]:
def _create(
engine: IntegrationTestEngine, gateway: str, test_type: str, table_format: str
Expand All @@ -103,6 +105,7 @@ def _create(
engine_adapter,
f"{engine.engine}_{table_format}",
gateway,
tmp_path=tmp_path,
is_remote=is_remote,
)

Expand Down
Loading
Loading