diff --git a/.gitignore b/.gitignore index 68bc17f..8dae204 100644 --- a/.gitignore +++ b/.gitignore @@ -6,6 +6,9 @@ __pycache__/ # C extensions *.so +.DS_Store + + # Distribution / packaging .Python build/ diff --git a/src/__init__.py b/src/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/cell.py b/src/cell.py new file mode 100644 index 0000000..6c7c35c --- /dev/null +++ b/src/cell.py @@ -0,0 +1,75 @@ +""" +File Name: cell.py + +Copyright (c) 2023 - 2024 IndoorJson + +Author: Ziwei Xiang +Create Date: 2024/3/14 +""" + +import json +from functools import wraps +from typing import Dict +from shapely.wkt import loads +from shapely.geometry.base import BaseGeometry + + +def type_check(func): + @wraps(func) + def wrapper(self, cell_id: str, properties: Dict, space: BaseGeometry, node: BaseGeometry, *args, **kwargs): + if not isinstance(cell_id, str): + raise TypeError("cell_id must be a string") + if not isinstance(properties, dict): + raise TypeError("properties must be a dictionary") + if not isinstance(space, BaseGeometry): + raise TypeError("space must be a BaseGeometry instance") + if not isinstance(node, BaseGeometry): + raise TypeError("node must be a BaseGeometry instance") + return func(self, cell_id, properties, space, node, *args, **kwargs) + return wrapper + + +class Cell: + + @type_check + def __init__(self, cell_id: str, properties: Dict, space: BaseGeometry, node: BaseGeometry): + self.__id: str = cell_id + self.__properties: Dict = properties + self.__space: BaseGeometry = space + self.__node: BaseGeometry = node + + @property + def id(self) -> str: + return self.__id + + @property + def properties(self) -> Dict: + return self.__properties + + @property + def space(self) -> BaseGeometry: + return self.__space + + @property + def node(self) -> BaseGeometry: + return self.__node + + def to_json(self) -> Dict: + result = {} + for key, value in self.__dict__.items(): + json_key = key.split('__')[-1] + if isinstance(value, BaseGeometry): + result[json_key] = value.wkt + elif json_key == 'id': + json_key = '$id' + result[json_key] = value + else: + result[json_key] = value + return result + + @classmethod + def from_json(cls, json_dict: Dict) -> 'Cell': + json_dict['cell_id'] = json_dict.pop('$id') + json_dict['space'] = loads(json_dict['space']) + json_dict['node'] = loads(json_dict['node']) + return cls(**json_dict) diff --git a/src/connection.py b/src/connection.py new file mode 100644 index 0000000..d14ab8c --- /dev/null +++ b/src/connection.py @@ -0,0 +1,91 @@ +""" +File Name: connection.py + +Copyright (c) 2023 - 2024 IndoorJson + +Author: Ziwei Xiang +Create Date: 2024/3/14 +""" + +import json +from functools import wraps +from typing import Dict +from shapely.wkt import loads +from shapely.geometry.base import BaseGeometry + + +def type_check(func): + @wraps(func) + def wrapper(self, connection_id: str, properties: dict, fr: str, to: str, + bound: BaseGeometry, edge: BaseGeometry, *args, **kwargs): + if not isinstance(connection_id, str): + raise TypeError("connection_id must be a string") + if not isinstance(properties, dict): + raise TypeError("properties must be a dictionary") + if not isinstance(fr, str): + raise TypeError("source must be a string") + if not isinstance(to, str): + raise TypeError("target must be a string") + if not isinstance(bound, BaseGeometry): + raise TypeError("bound must be a BaseGeometry instance") + if not isinstance(edge, BaseGeometry): + raise TypeError("edge must be a BaseGeometry instance") + return func(self, connection_id, properties, fr, to, bound, edge, *args, **kwargs) + return wrapper + + +class Connection: + + @type_check + def __init__(self, connections_id: str, properties: Dict, fr: str, to: str, + bound: BaseGeometry, edge: BaseGeometry): + self.__id: str = connections_id + self.__properties: Dict = properties + self.__fr: str = fr + self.__to: str = to + self.__bound: BaseGeometry = bound + self.__edge: BaseGeometry = edge + + @property + def id(self) -> str: + return self.__id + + @property + def properties(self) -> dict: + return self.__properties + + @property + def source(self) -> str: + return self.__fr + + @property + def target(self) -> str: + return self.__to + + @property + def bound(self) -> BaseGeometry: + return self.__bound + + @property + def edge(self) -> BaseGeometry: + return self.__edge + + def to_json(self) -> Dict: + result = {} + for key, value in self.__dict__.items(): + json_key = key.split('__')[-1] + if isinstance(value, BaseGeometry): + result[json_key] = value.wkt + elif json_key == 'id': + json_key = '$id' + result[json_key] = value + else: + result[json_key] = value + return result + + @classmethod + def from_json(cls, json_dict: Dict) -> 'Connection': + json_dict['connection_id'] = json_dict.pop('$id') + json_dict['bound'] = loads(json_dict['bound']) + json_dict['edge'] = loads(json_dict['edge']) + return cls(**json_dict) diff --git a/src/indoorspace.py b/src/indoorspace.py new file mode 100644 index 0000000..574debc --- /dev/null +++ b/src/indoorspace.py @@ -0,0 +1,183 @@ +""" +File Name: indoorspace.py + +Copyright (c) 2023 - 2024 IndoorJson + +Author: Ziwei Xiang +Create Date: 2024/3/14 +""" + +import json +import numpy as np +from cell import Cell +from connection import Connection +from layer import Layer +from rlines import Rlines +from typing import List, Dict + + +class IndoorSpace: + + def __init__(self): + self._properties: Dict = {} + self._cells: List[Cell] = [] + self._connections: List[Connection] = [] + self._layers: List[Layer] = [] + self._rlineses: List[Rlines] = [] + self._hypergraph: Dict = {} + + @property + def properties(self) -> Dict: + return self._properties + + @property + def cells(self) -> List[Cell]: + return self._cells + + @property + def connections(self) -> List[Connection]: + return self._connections + + @property + def layers(self) -> List[Layer]: + return self._layers + + @property + def rlineses(self) -> List[Rlines]: + return self._rlineses + + @property + def hypergraph(self) -> Dict: + return self._hypergraph + + def set_properties(self, properties: Dict): + self._properties = properties + + def add_cell(self, cell: Cell): + if cell.id not in [c.id for c in self._cells]: + self._cells.append(cell) + else: + raise ValueError('Cell id already exists') + + def add_connection(self, connection: Connection): + if connection.id not in [c.id for c in self._connections]: + if connection.source in [ + c.id for c in self._cells + ] and connection.target in [c.id for c in self._cells]: + self._connections.append(connection) + elif connection.source not in [ + c.id for c in self._cells + ] and connection.target in [c.id for c in self._cells]: + raise ValueError('Source cell does not exist') + elif connection.source in [ + c.id for c in self._cells + ] and connection.target not in [c.id for c in self._cells]: + raise ValueError('Target cell does not exist') + else: + raise ValueError('Source and target cell do not exist') + else: + raise ValueError('Connection id already exists') + + def set_layers(self, layers: Layer): + self._layers.append(layers) + + def set_rlineses(self, rlineses: Rlines): + self._rlineses.append(rlineses) + + def get_incident_matrix(self): + cells = self.cells + connections = self.connections + incident_matrix = np.zeros((len(cells), len(connections)), dtype=int) + for j, connection in enumerate(connections): + source = self.get_cell_from_id(connection.source) + target = self.get_cell_from_id(connection.target) + source_index = cells.index(source) + target_index = cells.index(target) + incident_matrix[source_index, j] = 1 + incident_matrix[target_index, j] = -1 + return incident_matrix + + def get_hypergraph_incidence_matrix(self): + return self.get_incident_matrix().T + + def get_hypergraph(self): + cells = self.cells + connections = self.connections + rlineses = self.rlineses + hypergraph = self._hypergraph + hypergraph['hyperNodes'] = [] + hypergraph['hyperEdges'] = [] + incident_matrix = self.get_incident_matrix() + incident_matrix_transpose = incident_matrix.T + + for hyperNode in connections: + hypergraph['hyperNodes'].append(hyperNode.to_json()) + + for j in range(incident_matrix_transpose.shape[1]): + hyperEdge = {} + inner_edge_id = {'ins': [], 'outs': []} + for i in range(incident_matrix_transpose.shape[0]): + if incident_matrix_transpose[i, j] != 0: + if incident_matrix_transpose[i, j] == -1: + inner_edge_ins_id = connections[i].id + inner_edge_id['ins'].append(inner_edge_ins_id) + elif incident_matrix_transpose[i, j] == 1: + inner_edge_outs_id = connections[i].id + inner_edge_id['outs'].append(inner_edge_outs_id) + else: + raise ValueError('Incident matrix error') + hyperEdge['id'] = cells[j].id + hyperEdge['properties'] = cells[j].properties + hyperEdge['space'] = cells[j].space.wkt + hyperEdge['node'] = cells[j].node.wkt + hyperEdge['inner_nodeset'] = inner_edge_id + + for rlines in rlineses: + if rlines.cell == cells[j].id: + hyperEdge['closure'] = rlines.closure + break + + hypergraph['hyperEdges'].append(hyperEdge) + + self.set_hypergraph(hypergraph) + + return hypergraph + + def set_hypergraph(self, hypergraph): + self._hypergraph = hypergraph + + def get_cell_from_id(self, cell_id): + for cell in self.cells: + if cell.id == cell_id: + return cell + return None + + def get_connection_from_id(self, connection_id): + for connection in self.connections: + if connection.id == connection_id: + return connection + return None + + def to_json(self) -> Dict: + result = {} + for key, value in self.__dict__.items(): + if key == '_hypergraph': + continue + elif key == '_properties': + result[key.strip('_')] = value + else: + result[key.strip('_')] = [item.to_json() for item in value] + return result + + @classmethod + def from_json(cls, json_str: str) -> 'IndoorSpace': + json_data = json.loads(json_str) + instance = cls() + for key, value in json_data.items(): + if key == 'properties': + setattr(instance, f"_{key}", value) + elif key == 'rlineses': + setattr(instance, f"_{key}", [eval(key.capitalize()[:-2]).from_json(item) for item in value]) + else: + setattr(instance, f"_{key}", [eval(key.capitalize()[:-1]).from_json(item) for item in value]) + return instance diff --git a/src/layer.py b/src/layer.py new file mode 100644 index 0000000..78d6433 --- /dev/null +++ b/src/layer.py @@ -0,0 +1,53 @@ +""" +File Name: layer.py + +Copyright (c) 2023 - 2024 IndoorJson + +Author: Ziwei Xiang +Create Date: 2024/3/14 +""" + +import json +from functools import wraps +from typing import List, Dict + + +def type_check(func): + @wraps(func) + def wrapper(self, layer_id: str, cells: List[str], *args, **kwargs): + if not isinstance(layer_id, str): + raise TypeError("layer_id must be a string") + if not isinstance(cells, list): + raise TypeError("cells must be a list") + return func(self, layer_id, cells, *args, **kwargs) + return wrapper + + +class Layer: + + @type_check + def __init__(self, layer_id: str, cells: List[str]): + self.__id: str = layer_id + self.__cells: List[str] = cells + + @property + def id(self) -> str: + return self.__id + + @property + def cells(self) -> List[str]: + return self.__cells + + def to_json(self) -> Dict: + result = {} + for key, value in self.__dict__.items(): + json_key = key.split('__')[-1] + if json_key == 'id': + json_key = '$id' + result[json_key] = value + return result + + @classmethod + def from_json(cls, json_dict: Dict) -> 'Layer': + json_dict['layer_id'] = json_dict.pop('$id') + return cls(**json_dict) diff --git a/src/rlines.py b/src/rlines.py new file mode 100644 index 0000000..a0be022 --- /dev/null +++ b/src/rlines.py @@ -0,0 +1,74 @@ +""" +File Name: rlines.py + +Copyright (c) 2023 - 2024 IndoorJson + +Author: Ziwei Xiang +Create Date: 2024/3/14 +""" + +import json +from functools import wraps +from typing import List, Dict + + +def type_check(func): + @wraps(func) + def wrapper(self, rlines_id: str, cell: str, ins: List[str], outs: List[str], closure: List[str], *args, **kwargs): + if not isinstance(rlines_id, str): + raise TypeError("rlines_id must be a string") + if not isinstance(cell, str): + raise TypeError("cells must be a string") + if not isinstance(ins, list): + raise TypeError("ins must be a list") + if not isinstance(outs, list): + raise TypeError("outs must be a list") + if not isinstance(closure, list): + raise TypeError("closure must be a list") + return func(self, rlines_id, cell, ins, outs, closure, *args, **kwargs) + return wrapper + + +class Rlines: + + @type_check + def __init__(self, rlines_id: str, cell: str, ins: List[str], outs: List[str], closure: List[str]): + self.__id: str = rlines_id + self.__cell: str = cell + self.__ins: List[str] = ins + self.__outs: List[str] = outs + self.__closure: List[str] = closure + + @property + def id(self) -> str: + return self.__id + + @property + def cell(self) -> str: + return self.__cell + + @property + def ins(self) -> List[str]: + return self.__ins + + @property + def outs(self) -> List[str]: + return self.__outs + + @property + def closure(self) -> List[str]: + return self.__closure + + def to_json(self) -> Dict: + result = {} + for key, value in self.__dict__.items(): + json_key = key.split('__')[-1] + if json_key == 'id': + json_key = '$id' + result[json_key] = value + return result + + @classmethod + def from_json(cls, json_dict: Dict) -> 'Rlines': + json_dict['rlines_id'] = json_dict.pop('$id') + return cls(**json_dict) diff --git a/src/serialization.py b/src/serialization.py new file mode 100644 index 0000000..dd78f66 --- /dev/null +++ b/src/serialization.py @@ -0,0 +1,24 @@ +""" +File Name: serialization.py + +Copyright (c) 2023 - 2024 IndoorJson + +Author: Ziwei Xiang +Create Date: 2024/3/13 +""" + +import json +from indoorspace import IndoorSpace + + +def serialization(filepath: str, indoorspace: IndoorSpace): + indoorSpace_jsondata = json.dumps(indoorspace.to_json(), indent=4, ensure_ascii=False) + with open(filepath, 'w', encoding='utf-8') as file: + file.write(indoorSpace_jsondata) + return indoorSpace_jsondata + + +def deserialization(filepath: str) -> IndoorSpace: + with open(filepath, 'r', encoding='utf-8') as file: + indoorSpace_str = file.read() + return IndoorSpace().from_json(indoorSpace_str) diff --git a/src/visualization.py b/src/visualization.py new file mode 100644 index 0000000..9051483 --- /dev/null +++ b/src/visualization.py @@ -0,0 +1,145 @@ +""" +File Name: visualization.py + +Copyright (c) 2023 - 2024 IndoorJson + +Author: Ziwei Xiang +Create Date: 2024/4/18 +""" + +import plotly.graph_objs as go +from plotly.offline import plot +from shapely import geometry as geo +from shapely.wkt import loads + +from indoorspace import IndoorSpace + + +def graph_visualize(indoorSpace: IndoorSpace, filename: str = 'graph.html'): + fig = go.Figure() + + for cell in indoorSpace.cells: + cell_space = geo.Polygon(cell.space) + cell_node = geo.Point(cell.node) + x1, y1 = cell_space.exterior.xy + x2, y2 = cell_node.xy + + fig.add_trace( + go.Scatter(x=list(x1), + y=list(y1), + fill='toself', + fillcolor='#C1DDDB', + line=dict(color='#81B3A9', width=2), + name='Space')) + fig.add_trace( + go.Scatter(x=list(x2), + y=list(y2), + mode='markers', + marker=dict(size=10, color='#81B3A9'), + name='Cell', + text=str(cell.properties), + hoverinfo='text')) + + for connection in indoorSpace.connections: + connection_bound = geo.LineString(connection.bound) + connection_edge = geo.LineString(connection.edge) + x1, y1 = connection_bound.xy + x2, y2 = connection_edge.xy + + fig.add_trace( + go.Scatter(x=list(x1), + y=list(y1), + mode='lines', + line=dict(color='#81B3A9', width=2), + name='Boundary', + text=str(connection.properties), + hoverinfo='text')) + fig.add_trace( + go.Scatter(x=list(x2), + y=list(y2), + mode='lines', + line=dict(color='#81B3A9', width=2), + name='Edge', + text=str(connection.properties), + hoverinfo='text')) + + fig.update_layout(showlegend=False) + + plot(fig, filename=filename) + + +def hypergraph_visualize(indoorSpace: IndoorSpace, filename: str = 'hypergraph.html'): + fig = go.Figure() + + hypergraph = indoorSpace.get_hypergraph() + + for hyperEdge in hypergraph['hyperEdges']: + cell = indoorSpace.get_cell_from_id(hyperEdge['id']) + ins = hyperEdge['inner_nodeset']['ins'] + outs = hyperEdge['inner_nodeset']['outs'] + rlines = [] + rlines_group = geo.Polygon(cell.space) + + x_rlinesGroup, y_rlinesGroup = rlines_group.exterior.xy + + fig.add_trace( + go.Scatter(x=list(x_rlinesGroup), + y=list(y_rlinesGroup), + fill='toself', + fillcolor='#C1DDDB', + line=dict(color='#81B3A9', width=2), + name='Rline Group', + text=str(cell.properties), + hoverinfo='text')) + + for ins_id in ins: + insConnectionPoint = indoorSpace.get_connection_from_id( + ins_id).bound.centroid + + for outs_id in outs: + outsConnectionPoint = indoorSpace.get_connection_from_id( + outs_id).bound.centroid + rline = geo.LineString( + [insConnectionPoint, outsConnectionPoint]) + rlines.append(rline) + + if 'closure' in hyperEdge: + rlines_closure = hyperEdge['closure'] + for rlines_pairs in rlines_closure: + insConnectionPoint = indoorSpace.get_connection_from_id( + rlines_pairs[0]).bound.centroid + outsConnectionPoint = indoorSpace.get_connection_from_id( + rlines_pairs[1]).bound.centroid + rline_closure = geo.LineString( + [insConnectionPoint, outsConnectionPoint]) + + for rline in rlines: + if rline == rline_closure: + rlines.remove(rline) + break + + for rline in rlines: + x_rline, y_rline = rline.xy + fig.add_trace( + go.Scatter(x=list(x_rline), + y=list(y_rline), + mode='lines', + line=dict(color='#81B3A9'), + name='Rline')) + + for hyperNode in hypergraph['hyperNodes']: + connectionPoint = loads(hyperNode['bound']) + x_hyperNode, y_hyperNode = geo.LineString(connectionPoint).centroid.xy + + fig.add_trace( + go.Scatter(x=list(x_hyperNode), + y=list(y_hyperNode), + mode='markers', + marker=dict(size=10, color='#81B3A9'), + name='Connection Point', + text=str(hyperNode['properties']), + hoverinfo='text')) + + fig.update_layout(showlegend=False) + + plot(fig, filename=filename) diff --git a/test/example.json b/test/example.json new file mode 100644 index 0000000..ff44232 --- /dev/null +++ b/test/example.json @@ -0,0 +1,68 @@ +{ + "properties": { + "name": "indoorjson3-cpp", + "labels": ["indoorgml", "GIS"], + "language": ["English", "中文", "한국어"], + "author": { + "name": "Kunlin Yu", + "email": "yukunlin@syriusrobotics.com" + } + }, + "cells": [ + { + "$id": "c1", + "properties": {"roomNumber": "1101"}, + "space": "POLYGON ((0 0, 1 0, 1 1, 0 1, 0 0))", + "node": "POINT (0.5 0.5)" + }, + { + "$id": "c2", + "properties": {"roomNumber": "1102"}, + "space": "POLYGON ((1 0, 2 0, 2 1, 1 1, 1 0))", + "node": "POINT (1.5 0.5)" + }, + { + "$id": "c3", + "properties": {"roomNumber": "1103"}, + "space": "POLYGON ((0 1, 1 1, 1 2, 0 2, 0 1))", + "node": "POINT (0.5 1.5)" + } + ], + "connections": [ + { + "$id": "conn1-2", + "properties": { + "type": "door", + "开放时间": "全天", + "오픈 시간": "하루 종일" + }, + "fr": "c1", + "to": "c2", + "bound": "LINESTRING (1 0, 1 1)", + "edge": "LINESTRING (0.5 0.5, 1.5 0.5)" + }, + { + "$id": "conn3-1", + "properties": {"type": "window"}, + "fr": "c3", + "to": "c1", + "bound": "LINESTRING (1 0, 1 1)", + "edge": "LINESTRING (0.5 0.5, 1.5 0.5)" + } + ], + "layers": [ + { + "$id": "layer", + "cells": ["c1", "c2"] + } + ], + "rlineses": [ + { + "$id": "rlines1", + "cell": "c1", + "ins": ["conn3-1"], + "outs": ["conn1-2"], + "closure": [] + } + ] +} diff --git a/test/test_deserialization.json b/test/test_deserialization.json new file mode 100644 index 0000000..5b46ef3 --- /dev/null +++ b/test/test_deserialization.json @@ -0,0 +1,90 @@ +{ + "properties": { + "name": "indoorjson3-cpp", + "labels": [ + "indoorgml", + "GIS" + ], + "language": [ + "English", + "中文", + "한국어" + ], + "author": { + "name": "Kunlin Yu", + "email": "yukunlin@syriusrobotics.com" + } + }, + "cells": [ + { + "$id": "c1", + "properties": { + "roomNumber": "1101" + }, + "space": "POLYGON ((0 0, 1 0, 1 1, 0 1, 0 0))", + "node": "POINT (0.5 0.5)" + }, + { + "$id": "c2", + "properties": { + "roomNumber": "1102" + }, + "space": "POLYGON ((1 0, 2 0, 2 1, 1 1, 1 0))", + "node": "POINT (1.5 0.5)" + }, + { + "$id": "c3", + "properties": { + "roomNumber": "1103" + }, + "space": "POLYGON ((0 1, 1 1, 1 2, 0 2, 0 1))", + "node": "POINT (0.5 1.5)" + } + ], + "connections": [ + { + "$id": "conn1-2", + "properties": { + "type": "door", + "开放时间": "全天", + "오픈 시간": "하루 종일" + }, + "source": "c1", + "target": "c2", + "bound": "LINESTRING (1 0, 1 1)", + "edge": "LINESTRING (0.5 0.5, 1.5 0.5)" + }, + { + "$id": "conn3-1", + "properties": { + "type": "window" + }, + "source": "c3", + "target": "c1", + "bound": "LINESTRING (1 0, 1 1)", + "edge": "LINESTRING (0.5 0.5, 1.5 0.5)" + } + ], + "layers": [ + { + "$id": "layer", + "cells": [ + "c1", + "c2" + ] + } + ], + "rlineses": [ + { + "$id": "rlines1", + "cell": "c1", + "ins": [ + "conn3-1" + ], + "outs": [ + "conn1-2" + ], + "closure": [] + } + ] +} \ No newline at end of file diff --git a/test/test_deserialization.py b/test/test_deserialization.py new file mode 100644 index 0000000..84da642 --- /dev/null +++ b/test/test_deserialization.py @@ -0,0 +1,35 @@ +""" +File Name: test_deserialization.py + +Copyright (c) 2023 - 2024 IndoorJson + +Author: Ziwei Xiang +Create Date: 2024/3/13 +""" + +import json +import sys +import os +import unittest + +sys.path.append(os.path.abspath('../src')) + +from serialization import serialization, deserialization + + +class TestDeserialization(unittest.TestCase): + + def test_deserialization(self): + + with open('example.json', 'r') as file: + original_json = json.load(file) + + indoorSpace = deserialization('example.json') + + generated_json = indoorSpace.to_json() + + self.assertEqual(original_json, generated_json) + + +if __name__ == '__main__': + unittest.main()