From 5ed0a26d388f51a7b94b598eb5410256d71d6c12 Mon Sep 17 00:00:00 2001 From: Jihun Hong Date: Wed, 24 Feb 2021 05:27:33 -0800 Subject: [PATCH] Implement DiGraph class, add pytest --- snapx/snapx/classes/digraph.py | 83 +++++-- snapx/snapx/classes/graph.py | 9 +- snapx/snapx/classes/tests/test_digraph.py | 281 ++++++++++++++++++++++ 3 files changed, 351 insertions(+), 22 deletions(-) create mode 100644 snapx/snapx/classes/tests/test_digraph.py diff --git a/snapx/snapx/classes/digraph.py b/snapx/snapx/classes/digraph.py index 15c78166..96c54b2f 100644 --- a/snapx/snapx/classes/digraph.py +++ b/snapx/snapx/classes/digraph.py @@ -3,6 +3,8 @@ from copy import deepcopy import snapx as sx +import snap + from snapx.classes.graph import Graph from snapx.classes.coreviews import AdjacencyView from snapx.classes.reportviews import ( @@ -323,11 +325,12 @@ def __init__(self, incoming_graph_data=None, **attr): {'day': 'Friday'} """ - self._graph = TNEANet.New() + self._graph = snap.TNEANet.New() # Because we are emulating an directed graph using multigraph, # we need a counter to keep track of edges self._num_edges = 0 + # Supplementary dicts to keep data not supported by snap graph self._node_extra_attr = {} self._edge_extra_attr = {} @@ -435,7 +438,7 @@ def add_node(self, node_for_adding, **attr): NetworkX Graphs, though one should be careful that the hash doesn't change on mutables. """ - super.add_node(node_for_adding, **attr) + super().add_node(node_for_adding, **attr) def add_nodes_from(self, nodes_for_adding, **attr): """Add multiple nodes. @@ -481,7 +484,7 @@ def add_nodes_from(self, nodes_for_adding, **attr): 11 """ - super.add_nodes_from(nodes_for_adding, **attr) + super().add_nodes_from(nodes_for_adding, **attr) def remove_node(self, n): """Remove node n. @@ -513,7 +516,7 @@ def remove_node(self, n): [] """ - super.remove_node(n) + super().remove_node(n) def remove_nodes_from(self, nodes): """Remove multiple nodes. @@ -539,7 +542,7 @@ def remove_nodes_from(self, nodes): [] """ - super.remove_nodes_from(nodes) + super().remove_nodes_from(nodes) def add_edge(self, u_of_edge, v_of_edge, **attr): """Add an edge between u and v. @@ -651,7 +654,7 @@ def add_edges_from(self, ebunch_to_add, **attr): >>> G.add_edges_from([(1, 2), (2, 3)], weight=3) >>> G.add_edges_from([(3, 4), (1, 4)], label='WN2898') """ - super.add_edges_from(ebunch_to_add, **attr) + super().add_edges_from(ebunch_to_add, **attr) def remove_edge(self, u, v): """Remove the edge between u and v. @@ -680,11 +683,14 @@ def remove_edge(self, u, v): >>> e = (2, 3, {'weight':7}) # an edge with attribute data >>> G.remove_edge(*e[:2]) # select first part of edge tuple """ + if not self.has_edge(u, v): + return + try: - del self._succ[u][v] - del self._pred[v][u] - except KeyError as e: - raise NetworkXError("The edge {u}-{v} not in graph.".format(u, v)) from e + self._graph.DelEdge(u, v) + self._num_edges -= 1 + except TypeError: + raise SnapXTypeError("SNAP only supports int as edge keys.") def remove_edges_from(self, ebunch): """Remove all edges specified in ebunch. @@ -712,7 +718,46 @@ def remove_edges_from(self, ebunch): >>> ebunch = [(1, 2), (2, 3)] >>> G.remove_edges_from(ebunch) """ - raise NotImplementedError("TODO") + super().remove_edges_from(ebunch) + + def has_edge(self, u, v): + """PORTED FROM NETWORKX + Returns True if the edge (u, v) is in the graph. + This is the same as `v in G[u]` without KeyError exceptions. + Parameters + ---------- + u, v : nodes + Nodes can be, for example, strings or numbers. + Nodes must be hashable (and not None) Python objects. + Returns + ------- + edge_ind : bool + True if edge is in the graph, False otherwise. + Examples + -------- + >>> G = nx.path_graph(4) # or DiGraph, MultiGraph, MultiDiGraph, etc + >>> G.has_edge(0, 1) # using two nodes + True + >>> e = (0, 1) + >>> G.has_edge(*e) # e is a 2-tuple (u, v) + True + >>> e = (0, 1, {'weight':7}) + >>> G.has_edge(*e[:2]) # e is a 3-tuple (u, v, data_dictionary) + True + The following syntax are equivalent: + >>> G.has_edge(0, 1) + True + >>> 1 in G[0] # though this gives KeyError if 0 not in G + True + """ + # Avoid snap runtime error by first checking + # if the nodes exist + if u not in self or v not in self: + return False + try: + return self._graph.IsEdge(u, v) + except TypeError: + return False def has_successor(self, u, v): """Returns True if node u has successor v. @@ -758,7 +803,10 @@ def successors(self, n): # return iter(self._succ[n]) # except KeyError as e: # raise NetworkXError(f"The node {n} is not in the digraph.") from e - raise NotImplementedError("TODO") + try: + return iter(self.adj[n]) + except KeyError: + raise SnapXError("The node {} is not in the graph.".format(n)) # digraph definitions neighbors = successors @@ -1042,7 +1090,7 @@ def clear(self): [] """ - super.clear() + super().clear() def clear_edges(self): """Remove all edges from the graph without altering nodes. @@ -1057,10 +1105,11 @@ def clear_edges(self): [] """ - for predecessor_dict in self._pred.values(): - predecessor_dict.clear() - for successor_dict in self._succ.values(): - successor_dict.clear() + # for predecessor_dict in self._pred.values(): + # predecessor_dict.clear() + # for successor_dict in self._succ.values(): + # successor_dict.clear() + raise NotImplementedError("TODO") def is_multigraph(self): """Returns True if graph is a multigraph, False otherwise.""" diff --git a/snapx/snapx/classes/graph.py b/snapx/snapx/classes/graph.py index c394bcc0..b41b7834 100644 --- a/snapx/snapx/classes/graph.py +++ b/snapx/snapx/classes/graph.py @@ -2,8 +2,7 @@ """ import snapx as sx - -from snap import TNEANet, TNEANetNodeI, Nodes, Edges, TSIn +import snap from snapx.classes.reportviews import NodeView, EdgeView from snapx.classes.coreviews import AdjacencyView @@ -88,7 +87,7 @@ def __init__(self, incoming_graph_data=None, **attr): >>> G.graph {'day': 'Friday'} """ - self._graph = TNEANet.New() + self._graph = snap.TNEANet.New() # Because we are emulating an undirected graph using multigraph, # we need a counter to keep track of edges @@ -97,7 +96,7 @@ def __init__(self, incoming_graph_data=None, **attr): self._node_extra_attr = {} self._edge_extra_attr = {} if incoming_graph_data is not None: - if type(incoming_graph_data) == type(TNEANet.New()): + if type(incoming_graph_data) == type(snap.TNEANet.New()): self._graph = incoming_graph_data self._num_edges = self._graph.GetEdges() else: @@ -731,7 +730,7 @@ def remove_edges_from(self, ebunch): >>> ebunch = [(1, 2), (2, 3)] >>> G.remove_edges_from(ebunch) """ - for e in ebunch_to_add: + for e in ebunch: ne = len(e) if ne == 3: u, v, _ = e diff --git a/snapx/snapx/classes/tests/test_digraph.py b/snapx/snapx/classes/tests/test_digraph.py new file mode 100644 index 00000000..3844a484 --- /dev/null +++ b/snapx/snapx/classes/tests/test_digraph.py @@ -0,0 +1,281 @@ +import pytest + +import snapx as sx +from snapx.testing.utils import assert_nodes_equal +from .test_graph import BaseGraphTester, BaseAttrGraphTester + + +class BaseDiGraphTester(BaseGraphTester): + # def test_has_successor(self): + # G = self.K3 + # assert G.has_successor(0, 1) + # assert not G.has_successor(0, -1) + + # def test_successors(self): + # G = self.K3 + # assert sorted(G.successors(0)) == [1, 2] + # with pytest.raises(sx.SnapXError): + # G.successors(-1) + + # def test_has_predecessor(self): + # G = self.K3 + # assert G.has_predecessor(0, 1) + # assert not G.has_predecessor(0, -1) + + # def test_predecessors(self): + # G = self.K3 + # assert sorted(G.predecessors(0)) == [1, 2] + # with pytest.raises(sx.SnapXError): + # G.predecessors(-1) + + def test_edges(self): + G = self.K3 + assert sorted(G.edges()) == [(0, 1), (0, 2), (1, 2)] + assert sorted(G.edges(0)) == [(0, 1), (0, 2)] + assert sorted(G.edges([0, 1])) == [(0, 1), (0, 2), (1, 2)] + with pytest.raises(sx.SnapXError): + G.edges(-1) + + def test_out_edges(self): + G = self.K3 + assert sorted(G.out_edges()) == [(0, 1), (0, 2), (1, 2)] + assert sorted(G.out_edges(0)) == [(0, 1), (0, 2)] + with pytest.raises(sx.SnapXError): + G.out_edges(-1) + + def test_out_edges_dir(self): + G = self.P3 + assert sorted(G.out_edges()) == [(0, 1), (1, 2)] + assert sorted(G.out_edges(0)) == [(0, 1)] + assert sorted(G.out_edges(2)) == [] + +# def test_out_edges_data(self): +# G = sx.DiGraph([(0, 1, {"data": 0}), (1, 0, {})]) +# assert sorted(G.out_edges(data=True)) == [(0, 1, {"data": 0}), (1, 0, {})] +# assert sorted(G.out_edges(0, data=True)) == [(0, 1, {"data": 0})] +# assert sorted(G.out_edges(data="data")) == [(0, 1, 0), (1, 0, None)] +# assert sorted(G.out_edges(0, data="data")) == [(0, 1, 0)] + +# def test_in_edges_dir(self): +# G = self.P3 +# assert sorted(G.in_edges()) == [(0, 1), (1, 2)] +# assert sorted(G.in_edges(0)) == [] +# assert sorted(G.in_edges(2)) == [(1, 2)] + +# def test_in_edges_data(self): +# G = sx.DiGraph([(0, 1, {"data": 0}), (1, 0, {})]) +# assert sorted(G.in_edges(data=True)) == [(0, 1, {"data": 0}), (1, 0, {})] +# assert sorted(G.in_edges(1, data=True)) == [(0, 1, {"data": 0})] +# assert sorted(G.in_edges(data="data")) == [(0, 1, 0), (1, 0, None)] +# assert sorted(G.in_edges(1, data="data")) == [(0, 1, 0)] + +# def test_degree(self): +# G = self.K3 +# assert sorted(G.degree()) == [(0, 4), (1, 4), (2, 4)] +# assert dict(G.degree()) == {0: 4, 1: 4, 2: 4} +# assert G.degree(0) == 4 +# assert list(G.degree(iter([0]))) == [(0, 4)] # run through iterator + +# def test_in_degree(self): +# G = self.K3 +# assert sorted(G.in_degree()) == [(0, 2), (1, 2), (2, 2)] +# assert dict(G.in_degree()) == {0: 2, 1: 2, 2: 2} +# assert G.in_degree(0) == 2 +# assert list(G.in_degree(iter([0]))) == [(0, 2)] # run through iterator + +# def test_out_degree(self): +# G = self.K3 +# assert sorted(G.out_degree()) == [(0, 2), (1, 2), (2, 2)] +# assert dict(G.out_degree()) == {0: 2, 1: 2, 2: 2} +# assert G.out_degree(0) == 2 +# assert list(G.out_degree(iter([0]))) == [(0, 2)] + + def test_size(self): + G = self.K3 + assert G.size() == 3 + assert G.number_of_edges() == 3 + +# def test_to_undirected_reciprocal(self): +# G = self.Graph() +# G.add_edge(1, 2) +# assert G.to_undirected().has_edge(1, 2) +# assert not G.to_undirected(reciprocal=True).has_edge(1, 2) +# G.add_edge(2, 1) +# assert G.to_undirected(reciprocal=True).has_edge(1, 2) + +# def test_reverse_copy(self): +# G = sx.DiGraph([(0, 1), (1, 2)]) +# R = G.reverse() +# assert sorted(R.edges()) == [(1, 0), (2, 1)] +# R.remove_edge(1, 0) +# assert sorted(R.edges()) == [(2, 1)] +# assert sorted(G.edges()) == [(0, 1), (1, 2)] + +# def test_reverse_nocopy(self): +# G = sx.DiGraph([(0, 1), (1, 2)]) +# R = G.reverse(copy=False) +# assert sorted(R.edges()) == [(1, 0), (2, 1)] +# with pytest.raises(sx.SnapXError): +# R.remove_edge(1, 0) + + +class BaseAttrDiGraphTester(BaseDiGraphTester, BaseAttrGraphTester): + pass +# def test_edges_data(self): +# G = self.K3 +# all_edges = [ +# (0, 1, {}), +# (0, 2, {}), +# (1, 0, {}), +# (1, 2, {}), +# (2, 0, {}), +# (2, 1, {}), +# ] +# assert sorted(G.edges(data=True)) == all_edges +# assert sorted(G.edges(0, data=True)) == all_edges[:2] +# assert sorted(G.edges([0, 1], data=True)) == all_edges[:4] +# with pytest.raises(sx.SnapXError): +# G.edges(-1, True) + +# def test_in_degree_weighted(self): +# G = self.K3.copy() +# G.add_edge(0, 1, weight=0.3, other=1.2) +# assert sorted(G.in_degree(weight="weight")) == [(0, 2), (1, 1.3), (2, 2)] +# assert dict(G.in_degree(weight="weight")) == {0: 2, 1: 1.3, 2: 2} +# assert G.in_degree(1, weight="weight") == 1.3 +# assert sorted(G.in_degree(weight="other")) == [(0, 2), (1, 2.2), (2, 2)] +# assert dict(G.in_degree(weight="other")) == {0: 2, 1: 2.2, 2: 2} +# assert G.in_degree(1, weight="other") == 2.2 +# assert list(G.in_degree(iter([1]), weight="other")) == [(1, 2.2)] + +# def test_out_degree_weighted(self): +# G = self.K3.copy() +# G.add_edge(0, 1, weight=0.3, other=1.2) +# assert sorted(G.out_degree(weight="weight")) == [(0, 1.3), (1, 2), (2, 2)] +# assert dict(G.out_degree(weight="weight")) == {0: 1.3, 1: 2, 2: 2} +# assert G.out_degree(0, weight="weight") == 1.3 +# assert sorted(G.out_degree(weight="other")) == [(0, 2.2), (1, 2), (2, 2)] +# assert dict(G.out_degree(weight="other")) == {0: 2.2, 1: 2, 2: 2} +# assert G.out_degree(0, weight="other") == 2.2 +# assert list(G.out_degree(iter([0]), weight="other")) == [(0, 2.2)] + + +class TestDiGraph(BaseAttrDiGraphTester): + """Tests specific to dict-of-dict-of-dict digraph data structure""" + + def setup_method(self): + self.Graph = sx.DiGraph + # build dict-of-dict-of-dict K3 + self.k3edges = [(0, 1), (0, 2), (1, 2)] + self.k3nodes = [0, 1, 2] + self.K3 = self.Graph() + + self.K3.add_edges_from(self.k3edges) + self.K3.add_nodes_from(self.k3nodes) + + self.p3edges = [(0, 1), (1, 2)] + self.p3nodes = [0, 1, 2] + self.P3 = self.Graph() + + self.P3.add_edges_from(self.p3edges) + self.P3.add_nodes_from(self.p3nodes) + +# def test_data_input(self): +# G = self.Graph({1: [2], 2: [1]}, name="test") +# assert G.name == "test" +# assert sorted(G.adj.items()) == [(1, {2: {}}), (2, {1: {}})] +# assert sorted(G.succ.items()) == [(1, {2: {}}), (2, {1: {}})] +# assert sorted(G.pred.items()) == [(1, {2: {}}), (2, {1: {}})] + + def test_add_edge(self): + G = self.Graph() + G.add_edge(0, 1) + assert G.adj == {0: {1: {}}, 1: {}} + assert G.succ == {0: {1: {}}, 1: {}} + # assert G.pred == {0: {}, 1: {0: {}}} + G = self.Graph() + G.add_edge(*(0, 1)) + assert G.adj == {0: {1: {}}, 1: {}} + assert G.succ == {0: {1: {}}, 1: {}} + # assert G.pred == {0: {}, 1: {0: {}}} + + def test_add_edges_from(self): + G = self.Graph() + G.add_edges_from([(0, 1), (0, 2, {"data": 3})], data=2) + assert G.adj == {0: {1: {"data": 2}, 2: {"data": 3}}, 1: {}, 2: {}} + assert G.succ == {0: {1: {"data": 2}, 2: {"data": 3}}, 1: {}, 2: {}} + # assert G.pred == {0: {}, 1: {0: {"data": 2}}, 2: {0: {"data": 3}}} + + with pytest.raises(sx.SnapXError): + G.add_edges_from([(0,)]) # too few in tuple + with pytest.raises(sx.SnapXError): + G.add_edges_from([(0, 1, 2, 3)]) # too many in tuple + with pytest.raises(TypeError): + G.add_edges_from([0]) # not a tuple + + def test_remove_edge(self): + G = self.K3.copy() + G.remove_edge(0, 1) + assert G.succ == {0: {2: {}}, 1: {2: {}}, 2: {}} + # assert G.pred == {0: {1: {}, 2: {}}, 1: {2: {}}, 2: {0: {}, 1: {}}} + # with pytest.raises(sx.SnapXError): + # G.remove_edge(-1, 0) + + def test_remove_edges_from(self): + G = self.K3.copy() + G.remove_edges_from([(0, 1)]) + assert G.succ == {0: {2: {}}, 1: {2: {}}, 2: {}} + # assert G.pred == {0: {1: {}, 2: {}}, 1: {2: {}}, 2: {0: {}, 1: {}}} + G.remove_edges_from([(0, 0)]) # silent fail + + def test_clear(self): + G = self.K3 + G.graph["name"] = "K3" + G.clear() + assert list(G.nodes) == [] + assert G.succ == {} + # assert G.pred == {} + assert G.graph == {} + +# def test_clear_edges(self): +# G = self.K3 +# G.graph["name"] = "K3" +# nodes = list(G.nodes) +# G.clear_edges() +# assert list(G.nodes) == nodes +# expected = {0: {}, 1: {}, 2: {}} +# assert G.succ == expected +# assert G.pred == expected +# assert list(G.edges) == [] +# assert G.graph["name"] == "K3" + + +# class TestEdgeSubgraph(_TestGraphEdgeSubgraph): +# """Unit tests for the :meth:`DiGraph.edge_subgraph` method.""" +# +# def setup_method(self): +# # Create a doubly-linked path graph on five nodes. +# G = sx.DiGraph(sx.path_graph(5)) +# # Add some node, edge, and graph attributes. +# for i in range(5): +# G.nodes[i]["name"] = f"node{i}" +# G.edges[0, 1]["name"] = "edge01" +# G.edges[3, 4]["name"] = "edge34" +# G.graph["name"] = "graph" +# # Get the subgraph induced by the first and last edges. +# self.G = G +# self.H = G.edge_subgraph([(0, 1), (3, 4)]) +# +# def test_pred_succ(self): +# """Test that nodes are added to predecessors and successors. +# +# For more information, see GitHub issue #2370. +# +# """ +# G = sx.DiGraph() +# G.add_edge(0, 1) +# H = G.edge_subgraph([(0, 1)]) +# assert list(H.predecessors(0)) == [] +# assert list(H.successors(0)) == [1] +# assert list(H.predecessors(1)) == [0] +# assert list(H.successors(1)) == []