-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathFPGAEnv.py
72 lines (56 loc) · 2.41 KB
/
FPGAEnv.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
import os
import time
import numpy as np
import gymnasium as gym
class FPGAEnv(gym.Env):
def __init__(self):
self.obs = None
self.states = None
self.first_step = None
try:
self.pc2fpga = os.open('/dev/xdma0_h2c_0', os.O_WRONLY)
self.fpga2pc = os.open('/dev/xdma0_c2h_0', os.O_RDONLY)
except:
# assert False, "Error: Can't open FPGA device."
print("Error: Can't open FPGA device.")
# for write FPGA
self.STA_WD_NUM = int(self.ENV_NUM * self.STA_SPACE_N)
self.START_FLAG = 0x00000001
self.START_FLAG = self.START_FLAG.to_bytes(int(self.DATA_WL/8), byteorder='little')
self.CLEAR_FLAG = 0x00000002
self.CLEAR_FLAG = self.CLEAR_FLAG.to_bytes(int(self.DATA_WL/8), byteorder='little')
self.ACTION_OFFSET = int(self.ENV_NUM*self.STA_SPACE_N * self.DATA_WL/8)
# for read FPGA
self.READ_OFFSET = int(self.ACTION_OFFSET + (self.ENV_NUM*self.ACT_WL/self.DATA_WL + 1) * self.DATA_WL/8)
self.READ_BYTE_NUM = int(self.ENV_NUM * (self.OBS_SPACE_N + self.RWD_WL/self.DATA_WL + 1/self.DATA_WL) * self.DATA_WL/8)
self.SLEEP_TIME_NS = int(self.ENV_NUM * (self.ACT_WL/self.DATA_WL + self.OBS_SPACE_N + self.RWD_WL/self.DATA_WL + 1/self.DATA_WL) * 10)
# for data unpack
self.OBS_WD_NUM = int(self.ENV_NUM * self.OBS_SPACE_N)
self.RWD_WD_NUM = int(self.ENV_NUM * self.RWD_WL / self.DATA_WL)
self.DONE_WD_NUM = int(self.ENV_NUM / self.DATA_WL)
self.OBS_OFFSET = 0
self.RWD_OFFSET = self.OBS_WD_NUM * int(self.DATA_WL/8)
self.DONE_OFFSET = self.RWD_OFFSET + self.RWD_WD_NUM * int(self.DATA_WL/8)
self.fpga_clear()
def precise_short_sleep(duration):
start = time.perf_counter_ns()
while (time.perf_counter_ns() - start) < duration:
pass
def fpga_clear(self):
all_zero = np.zeros(65536, dtype=np.float32)
os.pwrite(self.pc2fpga, all_zero, 0x0000)
def close(self):
os.close(self.pc2fpga)
os.close(self.fpga2pc)
def fpga_step(self):
raise NotImplementedError
def step(self):
raise NotImplementedError
def reset(self):
raise NotImplementedError
def _get_obs(self):
raise NotImplementedError
if __name__ == "__main__":
env = FPGAEnv()
print(env.action_space)
print(env.observation)