-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathonsite_env.py
323 lines (292 loc) · 12.3 KB
/
onsite_env.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
import copy
import queue
import re
import gym
import torch.distributed.optim
from selenium.webdriver import ActionChains
from utils import *
from const import *
import settings
class OnSiteEnv(gym.Env):
def __init__(self):
super(OnSiteEnv, self).__init__()
# 常量
self.base_url = "https://kana.byha.top:444"
self.room = settings.default_room
self.block_finder = re.compile(r'<td.*?id="td-\d+".*?class="(.*?)".*?>([\d\s]*)</td>')
# 初始化浏览器
self.driver = webdriver.Chrome(options=init_driver_options())
login(self.driver)
# 游戏局内参数
self.game_table = None
self.map = None
self.map_size = None
self.self_color = None
self.selected = (0, 0)
self.map_history = queue.Queue() # 只读 整型
self.action_history = queue.Queue() # 只读
self.view = False
self.observation = None
self.crown_ele = None
self.shown_before = None
# 全局临时变量
self._map_data = None
self._blocks = None
def reset(self):
"""
重设 顺便等待游戏开始
:return: observation (Tensor)
"""
if self.driver.current_url != self.base_url + "/checkmate/room/" + self.room:
# 如果不在房间内
self.enter_room(self.room)
if self.view:
# 如果是旁观
self.driver.find_element(By.ID, "view").click()
# 准备
ActionChains(self.driver).click(self.driver.find_element_by_id("ready")).perform()
# 等
WebDriverWait(self.driver, 86400).until_not(EC.text_to_be_present_in_element((By.ID, "game-status"), "准备中"))
# 获取table
self.game_table = self.driver.find_element(By.ID, "m")
# 初始化地图
c1, c2 = self.init_map()
# 保存action
self.action_history.put(torch.as_tensor([-1, -1, -1, -1, -1], dtype=torch.long))
if c1 != 1 or c2 >= 100:
# 如果是流浪或者抓虾 那没事了
self.driver.find_element(By.ID, "view").click()
self.view = True
return self.observation
def step(self, action: torch.Tensor):
"""
执行一步
:param action: movement => tensor([[x1, y1, x2, y2, is_half]]) 注意x,y和i,j正好相反
:return: observation (Tensor), reward (float), done (bool), info (dict)
"""
reward = 0
# 执行到这里其实还是上一步 等下一回合
wait_until(next_round(self.crown_ele.text), self.driver)
# 先看看游戏是否结束
state_now = self.win_check()
if state_now != 0:
reward = 300 if state_now == 2 else -300
return self.observation, reward, True, {}
self.update_map()
try:
# 这里有可能也会结束 因为move比较耗时
self.move(action)
except Exception:
state_now = self.win_check()
if state_now != 0:
reward = 300 if state_now == 2 else -300
return self.observation, reward, True, {}
# 计算上一步的奖励
_dirx = [0, -1, 0, 1, 1, -1, 1, -1]
_diry = [-1, 0, 1, 0, 1, -1, -1, 1]
last_move = self.action_history.queue[-1].long().tolist()
last_map = self.map_history.queue[-1]
# 保存action
if self.action_history.qsize() == 3:
self.action_history.get()
self.action_history.put(copy.copy(at.i_to_a(self.map_size, int(action[0].long()))[0]))
# 如果动作为空
if last_move[0] < 0:
return self.observation, reward, False, {}
# 撞山扣一点
if self.map[1][last_move[3] - 1][last_move[2] - 1] == BlockType.mountain:
reward -= 10
# 撞塔扣分
if self.map[1][last_move[3] - 1][last_move[2] - 1] == BlockType.city:
if self.map[2][last_move[3] - 1][last_move[2] - 1] != self._get_colormark(self.self_color):
reward -= 10
# 探索新领地加分 注意 不是占领
for i in range(8):
t_x = last_move[3] - 1 + _dirx[i]
t_y = last_move[2] - 1 + _diry[i]
if t_x < 0 or t_x >= self.map_size or t_y < 0 or t_y >= self.map_size:
continue
if self.map[3][t_x][t_y] - last_map[3][t_x][t_y] == 1:
reward += explore_reward[self.map[1][t_x][t_y]]
# 如果探到玩家 额外给0.5
if self.map[3][t_x][t_y] != self._get_colormark(PlayerColor.grey):
reward += 0.5
# 再检查一遍 有没有结束
state_now = self.win_check()
if state_now != 0:
reward = 300 if state_now == 2 else -300
return self.observation, reward, True, {}
return self.observation, reward, False, {}
def render(self, mode="human"):
"""
在网站上玩为什么需要渲染 =_=
:param mode:
:return:
"""
pass
def init_map(self):
time.sleep(0.2)
# 获取地图大小
self._map_data = self.game_table.get_attribute("innerHTML")
self._blocks = self.block_finder.findall(self._map_data)
self.map_size = int(math.sqrt(len(self._blocks)))
# 找自己家
crown_s = self.driver.find_elements(By.CSS_SELECTOR, ".own.crown")
self.crown_ele = crown_s[0]
self.self_color = self._get_color(self.crown_ele.get_attribute("class"))
cnt1 = len(crown_s)
cnt2 = int(self.crown_ele.text)
# 初始化地图和shown标记
self.map = torch.zeros([4, self.map_size, self.map_size])
self.shown_before = torch.zeros([self.map_size, self.map_size])
for i in range(3):
self.map_history.put(copy.copy(self.map))
self.update_map(True)
return cnt1, cnt2
def update_map(self, _init_flag=False):
"""
更新地图 顺便更新observation
:param _init_flag: 如果init_map叫我 那就是True
:return:
"""
# 弹出旧地图 压入新地图
# 注意 self.map_history中的数据是只读
if self.map_history.qsize() == 3:
self.map_history.get()
self.map_history.put(copy.copy(self.map))
if not _init_flag:
# init_map会顺便帮我整这些东西的
self._map_data = self.game_table.get_attribute("innerHTML")
# self._blocks[index][0] means class name
# self._blocks[index][1] means value
self._blocks = self.block_finder.findall(self._map_data)
# 用于遍历self._blocks
index = 0
for i in range(self.map_size):
for j in range(self.map_size):
try:
# 获取这一格上的兵力
b_value = int(self._blocks[index][1])
except ValueError:
# 如果是空的 会爆ValueError
b_value = 0
# get class name
b_attr = self._blocks[index][0]
# 看看是否在视野内
shown = False if "unshown" in b_attr else True
if int(self.shown_before[i][j]) == 1 and shown == 0:
# 如果以前看到过 保留视野 但shown标记跟随地图
self.map[3][i][j] = shown
continue
if shown:
# 只要看到过 就标成1
self.shown_before[i][j] = 1
# 获取兵力和类型
if "unshown" == b_attr:
self.map[0][i][j] = b_value
self.map[1][i][j] = BlockType.road
b_attr += " grey"
elif "null" in b_attr:
self.map[0][i][j] = b_value
self.map[1][i][j] = BlockType.road
elif "obstacle" in b_attr:
self.map[0][i][j] = -1
self.map[1][i][j] = BlockType.obstacle
b_attr += " grey"
elif "mountain" in b_attr:
self.map[0][i][j] = -1
self.map[1][i][j] = BlockType.mountain
b_attr += " grey"
elif "crown" in b_attr:
self.map[0][i][j] = b_value
self.map[1][i][j] = BlockType.crown
elif "city" in b_attr:
self.map[0][i][j] = b_value
self.map[1][i][j] = BlockType.city
elif "empty-city" in b_attr:
self.map[0][i][j] = b_value
self.map[1][i][j] = BlockType.city
b_attr += " grey"
# get colormark
color = self._get_color(b_attr)
self.map[2][i][j] = self._get_colormark(color)
# set shown
self.map[3][i][j] = shown
index += 1
# 三帧并在一起作为observation
self.observation = torch.cat((self.map_history.queue[0], self.map_history.queue[1], self.map_history.queue[2]))
self.observation = self.observation.unsqueeze(0)
def move(self, mov):
"""
just as the name
:param mov: tensor([[x1, y1, x2, y2, is_half]]) 注意x,y和i,j正好相反
:return:
"""
move_info = mov[0].long()
# 先交换 将x,y坐标转换为i,j坐标
move_info[0], move_info[1] = move_info[1], move_info[0]
move_info[2], move_info[3] = move_info[3], move_info[2]
if self.selected[0] != move_info[0] - 1 or self.selected[1] != move_info[1] - 1:
# 如果没选中 先点一下
self.driver.find_element_by_id(f"td-{int((move_info[0] - 1) * self.map_size + move_info[1])}").click()
# 获取移动方向 决定按哪个键
keys = ['W', 'A', 'S', 'D']
difx = move_info[2] - move_info[0]
dify = move_info[3] - move_info[1]
for i in range(4):
if difx == dx[i] and dify == dy[i]:
ActionChains(self.driver).send_keys(keys[i]).perform()
if self.map[1][move_info[2] - 1][move_info[3] - 1] != BlockType.mountain and \
self.map[1][move_info[2] - 1][move_info[3] - 1] != BlockType.obstacle:
self.selected = (move_info[2] - 1, move_info[3] - 1)
def win_check(self) -> int:
"""
虽然也有可能是输了
:return: 0 -> 还在打, 1 -> bot寄了, 2 -> bot赢了
"""
try:
t = self.driver.find_element(By.ID, "swal2-content")
self.driver.find_element(By.CSS_SELECTOR, "div.swal2-actions > button.swal2-confirm.swal2-styled")
if t.text.strip() == settings.bot_name + "赢了":
return 2
except NoSuchElementException:
return 0
return 1
def enter_room(self, room_id: str):
"""
进房间
:param room_id: room name
:return:
"""
self.driver.get(self.base_url + "/checkmate/room/" + room_id)
self.room = room_id
self.game_table = self.driver.find_element_by_id("m")
def _get_color(self, class_name: list) -> int:
if "grey" in class_name:
return PlayerColor.grey
if "blue" in class_name:
return PlayerColor.blue
if "red" in class_name:
return PlayerColor.red
if "green" in class_name:
return PlayerColor.green
if "orange" in class_name:
return PlayerColor.orange
if "pink" in class_name:
return PlayerColor.pink
if "purple" in class_name:
return PlayerColor.purple
if "chocolate" in class_name:
return PlayerColor.chocolate
if "maroon" in class_name:
return PlayerColor.maroon
return PlayerColor.grey
def _get_colormark(self, color):
cm = 0
if color == PlayerColor.grey:
cm = -40
elif color != self.self_color:
cm = 40 + 5 * color
return cm
def quit_signal(self):
return False