Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 84 additions & 49 deletions py-src/data_formulator/datalake/workspace.py
Original file line number Diff line number Diff line change
Expand Up @@ -579,77 +579,112 @@ class WorkspaceWithTempData:
Context manager that temporarily adds temp data (list of {name, rows}) to a workspace
as parquet files, yields the same workspace, and removes those files on exit.

OPTIMIZATION: Temp files are written directly to disk WITHOUT metadata updates.
This eliminates metadata file locking contention when multiple temp tables are
created concurrently. Since temp files are ephemeral (exist only during the context),
they don't need to be tracked in workspace.yaml.

Python code can still access temp files via relative paths (e.g., pd.read_parquet())
because the sandbox execution runs with workspace._path as the current working directory.

Use when the client sends in-memory data (e.g. language == "python"): wrap the
workspace so temp tables are visible for the block and then cleaned up.
Two modes controlled by ``register_metadata``:

**register_metadata=False (default)**
Temp files are written directly to disk WITHOUT metadata updates.
This eliminates metadata file locking contention when multiple temp tables
are created concurrently (the Flask/web-app path). Files use a ``.temp_``
prefix so they can be identified for crash-recovery cleanup.
Python code can still access them via relative paths because the sandbox
runs with ``workspace._path`` as the working directory.

**register_metadata=True**
Files are written with their plain sanitised name **and** registered in
``workspace.yaml`` via ``add_table_metadata``. On exit they are removed
with ``delete_table`` (which cleans up both the file and the metadata
entry). Use this when downstream code needs ``read_data_as_df`` /
``generate_data_summary`` to resolve tables by name – e.g. the MCP server.
"""

def __init__(self, workspace: Workspace, temp_data: Optional[list[dict[str, Any]]] = None):
def __init__(
self,
workspace: Workspace,
temp_data: Optional[list[dict[str, Any]]] = None,
register_metadata: bool = False,
):
self._workspace = workspace
self._temp_data = temp_data if temp_data else None
self._temp_files: list[Path] = [] # Track file paths for cleanup (not table names)
self._register_metadata = register_metadata
# When register_metadata=False we track file *paths* for cleanup.
# When register_metadata=True we track table *names* for delete_table().
self._temp_files: list[Path] = []
self._temp_table_names: list[str] = []

def __enter__(self) -> Workspace:
if not self._temp_data:
return self._workspace

from datetime import datetime
from data_formulator.datalake.parquet_utils import sanitize_table_name

for item in self._temp_data:
base_name = item.get("name", "table")
safe_name = sanitize_table_name(base_name)

# Use .temp_ prefix to distinguish from persistent tables
# This also helps with crash recovery - stale temp files can be identified and cleaned up
temp_filename = f".temp_{safe_name}.parquet"
file_path = self._workspace._path / temp_filename

# Handle name conflicts by checking filesystem directly (no metadata read needed)
counter = 1
while file_path.exists():
temp_filename = f".temp_{safe_name}_{counter}.parquet"
file_path = self._workspace._path / temp_filename
counter += 1

# CRITICAL: Write parquet directly - NO metadata update
# This is the key optimization that eliminates metadata file locking contention.
# Temp files don't need metadata tracking since they're ephemeral and only
# live for the duration of this context.
#
# Python code can still access them via relative paths since the sandbox
# runs with workspace._path as cwd, e.g.:
# pd.read_parquet('.temp_sales.parquet')
# conn.execute("SELECT * FROM read_parquet('.temp_sales.parquet')")
rows = item.get("rows", [])
df = pd.DataFrame(rows) if rows else pd.DataFrame()
df.to_parquet(file_path)

self._temp_files.append(file_path)
logger.debug(
f"Added temp file {file_path.name} to workspace "
f"({len(df)} rows, no metadata update)"
)
if self._register_metadata:
# ---- metadata-aware path ----
filename = f"{safe_name}.parquet"
file_path = self._workspace._path / filename
df.to_parquet(file_path)

from data_formulator.datalake.metadata import TableMetadata

meta = TableMetadata(
name=safe_name,
source_type="upload",
filename=filename,
file_type="parquet",
created_at=datetime.now(),
row_count=len(df),
)
self._workspace.add_table_metadata(meta)
self._temp_table_names.append(safe_name)
logger.debug(
f"Added table {safe_name} to workspace with metadata "
f"({len(df)} rows)"
)
else:
# ---- fast path (no metadata) ----
temp_filename = f".temp_{safe_name}.parquet"
file_path = self._workspace._path / temp_filename

counter = 1
while file_path.exists():
temp_filename = f".temp_{safe_name}_{counter}.parquet"
file_path = self._workspace._path / temp_filename
counter += 1

df.to_parquet(file_path)
self._temp_files.append(file_path)
logger.debug(
f"Added temp file {file_path.name} to workspace "
f"({len(df)} rows, no metadata update)"
)

return self._workspace

def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
# Delete temp files directly - NO metadata update
# This is safe because we never added them to metadata in the first place
for file_path in self._temp_files:
try:
file_path.unlink(missing_ok=True)
logger.debug(f"Removed temp file {file_path.name}")
except Exception as e:
logger.warning(f"Failed to remove temp file {file_path}: {e}")

self._temp_files.clear()
if self._register_metadata:
# delete_table removes both the parquet file and the metadata entry
for name in self._temp_table_names:
try:
self._workspace.delete_table(name)
logger.debug(f"Deleted table {name} (file + metadata)")
except Exception as e:
logger.warning(f"Failed to delete table {name}: {e}")
self._temp_table_names.clear()
else:
for file_path in self._temp_files:
try:
file_path.unlink(missing_ok=True)
logger.debug(f"Removed temp file {file_path.name}")
except Exception as e:
logger.warning(f"Failed to remove temp file {file_path}: {e}")
self._temp_files.clear()


# ==============================================================================
Expand Down
Loading