Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from src.comparejsonld import CompareJSONLD
from src.compareshape import CompareShape
from src.shape import Shape
from src.getjsonld import JSONLDShape

app = Flask(__name__)
CORS(app)
Expand Down Expand Up @@ -77,7 +78,7 @@ def v2():
properties: list = []
statements: list = []
for schema in schema_list:
shape: Shape = Shape(schema, language)
shape: JSONLDShape = JSONLDShape(schema, language)
comparison: CompareJSONLD = CompareJSONLD(shape.get_json_ld(), entity, language)
names.append(shape.get_name())
general.append(comparison.get_general())
Expand Down
348 changes: 3 additions & 345 deletions src/comparejsonld.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@
"""
import json
import re
from typing import Tuple, Any

import requests
from requests import Response

from src.compareproperties import CompareProperties
from src.comparestatements import CompareStatements


class CompareJSONLD:
"""
Expand Down Expand Up @@ -156,347 +158,3 @@ def _process_each_of(self) -> None:
:return:
"""
pass


class CompareProperties:

def __init__(self, entity: str, entities: dict, props: list, names: dict, start_shape: dict) -> None:
self._entities: dict = entities
self._names: dict = names
self._entity: str = entity
self._props: list = props
self._start_shape: dict = start_shape

def compare_properties(self) -> dict:
"""

:return:
"""
if "entities" not in self._entities:
return {}
if self._entity not in self._entities["entities"]:
return {}
if "claims" not in self._entities["entities"][self._entity]:
return {}

claims: dict = self._entities["entities"][self._entity]["claims"]
properties: dict = {}
if self._start_shape is None:
return properties
utilities: Utilities = Utilities()
for prop in self._props:
child: dict = {"name": self._names[prop],
"necessity": utilities.calculate_necessity(prop, self._start_shape)}
if prop in claims:
response: str = self.check_claims_for_props(claims, prop)
else:
response: str = "missing"
if child["necessity"] != "absent":
if response != "":
child["response"] = response
elif response != "present":
child["response"] = response
properties[prop] = child
return properties

def check_claims_for_props(self, claims: dict, prop: str) -> str:
""""

:return:
"""
cardinality: str = "correct"
allowed: str = "present"
if "expression" not in self._start_shape:
return "present"
if "expressions" not in self._start_shape["expression"]:
return "present"
for expression in self._start_shape["expression"]["expressions"]:
if "predicate" in expression and expression["predicate"].endswith(prop):
allowed_list = self._get_allowed_list(claims, prop, expression)
cardinality2 = self._process_cardinalities(expression, allowed_list, self._start_shape, prop)
if cardinality2 not in ["", "correct"]:
cardinality = cardinality2
if "correct" in allowed_list:
allowed = "correct"
if cardinality == "correct":
response: str = allowed
else:
response: str = cardinality
return response

def _get_allowed_list(self, claims: dict, prop: str, expression: dict) -> list:
if prop not in claims:
return []

allowed_list: list = []
for statement in claims[prop]:
is_it_allowed: str = ""
if statement["mainsnak"]["property"] == prop:
is_it_allowed = self._process_triple_constraint(statement["mainsnak"],
expression,
"")
if "extra" in self._start_shape:
for extra in self._start_shape["extra"]:
if extra.endswith(prop) and is_it_allowed == "incorrect":
is_it_allowed = "allowed"
allowed_list.append(is_it_allowed)
return allowed_list

def _process_cardinalities(self, expression: dict, allowed_list: list, shape: dict, prop: str) -> str:
if "predicate" not in expression:
return ""
if not expression["predicate"].endswith(prop):
return ""
occurrences: int = allowed_list.count("correct")
occurrences += allowed_list.count("present")
cardinality: str = "correct"
for expression in shape["expression"]["expressions"]:
if "predicate" in expression and expression["predicate"].endswith(prop):
cardinality = self._get_cardinalities(occurrences, expression)
predicate: str = f'http://www.wikidata.org/prop/direct/{prop}'
if "extra" in shape and predicate in shape["extra"] and cardinality == "too many statements":
cardinality = "correct"
return cardinality

@staticmethod
def _get_cardinalities(occurrences: int, expression: dict) -> str:
cardinality: str = "correct"
min_cardinality: bool = True
max_cardinality: bool = True
max_card: int = 1
min_card: int = 1
if "max" in expression:
max_card = expression["max"]
if "min" in expression:
min_card = expression["min"]
if max_card < occurrences:
max_cardinality = False
if min_card > occurrences:
min_cardinality = False
if max_card == -1:
max_cardinality = True
if min_card == -1:
min_cardinality = True
if min_cardinality and not max_cardinality:
cardinality = "too many statements"
if max_cardinality and not min_cardinality:
cardinality = "not enough correct statements"
return cardinality

@staticmethod
def _process_triple_constraint(statement: dict, expression: dict, allowed: str) -> str:
"""
Processes triple constraint expression types in the shape

:param dict statement: The entity's statement to be assessed
:param dict expression: The expression from the shape to be assessed against
:param str allowed: Whether the statement is allowed by the expression or not currently
:return: allowed
"""
if "property" not in statement:
return allowed
if "predicate" not in expression:
return allowed

if expression["predicate"].endswith(statement["property"]):
allowed = "present"
try:
if expression["valueExpr"]["type"] == "NodeConstraint":
allowed = Utilities.process_node_constraint(statement,
expression["valueExpr"],
allowed)
except (KeyError, TypeError):
pass
return allowed


class CompareStatements:

def __init__(self, entities: dict, entity: str, start_shape: dict) -> None:
self._entities: dict = entities
self._entity: str = entity
self.start_shape: dict = start_shape

def compare_statements(self) -> dict:
"""
Compares the statements with the shape

