-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathCoach.py
213 lines (178 loc) · 9.19 KB
/
Coach.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
import logging
import os
import sys
from collections import deque
from pickle import Pickler, Unpickler
from random import shuffle
import cv2
import numpy as np
from tqdm import tqdm
from Arena import Arena
from mcts_pure import MCTS, MCTSPlayer
from watermelon_chess.common import create_directory, draw_chessmen, BACKGROUND, \
write_msg, PROCEDURE_PATH
from watermelon_chess.tensor_board_tool import my_summary
log = logging.getLogger(__name__)
class Coach:
"""
This class executes the self-play + learning. It uses the functions defined
in Game and NeuralNet. args are specified in main.py.
"""
def __init__(self, game, nnet, args):
self.game = game
self.nnet = nnet
self.pnet = self.nnet.__class__(self.game) # the competitor network
self.args = args
self.mcts_player = MCTSPlayer(self.pnet.predict,
c_puct=self.args.cpuct,
n_playout=self.args.numMCTSSims,
is_selfplay=1)
self.trainExamplesHistory = [] # history of examples from args.numItersForTrainExamplesHistory latest iterations
self.skipFirstSelfPlay = False # can be overriden in loadTrainExamples()
def write_file(self, epoch_idx, step, self_play_idx, board, key):
epoch_directory = PROCEDURE_PATH / (key + "_epoch_" + str(epoch_idx))
self.create_procedure_directory(epoch_directory)
self_play_directory = epoch_directory / (key + "_self_play_" + str(self_play_idx))
self.create_procedure_directory(self_play_directory)
step_directory = self_play_directory / (key + "_step_" + str(step))
self.create_procedure_directory(step_directory)
name = step_directory / f"chess_board"
image = cv2.imread(str(BACKGROUND))
draw_chessmen(board, image, True, name)
def create_procedure_directory(self, directory):
if not os.path.exists(directory):
create_directory(directory)
def write_result(self, directory, is_peace, r):
path = directory / "result.txt"
if r != 0 or is_peace:
msg = f':{"Exist result" if r != 0 else "No Result"}, {"Is Draw" if is_peace else ""}'
write_msg(msg, path)
def executeEpisode(self, epoch_idx, self_play_idx, is_write):
"""
This function executes one episode of self-play, starting with player 1.
As the game is played, each turn is added as a training example to
trainExamples. The game is played till the game ends. After the game
ends, the outcome of the game is used to assign values to each example
in trainExamples.
It uses a temp=1 if episodeStep < tempThreshold, and thereafter
uses temp=0.
Returns:
trainExamples: a list of examples of the form (canonicalBoard, currPlayer, pi,v)
pi is the MCTS informed policy vector, v is +1 if
the player eventually won the game, else -1.
"""
trainExamples = []
board = self.game.getInitBoard()
self.curPlayer = 1
episodeStep = 0
while True:
episodeStep += 1
canonicalBoard = self.game.getCanonicalForm(board, self.curPlayer)
temp = int(episodeStep < self.args.tempThreshold)
pi = self.mcts_player.get_action(canonicalBoard, )
sym = self.game.getSymmetries(canonicalBoard, pi)
for b, p in sym:
trainExamples.append([b, self.curPlayer, p, None])
action = np.random.choice(len(pi), p=pi)
board, self.curPlayer = self.game.getNextState(board, self.curPlayer, action)
r = self.game.getGameEnded(board, self.curPlayer)
if is_write:
self.write_file(epoch_idx, episodeStep, self_play_idx, board, "in_episode")
if r != 0:
my_summary.add_float(x=epoch_idx * self.args.numEps + self_play_idx, y=episodeStep,
title="Steps of one episode in Play(Training stage)")
return [(x[0], x[2], r * ((-1) ** (x[1] != self.curPlayer))) for x in trainExamples]
def _is_write(self):
if np.random.uniform(0, 1, 1).item() < -1:
return True
return False
def learn(self):
"""
Performs numIters iterations with numEps episodes of self-play in each
iteration. After every iteration, it retrains neural network with
examples in trainExamples (which has a maximum length of maxlenofQueue).
It then pits the new neural network against the old one and accepts it
only if it wins >= updateThreshold fraction of games.
"""
for i in range(1, self.args.numIters + 1):
# bookkeeping
log.info(f'Starting Iter #{i} ...')
# examples of the iteration
if not self.skipFirstSelfPlay or i > 1:
iterationTrainExamples = deque([], maxlen=self.args.maxlenOfQueue)
for idx in tqdm(range(self.args.numEps), desc="Self Play"):
self.mcts_player.reset_player() # reset search tree
iterationTrainExamples += self.executeEpisode(i, idx, self._is_write())
# save the iteration examples to the history
self.trainExamplesHistory.append(iterationTrainExamples)
if len(self.trainExamplesHistory) > self.args.numItersForTrainExamplesHistory:
log.warning(
f"Removing the oldest entry in trainExamples. len(trainExamplesHistory) = {len(self.trainExamplesHistory)}")
self.trainExamplesHistory.pop(0)
# backup history to a file
# NB! the examples were collected using the model from the previous iteration, so (i-1)
self.saveTrainExamples(i - 1)
# shuffle examples before training
trainExamples = []
for e in self.trainExamplesHistory:
trainExamples.extend(e)
shuffle(trainExamples)
# training new network, keeping a copy of the old one
self.nnet.save_checkpoint(folder=self.args.checkpoint, filename='temp.pth.tar')
self.pnet.load_checkpoint(folder=self.args.checkpoint, filename='temp.pth.tar')
pmcts = MCTSPlayer(self.pnet.predict,
c_puct=self.args.cpuct,
n_playout=self.args.numMCTSSims,
is_selfplay=0)
self.nnet.train(trainExamples, i)
nmcts = MCTSPlayer(self.nnet.predict,
c_puct=self.args.cpuct,
n_playout=self.args.numMCTSSims,
is_selfplay=0)
second_player = nmcts.get_action
first_player = pmcts.get_action
log.info('PITTING AGAINST PREVIOUS VERSION')
arena = Arena(first_player,
second_player, self.game)
pwins, nwins, draws = arena.playGames(self.args.arenaCompare, iter=i)
my_summary.add_float(x=i, y=i, title="Training Epoch")
my_summary.add_float(x=i, y=nwins, title="New Player Winning times")
my_summary.add_float(x=i, y=pwins, title="Old Player Wining times")
my_summary.add_float(x=i, y=draws, title="Draws times")
log.info('NEW/PREV WINS : %d / %d ; DRAWS : %d' % (nwins, pwins, draws))
if pwins + nwins == 0 or float(nwins) / (pwins + nwins) < self.args.updateThreshold:
log.info('REJECTING NEW MODEL')
else:
log.info('ACCEPTING NEW MODEL')
self.nnet.save_checkpoint(folder=self.args.checkpoint, filename=self.getCheckpointFile(i))
self.nnet.save_checkpoint(folder=self.args.checkpoint, filename='best.pth.tar')
if pwins + nwins == 0:
win_rate = -1
else:
win_rate = float(nwins) / (pwins + nwins)
my_summary.add_float(x=i, y=win_rate, title="Winning Rate")
def getCheckpointFile(self, iteration):
return 'checkpoint_' + str(iteration) + '.pth.tar'
def saveTrainExamples(self, iteration):
folder = self.args.checkpoint
if not os.path.exists(folder):
os.makedirs(folder)
filename = os.path.join(folder, self.getCheckpointFile(iteration) + ".examples")
with open(filename, "wb+") as f:
Pickler(f).dump(self.trainExamplesHistory)
f.closed
def loadTrainExamples(self):
modelFile = os.path.join(self.args.load_folder_file[0], self.args.load_folder_file[1])
examplesFile = modelFile + ".examples"
if not os.path.isfile(examplesFile):
log.warning(f'File "{examplesFile}" with trainExamples not found!')
r = input("Continue? [y|n]")
if r != "y":
sys.exit()
else:
log.info("File with trainExamples found. Loading it...")
with open(examplesFile, "rb") as f:
self.trainExamplesHistory = Unpickler(f).load()
log.info('Loading done!')
# examples based on the model were already collected (loaded)
self.skipFirstSelfPlay = True