diff --git a/src/app.py b/src/app.py index b5ee249..4edebad 100644 --- a/src/app.py +++ b/src/app.py @@ -12,6 +12,7 @@ from src.comparejsonld import CompareJSONLD from src.compareshape import CompareShape from src.shape import Shape +from src.getjsonld import JSONLDShape app = Flask(__name__) CORS(app) @@ -77,7 +78,7 @@ def v2(): properties: list = [] statements: list = [] for schema in schema_list: - shape: Shape = Shape(schema, language) + shape: JSONLDShape = JSONLDShape(schema, language) comparison: CompareJSONLD = CompareJSONLD(shape.get_json_ld(), entity, language) names.append(shape.get_name()) general.append(comparison.get_general()) diff --git a/src/comparejsonld.py b/src/comparejsonld.py index 3a92eeb..6df9a9c 100644 --- a/src/comparejsonld.py +++ b/src/comparejsonld.py @@ -3,11 +3,13 @@ """ import json import re -from typing import Tuple, Any import requests from requests import Response +from src.compareproperties import CompareProperties +from src.comparestatements import CompareStatements + class CompareJSONLD: """ @@ -156,347 +158,3 @@ def _process_each_of(self) -> None: :return: """ pass - - -class CompareProperties: - - def __init__(self, entity: str, entities: dict, props: list, names: dict, start_shape: dict) -> None: - self._entities: dict = entities - self._names: dict = names - self._entity: str = entity - self._props: list = props - self._start_shape: dict = start_shape - - def compare_properties(self) -> dict: - """ - - :return: - """ - if "entities" not in self._entities: - return {} - if self._entity not in self._entities["entities"]: - return {} - if "claims" not in self._entities["entities"][self._entity]: - return {} - - claims: dict = self._entities["entities"][self._entity]["claims"] - properties: dict = {} - if self._start_shape is None: - return properties - utilities: Utilities = Utilities() - for prop in self._props: - child: dict = {"name": self._names[prop], - "necessity": utilities.calculate_necessity(prop, self._start_shape)} - if prop in claims: - response: str = self.check_claims_for_props(claims, prop) - else: - response: str = "missing" - if child["necessity"] != "absent": - if response != "": - child["response"] = response - elif response != "present": - child["response"] = response - properties[prop] = child - return properties - - def check_claims_for_props(self, claims: dict, prop: str) -> str: - """" - - :return: - """ - cardinality: str = "correct" - allowed: str = "present" - if "expression" not in self._start_shape: - return "present" - if "expressions" not in self._start_shape["expression"]: - return "present" - for expression in self._start_shape["expression"]["expressions"]: - if "predicate" in expression and expression["predicate"].endswith(prop): - allowed_list = self._get_allowed_list(claims, prop, expression) - cardinality2 = self._process_cardinalities(expression, allowed_list, self._start_shape, prop) - if cardinality2 not in ["", "correct"]: - cardinality = cardinality2 - if "correct" in allowed_list: - allowed = "correct" - if cardinality == "correct": - response: str = allowed - else: - response: str = cardinality - return response - - def _get_allowed_list(self, claims: dict, prop: str, expression: dict) -> list: - if prop not in claims: - return [] - - allowed_list: list = [] - for statement in claims[prop]: - is_it_allowed: str = "" - if statement["mainsnak"]["property"] == prop: - is_it_allowed = self._process_triple_constraint(statement["mainsnak"], - expression, - "") - if "extra" in self._start_shape: - for extra in self._start_shape["extra"]: - if extra.endswith(prop) and is_it_allowed == "incorrect": - is_it_allowed = "allowed" - allowed_list.append(is_it_allowed) - return allowed_list - - def _process_cardinalities(self, expression: dict, allowed_list: list, shape: dict, prop: str) -> str: - if "predicate" not in expression: - return "" - if not expression["predicate"].endswith(prop): - return "" - occurrences: int = allowed_list.count("correct") - occurrences += allowed_list.count("present") - cardinality: str = "correct" - for expression in shape["expression"]["expressions"]: - if "predicate" in expression and expression["predicate"].endswith(prop): - cardinality = self._get_cardinalities(occurrences, expression) - predicate: str = f'http://www.wikidata.org/prop/direct/{prop}' - if "extra" in shape and predicate in shape["extra"] and cardinality == "too many statements": - cardinality = "correct" - return cardinality - - @staticmethod - def _get_cardinalities(occurrences: int, expression: dict) -> str: - cardinality: str = "correct" - min_cardinality: bool = True - max_cardinality: bool = True - max_card: int = 1 - min_card: int = 1 - if "max" in expression: - max_card = expression["max"] - if "min" in expression: - min_card = expression["min"] - if max_card < occurrences: - max_cardinality = False - if min_card > occurrences: - min_cardinality = False - if max_card == -1: - max_cardinality = True - if min_card == -1: - min_cardinality = True - if min_cardinality and not max_cardinality: - cardinality = "too many statements" - if max_cardinality and not min_cardinality: - cardinality = "not enough correct statements" - return cardinality - - @staticmethod - def _process_triple_constraint(statement: dict, expression: dict, allowed: str) -> str: - """ - Processes triple constraint expression types in the shape - - :param dict statement: The entity's statement to be assessed - :param dict expression: The expression from the shape to be assessed against - :param str allowed: Whether the statement is allowed by the expression or not currently - :return: allowed - """ - if "property" not in statement: - return allowed - if "predicate" not in expression: - return allowed - - if expression["predicate"].endswith(statement["property"]): - allowed = "present" - try: - if expression["valueExpr"]["type"] == "NodeConstraint": - allowed = Utilities.process_node_constraint(statement, - expression["valueExpr"], - allowed) - except (KeyError, TypeError): - pass - return allowed - - -class CompareStatements: - - def __init__(self, entities: dict, entity: str, start_shape: dict) -> None: - self._entities: dict = entities - self._entity: str = entity - self.start_shape: dict = start_shape - - def compare_statements(self) -> dict: - """ - Compares the statements with the shape - - :return: statements - """ - if "entities" not in self._entities: - return {} - - statements: dict = {} - claims: dict = self._entities["entities"][self._entity]['claims'] - for claim in claims: - property_statement_results: list = [] - for statement in claims[claim]: - child: dict = {"property": claim} - utilities: Utilities = Utilities() - necessity = utilities.calculate_necessity(statement["mainsnak"]["property"], self.start_shape) - if necessity != "absent": - child["necessity"] = necessity - child, allowed = self._process_shape(statement["mainsnak"], self.start_shape, child) - statements[statement["id"]] = child - if allowed.startswith("missing"): - allowed = "incorrect" - property_statement_results.append(allowed) - return statements - - def _process_shape(self, statement: dict, shape: dict, child: dict) -> Tuple[Any, str]: - """ - Processes a full shape - - :param statement: The entity's statement to be assessed - :param shape: The shape to be assessed against - :param child: The current response from the assessment - :return: child and allowed - """ - expressions: dict = {} - if "expression" in shape and "expressions" in shape["expression"]: - expressions = shape["expression"]["expressions"] - allowed: str = "not in schema" - for expression in expressions: - allowed = self.process_expressions(expression, shape, statement, allowed) - if allowed != "": - child["response"] = allowed - return child, allowed - - def process_expressions(self, expression: dict, shape: dict, statement: dict, allowed: str) -> str: - if "type" not in expression: - return allowed - if "predicate" not in expression: - return allowed - if "property" not in statement: - return allowed - - if expression["type"] == "TripleConstraint" and expression["predicate"].endswith(statement["property"]): - allowed = self._process_triple_constraint(statement, - expression, - allowed) - if "extra" in shape: - for extra in shape["extra"]: - if extra.endswith(statement["property"]) and allowed == "incorrect": - allowed = "allowed" - return allowed - - @staticmethod - def _process_triple_constraint(statement: dict, expression: dict, allowed: str) -> str: - """ - Processes triple constraint expression types in the shape - - :param statement: The entity's statement to be assessed - :param expression: The expression from the shape to be assessed against - :param allowed: Whether the statement is allowed by the expression or not currently - :return: allowed - """ - if "property" not in statement: - return allowed - if "predicate" not in expression: - return allowed - - if expression["predicate"].endswith(statement["property"]): - allowed = "allowed" - Utilities.process_cardinalities(expression, {"mainsnak": statement}) - try: - if expression["valueExpr"]["type"] == "NodeConstraint": - allowed = Utilities.process_node_constraint(statement, - expression["valueExpr"], - allowed) - except (KeyError, TypeError): - pass - return allowed - - -class Utilities: - - def calculate_necessity(self, prop: str, shape: dict) -> str: - """ - Check if a property is required, optional or absent from a shape - - :param str prop: the property to be checked - :param dict shape: the shape to check against - :return: necessity - """ - necessity: str = "absent" - list_of_expressions: list = [] - - if "expression" not in shape: - return necessity - - if "expressions" in shape["expression"]: - for expression in shape["expression"]["expressions"]: - list_of_expressions.append(expression) - else: - list_of_expressions.append(shape["expression"]) - - for expression in list_of_expressions: - if "predicate" in expression and expression["predicate"].endswith(prop): - necessity = self.required_or_absent(expression) - return necessity - - @staticmethod - def required_or_absent(expression: dict) -> str: - necessity: str = "optional" - if ("min" in expression and expression["min"] > 0) or ("min" not in expression and "max" not in expression): - necessity = "required" - if "min" in expression and "max" in expression and expression["min"] == 0 and expression["max"] == 0: - necessity = "absent" - return necessity - - @staticmethod - def process_cardinalities(expression: dict, claim: dict) -> str: - """ - Processes cardinalities in expressions - - :return: cardinality - """ - cardinality: str = "correct" - min_cardinality: bool = True - max_cardinality: bool = True - max_card: int = 1 - min_card: int = 1 - if "max" in expression: - max_card = expression["max"] - if "min" in expression: - min_card = expression["min"] - if max_card < len(claim): - max_cardinality = False - if min_card > len(claim): - min_cardinality = False - if max_card == -1: - max_cardinality = True - if min_card == -1: - min_cardinality = True - if min_cardinality and not max_cardinality: - cardinality = "too many statements" - if max_cardinality and not min_cardinality: - cardinality = "not enough correct statements" - return cardinality - - @staticmethod - def process_node_constraint(statement: dict, expression: dict, allowed: str) -> str: - """ - Processes node constraint expression types in the shape - - :param dict statement: The entity's statement to be assessed - :param dict expression: The expression from the shape to be assessed against - :param str allowed: Whether the statement is allowed by the expression or not currently - :return: allowed - """ - if "snaktype" not in statement: - return allowed - if "datavalue" not in statement: - return allowed - if "type" not in statement["datavalue"]: - return allowed - - if statement["snaktype"] == "value" and \ - statement["datavalue"]["type"] == "wikibase-entityid": - obj = f'http://www.wikidata.org/entity/{statement["datavalue"]["value"]["id"]}' - if "values" in expression: - if obj in expression["values"]: - allowed = "correct" - else: - allowed = "incorrect" - return allowed diff --git a/src/compareproperties.py b/src/compareproperties.py new file mode 100644 index 0000000..287baeb --- /dev/null +++ b/src/compareproperties.py @@ -0,0 +1,153 @@ +from src.utilities import Utilities + + +class CompareProperties: + + def __init__(self, entity: str, entities: dict, props: list, names: dict, start_shape: dict) -> None: + self._entities: dict = entities + self._names: dict = names + self._entity: str = entity + self._props: list = props + self._start_shape: dict = start_shape + + def compare_properties(self) -> dict: + """ + + :return: + """ + if "entities" not in self._entities: + return {} + if self._entity not in self._entities["entities"]: + return {} + if "claims" not in self._entities["entities"][self._entity]: + return {} + + claims: dict = self._entities["entities"][self._entity]["claims"] + properties: dict = {} + if self._start_shape is None: + return properties + utilities: Utilities = Utilities() + for prop in self._props: + child: dict = {"name": self._names[prop], + "necessity": utilities.calculate_necessity(prop, self._start_shape)} + if prop in claims: + response: str = self.check_claims_for_props(claims, prop) + else: + response: str = "missing" + if child["necessity"] != "absent": + if response != "": + child["response"] = response + elif response != "present": + child["response"] = response + properties[prop] = child + return properties + + def check_claims_for_props(self, claims: dict, prop: str) -> str: + """" + + :return: + """ + cardinality: str = "correct" + allowed: str = "present" + if "expression" not in self._start_shape: + return "present" + if "expressions" not in self._start_shape["expression"]: + return "present" + for expression in self._start_shape["expression"]["expressions"]: + if "predicate" in expression and expression["predicate"].endswith(prop): + allowed_list = self._get_allowed_list(claims, prop, expression) + cardinality2 = self._process_cardinalities(expression, allowed_list, self._start_shape, prop) + if cardinality2 not in ["", "correct"]: + cardinality = cardinality2 + if "correct" in allowed_list: + allowed = "correct" + if cardinality == "correct": + response: str = allowed + else: + response: str = cardinality + return response + + def _get_allowed_list(self, claims: dict, prop: str, expression: dict) -> list: + if prop not in claims: + return [] + + allowed_list: list = [] + for statement in claims[prop]: + is_it_allowed: str = "" + if statement["mainsnak"]["property"] == prop: + is_it_allowed = self._process_triple_constraint(statement["mainsnak"], + expression, + "") + if "extra" in self._start_shape: + for extra in self._start_shape["extra"]: + if extra.endswith(prop) and is_it_allowed == "incorrect": + is_it_allowed = "allowed" + allowed_list.append(is_it_allowed) + return allowed_list + + def _process_cardinalities(self, expression: dict, allowed_list: list, shape: dict, prop: str) -> str: + if "predicate" not in expression: + return "" + if not expression["predicate"].endswith(prop): + return "" + occurrences: int = allowed_list.count("correct") + occurrences += allowed_list.count("present") + cardinality: str = "correct" + for expression in shape["expression"]["expressions"]: + if "predicate" in expression and expression["predicate"].endswith(prop): + cardinality = self._get_cardinalities(occurrences, expression) + predicate: str = f'http://www.wikidata.org/prop/direct/{prop}' + if "extra" in shape and predicate in shape["extra"] and cardinality == "too many statements": + cardinality = "correct" + return cardinality + + @staticmethod + def _get_cardinalities(occurrences: int, expression: dict) -> str: + cardinality: str = "correct" + min_cardinality: bool = True + max_cardinality: bool = True + max_card: int = 1 + min_card: int = 1 + if "max" in expression: + max_card = expression["max"] + if "min" in expression: + min_card = expression["min"] + if max_card < occurrences: + max_cardinality = False + if min_card > occurrences: + min_cardinality = False + if max_card == -1: + max_cardinality = True + if min_card == -1: + min_cardinality = True + if min_cardinality and not max_cardinality: + cardinality = "too many statements" + if max_cardinality and not min_cardinality: + cardinality = "not enough correct statements" + return cardinality + + @staticmethod + def _process_triple_constraint(statement: dict, expression: dict, allowed: str) -> str: + """ + Processes triple constraint expression types in the shape + + :param dict statement: The entity's statement to be assessed + :param dict expression: The expression from the shape to be assessed against + :param str allowed: Whether the statement is allowed by the expression or not currently + :return: allowed + """ + if "property" not in statement: + return allowed + if "predicate" not in expression: + return allowed + + if expression["predicate"].endswith(statement["property"]): + allowed = "present" + try: + if expression["valueExpr"]["type"] == "NodeConstraint": + allowed = Utilities.process_node_constraint(statement, + expression["valueExpr"], + allowed) + except (KeyError, TypeError): + pass + return allowed diff --git a/src/comparestatements.py b/src/comparestatements.py new file mode 100644 index 0000000..46eb425 --- /dev/null +++ b/src/comparestatements.py @@ -0,0 +1,101 @@ +from typing import Tuple, Any + +from src.utilities import Utilities + + +class CompareStatements: + + def __init__(self, entities: dict, entity: str, start_shape: dict) -> None: + self._entities: dict = entities + self._entity: str = entity + self.start_shape: dict = start_shape + + def compare_statements(self) -> dict: + """ + Compares the statements with the shape + + :return: statements + """ + if "entities" not in self._entities: + return {} + + statements: dict = {} + claims: dict = self._entities["entities"][self._entity]['claims'] + for claim in claims: + property_statement_results: list = [] + for statement in claims[claim]: + child: dict = {"property": claim} + utilities: Utilities = Utilities() + necessity = utilities.calculate_necessity(statement["mainsnak"]["property"], self.start_shape) + if necessity != "absent": + child["necessity"] = necessity + child, allowed = self._process_shape(statement["mainsnak"], self.start_shape, child) + statements[statement["id"]] = child + if allowed.startswith("missing"): + allowed = "incorrect" + property_statement_results.append(allowed) + return statements + + def _process_shape(self, statement: dict, shape: dict, child: dict) -> Tuple[Any, str]: + """ + Processes a full shape + + :param statement: The entity's statement to be assessed + :param shape: The shape to be assessed against + :param child: The current response from the assessment + :return: child and allowed + """ + expressions: dict = {} + if "expression" in shape and "expressions" in shape["expression"]: + expressions = shape["expression"]["expressions"] + allowed: str = "not in schema" + for expression in expressions: + allowed = self.process_expressions(expression, shape, statement, allowed) + if allowed != "": + child["response"] = allowed + return child, allowed + + def process_expressions(self, expression: dict, shape: dict, statement: dict, allowed: str) -> str: + if "type" not in expression: + return allowed + if "predicate" not in expression: + return allowed + if "property" not in statement: + return allowed + + if expression["type"] == "TripleConstraint" and expression["predicate"].endswith(statement["property"]): + allowed = self._process_triple_constraint(statement, + expression, + allowed) + if "extra" in shape: + for extra in shape["extra"]: + if extra.endswith(statement["property"]) and allowed == "incorrect": + allowed = "allowed" + return allowed + + @staticmethod + def _process_triple_constraint(statement: dict, expression: dict, allowed: str) -> str: + """ + Processes triple constraint expression types in the shape + + :param statement: The entity's statement to be assessed + :param expression: The expression from the shape to be assessed against + :param allowed: Whether the statement is allowed by the expression or not currently + :return: allowed + """ + if "property" not in statement: + return allowed + if "predicate" not in expression: + return allowed + + if expression["predicate"].endswith(statement["property"]): + allowed = "allowed" + Utilities.process_cardinalities(expression, {"mainsnak": statement}) + try: + if expression["valueExpr"]["type"] == "NodeConstraint": + allowed = Utilities.process_node_constraint(statement, + expression["valueExpr"], + allowed) + except (KeyError, TypeError): + pass + return allowed diff --git a/src/getjsonld.py b/src/getjsonld.py new file mode 100644 index 0000000..6a7da2f --- /dev/null +++ b/src/getjsonld.py @@ -0,0 +1,50 @@ +import json + +import requests +from jsonasobj import as_json +from pyshexc.parser_impl.generate_shexj import parse + + +class JSONLDShape: + """ + Produces a shape in the form of a json for a wikidata entityschema (e.g. E10) + + :param schema: The identifier of the entityschema to be processed + :param language: The language to get the schema name in + + :return name: the name of the entityschema + :return shape: a json representation of the entityschema + """ + def __init__(self, schema: str, language: str) -> None: + self._language: str = language + self._get_schema_json(schema) + + def get_json_ld(self) -> dict: + """ + Gets the JSON_LD form of the Schema + """ + try: + return json.loads(as_json(parse(self._json_text["schemaText"]))) + except (KeyError, IndexError, AttributeError, ValueError): + return {} + + def _get_schema_json(self, schema) -> None: + """ + Downloads the schema from wikidata + + :param schema: the entityschema to be downloaded + """ + url: str = f"https://www.wikidata.org/wiki/EntitySchema:{schema}?action=raw" + response = requests.get(url=url, + headers={'User-Agent': 'Userscript Entityshape by User:Teester'}) + self._json_text: dict = response.json() + + def get_name(self) -> str: + """ + Gets the name of the schema + + :return: the name of the schema + """ + if self._language in self._json_text["labels"]: + return self._json_text["labels"][self._language] + return "" \ No newline at end of file diff --git a/src/shape.py b/src/shape.py index 4fadbc1..4c8a882 100644 --- a/src/shape.py +++ b/src/shape.py @@ -1,13 +1,10 @@ """ Converts entityschema to json suitable for comparing with a wikidata item """ -import json import os import re from typing import Optional, Match, Union, Pattern, Any -from jsonasobj import as_json -from pyshexc.parser_impl.generate_shexj import parse import requests @@ -54,15 +51,6 @@ def get_name(self) -> str: return self._json_text["labels"][self._language] return "" - def get_json_ld(self) -> dict: - """ - Gets the JSON_LD form of the Schema - """ - try: - return json.loads(as_json(parse(self._json_text["schemaText"]))) - except (KeyError, IndexError, AttributeError, ValueError): - return {} - def _translate_schema(self) -> None: """ Converts the entityschema to a json representation diff --git a/src/utilities.py b/src/utilities.py new file mode 100644 index 0000000..a6a9a3e --- /dev/null +++ b/src/utilities.py @@ -0,0 +1,92 @@ +class Utilities: + + def calculate_necessity(self, prop: str, shape: dict) -> str: + """ + Check if a property is required, optional or absent from a shape + + :param str prop: the property to be checked + :param dict shape: the shape to check against + :return: necessity + """ + necessity: str = "absent" + list_of_expressions: list = [] + + if "expression" not in shape: + return necessity + + if "expressions" in shape["expression"]: + for expression in shape["expression"]["expressions"]: + list_of_expressions.append(expression) + else: + list_of_expressions.append(shape["expression"]) + + for expression in list_of_expressions: + if "predicate" in expression and expression["predicate"].endswith(prop): + necessity = self.required_or_absent(expression) + return necessity + + @staticmethod + def required_or_absent(expression: dict) -> str: + necessity: str = "optional" + if ("min" in expression and expression["min"] > 0) or ("min" not in expression and "max" not in expression): + necessity = "required" + if "min" in expression and "max" in expression and expression["min"] == 0 and expression["max"] == 0: + necessity = "absent" + return necessity + + @staticmethod + def process_cardinalities(expression: dict, claim: dict) -> str: + """ + Processes cardinalities in expressions + + :return: cardinality + """ + cardinality: str = "correct" + min_cardinality: bool = True + max_cardinality: bool = True + max_card: int = 1 + min_card: int = 1 + if "max" in expression: + max_card = expression["max"] + if "min" in expression: + min_card = expression["min"] + if max_card < len(claim): + max_cardinality = False + if min_card > len(claim): + min_cardinality = False + if max_card == -1: + max_cardinality = True + if min_card == -1: + min_cardinality = True + if min_cardinality and not max_cardinality: + cardinality = "too many statements" + if max_cardinality and not min_cardinality: + cardinality = "not enough correct statements" + return cardinality + + @staticmethod + def process_node_constraint(statement: dict, expression: dict, allowed: str) -> str: + """ + Processes node constraint expression types in the shape + + :param dict statement: The entity's statement to be assessed + :param dict expression: The expression from the shape to be assessed against + :param str allowed: Whether the statement is allowed by the expression or not currently + :return: allowed + """ + if "snaktype" not in statement: + return allowed + if "datavalue" not in statement: + return allowed + if "type" not in statement["datavalue"]: + return allowed + + if statement["snaktype"] == "value" and \ + statement["datavalue"]["type"] == "wikibase-entityid": + obj = f'http://www.wikidata.org/entity/{statement["datavalue"]["value"]["id"]}' + if "values" in expression: + if obj in expression["values"]: + allowed = "correct" + else: + allowed = "incorrect" + return allowed diff --git a/tests/test_jsonld.py b/tests/test_jsonld.py index 68abffc..175569a 100644 --- a/tests/test_jsonld.py +++ b/tests/test_jsonld.py @@ -6,6 +6,7 @@ from src.comparejsonld import CompareJSONLD from src.compareshape import CompareShape from src.shape import Shape +from src.getjsonld import JSONLDShape class JSONLDCase(unittest.TestCase): @@ -18,8 +19,9 @@ def setUpClass(cls) -> None: schema: str = "E236" entity: str = "Q1728820" shape: Shape = Shape(schema, language) + shape2: JSONLDShape = JSONLDShape(schema, language) cls.comparison: CompareShape = CompareShape(shape.get_schema_shape(), entity, language) - cls.comparison2: CompareJSONLD = CompareJSONLD(shape.get_json_ld(), entity, language) + cls.comparison2: CompareJSONLD = CompareJSONLD(shape2.get_json_ld(), entity, language) def test_compare_property_names(self) -> None: """ diff --git a/tests/test_utilities.py b/tests/test_utilities.py index cde7595..0225a9c 100644 --- a/tests/test_utilities.py +++ b/tests/test_utilities.py @@ -1,6 +1,6 @@ import unittest -from src.comparejsonld import Utilities +from src.utilities import Utilities class UtilitiesTests(unittest.TestCase):