-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcounter.py
141 lines (116 loc) · 3.97 KB
/
counter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
import threading
import utils
from config import *
class Counter:
def __init__(self, duration: float):
self.duration = duration
self.blocks = 0
self.transactions = 0
self.mutex = threading.Lock()
def add_block(self, txs: int):
self.mutex.acquire()
try:
self.blocks += 1
self.transactions += txs
finally:
self.mutex.release()
def to_json(self) -> dict:
return {
"tps": self.transactions / self.duration,
"bpm": self.blocks * 60 / self.duration
}
class Stats:
def __init__(self, start_time: float, end_time: float):
self.start_time = start_time
self.end_time = end_time
self.minutes = []
self.total = Counter(end_time - start_time)
x1 = start_time
while x1 < end_time - 30.0:
x2 = min(end_time, x1 + 60.0)
self.minutes.append(Counter(x2 - x1))
x1 = x2
def add_block(self, timestamp: float, txs: int):
if timestamp < self.start_time or self.end_time < timestamp:
return
self.total.add_block(txs)
i = int((timestamp - self.start_time) / 60.0)
if 0 <= i < len(self.minutes):
self.minutes[i].add_block(txs)
def to_json(self) -> dict:
return {
"total": self.total.to_json(),
"minutes": [x.to_json() for x in self.minutes]
}
def get_block_header(block_id) -> tuple[int, str]:
ts = 0
prev = ""
for s in utils.lite_client("gethead " + block_id).split("\n"):
if s.startswith("block header of"):
ts = int(s.split()[-5])
if s.startswith("previous block #1"):
prev = s.split()[-1]
assert ts
assert prev
return ts, prev
def count_transactions(block_id: str) -> int:
x = 0
acc = ""
lt = ""
while True:
for s in utils.lite_client("listblocktrans %s 1000 %s %s" % (block_id, acc, lt)).split("\n"):
if s.startswith("transaction #"):
x += 1
s = s.split()
acc = s[3]
lt = s[5]
elif "end of block transaction list" in s:
return x
def count_stats_shard(stats: Stats, block_id: str):
while True:
ts, prev = get_block_header(block_id)
if ts < stats.start_time:
break
if ts > stats.end_time:
block_id = prev
continue
transactions = count_transactions(block_id)
stats.add_block(ts, transactions)
block_id = prev
MAX_THREADS = 4
def count_stats_thread(stats: Stats, shard_blocks: list[str]):
for x in shard_blocks:
count_stats_shard(stats, x)
def count_stats(result_json: dict, start_time: float, end_time: float):
mc_block = ""
shard_blocks = []
for s in utils.lite_client("allshards").split("\n"):
if s.startswith("latest masterchain block known to server is"):
mc_block = s.split()[7]
elif s.startswith("shard #"):
shard_blocks.append(s.split()[3])
assert mc_block
master_stats = Stats(start_time, end_time)
base_stats = Stats(start_time, end_time)
threads = []
for i in range(MAX_THREADS):
cur_shard_blocks = shard_blocks[i::MAX_THREADS]
if cur_shard_blocks:
t = threading.Thread(target=count_stats_thread, args=[base_stats, cur_shard_blocks])
threads.append(t)
t.start()
count_stats_shard(master_stats, mc_block)
for t in threads:
t.join()
result_json["stats_base"] = base_stats.to_json()
result_json["stats_mc"] = master_stats.to_json()
def get_total_queues_size() -> int:
res = utils.lite_client("msgqueuesizes")
i = res.find("Outbound message queue sizes:")
if i == -1:
raise Exception("Unexpected msgqueuesizes output: " + res)
total = 0
for s in res[i:].split("\n"):
if s.startswith("(0,"):
total += int(s.split()[1])
return total