-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathtest_multiprocess.py
45 lines (37 loc) · 1.4 KB
/
test_multiprocess.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
from multiprocessing import Process, Manager, Value
import time
class Motor:
def __init__(self):
self.ticks = Value('i', 0)
def update(self):
with self.ticks.get_lock():
self.ticks.value += 1
def update_encoder(left_motor, right_motor):
while True:
left_motor.update()
right_motor.update()
if __name__ == "__main__":
with Manager() as manager:
left_motor = Motor()
right_motor = Motor()
writer_process = Process(target=update_encoder, args=(left_motor, right_motor))
writer_process.start()
while True:
print("Waiting for process to initialise...")
time.sleep(1)
# print(f"Value from writer process (Left Motor): {left_motor.ticks.value}")
# print(f"Value from writer process (Right Motor): {right_motor.ticks.value}")
tick_sum = 0
old_ticks = (left_motor.ticks.value + right_motor.ticks.value) / 2
counts = 6
time_diff = 0.5
for index in range(counts):
time.sleep(time_diff)
new_ticks = (left_motor.ticks.value + right_motor.ticks.value) / 2
tick_sum += new_ticks - old_ticks
old_ticks = new_ticks
print(index)
tick_diff_average = tick_sum / counts / time_diff
print("Tick diff:", tick_diff_average)
print("Safe from aliasing:", (0.9 * tick_diff_average) / 2)
break