-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathnode.py
203 lines (166 loc) · 7.66 KB
/
node.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
import random
import os
import base64
from database import Database
from scoring import get_ranking, calculate_ranking_similarity
class Node:
NAT_TIMEOUT = 60000 # Time before the NAT will close a hole
NAT_TIMEOUT_WITH_MARGIN = 57500 # Maximum Time after puncturing a NAT at which we assume
# a message can still reach the target before the hole closes (ms)
WALK_STEP_TIME = 5000 # Interval at which we do walk steps (ms)
RANK_STEP_TIME = 60000 # Interval at which we recalculate the scores of our peers (ms)
def __init__(self, public_key, simulation, persistent_walking=False, directed_walking=False, alpha=0.1,
teleport_probability=0.5):
self.public_key = public_key
self.simulation = simulation
self.live_edges = []
self.node_directory = "nodes/" + base64.encodestring(str(self.public_key))
if not os.path.exists(self.node_directory):
os.makedirs(self.node_directory)
self.block_database = Database(self.node_directory + "/multichain.db")
self.log = open(self.node_directory + "/log.txt", 'w')
self.directed_walking = directed_walking
self.persistent_walking = persistent_walking
if self.persistent_walking:
self.teleport_probability = teleport_probability
self.current_walk = None
if self.directed_walking:
self.walk_function = self.walk_statefull_directed
else:
self.walk_function = self.walk_statefull_undirected
else:
if self.directed_walking:
self.walk_function = self.walk_stateless_directed
else:
self.walk_function = self.walk_stateless_undirected
self.ranking = {}
self.number_of_requests_received = 0
self.alpha = alpha
def receive_message(self, sender, message):
message['function'](*message['arguments'])
def send_message(self, target, message):
self.simulation.send_message(self, target, message)
def add_live_edge(self, peer):
if peer not in self.live_edges:
self.live_edges.append(peer)
self.simulation.add_event(self.NAT_TIMEOUT_WITH_MARGIN, self.live_edge_timeout, [peer])
# Only used for bootstrapping
def send_identity(self, target):
message = dict()
message['function'] = target.receive_identity
message['arguments'] = [self]
self.send_message(target, message)
# Only used for bootstrapping
def receive_identity(self, sender):
self.live_edges.append(sender)
def add_blocks(self, blocks):
self.block_database.add_blocks(blocks)
self.log.write("Added " + str(len(blocks)) + " blocks to my database, I now have " +
str(self.block_database.get_number_of_blocks_in_db()) + " blocks\n")
def take_walk_step(self):
self.walk_function()
self.simulation.add_event(self.WALK_STEP_TIME, self.take_walk_step)
def walk_stateless_undirected(self):
if self.live_edges:
peer = random.choice(self.live_edges)
self.send_introduction_request(peer)
def walk_stateless_directed(self):
peer = self.select_best_live_edge()
self.send_introduction_request(peer)
def walk_statefull_undirected(self):
if self.current_walk:
if random.random() <= self.teleport_probability:
self.current_walk = random.choice(self.live_edges)
else:
self.current_walk = self.live_edges[-1]
else:
# Start walking
self.current_walk = random.choice(self.live_edges)
self.send_introduction_request(self.current_walk)
def walk_statefull_directed(self):
raise NotImplementedError
def select_best_live_edge(self, exclude_peer=None):
if self.live_edges:
index = 0
# Order the live edges:
if self.ranking:
ranked_live_edges = []
for live_edge in self.live_edges:
if live_edge is exclude_peer:
continue
try:
rank = self.ranking[str(live_edge.public_key)][1]
except KeyError:
continue
ranked_live_edges.append((live_edge, rank))
ranked_live_edges = sorted(ranked_live_edges, key=lambda x: x[1])
if ranked_live_edges:
# Select an edge from the ranked live edges:
while random.random() > self.alpha:
index = (index + 1) % len(ranked_live_edges)
return ranked_live_edges[index][0]
# It seems we can't rank our edges, so we just return a random one
return random.choice(self.live_edges)
else:
print "I have no live edges"
return None
def update_ranking(self):
self.ranking = get_ranking(self.block_database, self.public_key)
self.simulation.add_event(self.RANK_STEP_TIME, self.update_ranking)
def send_introduction_request(self, target):
message = dict()
message['function'] = target.receive_introduction_request
message['arguments'] = [self]
self.send_message(target, message)
def receive_introduction_request(self, sender):
self.add_live_edge(sender) # This is know in dispersy as a stumble candidate
self.send_introduction_response(sender)
def send_introduction_response(self, target):
if self.live_edges:
if len(self.live_edges) == 1 and self.live_edges[0] == target:
print "Can't introduce: I know only this peer"
else:
if self.directed_walking:
peer = self.select_best_live_edge(exclude_peer=target)
else:
peer = None
while peer is None or peer == target:
peer = random.choice(self.live_edges)
message = dict()
message['function'] = target.receive_introduction_response
message['arguments'] = [peer]
self.send_message(target, message)
else:
print "I have no live edges"
def receive_introduction_response(self, peer):
self.add_live_edge(peer)
self.send_crawl_request(peer)
def live_edge_timeout(self, peer):
self.live_edges.remove(peer)
def send_crawl_request(self, target):
message = dict()
message['function'] = target.receive_crawl_request
message['arguments'] = [self]
self.send_message(target, message)
def receive_crawl_request(self, sender):
self.number_of_requests_received += 1
if self.public_key is not 0:
self.send_crawl_response(sender)
def send_crawl_response(self, target):
blocks = self.block_database.get_blocks(self.public_key)
message = dict()
message['function'] = target.receive_crawl_response
message['arguments'] = [blocks]
self.send_message(target, message)
def receive_crawl_response(self, blocks):
self.add_blocks(blocks)
def log_blocks(self, datafile):
with open(datafile, 'a') as f:
f.write(str(self.block_database.get_number_of_blocks_in_db()) + " ")
def log_ranking(self, datafile):
with open(datafile, 'a') as f:
similarity = calculate_ranking_similarity(self.ranking, self.simulation.rankings[str(self.public_key)])
f.write(str(similarity) + " ")
def log_requests(self, datafile):
with open(datafile, 'a') as f:
f.write(str(self.number_of_requests_received) + "\n")