-
Notifications
You must be signed in to change notification settings - Fork 14
/
engine_driver.py
240 lines (187 loc) · 8.15 KB
/
engine_driver.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
from ui.minimal_ui import ChessUI
import ai.ai as ai
from Game import ChessGame
import chess
import chess.pgn
from concurrent.futures import ProcessPoolExecutor
from Agents import *
import chess.svg
import threading
import queue
import time
import argparse
import Teach
USE_UI = True
LOG = False
from concurrent.futures import ProcessPoolExecutor
import time
def run_single_game(white_agent, black_agent):
game = ChessGame(white_agent=white_agent, black_agent=black_agent)
board_state = game.get_board()
w_agent_times = []
b_agent_times = []
while not board_state.is_game_over():
# print("\n",board_state)
player = game.get_next_player()
start_time = time.perf_counter()
next_move = player.get_move(board_state)
time_elapsed = time.perf_counter() - start_time
# if USE_UI:
# ui.update_board(board_state)
if player == white_agent:
w_agent_times.append(time_elapsed)
else:
b_agent_times.append(time_elapsed)
board_state.push(next_move)
winner = None
if board_state.result() == "1-0":
winner = (1, 0, 0)
elif board_state.result() == "0-1":
winner = (0, 1, 0)
else:
winner = (0, 0, 1)
print("finished game!", board_state.result())
return winner, w_agent_times, b_agent_times
def stockfish_ladder(ui, agent):
sf_agent = StockfishAgent()
print("stockfish as white", agent.__class__.__name__, "as black")
for i in range(1, 21):
sf_agent.set_skill(i)
result = run_single_game(ui, agent, sf_agent)
result = ("w wins as " + agent.__class__.__name__) if result[0][0] == 1 else "b wins as sf" if result[0][1] == 1 else "draw"
print("level", i, "outcome:", result)
if result == "b wins as sf":
break
def test_games(white_agent = RandomAgent(), black_agent = RandomAgent(), game_count = 1):
w_total_times = []
b_total_times = []
with ProcessPoolExecutor() as executor:
future_games = [executor.submit(run_single_game, white_agent, black_agent) for _ in range(game_count)]
results = [future.result() for future in future_games]
# results = []
# for _ in range(game_count):
# winner, w_times, b_times = run_single_game(white_agent, black_agent)
# results.append((winner, w_times, b_times))
# print("finished game!", winner)
w_wins = sum(w for (w, _, _), _, _ in results)
b_wins = sum(b for (_, b, _), _, _ in results)
draws = sum(d for (_, _, d), _, _ in results)
for _, w_times, b_times in results:
w_total_times.extend(w_times)
b_total_times.extend(b_times)
print(f"\n{white_agent.__class__.__name__} wins as WHITE: {w_wins} ({w_wins/game_count*100:.2f}%)")
print(f"{black_agent.__class__.__name__} wins as BLACK: {b_wins} ({b_wins/game_count*100:.2f}%)")
print(f"Draws: {draws} ({draws/game_count*100:.2f}%)")
print(f"\n{white_agent.__class__.__name__} Average time per move: {sum(w_total_times)/len(w_total_times):.2f}s")
print(f"{black_agent.__class__.__name__} Average time per move: {sum(b_total_times)/len(b_total_times):.2f}s")
def moves_to_pgn(moves, result, white = "opponet", black = "opponet"):
game = chess.pgn.Game()
game.headers["Result"] = result
game.headers["White"] = white
game.headers["Black"] = black
node = game
evaluation = []
board = chess.Board()
for move in moves:
move_obj = chess.Move.from_uci(move)
board.push(move_obj)
node = node.add_variation(move_obj)
# Get evaluation after the move
result = ai.getPositionEval(board.fen())
evaluation.append(result)
node.comment = f"Evaluation: {result}"
return game
def save_pgn(game, filename):
with open(filename, "w") as f:
exporter = chess.pgn.FileExporter(f)
game.accept(exporter)
from queue import Queue
def main(ui, white, black):
game = ChessGame(white_agent=white, black_agent=black)
board_state_queue = Queue() # Queue to pass board states from game loop to UI update loop
name = game.white.__class__.__name__
side = 0
if game.white.__class__.__name__ == "MouseAgent":
side = 0
name = input("What is your name: ")
game.white.__class__.__name__ = name
elif game.black.__class__.__name__ == "MouseAgent":
side = 1
name = input("What is your name: ")
game.black.__class__.__name__ = name
print(f"W: {game.white.__class__.__name__} B: {game.black.__class__.__name__}")
board_state = game.get_board()
movelist = []
# Define a function to run the game loop
def game_loop():
while not board_state.is_game_over():
player = game.get_next_player()
next_move = player.get_move(board_state)
print(next_move)
movelist.append(next_move.uci())
board_state.push(next_move)
if LOG:
game.print_state()
# Put the current board state in the queue
board_state_queue.put(board_state.copy())
# Start the game loop in a separate thread
game_thread = threading.Thread(target=game_loop)
game_thread.start()
if USE_UI:
# Define a function to continuously update the UI
def update_ui_loop():
while True:
# Get the latest board state from the queue
board_state = board_state_queue.get()
ui.update_board(board_state)
if board_state.is_game_over():
break # Exit the loop if the game is over
# Start the UI update loop in a separate thread
ui_thread = threading.Thread(target=update_ui_loop)
ui_thread.start()
# Wait for the game loop thread to finish
game_thread.join()
if USE_UI:
ui_thread.join()
ui.destroy()
print("\nresult:", board_state.result())
pgn = moves_to_pgn(movelist, board_state.result(), game.white.__class__.__name__, game.black.__class__.__name__)
save_pgn(pgn, "game.pgn")
Teach.main(side)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='robot arm that plays chess')
parser.add_argument('-w', '--white', type=str, default='RandomAgent', metavar="PlayerAgent",
help='specify who plays white (H , R_ARM , or H_ARM)')
parser.add_argument('-b', '--black', type=str, default='RandomAgent', metavar="PlayerAgent",
help='specify who plays black (H , R_ARM, or H_ARM)')
parser.add_argument('-tc', '--time-control', type=str, default='NONE', metavar="10/2",
help='time control for the game in the format <minutes>/<increment> (e.g. 10/2 for 10 minutes with a 2 second increment). If not specified, the game will have no time control.')
parser.add_argument('--no-ui', dest='ui', action='store_false')
parser.add_argument('--no-logs', dest='logs', action='store_false')
parser.add_argument('--tests-per-side', type=int, default=0, metavar="N")
parser.set_defaults(ui=True, logs=True)
args = parser.parse_args()
app = None if not args.ui else ChessUI()
agents = {Agent.__name__: Agent for Agent in Agent.__subclasses__()}
USE_UI = args.ui
LOG = args.logs
if args.white not in agents:
raise ValueError('white must be in ' + ', '.join(agents.keys()))
if args.black not in agents:
raise ValueError('black must be in ' + ', '.join(agents.keys()))
white = agents[args.white]() if args.white != 'MouseAgent' else agents[args.white](app)
black = agents[args.black]() if args.black != 'MouseAgent' else agents[args.black](app)
if args.tests_per_side > 0:
print("\ntesting with", args.tests_per_side * 2, "games...")
test_games(white, black, args.tests_per_side)
print("\nswitching sides...\n")
test_games(black, white, args.tests_per_side)
exit()
# stockfish_ladder(agent=white)
# thread = threading.Thread(target=stockfish_ladder, args=(app, white,))
# thread.start()
thread = threading.Thread(target=main, args=(app, white, black,))
thread.start()
if USE_UI:
print("starting UI thread...")
app.mainloop()