summaryrefslogtreecommitdiffstats
path: root/python/altgraph/altgraph_tests
diff options
context:
space:
mode:
Diffstat (limited to 'python/altgraph/altgraph_tests')
-rw-r--r--python/altgraph/altgraph_tests/__init__.py1
-rw-r--r--python/altgraph/altgraph_tests/test_altgraph.py45
-rw-r--r--python/altgraph/altgraph_tests/test_dot.py370
-rw-r--r--python/altgraph/altgraph_tests/test_graph.py644
-rw-r--r--python/altgraph/altgraph_tests/test_graphstat.py70
-rw-r--r--python/altgraph/altgraph_tests/test_graphutil.py140
-rw-r--r--python/altgraph/altgraph_tests/test_object_graph.py349
7 files changed, 1619 insertions, 0 deletions
diff --git a/python/altgraph/altgraph_tests/__init__.py b/python/altgraph/altgraph_tests/__init__.py
new file mode 100644
index 000000000..6890389df
--- /dev/null
+++ b/python/altgraph/altgraph_tests/__init__.py
@@ -0,0 +1 @@
+""" altgraph tests """
diff --git a/python/altgraph/altgraph_tests/test_altgraph.py b/python/altgraph/altgraph_tests/test_altgraph.py
new file mode 100644
index 000000000..2ca6b251e
--- /dev/null
+++ b/python/altgraph/altgraph_tests/test_altgraph.py
@@ -0,0 +1,45 @@
+#!/usr/bin/env py.test
+import os
+import sys
+
+from altgraph import Graph, GraphAlgo
+import unittest
+
+class BasicTests (unittest.TestCase):
+ def setUp(self):
+ self.edges = [
+ (1,2), (2,4), (1,3), (2,4), (3,4), (4,5), (6,5), (6,14), (14,15),
+ (6, 15), (5,7), (7, 8), (7,13), (12,8), (8,13), (11,12), (11,9),
+ (13,11), (9,13), (13,10)
+ ]
+
+ # these are the edges
+ self.store = {}
+ self.g = Graph.Graph()
+ for head, tail in self.edges:
+ self.store[head] = self.store[tail] = None
+ self.g.add_edge(head, tail)
+
+ def test_num_edges(self):
+ # check the parameters
+ self.assertEqual(self.g.number_of_nodes(), len(self.store))
+ self.assertEqual(self.g.number_of_edges(), len(self.edges))
+
+ def test_forw_bfs(self):
+ # do a forward bfs
+ self.assertEqual( self.g.forw_bfs(1),
+ [1, 2, 3, 4, 5, 7, 8, 13, 11, 10, 12, 9])
+
+
+ def test_get_hops(self):
+ # diplay the hops and hop numbers between nodes
+ self.assertEqual(self.g.get_hops(1, 8),
+ [(1, 0), (2, 1), (3, 1), (4, 2), (5, 3), (7, 4), (8, 5)])
+
+ def test_shortest_path(self):
+ self.assertEqual(GraphAlgo.shortest_path(self.g, 1, 12),
+ [1, 2, 4, 5, 7, 13, 11, 12])
+
+
+if __name__ == "__main__": # pragma: no cover
+ unittest.main()
diff --git a/python/altgraph/altgraph_tests/test_dot.py b/python/altgraph/altgraph_tests/test_dot.py
new file mode 100644
index 000000000..83993dad5
--- /dev/null
+++ b/python/altgraph/altgraph_tests/test_dot.py
@@ -0,0 +1,370 @@
+import unittest
+import os
+
+from altgraph import Dot
+from altgraph import Graph
+from altgraph import GraphError
+
+
+class TestDot (unittest.TestCase):
+
+ def test_constructor(self):
+ g = Graph.Graph([
+ (1,2),
+ (1,3),
+ (1,4),
+ (2,4),
+ (2,6),
+ (2,7),
+ (7,4),
+ (6,1),
+ ]
+ )
+
+ dot = Dot.Dot(g)
+
+ self.assertEqual(dot.name, 'G')
+ self.assertEqual(dot.attr, {})
+ self.assertEqual(dot.temp_dot, 'tmp_dot.dot')
+ self.assertEqual(dot.temp_neo, 'tmp_neo.dot')
+ self.assertEqual(dot.dot, 'dot')
+ self.assertEqual(dot.dotty, 'dotty')
+ self.assertEqual(dot.neato, 'neato')
+ self.assertEqual(dot.type, 'digraph')
+
+ self.assertEqual(dot.nodes, dict([(x, {}) for x in g]))
+
+ edges = {}
+ for head in g:
+ edges[head] = {}
+ for tail in g.out_nbrs(head):
+ edges[head][tail] = {}
+
+ self.assertEqual(dot.edges[1], edges[1])
+ self.assertEqual(dot.edges, edges)
+
+
+ dot = Dot.Dot(g, nodes=[1,2],
+ edgefn=lambda node: list(sorted(g.out_nbrs(node)))[:-1],
+ nodevisitor=lambda node: {'label': node},
+ edgevisitor=lambda head, tail: {'label': (head, tail) },
+ name="testgraph",
+ dot='/usr/local/bin/dot',
+ dotty='/usr/local/bin/dotty',
+ neato='/usr/local/bin/neato',
+ graphtype="graph")
+
+ self.assertEqual(dot.name, 'testgraph')
+ self.assertEqual(dot.attr, {})
+ self.assertEqual(dot.temp_dot, 'tmp_dot.dot')
+ self.assertEqual(dot.temp_neo, 'tmp_neo.dot')
+ self.assertEqual(dot.dot, '/usr/local/bin/dot')
+ self.assertEqual(dot.dotty, '/usr/local/bin/dotty')
+ self.assertEqual(dot.neato, '/usr/local/bin/neato')
+ self.assertEqual(dot.type, 'graph')
+
+ self.assertEqual(dot.nodes, dict([(x, {'label': x}) for x in [1,2]]))
+
+ edges = {}
+ for head in [1,2]:
+ edges[head] = {}
+ for tail in list(sorted(g.out_nbrs(head)))[:-1]:
+ if tail not in [1,2]: continue
+ edges[head][tail] = {'label': (head, tail) }
+
+ self.assertEqual(dot.edges[1], edges[1])
+ self.assertEqual(dot.edges, edges)
+
+ self.assertRaises(GraphError, Dot.Dot, g, nodes=[1,2, 9])
+
+ def test_style(self):
+ g = Graph.Graph([])
+
+ dot = Dot.Dot(g)
+
+ self.assertEqual(dot.attr, {})
+
+ dot.style(key='value')
+ self.assertEqual(dot.attr, {'key': 'value'})
+
+ dot.style(key2='value2')
+ self.assertEqual(dot.attr, {'key2': 'value2'})
+
+ def test_node_style(self):
+ g = Graph.Graph([
+ (1,2),
+ (1,3),
+ (1,4),
+ (2,4),
+ (2,6),
+ (2,7),
+ (7,4),
+ (6,1),
+ ]
+ )
+
+ dot = Dot.Dot(g)
+
+ self.assertEqual(dot.nodes[1], {})
+
+ dot.node_style(1, key='value')
+ self.assertEqual(dot.nodes[1], {'key': 'value'})
+
+ dot.node_style(1, key2='value2')
+ self.assertEqual(dot.nodes[1], {'key2': 'value2'})
+ self.assertEqual(dot.nodes[2], {})
+
+ dot.all_node_style(key3='value3')
+ for n in g:
+ self.assertEqual(dot.nodes[n], {'key3': 'value3'})
+
+ self.assertTrue(9 not in dot.nodes)
+ dot.node_style(9, key='value')
+ self.assertEqual(dot.nodes[9], {'key': 'value'})
+
+ def test_edge_style(self):
+ g = Graph.Graph([
+ (1,2),
+ (1,3),
+ (1,4),
+ (2,4),
+ (2,6),
+ (2,7),
+ (7,4),
+ (6,1),
+ ]
+ )
+
+ dot = Dot.Dot(g)
+
+ self.assertEqual(dot.edges[1][2], {})
+ dot.edge_style(1,2, foo='bar')
+ self.assertEqual(dot.edges[1][2], {'foo': 'bar'})
+
+ dot.edge_style(1,2, foo2='2bar')
+ self.assertEqual(dot.edges[1][2], {'foo2': '2bar'})
+
+ self.assertEqual(dot.edges[1][3], {})
+
+ self.assertFalse(6 in dot.edges[1])
+ dot.edge_style(1,6, foo2='2bar')
+ self.assertEqual(dot.edges[1][6], {'foo2': '2bar'})
+
+ self.assertRaises(GraphError, dot.edge_style, 1, 9, a=1)
+ self.assertRaises(GraphError, dot.edge_style, 9, 1, a=1)
+
+
+ def test_iter(self):
+ g = Graph.Graph([
+ (1,2),
+ (1,3),
+ (1,4),
+ (2,4),
+ (2,6),
+ (2,7),
+ (7,4),
+ (6,1),
+ ]
+ )
+
+ dot = Dot.Dot(g)
+ dot.style(graph="foobar")
+ dot.node_style(1, key='value')
+ dot.node_style(2, key='another', key2='world')
+ dot.edge_style(1,4, key1='value1', key2='value2')
+ dot.edge_style(2,4, key1='valueA')
+
+ self.assertEqual(list(iter(dot)), list(dot.iterdot()))
+
+ for item in dot.iterdot():
+ self.assertTrue(isinstance(item, str))
+
+ first = list(dot.iterdot())[0]
+ self.assertEqual(first, "digraph %s {\n"%(dot.name,))
+
+ dot.type = 'graph'
+ first = list(dot.iterdot())[0]
+ self.assertEqual(first, "graph %s {\n"%(dot.name,))
+
+ dot.type = 'foo'
+ self.assertRaises(GraphError, list, dot.iterdot())
+ dot.type = 'digraph'
+
+ self.assertEqual(list(dot), [
+ 'digraph G {\n',
+ 'graph="foobar";',
+ '\n',
+
+ '\t"1" [',
+ 'key="value",',
+ '];\n',
+
+ '\t"2" [',
+ 'key="another",',
+ 'key2="world",',
+ '];\n',
+
+ '\t"3" [',
+ '];\n',
+
+ '\t"4" [',
+ '];\n',
+
+ '\t"6" [',
+ '];\n',
+
+ '\t"7" [',
+ '];\n',
+
+ '\t"1" -> "2" [',
+ '];\n',
+
+ '\t"1" -> "3" [',
+ '];\n',
+
+ '\t"1" -> "4" [',
+ 'key1="value1",',
+ 'key2="value2",',
+ '];\n',
+
+ '\t"2" -> "4" [',
+ 'key1="valueA",',
+ '];\n',
+
+ '\t"2" -> "6" [',
+ '];\n',
+
+ '\t"2" -> "7" [',
+ '];\n',
+
+ '\t"6" -> "1" [',
+ '];\n',
+
+ '\t"7" -> "4" [',
+ '];\n',
+ '}\n'])
+
+
+ def test_save(self):
+ g = Graph.Graph([
+ (1,2),
+ (1,3),
+ (1,4),
+ (2,4),
+ (2,6),
+ (2,7),
+ (7,4),
+ (6,1),
+ ]
+ )
+
+ dot = Dot.Dot(g)
+ dot.style(graph="foobar")
+ dot.node_style(1, key='value')
+ dot.node_style(2, key='another', key2='world')
+ dot.edge_style(1,4, key1='value1', key2='value2')
+ dot.edge_style(2,4, key1='valueA')
+
+ fn = 'test_dot.dot'
+ self.assertTrue(not os.path.exists(fn))
+
+ try:
+ dot.save_dot(fn)
+
+ fp = open(fn, 'r')
+ data = fp.read()
+ fp.close()
+ self.assertEqual(data, ''.join(dot))
+
+ finally:
+ if os.path.exists(fn):
+ os.unlink(fn)
+
+
+ def test_img(self):
+ g = Graph.Graph([
+ (1,2),
+ (1,3),
+ (1,4),
+ (2,4),
+ (2,6),
+ (2,7),
+ (7,4),
+ (6,1),
+ ]
+ )
+
+ dot = Dot.Dot(g, dot='/usr/local/bin/!!dot', dotty='/usr/local/bin/!!dotty', neato='/usr/local/bin/!!neato')
+ dot.style(size='10,10', rankdir='RL', page='5, 5' , ranksep=0.75)
+ dot.node_style(1, label='BASE_NODE',shape='box', color='blue')
+ dot.node_style(2, style='filled', fillcolor='red')
+ dot.edge_style(1,4, style='dotted')
+ dot.edge_style(2,4, arrowhead='dot', label='binds', labelangle='90')
+
+ system_cmds = []
+ def fake_system(cmd):
+ system_cmds.append(cmd)
+ return None
+
+ try:
+ real_system = os.system
+ os.system = fake_system
+
+ system_cmds = []
+ dot.save_img('foo')
+ self.assertEqual(system_cmds, ['/usr/local/bin/!!dot -Tgif tmp_dot.dot -o foo.gif'])
+
+ system_cmds = []
+ dot.save_img('foo', file_type='jpg')
+ self.assertEqual(system_cmds, ['/usr/local/bin/!!dot -Tjpg tmp_dot.dot -o foo.jpg'])
+
+ system_cmds = []
+ dot.save_img('bar', file_type='jpg', mode='neato')
+ self.assertEqual(system_cmds, [
+ '/usr/local/bin/!!neato -o tmp_dot.dot tmp_neo.dot',
+ '/usr/local/bin/!!dot -Tjpg tmp_dot.dot -o bar.jpg',
+ ])
+
+ system_cmds = []
+ dot.display()
+ self.assertEqual(system_cmds, [
+ '/usr/local/bin/!!dotty tmp_dot.dot'
+ ])
+
+ system_cmds = []
+ dot.display(mode='neato')
+ self.assertEqual(system_cmds, [
+ '/usr/local/bin/!!neato -o tmp_dot.dot tmp_neo.dot',
+ '/usr/local/bin/!!dotty tmp_dot.dot'
+ ])
+
+ finally:
+ if os.path.exists(dot.temp_dot):
+ os.unlink(dot.temp_dot)
+ if os.path.exists(dot.temp_neo):
+ os.unlink(dot.temp_neo)
+ os.system = real_system
+
+ if os.path.exists('/usr/local/bin/dot') and os.path.exists('/usr/local/bin/neato'):
+ try:
+ dot.dot='/usr/local/bin/dot'
+ dot.neato='/usr/local/bin/neato'
+ self.assertFalse(os.path.exists('foo.gif'))
+ dot.save_img('foo')
+ self.assertTrue(os.path.exists('foo.gif'))
+ os.unlink('foo.gif')
+
+ self.assertFalse(os.path.exists('foo.gif'))
+ dot.save_img('foo', mode='neato')
+ self.assertTrue(os.path.exists('foo.gif'))
+ os.unlink('foo.gif')
+
+ finally:
+ if os.path.exists(dot.temp_dot):
+ os.unlink(dot.temp_dot)
+ if os.path.exists(dot.temp_neo):
+ os.unlink(dot.temp_neo)
+
+
+if __name__ == "__main__": # pragma: no cover
+ unittest.main()
diff --git a/python/altgraph/altgraph_tests/test_graph.py b/python/altgraph/altgraph_tests/test_graph.py
new file mode 100644
index 000000000..553549f5a
--- /dev/null
+++ b/python/altgraph/altgraph_tests/test_graph.py
@@ -0,0 +1,644 @@
+import unittest
+
+from altgraph import GraphError
+from altgraph.Graph import Graph
+
+class TestGraph (unittest.TestCase):
+
+ def test_nodes(self):
+ graph = Graph()
+
+ self.assertEqual(graph.node_list(), [])
+
+ o1 = object()
+ o1b = object()
+ o2 = object()
+ graph.add_node(1, o1)
+ graph.add_node(1, o1b)
+ graph.add_node(2, o2)
+ graph.add_node(3)
+
+ self.assertRaises(TypeError, graph.add_node, [])
+
+ self.assertTrue(graph.node_data(1) is o1)
+ self.assertTrue(graph.node_data(2) is o2)
+ self.assertTrue(graph.node_data(3) is None)
+
+ self.assertTrue(1 in graph)
+ self.assertTrue(2 in graph)
+ self.assertTrue(3 in graph)
+
+ self.assertEqual(graph.number_of_nodes(), 3)
+ self.assertEqual(graph.number_of_hidden_nodes(), 0)
+ self.assertEqual(graph.hidden_node_list(), [])
+ self.assertEqual(list(sorted(graph)), [1, 2, 3])
+
+ graph.hide_node(1)
+ graph.hide_node(2)
+ graph.hide_node(3)
+
+
+ self.assertEqual(graph.number_of_nodes(), 0)
+ self.assertEqual(graph.number_of_hidden_nodes(), 3)
+ self.assertEqual(list(sorted(graph.hidden_node_list())), [1, 2, 3])
+
+ self.assertFalse(1 in graph)
+ self.assertFalse(2 in graph)
+ self.assertFalse(3 in graph)
+
+ graph.add_node(1)
+ self.assertFalse(1 in graph)
+
+ graph.restore_node(1)
+ self.assertTrue(1 in graph)
+ self.assertFalse(2 in graph)
+ self.assertFalse(3 in graph)
+
+ graph.restore_all_nodes()
+ self.assertTrue(1 in graph)
+ self.assertTrue(2 in graph)
+ self.assertTrue(3 in graph)
+
+ self.assertEqual(list(sorted(graph.node_list())), [1, 2, 3])
+
+ v = graph.describe_node(1)
+ self.assertEqual(v, (1, o1, [], []))
+
+ def test_edges(self):
+ graph = Graph()
+ graph.add_node(1)
+ graph.add_node(2)
+ graph.add_node(3)
+ graph.add_node(4)
+ graph.add_node(5)
+
+ self.assertTrue(isinstance(graph.edge_list(), list))
+
+ graph.add_edge(1, 2)
+ graph.add_edge(4, 5, 'a')
+
+ self.assertRaises(GraphError, graph.add_edge, 'a', 'b', create_nodes=False)
+
+ self.assertEqual(graph.number_of_hidden_edges(), 0)
+ self.assertEqual(graph.number_of_edges(), 2)
+ e = graph.edge_by_node(1, 2)
+ self.assertTrue(isinstance(e, int))
+ graph.hide_edge(e)
+ self.assertEqual(graph.number_of_hidden_edges(), 1)
+ self.assertEqual(graph.number_of_edges(), 1)
+ e2 = graph.edge_by_node(1, 2)
+ self.assertTrue(e2 is None)
+
+ graph.restore_edge(e)
+ e2 = graph.edge_by_node(1, 2)
+ self.assertEqual(e, e2)
+ self.assertEqual(graph.number_of_hidden_edges(), 0)
+
+ self.assertEqual(graph.number_of_edges(), 2)
+
+ e1 = graph.edge_by_node(1, 2)
+ e2 = graph.edge_by_node(4, 5)
+ graph.hide_edge(e1)
+ graph.hide_edge(e2)
+
+ self.assertEqual(graph.number_of_edges(), 0)
+ graph.restore_all_edges()
+ self.assertEqual(graph.number_of_edges(), 2)
+
+ self.assertEqual(graph.edge_by_id(e1), (1,2))
+ self.assertRaises(GraphError, graph.edge_by_id, (e1+1)*(e2+1)+1)
+
+ self.assertEqual(list(sorted(graph.edge_list())), [e1, e2])
+
+ self.assertEqual(graph.describe_edge(e1), (e1, 1, 1, 2))
+ self.assertEqual(graph.describe_edge(e2), (e2, 'a', 4, 5))
+
+ self.assertEqual(graph.edge_data(e1), 1)
+ self.assertEqual(graph.edge_data(e2), 'a')
+
+ self.assertEqual(graph.head(e2), 4)
+ self.assertEqual(graph.tail(e2), 5)
+
+ graph.add_edge(1, 3)
+ graph.add_edge(1, 5)
+ graph.add_edge(4, 1)
+
+ self.assertEqual(list(sorted(graph.out_nbrs(1))), [2, 3, 5])
+ self.assertEqual(list(sorted(graph.inc_nbrs(1))), [4])
+ self.assertEqual(list(sorted(graph.inc_nbrs(5))), [1, 4])
+ self.assertEqual(list(sorted(graph.all_nbrs(1))), [2, 3, 4, 5])
+
+ graph.add_edge(5, 1)
+ self.assertEqual(list(sorted(graph.all_nbrs(5))), [1, 4])
+
+ self.assertEqual(graph.out_degree(1), 3)
+ self.assertEqual(graph.inc_degree(2), 1)
+ self.assertEqual(graph.inc_degree(5), 2)
+ self.assertEqual(graph.all_degree(5), 3)
+
+ v = graph.out_edges(4)
+ self.assertTrue(isinstance(v, list))
+ self.assertEqual(graph.edge_by_id(v[0]), (4, 5))
+
+ v = graph.out_edges(1)
+ for e in v:
+ self.assertEqual(graph.edge_by_id(e)[0], 1)
+
+ v = graph.inc_edges(1)
+ self.assertTrue(isinstance(v, list))
+ self.assertEqual(graph.edge_by_id(v[0]), (4, 1))
+
+ v = graph.inc_edges(5)
+ for e in v:
+ self.assertEqual(graph.edge_by_id(e)[1], 5)
+
+ v = graph.all_edges(5)
+ for e in v:
+ self.assertTrue(graph.edge_by_id(e)[1] == 5 or graph.edge_by_id(e)[0] == 5)
+
+ e1 = graph.edge_by_node(1, 2)
+ self.assertTrue(isinstance(e1, int))
+ graph.hide_node(1)
+ self.assertRaises(GraphError, graph.edge_by_node, 1, 2)
+ graph.restore_node(1)
+ e2 = graph.edge_by_node(1, 2)
+ self.assertEqual(e1, e2)
+
+
+
+ def test_toposort(self):
+ graph = Graph()
+ graph.add_node(1)
+ graph.add_node(2)
+ graph.add_node(3)
+ graph.add_node(4)
+ graph.add_node(5)
+
+ graph.add_edge(1, 2)
+ graph.add_edge(1, 3)
+ graph.add_edge(2, 4)
+ graph.add_edge(3, 5)
+
+ ok, result = graph.forw_topo_sort()
+ self.assertTrue(ok)
+ for idx in range(1, 6):
+ self.assertTrue(idx in result)
+
+ self.assertTrue(result.index(1) < result.index(2))
+ self.assertTrue(result.index(1) < result.index(3))
+ self.assertTrue(result.index(2) < result.index(4))
+ self.assertTrue(result.index(3) < result.index(5))
+
+ ok, result = graph.back_topo_sort()
+ self.assertTrue(ok)
+ for idx in range(1, 6):
+ self.assertTrue(idx in result)
+ self.assertTrue(result.index(2) < result.index(1))
+ self.assertTrue(result.index(3) < result.index(1))
+ self.assertTrue(result.index(4) < result.index(2))
+ self.assertTrue(result.index(5) < result.index(3))
+
+
+ # Same graph as before, but with edges
+ # reversed, which means we should get
+ # the same results as before if using
+ # back_topo_sort rather than forw_topo_sort
+ # (and v.v.)
+
+ graph = Graph()
+ graph.add_node(1)
+ graph.add_node(2)
+ graph.add_node(3)
+ graph.add_node(4)
+ graph.add_node(5)
+
+ graph.add_edge(2, 1)
+ graph.add_edge(3, 1)
+ graph.add_edge(4, 2)
+ graph.add_edge(5, 3)
+
+ ok, result = graph.back_topo_sort()
+ self.assertTrue(ok)
+ for idx in range(1, 6):
+ self.assertTrue(idx in result)
+
+ self.assertTrue(result.index(1) < result.index(2))
+ self.assertTrue(result.index(1) < result.index(3))
+ self.assertTrue(result.index(2) < result.index(4))
+ self.assertTrue(result.index(3) < result.index(5))
+
+ ok, result = graph.forw_topo_sort()
+ self.assertTrue(ok)
+ for idx in range(1, 6):
+ self.assertTrue(idx in result)
+ self.assertTrue(result.index(2) < result.index(1))
+ self.assertTrue(result.index(3) < result.index(1))
+ self.assertTrue(result.index(4) < result.index(2))
+ self.assertTrue(result.index(5) < result.index(3))
+
+
+ # Create a cycle
+ graph.add_edge(1, 5)
+ ok, result = graph.forw_topo_sort()
+ self.assertFalse(ok)
+ ok, result = graph.back_topo_sort()
+ self.assertFalse(ok)
+
+ def test_bfs_subgraph(self):
+ graph = Graph()
+ graph.add_edge(1, 2)
+ graph.add_edge(1, 4)
+ graph.add_edge(2, 4)
+ graph.add_edge(4, 8)
+ graph.add_edge(4, 9)
+ graph.add_edge(4, 10)
+ graph.add_edge(8, 10)
+
+ subgraph = graph.forw_bfs_subgraph(10)
+ self.assertTrue(isinstance(subgraph, Graph))
+ self.assertEqual(subgraph.number_of_nodes(), 1)
+ self.assertTrue(10 in subgraph)
+ self.assertEqual(subgraph.number_of_edges(), 0)
+
+ subgraph = graph.forw_bfs_subgraph(4)
+ self.assertTrue(isinstance(subgraph, Graph))
+ self.assertEqual(subgraph.number_of_nodes(), 4)
+ self.assertTrue(4 in subgraph)
+ self.assertTrue(8 in subgraph)
+ self.assertTrue(9 in subgraph)
+ self.assertTrue(10 in subgraph)
+ self.assertEqual(subgraph.number_of_edges(), 4)
+ e = subgraph.edge_by_node(4, 8)
+ e = subgraph.edge_by_node(4, 9)
+ e = subgraph.edge_by_node(4, 10)
+ e = subgraph.edge_by_node(8, 10)
+
+ # same graph as before, but switch around
+ # edges. This results in the same test results
+ # but now for back_bfs_subgraph rather than
+ # forw_bfs_subgraph
+
+ graph = Graph()
+ graph.add_edge(2, 1)
+ graph.add_edge(4, 1)
+ graph.add_edge(4, 2)
+ graph.add_edge(8, 4)
+ graph.add_edge(9, 4)
+ graph.add_edge(10, 4)
+ graph.add_edge(10, 8)
+
+ subgraph = graph.back_bfs_subgraph(10)
+ self.assertTrue(isinstance(subgraph, Graph))
+ self.assertEqual(subgraph.number_of_nodes(), 1)
+ self.assertTrue(10 in subgraph)
+ self.assertEqual(subgraph.number_of_edges(), 0)
+
+ subgraph = graph.back_bfs_subgraph(4)
+ self.assertTrue(isinstance(subgraph, Graph))
+ self.assertEqual(subgraph.number_of_nodes(), 4)
+ self.assertTrue(4 in subgraph)
+ self.assertTrue(8 in subgraph)
+ self.assertTrue(9 in subgraph)
+ self.assertTrue(10 in subgraph)
+ self.assertEqual(subgraph.number_of_edges(), 4)
+ e = subgraph.edge_by_node(4, 8)
+ e = subgraph.edge_by_node(4, 9)
+ e = subgraph.edge_by_node(4, 10)
+ e = subgraph.edge_by_node(8, 10)
+
+ def test_iterdfs(self):
+ graph = Graph()
+ graph.add_edge("1", "1.1")
+ graph.add_edge("1", "1.2")
+ graph.add_edge("1", "1.3")
+ graph.add_edge("1.1", "1.1.1")
+ graph.add_edge("1.1", "1.1.2")
+ graph.add_edge("1.2", "1.2.1")
+ graph.add_edge("1.2", "1.2.2")
+ graph.add_edge("1.2.2", "1.2.2.1")
+ graph.add_edge("1.2.2", "1.2.2.2")
+ graph.add_edge("1.2.2", "1.2.2.3")
+
+ result = list(graph.iterdfs("1"))
+ self.assertEqual(result, [
+ '1', '1.3', '1.2', '1.2.2', '1.2.2.3', '1.2.2.2',
+ '1.2.2.1', '1.2.1', '1.1', '1.1.2', '1.1.1'
+ ])
+ result = list(graph.iterdfs("1", "1.2.1"))
+ self.assertEqual(result, [
+ '1', '1.3', '1.2', '1.2.2', '1.2.2.3', '1.2.2.2',
+ '1.2.2.1', '1.2.1'
+ ])
+
+ result = graph.forw_dfs("1")
+ self.assertEqual(result, [
+ '1', '1.3', '1.2', '1.2.2', '1.2.2.3', '1.2.2.2',
+ '1.2.2.1', '1.2.1', '1.1', '1.1.2', '1.1.1'
+ ])
+ result = graph.forw_dfs("1", "1.2.1")
+ self.assertEqual(result, [
+ '1', '1.3', '1.2', '1.2.2', '1.2.2.3', '1.2.2.2',
+ '1.2.2.1', '1.2.1'
+ ])
+
+ graph = Graph()
+ graph.add_edge("1.1", "1")
+ graph.add_edge("1.2", "1")
+ graph.add_edge("1.3", "1")
+ graph.add_edge("1.1.1", "1.1")
+ graph.add_edge("1.1.2", "1.1")
+ graph.add_edge("1.2.1", "1.2")
+ graph.add_edge("1.2.2", "1.2")
+ graph.add_edge("1.2.2.1", "1.2.2")
+ graph.add_edge("1.2.2.2", "1.2.2")
+ graph.add_edge("1.2.2.3", "1.2.2")
+
+ result = list(graph.iterdfs("1", forward=False))
+ self.assertEqual(result, [
+ '1', '1.3', '1.2', '1.2.2', '1.2.2.3', '1.2.2.2',
+ '1.2.2.1', '1.2.1', '1.1', '1.1.2', '1.1.1'
+ ])
+ result = list(graph.iterdfs("1", "1.2.1", forward=False))
+ self.assertEqual(result, [
+ '1', '1.3', '1.2', '1.2.2', '1.2.2.3', '1.2.2.2',
+ '1.2.2.1', '1.2.1'
+ ])
+ result = graph.back_dfs("1")
+ self.assertEqual(result, [
+ '1', '1.3', '1.2', '1.2.2', '1.2.2.3', '1.2.2.2',
+ '1.2.2.1', '1.2.1', '1.1', '1.1.2', '1.1.1'
+ ])
+ result = graph.back_dfs("1", "1.2.1")
+ self.assertEqual(result, [
+ '1', '1.3', '1.2', '1.2.2', '1.2.2.3', '1.2.2.2',
+ '1.2.2.1', '1.2.1'
+ ])
+
+
+ # Introduce cyle:
+ graph.add_edge("1", "1.2")
+ result = list(graph.iterdfs("1", forward=False))
+ self.assertEqual(result, [
+ '1', '1.3', '1.2', '1.2.2', '1.2.2.3', '1.2.2.2',
+ '1.2.2.1', '1.2.1', '1.1', '1.1.2', '1.1.1'
+ ])
+
+ result = graph.back_dfs("1")
+ self.assertEqual(result, [
+ '1', '1.3', '1.2', '1.2.2', '1.2.2.3', '1.2.2.2',
+ '1.2.2.1', '1.2.1', '1.1', '1.1.2', '1.1.1'
+ ])
+
+
+ def test_iterdata(self):
+ graph = Graph()
+ graph.add_node("1", "I")
+ graph.add_node("1.1", "I.I")
+ graph.add_node("1.2", "I.II")
+ graph.add_node("1.3", "I.III")
+ graph.add_node("1.1.1", "I.I.I")
+ graph.add_node("1.1.2", "I.I.II")
+ graph.add_node("1.2.1", "I.II.I")
+ graph.add_node("1.2.2", "I.II.II")
+ graph.add_node("1.2.2.1", "I.II.II.I")
+ graph.add_node("1.2.2.2", "I.II.II.II")
+ graph.add_node("1.2.2.3", "I.II.II.III")
+
+ graph.add_edge("1", "1.1")
+ graph.add_edge("1", "1.2")
+ graph.add_edge("1", "1.3")
+ graph.add_edge("1.1", "1.1.1")
+ graph.add_edge("1.1", "1.1.2")
+ graph.add_edge("1.2", "1.2.1")
+ graph.add_edge("1.2", "1.2.2")
+ graph.add_edge("1.2.2", "1.2.2.1")
+ graph.add_edge("1.2.2", "1.2.2.2")
+ graph.add_edge("1.2.2", "1.2.2.3")
+
+ result = list(graph.iterdata("1", forward=True))
+ self.assertEqual(result, [
+ 'I', 'I.III', 'I.II', 'I.II.II', 'I.II.II.III', 'I.II.II.II',
+ 'I.II.II.I', 'I.II.I', 'I.I', 'I.I.II', 'I.I.I'
+ ])
+
+ result = list(graph.iterdata("1", end="1.2.1", forward=True))
+ self.assertEqual(result, [
+ 'I', 'I.III', 'I.II', 'I.II.II', 'I.II.II.III', 'I.II.II.II',
+ 'I.II.II.I', 'I.II.I'
+ ])
+
+ result = list(graph.iterdata("1", condition=lambda n: len(n) < 6, forward=True))
+ self.assertEqual(result, [
+ 'I', 'I.III', 'I.II',
+ 'I.I', 'I.I.I'
+ ])
+
+
+ # And the revese option:
+ graph = Graph()
+ graph.add_node("1", "I")
+ graph.add_node("1.1", "I.I")
+ graph.add_node("1.2", "I.II")
+ graph.add_node("1.3", "I.III")
+ graph.add_node("1.1.1", "I.I.I")
+ graph.add_node("1.1.2", "I.I.II")
+ graph.add_node("1.2.1", "I.II.I")
+ graph.add_node("1.2.2", "I.II.II")
+ graph.add_node("1.2.2.1", "I.II.II.I")
+ graph.add_node("1.2.2.2", "I.II.II.II")
+ graph.add_node("1.2.2.3", "I.II.II.III")
+
+ graph.add_edge("1.1", "1")
+ graph.add_edge("1.2", "1")
+ graph.add_edge("1.3", "1")
+ graph.add_edge("1.1.1", "1.1")
+ graph.add_edge("1.1.2", "1.1")
+ graph.add_edge("1.2.1", "1.2")
+ graph.add_edge("1.2.2", "1.2")
+ graph.add_edge("1.2.2.1", "1.2.2")
+ graph.add_edge("1.2.2.2", "1.2.2")
+ graph.add_edge("1.2.2.3", "1.2.2")
+
+ result = list(graph.iterdata("1", forward=False))
+ self.assertEqual(result, [
+ 'I', 'I.III', 'I.II', 'I.II.II', 'I.II.II.III', 'I.II.II.II',
+ 'I.II.II.I', 'I.II.I', 'I.I', 'I.I.II', 'I.I.I'
+ ])
+
+ result = list(graph.iterdata("1", end="1.2.1", forward=False))
+ self.assertEqual(result, [
+ 'I', 'I.III', 'I.II', 'I.II.II', 'I.II.II.III', 'I.II.II.II',
+ 'I.II.II.I', 'I.II.I'
+ ])
+
+ result = list(graph.iterdata("1", condition=lambda n: len(n) < 6, forward=False))
+ self.assertEqual(result, [
+ 'I', 'I.III', 'I.II',
+ 'I.I', 'I.I.I'
+ ])
+
+ def test_bfs(self):
+ graph = Graph()
+ graph.add_edge("1", "1.1")
+ graph.add_edge("1.1", "1.1.1")
+ graph.add_edge("1.1", "1.1.2")
+ graph.add_edge("1.1.2", "1.1.2.1")
+ graph.add_edge("1.1.2", "1.1.2.2")
+ graph.add_edge("1", "1.2")
+ graph.add_edge("1", "1.3")
+ graph.add_edge("1.2", "1.2.1")
+
+ self.assertEqual(graph.forw_bfs("1"),
+ ['1', '1.1', '1.2', '1.3', '1.1.1', '1.1.2', '1.2.1', '1.1.2.1', '1.1.2.2'])
+ self.assertEqual(graph.forw_bfs("1", "1.1.1"),
+ ['1', '1.1', '1.2', '1.3', '1.1.1'])
+
+
+ # And the "reverse" graph
+ graph = Graph()
+ graph.add_edge("1.1", "1")
+ graph.add_edge("1.1.1", "1.1")
+ graph.add_edge("1.1.2", "1.1")
+ graph.add_edge("1.1.2.1", "1.1.2")
+ graph.add_edge("1.1.2.2", "1.1.2")
+ graph.add_edge("1.2", "1")
+ graph.add_edge("1.3", "1")
+ graph.add_edge("1.2.1", "1.2")
+
+ self.assertEqual(graph.back_bfs("1"),
+ ['1', '1.1', '1.2', '1.3', '1.1.1', '1.1.2', '1.2.1', '1.1.2.1', '1.1.2.2'])
+ self.assertEqual(graph.back_bfs("1", "1.1.1"),
+ ['1', '1.1', '1.2', '1.3', '1.1.1'])
+
+
+
+ # check cycle handling
+ graph.add_edge("1", "1.2.1")
+ self.assertEqual(graph.back_bfs("1"),
+ ['1', '1.1', '1.2', '1.3', '1.1.1', '1.1.2', '1.2.1', '1.1.2.1', '1.1.2.2'])
+
+
+ def test_connected(self):
+ graph = Graph()
+ graph.add_node(1)
+ graph.add_node(2)
+ graph.add_node(3)
+ graph.add_node(4)
+
+ self.assertFalse(graph.connected())
+
+ graph.add_edge(1, 2)
+ graph.add_edge(3, 4)
+ self.assertFalse(graph.connected())
+
+ graph.add_edge(2, 3)
+ graph.add_edge(4, 1)
+ self.assertTrue(graph.connected())
+
+ def test_edges_complex(self):
+ g = Graph()
+ g.add_edge(1, 2)
+ e = g.edge_by_node(1,2)
+ g.hide_edge(e)
+ g.hide_node(2)
+ self.assertRaises(GraphError, g.restore_edge, e)
+
+ g.restore_all_edges()
+ self.assertRaises(GraphError, g.edge_by_id, e)
+
+ def test_clust_coef(self):
+ g = Graph()
+ g.add_edge(1, 2)
+ g.add_edge(1, 3)
+ g.add_edge(1, 4)
+ self.assertEqual(g.clust_coef(1), 0)
+
+ g.add_edge(2, 5)
+ g.add_edge(3, 5)
+ g.add_edge(4, 5)
+ self.assertEqual(g.clust_coef(1), 0)
+
+ g.add_edge(2, 3)
+ self.assertEqual(g.clust_coef(1), 1./6)
+ g.add_edge(2, 4)
+ self.assertEqual(g.clust_coef(1), 2./6)
+ g.add_edge(4, 2)
+ self.assertEqual(g.clust_coef(1), 3./6)
+
+ g.add_edge(2, 3)
+ g.add_edge(2, 4)
+ g.add_edge(3, 4)
+ g.add_edge(3, 2)
+ g.add_edge(4, 2)
+ g.add_edge(4, 3)
+ self.assertEqual(g.clust_coef(1), 1)
+
+
+ def test_get_hops(self):
+ graph = Graph()
+ graph.add_edge(1, 2)
+ graph.add_edge(1, 3)
+ graph.add_edge(2, 4)
+ graph.add_edge(4, 5)
+ graph.add_edge(5, 7)
+ graph.add_edge(7, 8)
+
+ self.assertEqual(graph.get_hops(1),
+ [(1, 0), (2, 1), (3, 1), (4, 2), (5, 3), (7, 4), (8, 5)])
+
+ self.assertEqual(graph.get_hops(1, 5),
+ [(1, 0), (2, 1), (3, 1), (4, 2), (5, 3)])
+
+ graph.add_edge(5, 1)
+ graph.add_edge(7, 1)
+ graph.add_edge(7, 4)
+
+ self.assertEqual(graph.get_hops(1),
+ [(1, 0), (2, 1), (3, 1), (4, 2), (5, 3), (7, 4), (8, 5)])
+
+ # And the reverse graph
+ graph = Graph()
+ graph.add_edge(2, 1)
+ graph.add_edge(3, 1)
+ graph.add_edge(4, 2)
+ graph.add_edge(5, 4)
+ graph.add_edge(7, 5)
+ graph.add_edge(8, 7)
+
+ self.assertEqual(graph.get_hops(1, forward=False),
+ [(1, 0), (2, 1), (3, 1), (4, 2), (5, 3), (7, 4), (8, 5)])
+
+ self.assertEqual(graph.get_hops(1, 5, forward=False),
+ [(1, 0), (2, 1), (3, 1), (4, 2), (5, 3)])
+
+ graph.add_edge(1, 5)
+ graph.add_edge(1, 7)
+ graph.add_edge(4, 7)
+
+ self.assertEqual(graph.get_hops(1, forward=False),
+ [(1, 0), (2, 1), (3, 1), (4, 2), (5, 3), (7, 4), (8, 5)])
+
+
+ def test_constructor(self):
+ graph = Graph(iter([
+ (1, 2),
+ (2, 3, 'a'),
+ (1, 3),
+ (3, 4),
+ ]))
+ self.assertEqual(graph.number_of_nodes(), 4)
+ self.assertEqual(graph.number_of_edges(), 4)
+ try:
+ graph.edge_by_node(1,2)
+ graph.edge_by_node(2,3)
+ graph.edge_by_node(1,3)
+ graph.edge_by_node(3,4)
+ except GraphError:
+ self.fail("Incorrect graph")
+
+ self.assertEqual(graph.edge_data(graph.edge_by_node(2, 3)), 'a')
+
+ self.assertRaises(GraphError, Graph, [(1,2,3,4)])
+
+if __name__ == "__main__": # pragma: no cover
+ unittest.main()
diff --git a/python/altgraph/altgraph_tests/test_graphstat.py b/python/altgraph/altgraph_tests/test_graphstat.py
new file mode 100644
index 000000000..b628b6f24
--- /dev/null
+++ b/python/altgraph/altgraph_tests/test_graphstat.py
@@ -0,0 +1,70 @@
+import unittest
+
+from altgraph import GraphStat
+from altgraph import Graph
+import sys
+
+class TestDegreesDist (unittest.TestCase):
+
+ def test_simple(self):
+ a = Graph.Graph()
+ self.assertEqual(GraphStat.degree_dist(a), [])
+
+ a.add_node(1)
+ a.add_node(2)
+ a.add_node(3)
+
+ self.assertEqual(GraphStat.degree_dist(a), GraphStat._binning([0, 0, 0]))
+
+ for x in range(100):
+ a.add_node(x)
+
+ for x in range(1, 100):
+ for y in range(1, 50):
+ if x % y == 0:
+ a.add_edge(x, y)
+
+ counts_inc = []
+ counts_out = []
+ for n in a:
+ counts_inc.append(a.inc_degree(n))
+ counts_out.append(a.out_degree(n))
+
+ self.assertEqual(GraphStat.degree_dist(a), GraphStat._binning(counts_out))
+ self.assertEqual(GraphStat.degree_dist(a, mode='inc'), GraphStat._binning(counts_inc))
+
+class TestBinning (unittest.TestCase):
+ def test_simple(self):
+
+ # Binning [0, 100) into 10 bins
+ a = list(range(100))
+ out = GraphStat._binning(a, limits=(0, 100), bin_num=10)
+
+ self.assertEqual(out,
+ [ (x*1.0, 10) for x in range(5, 100, 10) ])
+
+
+ # Check that outliers are ignored.
+ a = list(range(100))
+ out = GraphStat._binning(a, limits=(0, 90), bin_num=9)
+
+ self.assertEqual(out,
+ [ (x*1.0, 10) for x in range(5, 90, 10) ])
+
+
+ out = GraphStat._binning(a, limits=(0, 100), bin_num=15)
+ binSize = 100 / 15.0
+ result = [0]*15
+ for i in range(100):
+ bin = int(i/binSize)
+ try:
+ result[bin] += 1
+ except IndexError:
+ pass
+
+ result = [ (i * binSize + binSize/2, result[i]) for i in range(len(result))]
+
+ self.assertEqual(result, out)
+
+if __name__ == "__main__": # pragma: no cover
+ unittest.main()
diff --git a/python/altgraph/altgraph_tests/test_graphutil.py b/python/altgraph/altgraph_tests/test_graphutil.py
new file mode 100644
index 000000000..c1166237c
--- /dev/null
+++ b/python/altgraph/altgraph_tests/test_graphutil.py
@@ -0,0 +1,140 @@
+import unittest
+from altgraph import GraphUtil
+from altgraph import Graph, GraphError
+
+class TestGraphUtil (unittest.TestCase):
+
+ def test_generate_random(self):
+ g = GraphUtil.generate_random_graph(10, 50)
+ self.assertEqual(g.number_of_nodes(), 10)
+ self.assertEqual(g.number_of_edges(), 50)
+
+ seen = set()
+
+ for e in g.edge_list():
+ h, t = g.edge_by_id(e)
+ self.assertFalse(h == t)
+ self.assertTrue((h, t) not in seen)
+ seen.add((h, t))
+
+ g = GraphUtil.generate_random_graph(5, 30, multi_edges=True)
+ self.assertEqual(g.number_of_nodes(), 5)
+ self.assertEqual(g.number_of_edges(), 30)
+
+ seen = set()
+
+ for e in g.edge_list():
+ h, t = g.edge_by_id(e)
+ self.assertFalse(h == t)
+ if (h, t) in seen:
+ break
+ seen.add((h, t))
+
+ else:
+ self.fail("no duplicates?")
+
+ g = GraphUtil.generate_random_graph(5, 21, self_loops=True)
+ self.assertEqual(g.number_of_nodes(), 5)
+ self.assertEqual(g.number_of_edges(), 21)
+
+ seen = set()
+
+ for e in g.edge_list():
+ h, t = g.edge_by_id(e)
+ self.assertFalse((h, t) in seen)
+ if h == t:
+ break
+ seen.add((h, t))
+
+ else:
+ self.fail("no self loops?")
+
+ self.assertRaises(GraphError, GraphUtil.generate_random_graph, 5, 21)
+ g = GraphUtil.generate_random_graph(5, 21, True)
+ self.assertRaises(GraphError, GraphUtil.generate_random_graph, 5, 26, True)
+
+ def test_generate_scale_free(self):
+ graph = GraphUtil.generate_scale_free_graph(50, 10)
+ self.assertEqual(graph.number_of_nodes(), 500)
+
+ counts = {}
+ for node in graph:
+ degree = graph.inc_degree(node)
+ try:
+ counts[degree] += 1
+ except KeyError:
+ counts[degree] = 1
+
+ total_counts = sum(counts.values())
+ P = {}
+ for degree, count in counts.items():
+ P[degree] = count * 1.0 / total_counts
+
+ # XXX: use algoritm <http://stackoverflow.com/questions/3433486/how-to-do-exponential-and-logarithmic-curve-fitting-in-python-i-found-only-polyn>
+ # to check if P[degree] ~ degree ** G (for some G)
+
+ #print sorted(P.items())
+
+ #print sorted([(count, degree) for degree, count in counts.items()])
+
+ #self.fail("missing tests for GraphUtil.generate_scale_free_graph")
+
+ def test_filter_stack(self):
+ g = Graph.Graph()
+ g.add_node("1", "N.1")
+ g.add_node("1.1", "N.1.1")
+ g.add_node("1.1.1", "N.1.1.1")
+ g.add_node("1.1.2", "N.1.1.2")
+ g.add_node("1.1.3", "N.1.1.3")
+ g.add_node("1.1.1.1", "N.1.1.1.1")
+ g.add_node("1.1.1.2", "N.1.1.1.2")
+ g.add_node("1.1.2.1", "N.1.1.2.1")
+ g.add_node("1.1.2.2", "N.1.1.2.2")
+ g.add_node("1.1.2.3", "N.1.1.2.3")
+ g.add_node("2", "N.2")
+
+ g.add_edge("1", "1.1")
+ g.add_edge("1.1", "1.1.1")
+ g.add_edge("1.1", "1.1.2")
+ g.add_edge("1.1", "1.1.3")
+ g.add_edge("1.1.1", "1.1.1.1")
+ g.add_edge("1.1.1", "1.1.1.2")
+ g.add_edge("1.1.2", "1.1.2.1")
+ g.add_edge("1.1.2", "1.1.2.2")
+ g.add_edge("1.1.2", "1.1.2.3")
+
+ v, r, o = GraphUtil.filter_stack(g, "1", [
+ lambda n: n != "N.1.1.1", lambda n: n != "N.1.1.2.3" ])
+
+ self.assertEqual(v,
+ set(["1", "1.1", "1.1.1", "1.1.2", "1.1.3",
+ "1.1.1.1", "1.1.1.2", "1.1.2.1", "1.1.2.2",
+ "1.1.2.3"]))
+ self.assertEqual(r, set([
+ "1.1.1", "1.1.2.3"]))
+
+ o.sort()
+ self.assertEqual(o,
+ [
+ ("1.1", "1.1.1.1"),
+ ("1.1", "1.1.1.2")
+ ])
+
+ v, r, o = GraphUtil.filter_stack(g, "1", [
+ lambda n: n != "N.1.1.1", lambda n: n != "N.1.1.1.2" ])
+
+ self.assertEqual(v,
+ set(["1", "1.1", "1.1.1", "1.1.2", "1.1.3",
+ "1.1.1.1", "1.1.1.2", "1.1.2.1", "1.1.2.2",
+ "1.1.2.3"]))
+ self.assertEqual(r, set([
+ "1.1.1", "1.1.1.2"]))
+
+ self.assertEqual(o,
+ [
+ ("1.1", "1.1.1.1"),
+ ])
+
+
+if __name__ == "__main__": # pragma: no cover
+ unittest.main()
diff --git a/python/altgraph/altgraph_tests/test_object_graph.py b/python/altgraph/altgraph_tests/test_object_graph.py
new file mode 100644
index 000000000..9035607e7
--- /dev/null
+++ b/python/altgraph/altgraph_tests/test_object_graph.py
@@ -0,0 +1,349 @@
+import unittest
+import sys
+from altgraph.ObjectGraph import ObjectGraph
+from altgraph.Graph import Graph
+
+try:
+ from StringIO import StringIO
+except ImportError:
+ from io import StringIO
+
+
+class Node (object):
+ def __init__(self, graphident):
+ self.graphident = graphident
+
+class SubNode (Node):
+ pass
+
+class ArgNode (object):
+ def __init__(self, graphident, *args, **kwds):
+ self.graphident = graphident
+ self.args = args
+ self.kwds = kwds
+
+ def __repr__(self):
+ return '<ArgNode %s>'%(self.graphident,)
+
+class TestObjectGraph (unittest.TestCase):
+
+ def test_constructor(self):
+ graph = ObjectGraph()
+ self.assertTrue(isinstance(graph, ObjectGraph))
+
+ g = Graph()
+ graph = ObjectGraph(g)
+ self.assertTrue(graph.graph is g)
+ self.assertEqual(graph.debug, 0)
+ self.assertEqual(graph.indent, 0)
+
+ graph = ObjectGraph(debug=5)
+ self.assertEqual(graph.debug, 5)
+
+ def test_repr(self):
+ graph = ObjectGraph()
+ self.assertEqual(repr(graph), '<ObjectGraph>')
+
+
+ def testNodes(self):
+ graph = ObjectGraph()
+ n1 = Node("n1")
+ n2 = Node("n2")
+ n3 = Node("n3")
+ n4 = Node("n4")
+
+ n1b = Node("n1")
+
+ self.assertTrue(graph.getIdent(graph) is graph)
+ self.assertTrue(graph.getRawIdent(graph) is graph)
+
+ graph.addNode(n1)
+ graph.addNode(n2)
+ graph.addNode(n3)
+
+ self.assertTrue(n1 in graph)
+ self.assertFalse(n4 in graph)
+ self.assertTrue("n1" in graph)
+ self.assertFalse("n4" in graph)
+
+ self.assertTrue(graph.findNode(n1) is n1)
+ self.assertTrue(graph.findNode(n1b) is n1)
+ self.assertTrue(graph.findNode(n2) is n2)
+ self.assertTrue(graph.findNode(n4) is None)
+ self.assertTrue(graph.findNode("n1") is n1)
+ self.assertTrue(graph.findNode("n2") is n2)
+ self.assertTrue(graph.findNode("n4") is None)
+
+ self.assertEqual(graph.getRawIdent(n1), "n1")
+ self.assertEqual(graph.getRawIdent(n1b), "n1")
+ self.assertEqual(graph.getRawIdent(n4), "n4")
+ self.assertEqual(graph.getRawIdent("n1"), None)
+
+ self.assertEqual(graph.getIdent(n1), "n1")
+ self.assertEqual(graph.getIdent(n1b), "n1")
+ self.assertEqual(graph.getIdent(n4), "n4")
+ self.assertEqual(graph.getIdent("n1"), "n1")
+
+ self.assertTrue(n3 in graph)
+ graph.removeNode(n3)
+ self.assertTrue(n3 not in graph)
+ graph.addNode(n3)
+ self.assertTrue(n3 in graph)
+
+ n = graph.createNode(SubNode, "n1")
+ self.assertTrue(n is n1)
+
+ n = graph.createNode(SubNode, "n8")
+ self.assertTrue(isinstance(n, SubNode))
+ self.assertTrue(n in graph)
+ self.assertTrue(graph.findNode("n8") is n)
+
+ n = graph.createNode(ArgNode, "args", 1, 2, 3, a='a', b='b')
+ self.assertTrue(isinstance(n, ArgNode))
+ self.assertTrue(n in graph)
+ self.assertTrue(graph.findNode("args") is n)
+ self.assertEqual(n.args, (1, 2, 3))
+ self.assertEqual(n.kwds, {'a':'a', 'b':'b'})
+
+ def testEdges(self):
+ graph = ObjectGraph()
+ n1 = graph.createNode(ArgNode, "n1", 1)
+ n2 = graph.createNode(ArgNode, "n2", 1)
+ n3 = graph.createNode(ArgNode, "n3", 1)
+ n4 = graph.createNode(ArgNode, "n4", 1)
+
+ graph.createReference(n1, n2, "n1-n2")
+ graph.createReference("n1", "n3", "n1-n3")
+ graph.createReference("n2", n3)
+
+ g = graph.graph
+ e = g.edge_by_node("n1", "n2")
+ self.assertTrue(e is not None)
+ self.assertEqual(g.edge_data(e), "n1-n2")
+
+ e = g.edge_by_node("n1", "n3")
+ self.assertTrue(e is not None)
+ self.assertEqual(g.edge_data(e), "n1-n3")
+
+ e = g.edge_by_node("n2", "n3")
+ self.assertTrue(e is not None)
+ self.assertEqual(g.edge_data(e), None)
+
+ e = g.edge_by_node("n1", "n4")
+ self.assertTrue(e is None)
+
+ graph.removeReference(n1, n2)
+ e = g.edge_by_node("n1", "n2")
+ self.assertTrue(e is None)
+
+ graph.removeReference("n1", "n3")
+ e = g.edge_by_node("n1", "n3")
+ self.assertTrue(e is None)
+
+ graph.createReference(n1, n2, "foo")
+ e = g.edge_by_node("n1", "n2")
+ self.assertTrue(e is not None)
+ self.assertEqual(g.edge_data(e), "foo")
+
+
+ def test_flatten(self):
+ graph = ObjectGraph()
+ n1 = graph.createNode(ArgNode, "n1", 1)
+ n2 = graph.createNode(ArgNode, "n2", 2)
+ n3 = graph.createNode(ArgNode, "n3", 3)
+ n4 = graph.createNode(ArgNode, "n4", 4)
+ n5 = graph.createNode(ArgNode, "n5", 5)
+ n6 = graph.createNode(ArgNode, "n6", 6)
+ n7 = graph.createNode(ArgNode, "n7", 7)
+ n8 = graph.createNode(ArgNode, "n8", 8)
+
+ graph.createReference(graph, n1)
+ graph.createReference(graph, n7)
+ graph.createReference(n1, n2)
+ graph.createReference(n1, n4)
+ graph.createReference(n2, n3)
+ graph.createReference(n2, n5)
+ graph.createReference(n5, n6)
+ graph.createReference(n4, n6)
+ graph.createReference(n4, n2)
+
+ self.assertFalse(isinstance(graph.flatten(), list))
+
+ fl = list(graph.flatten())
+ self.assertTrue(n1 in fl)
+ self.assertTrue(n2 in fl)
+ self.assertTrue(n3 in fl)
+ self.assertTrue(n4 in fl)
+ self.assertTrue(n5 in fl)
+ self.assertTrue(n6 in fl)
+ self.assertTrue(n7 in fl)
+ self.assertFalse(n8 in fl)
+
+ fl = list(graph.flatten(start=n2))
+ self.assertFalse(n1 in fl)
+ self.assertTrue(n2 in fl)
+ self.assertTrue(n3 in fl)
+ self.assertFalse(n4 in fl)
+ self.assertTrue(n5 in fl)
+ self.assertTrue(n6 in fl)
+ self.assertFalse(n7 in fl)
+ self.assertFalse(n8 in fl)
+
+ graph.createReference(n1, n5)
+ fl = list(graph.flatten(lambda n: n.args[0] % 2 != 0))
+ self.assertTrue(n1 in fl)
+ self.assertFalse(n2 in fl)
+ self.assertFalse(n3 in fl)
+ self.assertFalse(n4 in fl)
+ self.assertTrue(n5 in fl)
+ self.assertFalse(n6 in fl)
+ self.assertTrue(n7 in fl)
+ self.assertFalse(n8 in fl)
+
+ def test_iter_nodes(self):
+ graph = ObjectGraph()
+ n1 = graph.createNode(ArgNode, "n1", 1)
+ n2 = graph.createNode(ArgNode, "n2", 2)
+ n3 = graph.createNode(ArgNode, "n3", 3)
+ n4 = graph.createNode(ArgNode, "n4", 4)
+ n5 = graph.createNode(ArgNode, "n5", 5)
+ n6 = graph.createNode(ArgNode, "n6", 5)
+
+ nodes = graph.nodes()
+ if sys.version[0] == '2':
+ self.assertTrue(hasattr(nodes, 'next'))
+ else:
+ self.assertTrue(hasattr(nodes, '__next__'))
+ self.assertTrue(hasattr(nodes, '__iter__'))
+
+ nodes = list(nodes)
+ self.assertEqual(len(nodes), 6)
+ self.assertTrue(n1 in nodes)
+ self.assertTrue(n2 in nodes)
+ self.assertTrue(n3 in nodes)
+ self.assertTrue(n4 in nodes)
+ self.assertTrue(n5 in nodes)
+ self.assertTrue(n6 in nodes)
+
+ def test_get_edges(self):
+ graph = ObjectGraph()
+ n1 = graph.createNode(ArgNode, "n1", 1)
+ n2 = graph.createNode(ArgNode, "n2", 2)
+ n3 = graph.createNode(ArgNode, "n3", 3)
+ n4 = graph.createNode(ArgNode, "n4", 4)
+ n5 = graph.createNode(ArgNode, "n5", 5)
+ n6 = graph.createNode(ArgNode, "n6", 5)
+
+ graph.createReference(n1, n2)
+ graph.createReference(n1, n3)
+ graph.createReference(n3, n1)
+ graph.createReference(n5, n1)
+ graph.createReference(n2, n4)
+ graph.createReference(n2, n5)
+ graph.createReference(n6, n2)
+
+ outs, ins = graph.get_edges(n1)
+
+ self.assertFalse(isinstance(outs, list))
+ self.assertFalse(isinstance(ins, list))
+
+ ins = list(ins)
+ outs = list(outs)
+
+
+ self.assertTrue(n1 not in outs)
+ self.assertTrue(n2 in outs)
+ self.assertTrue(n3 in outs)
+ self.assertTrue(n4 not in outs)
+ self.assertTrue(n5 not in outs)
+ self.assertTrue(n6 not in outs)
+
+ self.assertTrue(n1 not in ins)
+ self.assertTrue(n2 not in ins)
+ self.assertTrue(n3 in ins)
+ self.assertTrue(n4 not in ins)
+ self.assertTrue(n5 in ins)
+ self.assertTrue(n6 not in ins)
+
+ def test_filterStack(self):
+ graph = ObjectGraph()
+ n1 = graph.createNode(ArgNode, "n1", 0)
+ n11 = graph.createNode(ArgNode, "n1.1", 1)
+ n12 = graph.createNode(ArgNode, "n1.2", 0)
+ n111 = graph.createNode(ArgNode, "n1.1.1", 0)
+ n112 = graph.createNode(ArgNode, "n1.1.2",2)
+ n2 = graph.createNode(ArgNode, "n2", 0)
+ n3 = graph.createNode(ArgNode, "n2", 0)
+
+ graph.createReference(None, n1)
+ graph.createReference(None, n2)
+ graph.createReference(n1, n11)
+ graph.createReference(n1, n12)
+ graph.createReference(n11, n111)
+ graph.createReference(n11, n112)
+
+ self.assertTrue(n1 in graph)
+ self.assertTrue(n2 in graph)
+ self.assertTrue(n11 in graph)
+ self.assertTrue(n12 in graph)
+ self.assertTrue(n111 in graph)
+ self.assertTrue(n112 in graph)
+ self.assertTrue(n2 in graph)
+ self.assertTrue(n3 in graph)
+
+ visited, removes, orphans = graph.filterStack(
+ [lambda n: n.args[0] != 1, lambda n: n.args[0] != 2])
+
+ self.assertEqual(visited, 6)
+ self.assertEqual(removes, 2)
+ self.assertEqual(orphans, 1)
+
+ e = graph.graph.edge_by_node(n1.graphident, n111.graphident)
+ self.assertEqual(graph.graph.edge_data(e), "orphan")
+
+ self.assertTrue(n1 in graph)
+ self.assertTrue(n2 in graph)
+ self.assertTrue(n11 not in graph)
+ self.assertTrue(n12 in graph)
+ self.assertTrue(n111 in graph)
+ self.assertTrue(n112 not in graph)
+ self.assertTrue(n2 in graph)
+ self.assertTrue(n3 in graph)
+
+
+class TestObjectGraphIO (unittest.TestCase):
+ def setUp(self):
+ self._stdout = sys.stdout
+
+ def tearDown(self):
+ sys.stdout = self._stdout
+
+ def test_msg(self):
+ graph = ObjectGraph()
+
+ sys.stdout = fp = StringIO()
+ graph.msg(0, "foo")
+ self.assertEqual(fp.getvalue(), "foo \n")
+
+ sys.stdout = fp = StringIO()
+ graph.msg(5, "foo")
+ self.assertEqual(fp.getvalue(), "")
+
+ sys.stdout = fp = StringIO()
+ graph.debug = 10
+ graph.msg(5, "foo")
+ self.assertEqual(fp.getvalue(), "foo \n")
+
+ sys.stdout = fp = StringIO()
+ graph.msg(0, "foo", 1, "a")
+ self.assertEqual(fp.getvalue(), "foo 1 'a'\n")
+
+ sys.stdout = fp = StringIO()
+ graph.msgin(0, "hello", "world")
+ graph.msg(0, "test me")
+ graph.msgout(0, "bye bye")
+ self.assertEqual(fp.getvalue(), "hello 'world'\n test me \nbye bye \n")
+
+
+if __name__ == "__main__": # pragma: no cover
+ unittest.main()