-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathtracegraph.py
562 lines (487 loc) · 23.8 KB
/
tracegraph.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
import networkx as nx
from itertools import count, chain
from collections import defaultdict
import time
import json
import logging
import timetools as tt
_attrs = dict(id='id', source='source', target='target', key='key', name='name', src_name='src_name', tgt_name='tgt_name')
SURE, LIKELY, NEG = 2, 1, 0
def path_to_graph(paths, probe, g):
"""give a series of paths attached to a given probe, add them to graph g
Args:
paths (list of list of hops): it contains a list of path, which is a list of hops from source to dest
probe: (int or string): the name of the probe from which the above path measurements are performed
g: (nx.Graph): the graph object to which new nodes and edges are added
"""
for p in paths:
for e in zip(p[:-1], p[1:]):
if e not in g.edges():
g.add_edge(e[0], e[1], probe=set([]))
g[e[0]][e[1]]['probe'].add(probe)
def node_link_data_modify(G, attrs=_attrs):
"""dumps a nx.Graph to json compatible with visualization with d3
It is a modified version of node_link_data() function provided in nx library.
It fixes a potential issue where the the ids of nodes is different from thoses used in edges, which leads to error
when visulaizing the graph.
Args:
G (nx.Graph or nx.MultiGraph): the graph to be dumpped
attrs (dict): attributes need when dumpping the graph
Returns:
data (dict)
"""
multigraph = G.is_multigraph()
id_ = attrs['id']
name = attrs['name']
source = attrs['source']
target = attrs['target']
src_name = attrs['src_name']
tgt_name = attrs['tgt_name']
# Allow 'key' to be omitted from attrs if the graph is not a multigraph.
key = None if not multigraph else attrs['key']
if len(set([source, target, key])) < 3:
raise nx.NetworkXError('Attribute names are not unique.')
mapping = dict(zip(G, count()))
data = {}
data['directed'] = G.is_directed()
data['multigraph'] = multigraph
data['graph'] = G.graph
data['nodes'] = [dict(chain(G.node[n].items(), [(id_, mapping[n]), (name, n)])) for n in G]
# in the original version the over line goes (id_, n), can causes the id to be different from that of edges
if multigraph:
data['links'] = [
dict(chain(d.items(),
[(source, mapping[u]), (target, mapping[v]), (key, k)]))
for u, v, k, d in G.edges_iter(keys=True, data=True)]
else:
data['links'] = [
dict(chain(d.items(),
[(source, mapping[u]), (src_name, u), (target, mapping[v]), (tgt_name, v)]))
for u, v, d in G.edges_iter(data=True)]
return data
def compose_modify(G, H):
"""combine two given graphs that might have overlapped edges and nodes.
It is a modified version of compose() function in nx lib.
Args:
G (nx.Graph): multigraph class is current not supported
H (nx.Graph): multigraph class is current not supported
Returns:
R (nx.Graph): the combined graph
"""
if not G.is_multigraph() == H.is_multigraph() == False:
raise nx.NetworkXError('Doesn\'t handle multi-graph.')
R = nx.Graph()
for n, d in chain(G.nodes_iter(data=True), H.nodes_iter(data=True)):
# more complex logic merging attributes of node can be added here.
if n in R.nodes_iter():
dd = dict()
d1 = R.node[n]
dd['tag'] = d1['tag'] | d['tag']
if 'hosting' in d or 'hosting' in d1:
dd['hosting'] = set()
for di in [d, d1]:
dd['hosting'].update(di.get('hosting', iter([])))
else:
dd = d
R.add_node(n, dd)
for src, tgt, d in chain(H.edges_iter(data=True), G.edges_iter(data=True)):
# if the edge is already present in the graph, for each of its attribute (key), extend the list
if (src, tgt) in R.edges_iter():
dd = defaultdict(set)
d1 = R[src][tgt]
for di in (d1, d):
for k, v in di.iteritems():
dd[k].update(v)
else:
dd = d
R.add_edge(src, tgt, dd)
return R
def graph_update(original, delta):
"""update the originial graph with the delta graph
Args:
original (nx.Graph)
delta (nx.Graph)
Note:
modification is made to original graph
"""
if not original.is_multigraph() == delta.is_multigraph() == False:
raise nx.NetworkXError('Doesn\'t handle multi-graph.')
for n, d in delta.nodes_iter(data=True):
if n in original.nodes_iter():
# there should be always a tag for each node
original.node[n]['tag'].update(d['tag'])
else:
original.add_node(n, d)
for src, tgt, d in delta.edges_iter(data=True):
if (src, tgt) in original.edges_iter():
for k, v in d.iteritems():
original[src][tgt][k].update(v)
else:
original.add_edge(src, tgt, d)
def graph_union(graphs):
"""combine a list of graphs in a much more efficient way
Args:
graphs (list of nx.Graph)
Returns:
C (nx.graph): combined graph
"""
comb = nx.Graph()
for g in iter(graphs):
graph_update(comb, g)
return comb
def compose_all_modify(graphs):
"""combine a list of graphs
Args:
graphs (list of nx.Graph)
Returns:
C (nx.graph): combined graph
"""
graphs = iter(graphs)
C = next(graphs)
for H in graphs:
C = compose_modify(C, H)
return C
def change_binsum(fn, method, g, pb2links, pb2nodes, bin_size, begin, stop):
"""calculate binned sum of RTT changes for each link and node in a given topo
Args:
fn (string): path to the RTT file
method (string): field in the file to be extracted as the result of change detection
g (nx.Graph): network topology learnt from traceroute; link is annotated with probes traverse it
pb2links (dict): {probe id : [link in g (n1, n2),...]}
pb2nodes (dict): {probe id: [nodes in g...]}
bin_size (int): the size of bin in seconds
begin (int): sec since epoch from which records in fn is considered
stop (int): sec since epoch till which records in fn is considered
Notes:
no return will be provided. update is directly applied to g.
g has to be initialized for each of its link and node a dictionary "score", default to int type.
"""
t1 = time.time()
try:
with open(fn, 'r') as fp:
data = json.load(fp)
except IOError as e:
logging.critical(e)
return
if 'data' in locals() and data:
for pb in data:
pb_rec = data[pb]
if pb_rec:
for t, v in zip(pb_rec.get("epoch", []), pb_rec.get(method, [])):
if begin <= t <= stop:
t = (t // bin_size) * bin_size
for l in pb2links.get(pb, []):
g[l[0]][l[1]]['score'][t] += v
for n in pb2nodes.get(pb, []):
g.node[n]['score'][t] += v
t2 = time.time()
logging.debug("%s handled in %.2f sec" % (fn, t2 - t1))
def change_inference_node(g, node_threshold, bin_size, begin, stop):
"""perform node change location inference
Args:
g (nx.Graph): network topology learnt from traceroute; link is annotated with probes traverse it
node_threshold (float): parameter for node inference; minimum portion of trace traversing that node experience change
bin_size (int): the size of bin in seconds
begin (int): sec since epoch from which records in fn is considered
stop (int): sec since epoch till which records in fn is considered
Notes:
no return will be provided. update is directly applied to g.
g has to be initialized for each of its node a dictionary "inference", default to int type.
2 (SURE) for inferred (pretty sure) as cause
1 (LIKELY) for susceptible (not so sure) as cause
"""
t1 = time.time()
for t in range((begin // bin_size) * bin_size, ((stop // bin_size) + 1) * bin_size, bin_size):
for n in g.nodes_iter():
if len(g.node[n]['probe']) > 1 and g.node[n]['score'][t] > node_threshold:
g.node[n]['inference'][t] = SURE
t2 = time.time()
logging.debug("Node congestion inference in %.2f sec" % (t2 - t1))
def change_inference_link(graph, link_threshold, bin_size, begin, stop):
"""perform link change location inference
Args:
graph (nx.Graph): network topology learnt from traceroute; link is annotated with probes traverse it
link_threshold (float): parameter for link inference; minimum portion of trace on that link experience change
bin_size (int): the size of bin in seconds
begin (int): sec since epoch from which records in fn is considered
stop (int): sec since epoch till which records in fn is considered
Notes:
no return will be provided. update is directly applied to g.
g has to be initialized for each of its link a dictionary "inference", default to int type.
2 (SURE) for inferred (pretty sure) as cause
1 (LIKELY) for susceptible (not so sure) as cause
"""
call_depth = []
def helper(g, l, t, from_link=None):
""" the actual inference in done here
Args:
g (nx.Graph): the graph operated on
l (tuple of nodes): the link currently being investigated
t (int): the time stamp (second since epoch) of inference
from_link (tuple of nodes): the function can be called recursively, from_link indicates the outerlayer link
Returns:
SURE, LIKELY, NEG
"""
call_depth.append(0)
if len(call_depth) > 2:
logging.info("%d level deep Call at %s: %r" % (len(call_depth), tt.epoch_to_string(t), l))
# skip if already inferred
if t in g[l[0]][l[1]]['inference']:
call_depth.pop()
return g[l[0]][l[1]]['inference'][t]
# a safe check; the a link doesn't even meet the threshold, it can not be the cause
if g[l[0]][l[1]]["score"][t] <= link_threshold:
call_depth.pop()
return NEG
# if connecting nodes are the cause, then link can not be the cause according to single cause assumption
l0_res = g.node[l[0]]["inference"].get(t, NEG)
l1_res = g.node[l[1]]["inference"].get(t, NEG)
caused_by_node = bool(l0_res == SURE or l1_res == SURE)
if caused_by_node:
g[l[0]][l[1]]['inference'][t] = NEG
call_depth.pop()
return NEG
# verifies if the link itself is the cause
branches = find_branches(g, l[0], l[1])
ext = {k: [i for i in v if i[-1] > 0] for k, v in branches.items()}
ext_con_count_abs = {
k: sum([1 if g[i[0]][k]['score'][t] > link_threshold else 0 for i in v]) for
k, v in ext.items()}
ext_con_count_prop = {
k: sum([1 if g[i[0]][k]['score'][t] > float(i[2]) / i[1] * link_threshold else 0 for i in v])
for
k, v in ext.items()}
# 1/ l has multiple extension branches at both sides and multiple branches undergo same change
# NOTE: the extension branches can contain probes not in the current link, thus proportional threshold
if ext_con_count_prop[l[0]] > 1 and ext_con_count_prop[l[1]] > 1:
# verify if the extension branches are ALL LB branches; if the cause return LIKELY
pb_hash = defaultdict(set)
for n in l:
for ext_n, a, b in ext[n]:
if g[n][ext_n]['score'][t] > float(b)/a * link_threshold:
pb_hash[n].add(hash(frozenset(set(g[n][ext_n]["probe"]) & set(g[l[0]][l[1]]["probe"]))))
if all([len(i[1]) > 1 for i in pb_hash.items()]):
g[l[0]][l[1]]['inference'][t] = SURE
else:
g[l[0]][l[1]]['inference'][t] = LIKELY
# 2/ one extension branches one side multiple the other side; the other side has multiple branch undergo same change
elif len(ext[l[0]]) == 1 and ext_con_count_prop[l[1]] > 1:
if ext_con_count_abs[l[0]] < 1: # the single extension branch not being the cause
# again verify for LB
pb_hash = set()
for ext_n, a, b in ext[l[1]]:
if g[l[1]][ext_n]['score'][t] > float(b)/a * link_threshold:
pb_hash.add(hash(frozenset(set(g[l[1]][ext_n]["probe"]) & set(g[l[0]][l[1]]["probe"]))))
if len(pb_hash) > 1:
g[l[0]][l[1]]['inference'][t] = SURE
else:
g[l[0]][l[1]]['inference'][t] = LIKELY
# the result depends on the result of single extension branch
else:
trunk = (l[0], ext[l[0]][0][0]) # the only extension branch on l[0]
# the single extension branch depends as well on current link
if from_link and (trunk == from_link or trunk == from_link[::-1]):
logging.debug("Dependence loop: %s, %r <-> %r" % (tt.epoch_to_string(t), l, trunk))
g[l[0]][l[1]]['inference'][t] = LIKELY
else:
logging.debug("Dependence chain: %s, %r -> %r" % (tt.epoch_to_string(t), l, trunk))
trunk_res = helper(g, trunk, t, l)
# if the trunk_res == neg
# 1/ possible that l cause the change
# 2/ possible that upstream of trunk causes a change,
# and that change could be irrelevant to change on l
# therefore all other cases are LIKELY
if trunk_res == SURE:
g[l[0]][l[1]]['inference'][t] = NEG
else:
g[l[0]][l[1]]['inference'][t] = LIKELY
elif len(ext[l[1]]) == 1 and ext_con_count_prop[l[0]] > 1:
if ext_con_count_abs[l[1]] < 1: # the extension branch not being the cause
pb_hash = set()
for ext_n, a, b in ext[l[0]]:
if g[l[0]][ext_n]['score'][t] > float(b) / a * link_threshold:
pb_hash.add(hash(frozenset(set(g[l[0]][ext_n]["probe"]) & set(g[l[0]][l[1]]["probe"]))))
if len(pb_hash) > 1:
g[l[0]][l[1]]['inference'][t] = SURE
else:
g[l[0]][l[1]]['inference'][t] = LIKELY
else:
trunk = (l[1], ext[l[1]][0][0]) # the only extension branch on l[0]
if from_link and (trunk == from_link or trunk == from_link[::-1]):
logging.debug("Dependence loop: %s, %r <-> %r" % (tt.epoch_to_string(t), l, trunk))
g[l[0]][l[1]]['inference'][t] = LIKELY
else:
logging.debug("Dependence chain: %s, %r -> %r" % (tt.epoch_to_string(t), l, trunk))
trunk_res = helper(g, trunk, t, l)
if trunk_res == SURE:
g[l[0]][l[1]]['inference'][t] = NEG
else:
g[l[0]][l[1]]['inference'][t] = LIKELY
# 3/ both sides have only only one extension branch
elif len(ext[l[0]]) == 1 and len(ext[l[1]]) == 1:
# if non of the two extension branches could be the cause, the current one must be
if ext_con_count_abs[l[0]] < 1 and ext_con_count_abs[l[1]] < 1:
g[l[0]][l[1]]['inference'][t] = SURE
else:
# otherwise, the res of current branch depends on the res of the two extension branches
trunk_l0 = (l[0], ext[l[0]][0][0])
trunk_l1 = (l[1], ext[l[1]][0][0])
# if ext branch attached to l[0] depend on current link,
# then the res of current link depend on the ext branch attached to l[1]
if from_link and (trunk_l0 == from_link or trunk_l0 == from_link[::-1]):
logging.debug("Dependence loop: %s, %r <-> %r" % (tt.epoch_to_string(t), l, trunk_l0))
# it now depends on the result of trunk_l1 which must be different from l
trunk_l1_res = helper(g, trunk_l1, t, l)
if trunk_l1_res == SURE:
g[l[0]][l[1]]['inference'][t] = NEG
else:
g[l[0]][l[1]]['inference'][t] = LIKELY
elif from_link and (trunk_l1 == from_link or trunk_l1 == from_link[::-1]):
logging.debug("Dependence loop: %s, %r <-> %r" % (tt.epoch_to_string(t), l, trunk_l1))
# it now depends on the result of trunk_l0 which must be different from l
trunk_l0_res = helper(g, trunk_l0, t, l)
if trunk_l0_res == SURE:
g[l[0]][l[1]]['inference'][t] = NEG
else:
g[l[0]][l[1]]['inference'][t] = LIKELY
else:
logging.debug("Dependence chain: %s, %r -> (%r, %r) \n%r\n%r\n%r" %
(tt.epoch_to_string(t), l, trunk_l0, trunk_l1,
g[l[0]][l[1]]['probe'],
g[trunk_l0[0]][trunk_l0[1]]['probe'], g[trunk_l1[0]][trunk_l1[1]]['probe']))
trunk_l0_res = helper(g, trunk_l0, t, l)
trunk_l1_res = helper(g, trunk_l1, t, l)
if trunk_l1_res == SURE or trunk_l0_res == SURE:
g[l[0]][l[1]]['inference'][t] = NEG
elif trunk_l0_res == LIKELY or trunk_l1_res == LIKELY:
g[l[0]][l[1]]['inference'][t] = LIKELY
else:
g[l[0]][l[1]]['inference'][t] = SURE
# 5/ both side has no extension branch, i.e standalone link
elif len(ext[l[1]]) == 0 and len(ext[l[0]]) == 0:
g[l[0]][l[1]]['inference'][t] = SURE
# 4/ one side has no extension branch
elif len(ext[l[0]]) == 0:
if ext_con_count_prop[l[1]] > 1:
pb_hash = set()
for ext_n, a, b in ext[l[1]]:
if g[l[1]][ext_n]['score'][t] > float(b) / a * link_threshold:
pb_hash.add(hash(frozenset(set(g[l[1]][ext_n]["probe"]) & set(g[l[0]][l[1]]["probe"]))))
if len(pb_hash) > 1:
g[l[0]][l[1]]['inference'][t] = SURE
else:
g[l[0]][l[1]]['inference'][t] = LIKELY
elif len(ext[l[1]]) == 1:
if ext_con_count_abs[l[1]] < 1:
g[l[0]][l[1]]['inference'][t] = SURE
else:
trunk = (l[1], ext[l[1]][0][0]) # the only extension branch on l[1]
if from_link and (trunk == from_link or trunk == from_link[::-1]):
logging.debug("Dependence loop: %s, %r <-> %r" % (tt.epoch_to_string(t), l, trunk))
g[l[0]][l[1]]['inference'][t] = LIKELY
else:
logging.debug("Dependence chain: %s, %r -> %r" % (tt.epoch_to_string(t), l, trunk))
trunk_res = helper(g, trunk, t, l)
if trunk_res == SURE:
g[l[0]][l[1]]['inference'][t] = NEG
else:
g[l[0]][l[1]]['inference'][t] = LIKELY
else:
g[l[0]][l[1]]['inference'][t] = NEG
elif len(ext[l[1]]) == 0:
if ext_con_count_prop[l[0]] > 1:
pb_hash = set()
for ext_n, a, b in ext[l[0]]:
if g[l[0]][ext_n]['score'][t] > float(b) / a * link_threshold:
pb_hash.add(hash(frozenset(set(g[l[0]][ext_n]["probe"]) & set(g[l[0]][l[1]]["probe"]))))
if len(pb_hash) > 1:
g[l[0]][l[1]]['inference'][t] = SURE
else:
g[l[0]][l[1]]['inference'][t] = LIKELY
elif len(ext[l[0]]) == 1:
if ext_con_count_abs[l[0]] < 1:
g[l[0]][l[1]]['inference'][t] = SURE
else:
trunk = (l[0], ext[l[0]][0][0]) # the only extension branch on l[1]
if from_link and (trunk == from_link or trunk == from_link[::-1]):
logging.debug("Dependence loop: %s, %r <-> %r" % (tt.epoch_to_string(t), l, trunk))
g[l[0]][l[1]]['inference'][t] = LIKELY
else:
logging.debug("Dependence chain: %s, %r -> %r" % (tt.epoch_to_string(t), l, trunk))
trunk_res = helper(g, trunk, t, l)
if trunk_res == SURE:
g[l[0]][l[1]]['inference'][t] = NEG
else:
g[l[0]][l[1]]['inference'][t] = LIKELY
else:
g[l[0]][l[1]]['inference'][t] = NEG
else:
g[l[0]][l[1]]['inference'][t] = NEG
call_depth.pop()
return g[l[0]][l[1]]['inference'][t]
t1 = time.time()
for ts in range((begin // bin_size) * bin_size, ((stop // bin_size) + 1) * bin_size, bin_size):
for link in graph.edges_iter():
if graph[link[0]][link[1]]['score'][ts] > link_threshold and ts not in graph[link[0]][link[1]]['inference']:
_ = helper(graph, link, ts)
t2 = time.time()
logging.debug("Link congestion inference in %.2f sec" % (t2 - t1))
def find_branches(graph, n1, n2):
""" find all the links sharing nodes with the given link (n1, n2)
Args:
graph (nx.Graph)
n1 (int): one node of the link
n2 (int): the other node of the link
Returns:
dict{n1: [(x, probe # on (n1,x), common pb # with (n1, n2))...], n2: []}
empty dict in the case (n1, n2) is not an edge in graph
"""
try:
pbs = set(graph.edge[n1][n2]['probe'])
except KeyError:
return {n1: [], n2: []}
res = {n1: [], n2: []}
for tup in [(n1, n2), (n2, n1)]:
n, other = tup
for neighbour in graph.neighbors(n):
if neighbour != other:
n_pbs = set(graph.edge[n][neighbour]['probe'])
common = n_pbs & pbs
res[n].append((neighbour, len(n_pbs), len(common)))
return res
def divergent_set(l, crosspoints):
""" find largest subsets of l so that only common part among any elements in the subset is those in the crosspoints
Args:
l (dict): {element: set(attributes),...}
crosspoints (set): set of attributes allowed for being in common
Return:
tuple (the size of subset, {'member':[keys of l], 'attr': union of member attributes})
"""
def ok(o, target):
"""test if intersection between o and target is equal to crosspoints"""
if set.intersection(target, o) == set(crosspoints):
return True
else:
return False
# source https://stackoverflow.com/questions/10823227/how-to-get-all-the-maximums-max-function
def maxes(a, key=None):
"""return all the i with the maximum of key(i) for i in a"""
if key is None:
key = lambda x: x
m, max_list = key(a[0]), []
for s in a:
k = key(s)
if k > m:
m, max_list = k, [s]
elif k == m:
max_list.append(s)
return m, max_list
candidate = []
# TODO: not all possible combination is tested; it is in fact a maximum clique problem NP-complete
for e in l:
for c in candidate:
if ok(l[e], c['attr']):
c['member'].append(e)
c['attr'] |= l[e]
candidate.append({"member": [e], "attr": set(l[e])})
return maxes(candidate, key=lambda a: len(a['member']))