-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathplayer.py
123 lines (97 loc) · 3.7 KB
/
player.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
from threading import Thread
import numpy as np
from time import sleep
from mcts_alpha import MCTS_Alpha, evaluate_rollout
from mcts_mine import MCTS_Mine
class Player:
def __init__(self):
# 是否允许启动线程计算下一步action标记
self.allow = True
# 下一步action
self.action = None
# Player名字
self.name = 'Player'
# 该Player是否有效,用于提前退出计算循环
self.valid = True
# 表明落子计算进度的量(仅在Player为MCTS或AlphaGo时生效)
self.speed = None
def play(self, game):
if self.allow and self.action is None:
self.allow = False
# daemon=True可以使得主线程结束时,所有子线程全部退出,使得点击退出游戏按钮后,不用等待子线程结束
Thread(target=self.step, args=(game, ), daemon=True).start()
def step(self, game):
"""
根据当前游戏状态,获得执行动作
:param game: 游戏模拟器对象
:return:
"""
print('Hello!')
class HumanPlayer(Player):
def __init__(self):
super().__init__()
self.name = '人类玩家'
class RandomPlayer(Player):
def __init__(self):
super().__init__()
self.name = '随机落子'
def step(self, game):
sleep(1)
self.action = self.get_action(game)
@staticmethod
def get_action(game):
valid_move_idcs = game.game_state.advanced_valid_move_idcs()
if len(valid_move_idcs) > 1:
valid_move_idcs = valid_move_idcs[:-1]
action = np.random.choice(valid_move_idcs)
return action
class MCTSPlayer_Alpha(Player):
def __init__(self, n_playout=20):
super().__init__()
self.name = '蒙特卡洛zero{}'.format(n_playout)
def rollout_policy_fn(game_state_simulator):
# 选择随机动作
availables = game_state_simulator.advanced_valid_move_idcs() # advanced_valid_move_idcs
action_probs = np.random.rand(len(availables))
return zip(availables, action_probs)
def policy_value_fn(game_state_simulator):
# 返回均匀概率及通过随机方法获得的节点价值
availables = game_state_simulator.advanced_valid_move_idcs() # advanced_valid_move_idcs
action_probs = np.ones(len(availables)) / len(availables)
return zip(availables, action_probs), evaluate_rollout(game_state_simulator, rollout_policy_fn)
# alphagozero
self.mcts = MCTS_Alpha(policy_value_fn, n_playout)
def step(self, game):
action= self.get_action(game)
if action == -1:
action = None
self.allow = True
self.action = action
# 获得动作后将速度区域清空
self.speed = (0, 1)
def reset_player(self):
self.mcts.update_with_move(-1)
def get_action(self, game):
move = self.mcts.get_move_by_visits(game, self)
self.mcts.update_with_move(-1)
return move
class MCTSPlayer_Mine(Player):
def __init__(self,n_playout=20):
super().__init__()
self.name = '蒙特卡洛new{}'.format(n_playout)
# gomain
self.mcts = MCTS_Mine(n_playout)
def step(self, game):
action = self.get_action(game)
if action == -1:
action = None
self.allow = True
self.action = action
# 获得动作后将速度区域清空
self.speed = (0, 1)
def reset_player(self):
pass
def get_action(self, game):
move = self.mcts.get_move_by_visits(game, self)
#move = self.mcts.get_move_by_probs(game,self)
return move