Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions app/data/model/records.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from dataclasses import dataclass

from app.data.model import interface
from app.lib.storage import enums


@dataclass
Expand Down Expand Up @@ -38,6 +39,7 @@ def get[T](self, t: type[T]) -> T | None:
class RecordCrossmatch:
record: Record
processing_result: CIResult
triage_status: enums.RecordTriageStatus = enums.RecordTriageStatus.PENDING


@dataclass
Expand Down
26 changes: 20 additions & 6 deletions app/data/repositories/layer0/records.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ def get_processed_records(
offset: str | None = None,
table_name: str | None = None,
status: Sequence[enums.RecordCrossmatchStatus] | None = None,
triage_status: Sequence[enums.RecordTriageStatus] | None = None,
record_id: str | None = None,
) -> list[model.RecordCrossmatch]:
params = []
Expand All @@ -65,13 +66,19 @@ def get_processed_records(
where_stmnt.append("c.status = ANY(%s)")
params.append([s.value for s in statuses])

if triage_status is not None:
triage_statuses = list(triage_status)
if triage_statuses:
where_stmnt.append("c.triage_status = ANY(%s)")
params.append([s.value for s in triage_statuses])

if record_id is not None:
where_stmnt.append("o.id = %s")
params.append(record_id)

where_clause = f"WHERE {' AND '.join(where_stmnt)}" if where_stmnt else ""

query = f"""SELECT o.id, c.status, c.metadata
query = f"""SELECT o.id, c.status, c.triage_status, c.metadata
{join_tables}
{where_clause}
ORDER BY o.id
Expand Down Expand Up @@ -102,6 +109,7 @@ def get_processed_records(
[],
),
ci_result,
triage_status=row["triage_status"],
)
)

Expand Down Expand Up @@ -158,32 +166,38 @@ def get_table_statistics(self, table_name: str) -> model.TableStatistics:
)

def add_crossmatch_result(self, data: dict[str, model.CIResult]) -> None:
query = "INSERT INTO layer0.crossmatch (record_id, status, metadata) VALUES "
query = "INSERT INTO layer0.crossmatch (record_id, status, triage_status, metadata) VALUES "
params = []
values = []

for record_id, result in data.items():
values.append("(%s, %s, %s)")
values.append("(%s, %s, %s, %s)")

status = None
triage = enums.RecordTriageStatus.PENDING
meta = {}

if isinstance(result, model.CIResultObjectNew):
status = enums.RecordCrossmatchStatus.NEW
triage = enums.RecordTriageStatus.RESOLVED
meta = {}
elif isinstance(result, model.CIResultObjectExisting):
status = enums.RecordCrossmatchStatus.EXISTING
triage = enums.RecordTriageStatus.RESOLVED
meta = {"pgc": result.pgc}
else:
status = enums.RecordCrossmatchStatus.COLLIDED
possible_pgcs = list(result.pgcs)

meta = {"possible_matches": possible_pgcs}

params.extend([record_id, status, json.dumps(meta)])
params.extend([record_id, status, triage, json.dumps(meta)])

query += ",".join(values)
query += " ON CONFLICT (record_id) DO UPDATE SET status = EXCLUDED.status, metadata = EXCLUDED.metadata"
query += (
" ON CONFLICT (record_id) DO UPDATE SET "
"status = EXCLUDED.status, triage_status = EXCLUDED.triage_status, "
"metadata = EXCLUDED.metadata"
)

self._storage.exec(query, params=params)

Expand Down
3 changes: 2 additions & 1 deletion app/data/repositories/layer0/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,10 @@ def get_processed_records(
offset: str | None = None,
table_name: str | None = None,
status: Sequence[enums.RecordCrossmatchStatus] | None = None,
triage_status: Sequence[enums.RecordTriageStatus] | None = None,
record_id: str | None = None,
) -> list[model.RecordCrossmatch]:
return self.records_repo.get_processed_records(limit, offset, table_name, status, record_id)
return self.records_repo.get_processed_records(limit, offset, table_name, status, triage_status, record_id)

def add_crossmatch_result(self, data: dict[str, model.CIResult]) -> None:
return self.records_repo.add_crossmatch_result(data)
Expand Down
1 change: 1 addition & 0 deletions app/domain/adminapi/crossmatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ def _convert_to_record_crossmatch(self, records: list[model.RecordCrossmatch]) -
adminapi.RecordCrossmatch(
record_id=obj.record.id,
status=status,
triage_status=obj.triage_status,
metadata=metadata,
catalogs=catalogs,
)
Expand Down
5 changes: 5 additions & 0 deletions app/lib/storage/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,8 @@ class RecordCrossmatchStatus(enum.StrEnum):
NEW = "new"
COLLIDED = "collided"
EXISTING = "existing"


class RecordTriageStatus(enum.StrEnum):
PENDING = "pending"
RESOLVED = "resolved"
1 change: 1 addition & 0 deletions app/lib/storage/postgres/postgres_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ def dump(self, obj: Any) -> bytes | bytearray | memoryview:
DEFAULT_ENUMS = [
(enums.DataType, "common.datatype"),
(enums.RecordCrossmatchStatus, "layer0.crossmatch_status"),
(enums.RecordTriageStatus, "layer0.triage_status"),
]


Expand Down
1 change: 1 addition & 0 deletions app/presentation/adminapi/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ class Catalogs(pydantic.BaseModel):
class RecordCrossmatch(pydantic.BaseModel):
record_id: str
status: enums.RecordCrossmatchStatus
triage_status: enums.RecordTriageStatus
metadata: RecordCrossmatchMetadata
catalogs: Catalogs

Expand Down
1 change: 1 addition & 0 deletions app/tasks/submit_crossmatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ def run(self):
lambda d, _: d[-1].record.id,
table_name=self.table_name,
status=[enums.RecordCrossmatchStatus.NEW, enums.RecordCrossmatchStatus.EXISTING],
triage_status=[enums.RecordTriageStatus.RESOLVED],
batch_size=self.batch_size,
):
batch_new = sum(1 for obj in data if isinstance(obj.processing_result, model.CIResultObjectNew))
Expand Down
10 changes: 10 additions & 0 deletions postgres/migrations/V017__crossmatch_triage_status.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
/* pgmigrate-encoding: utf-8 */

CREATE TYPE layer0.triage_status AS ENUM ('pending', 'resolved');

ALTER TABLE layer0.crossmatch
ADD COLUMN triage_status layer0.triage_status NOT NULL DEFAULT 'pending';

UPDATE layer0.crossmatch
SET triage_status = 'resolved'
WHERE status IN ('new', 'existing');
Loading