1from itertools import product
2
3import pytest
4
5import networkx as nx
6from networkx.algorithms.community import lukes_partitioning
7
8EWL = "e_weight"
9NWL = "n_weight"
10
11
12# first test from the Lukes original paper
13def paper_1_case(float_edge_wt=False, explicit_node_wt=True, directed=False):
14
15    # problem-specific constants
16    limit = 3
17
18    # configuration
19    if float_edge_wt:
20        shift = 0.001
21    else:
22        shift = 0
23
24    if directed:
25        example_1 = nx.DiGraph()
26    else:
27        example_1 = nx.Graph()
28
29    # graph creation
30    example_1.add_edge(1, 2, **{EWL: 3 + shift})
31    example_1.add_edge(1, 4, **{EWL: 2 + shift})
32    example_1.add_edge(2, 3, **{EWL: 4 + shift})
33    example_1.add_edge(2, 5, **{EWL: 6 + shift})
34
35    # node weights
36    if explicit_node_wt:
37        nx.set_node_attributes(example_1, 1, NWL)
38        wtu = NWL
39    else:
40        wtu = None
41
42    # partitioning
43    clusters_1 = {
44        frozenset(x)
45        for x in lukes_partitioning(example_1, limit, node_weight=wtu, edge_weight=EWL)
46    }
47
48    return clusters_1
49
50
51# second test from the Lukes original paper
52def paper_2_case(explicit_edge_wt=True, directed=False):
53
54    # problem specific constants
55    byte_block_size = 32
56
57    # configuration
58    if directed:
59        example_2 = nx.DiGraph()
60    else:
61        example_2 = nx.Graph()
62
63    if explicit_edge_wt:
64        edic = {EWL: 1}
65        wtu = EWL
66    else:
67        edic = {}
68        wtu = None
69
70    # graph creation
71    example_2.add_edge("name", "home_address", **edic)
72    example_2.add_edge("name", "education", **edic)
73    example_2.add_edge("education", "bs", **edic)
74    example_2.add_edge("education", "ms", **edic)
75    example_2.add_edge("education", "phd", **edic)
76    example_2.add_edge("name", "telephone", **edic)
77    example_2.add_edge("telephone", "home", **edic)
78    example_2.add_edge("telephone", "office", **edic)
79    example_2.add_edge("office", "no1", **edic)
80    example_2.add_edge("office", "no2", **edic)
81
82    example_2.nodes["name"][NWL] = 20
83    example_2.nodes["education"][NWL] = 10
84    example_2.nodes["bs"][NWL] = 1
85    example_2.nodes["ms"][NWL] = 1
86    example_2.nodes["phd"][NWL] = 1
87    example_2.nodes["home_address"][NWL] = 8
88    example_2.nodes["telephone"][NWL] = 8
89    example_2.nodes["home"][NWL] = 8
90    example_2.nodes["office"][NWL] = 4
91    example_2.nodes["no1"][NWL] = 1
92    example_2.nodes["no2"][NWL] = 1
93
94    # partitioning
95    clusters_2 = {
96        frozenset(x)
97        for x in lukes_partitioning(
98            example_2, byte_block_size, node_weight=NWL, edge_weight=wtu
99        )
100    }
101
102    return clusters_2
103
104
105def test_paper_1_case():
106    ground_truth = {frozenset([1, 4]), frozenset([2, 3, 5])}
107
108    tf = (True, False)
109    for flt, nwt, drc in product(tf, tf, tf):
110        part = paper_1_case(flt, nwt, drc)
111        assert part == ground_truth
112
113
114def test_paper_2_case():
115    ground_truth = {
116        frozenset(["education", "bs", "ms", "phd"]),
117        frozenset(["name", "home_address"]),
118        frozenset(["telephone", "home", "office", "no1", "no2"]),
119    }
120
121    tf = (True, False)
122    for ewt, drc in product(tf, tf):
123        part = paper_2_case(ewt, drc)
124        assert part == ground_truth
125
126
127def test_mandatory_tree():
128    not_a_tree = nx.complete_graph(4)
129
130    with pytest.raises(nx.NotATree):
131        lukes_partitioning(not_a_tree, 5)
132
133
134def test_mandatory_integrality():
135
136    byte_block_size = 32
137
138    ex_1_broken = nx.DiGraph()
139
140    ex_1_broken.add_edge(1, 2, **{EWL: 3.2})
141    ex_1_broken.add_edge(1, 4, **{EWL: 2.4})
142    ex_1_broken.add_edge(2, 3, **{EWL: 4.0})
143    ex_1_broken.add_edge(2, 5, **{EWL: 6.3})
144
145    ex_1_broken.nodes[1][NWL] = 1.2  # !
146    ex_1_broken.nodes[2][NWL] = 1
147    ex_1_broken.nodes[3][NWL] = 1
148    ex_1_broken.nodes[4][NWL] = 1
149    ex_1_broken.nodes[5][NWL] = 2
150
151    with pytest.raises(TypeError):
152        lukes_partitioning(
153            ex_1_broken, byte_block_size, node_weight=NWL, edge_weight=EWL
154        )
155