Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 3 additions & 62 deletions app/domain/dataapi/actions.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,8 @@
from typing import final

import astropy.units as u
from astropy import coordinates as coords

from app.data import model, repositories
from app.data.repositories import layer2
from app.domain import expressions, responders
from app.domain.dataapi import parameterized_query
from app.domain import responders
from app.domain.dataapi import parameterized_query, search_parsers
from app.presentation import dataapi

ENABLED_CATALOGS = [
Expand All @@ -29,17 +25,14 @@ def __init__(
)

def query(self, query: dataapi.QueryRequest) -> dataapi.QueryResponse:
expression = expressions.parse_expression(query.q)
filters, search_params = expression_to_filter(expression)

filters, search_params = search_parsers.query_to_filters(query.q, search_parsers.DEFAULT_PARSERS)
objects = self.layer2_repo.query(
ENABLED_CATALOGS,
filters,
search_params,
query.page_size,
query.page,
)

responder = responders.JSONResponder()
pgc_objects = responder.build_response(objects)
return dataapi.QueryResponse(objects=pgc_objects)
Expand All @@ -49,55 +42,3 @@ def query_fits(self, query: dataapi.FITSRequest) -> bytes:

def query_simple(self, query: dataapi.QuerySimpleRequest) -> dataapi.QuerySimpleResponse:
return self.parameterized_query_manager.query_simple(query)


def parse_coordinates(coord_str: str) -> coords.SkyCoord:
try:
if coord_str.startswith(("J", "B")):
return coords.SkyCoord(coord_str[1:], unit=(u.Unit("hourangle"), u.Unit("deg")))

if coord_str.startswith("G"):
long, lat = map(float, coord_str[1:].split("+"))
coord = coords.SkyCoord(l=long * u.Unit("deg"), b=lat * u.Unit("deg"), frame="galactic")
return coord.transform_to("icrs")

return coords.SkyCoord(coord_str, unit=(u.Unit("hourangle"), u.Unit("deg")))
except Exception as e:
raise ValueError(f"Invalid coordinate format: {coord_str}") from e


def parse_function_node(node: expressions.FunctionNode) -> tuple[layer2.Filter, layer2.SearchParams]:
if node.function == expressions.FunctionName.PGC:
try:
pgc = int(node.value)
except ValueError as e:
raise ValueError(f"Invalid PGC value: '{node.value}'") from e

return layer2.PGCOneOfFilter([pgc]), layer2.CombinedSearchParams([])
if node.function == expressions.FunctionName.NAME:
return layer2.DesignationCloseFilter(2), layer2.DesignationSearchParams(node.value)
if node.function == expressions.FunctionName.POS:
return layer2.ICRSCoordinatesInRadiusFilter(1 * u.Unit("arcsec")), layer2.ICRSSearchParams(
coords=parse_coordinates(node.value)
)

raise ValueError(f"Unsupported function: {node.function}")


def expression_to_filter(expr: expressions.Node) -> tuple[layer2.Filter, layer2.SearchParams]:
if isinstance(expr, expressions.AndNode):
left_filter, left_search_params = expression_to_filter(expr.left)
right_filter, right_search_params = expression_to_filter(expr.right)

return layer2.AndFilter([left_filter, right_filter]), layer2.CombinedSearchParams(
[left_search_params, right_search_params]
)
if isinstance(expr, expressions.OrNode):
left_filter, left_search_params = expression_to_filter(expr.left)
right_filter, right_search_params = expression_to_filter(expr.right)

return layer2.OrFilter([left_filter, right_filter]), layer2.CombinedSearchParams(
[left_search_params, right_search_params]
)

return parse_function_node(expr)
107 changes: 107 additions & 0 deletions app/domain/dataapi/search_parsers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
import abc
import re
from typing import final

import astropy.units as u
from astropy import coordinates as coords

from app.data.repositories import layer2

RADIUS_ARCSEC = 1 * u.Unit("arcsec")

HMS_DMS_PATTERN = re.compile(r"^(\d+h\d+m[\d.]+s)([+-]\d+d\d+m[\d.]+s)$", re.IGNORECASE)
J_COORD_PATTERN = re.compile(r"^J(\d{2})(\d{2})([\d.]+)([+-])(\d{2})(\d{2})([\d.]+)$", re.IGNORECASE)


class SearchParser(abc.ABC):
@abc.abstractmethod
def parse(self, query: str) -> tuple[layer2.Filter, layer2.SearchParams] | None:
pass


@final
class NameSearchParser(SearchParser):
def parse(self, query: str) -> tuple[layer2.Filter, layer2.SearchParams] | None:
return (
layer2.DesignationLikeFilter(),
layer2.DesignationSearchParams(query.strip()),
)


@final
class HMSDMSCoordinateParser(SearchParser):
def parse(self, query: str) -> tuple[layer2.Filter, layer2.SearchParams] | None:
query = query.strip()
m = HMS_DMS_PATTERN.match(query)
if m is None:
return None
try:
ra_str = m.group(1).replace("h", ":").replace("m", ":").replace("s", "")
dec_str = m.group(2).replace("d", ":").replace("m", ":").replace("s", "")
sky_coord = coords.SkyCoord(
ra_str + " " + dec_str,
unit=(u.Unit("hourangle"), u.Unit("deg")),
)
except Exception:
return None
return (
layer2.ICRSCoordinatesInRadiusFilter(RADIUS_ARCSEC),
layer2.ICRSSearchParams(coords=sky_coord),
)


def _parse_j_coord_to_skycoord(query: str) -> coords.SkyCoord:
m = J_COORD_PATTERN.match(query.strip())
if m is None:
raise ValueError("Does not match J coordinate format")
ra_h, ra_m, ra_s, dec_sign, dec_d, dec_m, dec_s = m.groups()
ra_str = f"{ra_h}:{ra_m}:{ra_s}"
dec_str = f"{dec_sign}{dec_d}:{dec_m}:{dec_s}"
return coords.SkyCoord(
ra_str + " " + dec_str,
unit=(u.Unit("hourangle"), u.Unit("deg")),
)


@final
class JCoordinateParser(SearchParser):
def parse(self, query: str) -> tuple[layer2.Filter, layer2.SearchParams] | None:
query = query.strip()
if not query.upper().startswith("J"):
return None
try:
sky_coord = _parse_j_coord_to_skycoord(query)
except (ValueError, Exception):
return None
return (
layer2.ICRSCoordinatesInRadiusFilter(RADIUS_ARCSEC),
layer2.ICRSSearchParams(coords=sky_coord),
)


DEFAULT_PARSERS: list[SearchParser] = [
NameSearchParser(),
HMSDMSCoordinateParser(),
JCoordinateParser(),
]


def query_to_filters(
query: str,
parsers: list[SearchParser],
) -> tuple[layer2.Filter, layer2.SearchParams]:
results: list[tuple[layer2.Filter, layer2.SearchParams]] = []
for parser in parsers:
parsed = parser.parse(query)
if parsed is not None:
results.append(parsed)
if not results:
return (
layer2.DesignationLikeFilter(),
layer2.DesignationSearchParams(query.strip()),
)
if len(results) == 1:
return results[0]
filters = [f for f, _ in results]
params = [p for _, p in results]
return layer2.OrFilter(filters), layer2.CombinedSearchParams(params)
4 changes: 0 additions & 4 deletions app/domain/expressions/__init__.py

This file was deleted.

100 changes: 0 additions & 100 deletions app/domain/expressions/parser.py

This file was deleted.

Loading
Loading