Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions app/data/repositories/layer0/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,17 @@ def fetch_table(

def fetch_raw_data(
self,
table_name: str,
table_name: str | None = None,
offset: str | None = None,
columns: list[str] | None = None,
order_column: str | None = None,
order_direction: str = "asc",
limit: int | None = None,
record_id: str | None = None,
) -> model.Layer0RawData:
return self.table_repo.fetch_raw_data(table_name, offset, columns, order_column, order_direction, limit)
return self.table_repo.fetch_raw_data(
table_name, offset, columns, order_column, order_direction, limit, record_id
)

def fetch_metadata(self, table_name: str) -> model.Layer0TableMeta:
return self.table_repo.fetch_metadata(table_name)
Expand Down
41 changes: 34 additions & 7 deletions app/data/repositories/layer0/tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,33 +173,48 @@ def fetch_table(

def fetch_raw_data(
self,
table_name: str,
table_name: str | None = None,
offset: str | None = None,
columns: list[str] | None = None,
order_column: str | None = None,
order_direction: str = "asc",
limit: int | None = None,
record_id: str | None = None,
) -> model.Layer0RawData:
"""
:param table_name: Name of the raw table
:param columns: select only given columns
:param order_column: orders result by a provided column
:param order_direction: if `order_column` is specified, sets order direction. Either `asc` or `desc`.
:param offset: allows to retrieve rows starting from the `offset` record_id
:param limit: allows to retrieve no more than `limit` rows
:param offset: allows to retrieve rows starting from the `offset` record_id.
:param record_id: retrieves only the row with the given record_id. Other filters are still applied.
:param limit: allows to retrieve no more than `limit` rows.
:return: Layer0RawData
"""

if table_name is None and record_id is not None:
table_name = self._resolve_table_name(record_id)

if table_name is None:
raise ValueError("either table_name or record_id must be provided")

columns_str = ",".join(columns or ["*"])

params = []
query = f"""
SELECT {columns_str} FROM {RAWDATA_SCHEMA}."{table_name}"\n
"""
where_stmnt = []

if offset is not None:
query += f"WHERE {repositories.INTERNAL_ID_COLUMN_NAME} > %s\n"
where_stmnt.append(f"{repositories.INTERNAL_ID_COLUMN_NAME} > %s")
params.append(offset)

if record_id is not None:
where_stmnt.append(f"{INTERNAL_ID_COLUMN_NAME} = %s")
params.append(record_id)

where_clause = f"WHERE {' AND '.join(where_stmnt)}" if where_stmnt else ""

query = f'SELECT {columns_str} FROM {RAWDATA_SCHEMA}."{table_name}" {where_clause}\n'

if order_column is not None:
query += f"ORDER BY {order_column} {order_direction}\n"

Expand All @@ -210,6 +225,18 @@ def fetch_raw_data(
rows = self._storage.query(query, params=params)
return model.Layer0RawData(table_name, pandas.DataFrame(rows))

def _resolve_table_name(self, record_id: str) -> str | None:
rows = self._storage.query(
"""
SELECT t.table_name
FROM layer0.records AS o
JOIN layer0.tables AS t ON o.table_id = t.id
WHERE o.id = %s
""",
params=[record_id],
)
return rows[0]["table_name"] if rows else None

def fetch_metadata(self, table_name: str) -> model.Layer0TableMeta:
return self.fetch_metadata_by_name(table_name)

Expand Down
18 changes: 17 additions & 1 deletion app/domain/adminapi/crossmatch.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import final
from typing import Any, final

import structlog
from astropy import coordinates
Expand Down Expand Up @@ -150,6 +150,20 @@ def get_record_crossmatch(self, r: adminapi.GetRecordCrossmatchRequest) -> admin
obj = processed_objects[0]
crossmatch_records = self._convert_to_record_crossmatch([obj])

original_data: dict[str, Any] | None = None
table_name = ""
try:
raw_data = self.layer0_repo.fetch_raw_data(record_id=obj.record.id)
table_name = raw_data.table_name
if not raw_data.data.empty:
original_data = raw_data.data.iloc[0].to_dict()
except Exception:
logger.warning(
"Failed to fetch original raw data for record",
record_id=obj.record.id,
error=True,
)

candidate_pgcs: list[int] = []

if isinstance(obj.processing_result, model.CIResultObjectCollision):
Expand All @@ -158,9 +172,11 @@ def get_record_crossmatch(self, r: adminapi.GetRecordCrossmatchRequest) -> admin
candidate_pgcs.append(obj.processing_result.pgc)

response = adminapi.GetRecordCrossmatchResponse(
table_name=table_name,
crossmatch=crossmatch_records[0],
candidates=[],
schema=DATA_SCHEMA,
original_data=original_data,
)

if len(candidate_pgcs) == 0:
Expand Down
2 changes: 2 additions & 0 deletions app/presentation/adminapi/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,9 +280,11 @@ class PGCCandidate(pydantic.BaseModel):


class GetRecordCrossmatchResponse(pydantic.BaseModel):
table_name: str
crossmatch: RecordCrossmatch
candidates: list[PGCCandidate]
schema_: Schema = pydantic.Field(..., alias="schema")
original_data: dict[str, Any] | None = None


class Actions(abc.ABC):
Expand Down
5 changes: 4 additions & 1 deletion makefile
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,10 @@ test: check
uv run pytest --config-file=pyproject.toml tests/unit

test-all: check
uv run pytest --config-file=pyproject.toml tests
@uv run pytest \
--config-file=pyproject.toml \
--quiet \
tests

test-regression:
uv run tests.py regression-tests
Expand Down
Loading