:return: statements
"""
if "entities" not in self._entities:
return {}

statements: dict = {}
claims: dict = self._entities["entities"][self._entity]['claims']
for claim in claims:
property_statement_results: list = []
for statement in claims[claim]:
child: dict = {"property": claim}
utilities: Utilities = Utilities()
necessity = utilities.calculate_necessity(statement["mainsnak"]["property"], self.start_shape)
if necessity != "absent":
child["necessity"] = necessity
child, allowed = self._process_shape(statement["mainsnak"], self.start_shape, child)
statements[statement["id"]] = child
if allowed.startswith("missing"):
allowed = "incorrect"
property_statement_results.append(allowed)
return statements

def _process_shape(self, statement: dict, shape: dict, child: dict) -> Tuple[Any, str]:
"""
Processes a full shape

:param statement: The entity's statement to be assessed
:param shape: The shape to be assessed against
:param child: The current response from the assessment
:return: child and allowed
"""
expressions: dict = {}
if "expression" in shape and "expressions" in shape["expression"]:
expressions = shape["expression"]["expressions"]
allowed: str = "not in schema"
for expression in expressions:
allowed = self.process_expressions(expression, shape, statement, allowed)
if allowed != "":
child["response"] = allowed
return child, allowed

def process_expressions(self, expression: dict, shape: dict, statement: dict, allowed: str) -> str:
if "type" not in expression:
return allowed
if "predicate" not in expression:
return allowed
if "property" not in statement:
return allowed

if expression["type"] == "TripleConstraint" and expression["predicate"].endswith(statement["property"]):
allowed = self._process_triple_constraint(statement,
expression,
allowed)
if "extra" in shape:
for extra in shape["extra"]:
if extra.endswith(statement["property"]) and allowed == "incorrect":
allowed = "allowed"
return allowed

@staticmethod
def _process_triple_constraint(statement: dict, expression: dict, allowed: str) -> str:
"""
Processes triple constraint expression types in the shape

:param statement: The entity's statement to be assessed
:param expression: The expression from the shape to be assessed against
:param allowed: Whether the statement is allowed by the expression or not currently
:return: allowed
"""
if "property" not in statement:
return allowed
if "predicate" not in expression:
return allowed

if expression["predicate"].endswith(statement["property"]):
allowed = "allowed"
Utilities.process_cardinalities(expression, {"mainsnak": statement})
try:
if expression["valueExpr"]["type"] == "NodeConstraint":
allowed = Utilities.process_node_constraint(statement,
expression["valueExpr"],
allowed)
except (KeyError, TypeError):
pass
return allowed


class Utilities:

def calculate_necessity(self, prop: str, shape: dict) -> str:
"""
Check if a property is required, optional or absent from a shape

:param str prop: the property to be checked
:param dict shape: the shape to check against
:return: necessity
"""
necessity: str = "absent"
list_of_expressions: list = []

if "expression" not in shape:
return necessity

if "expressions" in shape["expression"]:
for expression in shape["expression"]["expressions"]:
list_of_expressions.append(expression)
else:
list_of_expressions.append(shape["expression"])

for expression in list_of_expressions:
if "predicate" in expression and expression["predicate"].endswith(prop):
necessity = self.required_or_absent(expression)
return necessity

@staticmethod
def required_or_absent(expression: dict) -> str:
necessity: str = "optional"
if ("min" in expression and expression["min"] > 0) or ("min" not in expression and "max" not in expression):
necessity = "required"
if "min" in expression and "max" in expression and expression["min"] == 0 and expression["max"] == 0:
necessity = "absent"
return necessity

@staticmethod
def process_cardinalities(expression: dict, claim: dict) -> str:
"""
Processes cardinalities in expressions

:return: cardinality
"""
cardinality: str = "correct"
min_cardinality: bool = True
max_cardinality: bool = True
max_card: int = 1
min_card: int = 1
if "max" in expression:
max_card = expression["max"]
if "min" in expression:
min_card = expression["min"]
if max_card < len(claim):
max_cardinality = False
if min_card > len(claim):
min_cardinality = False
if max_card == -1:
max_cardinality = True
if min_card == -1:
min_cardinality = True
if min_cardinality and not max_cardinality:
cardinality = "too many statements"
if max_cardinality and not min_cardinality:
cardinality = "not enough correct statements"
return cardinality

@staticmethod
def process_node_constraint(statement: dict, expression: dict, allowed: str) -> str:
"""
Processes node constraint expression types in the shape

:param dict statement: The entity's statement to be assessed
:param dict expression: The expression from the shape to be assessed against
:param str allowed: Whether the statement is allowed by the expression or not currently
:return: allowed
"""
if "snaktype" not in statement:
return allowed
if "datavalue" not in statement:
return allowed
if "type" not in statement["datavalue"]:
return allowed

if statement["snaktype"] == "value" and \
statement["datavalue"]["type"] == "wikibase-entityid":
obj = f'http://www.wikidata.org/entity/{statement["datavalue"]["value"]["id"]}'
if "values" in expression:
if obj in expression["values"]:
allowed = "correct"
else:
allowed = "incorrect"
return allowed
Loading