forked from CLSFramework/sample-playmaker-server-python-grpc
-
Notifications
You must be signed in to change notification settings - Fork 0
/
soccer_2d_env.py
446 lines (369 loc) · 18.8 KB
/
soccer_2d_env.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
from abc import abstractmethod
import datetime
import logging
import os
import signal
import subprocess
import threading
import time
from multiprocessing import Lock, Manager, Process, Queue
from queue import Empty
import gym
import numpy as np
from gym import spaces
import service_pb2 as pb2
from server import serve
from utils.logger_utils import setup_logger
log_dir = os.path.join(os.getcwd(), 'logs', datetime.datetime.now().strftime('%Y-%m-%d_%H-%M-%S'))
class Soccer2DEnv(gym.Env):
"""
Soccer2DEnv is a custom environment for a 2D soccer game using OpenAI Gym.
It integrates with gRPC, RCSSServer, and trainer/player processes.
"""
metadata = {"render.modes": ["human"]}
def __init__(self, render_mode: str = None,
run_grpc_server: bool = True,
run_rcssserver: bool = True,
run_trainer_player: bool = True,
logger: logging.Logger = None,
log_dir: str = log_dir):
"""
Initialize the Soccer2DEnv environment.
:param render_mode: Mode for rendering the environment.
:param run_grpc_server: Flag to run gRPC server.
:param run_rcssserver: Flag to run RCSSServer.
:param run_trainer_player: Flag to run trainer and player processes.
:param logger: Logger instance for logging.
:param log_dir: Directory for rcssserver logs.
"""
super(Soccer2DEnv, self).__init__()
self.log_dir = log_dir
self.logger = logger
if self.logger is None:
self.logger = setup_logger('Soccer2DEnv', log_dir, console_level=logging.DEBUG, file_level=logging.DEBUG)
self.logger.info("Initializing Soccer2DEnv...")
self.render_mode = render_mode
self.run_grpc_server = run_grpc_server
self.run_rcssserver = run_rcssserver
self.run_trainer_player = run_trainer_player
self.action_space = spaces.Discrete(4)
self.observation_space = spaces.Box(low=-1, high=1, shape=(2,), dtype=np.float32)
# create all queues
self.player_state_queue = Queue()
self.trainer_state_queue = Queue()
self.trainer_action_queue = Queue()
self.player_action_queue = Queue()
self._latest_player_state = None
self._latest_trainer_state = None
# Run grpc server
if self.run_grpc_server:
self.logger.info("Starting gRPC server...")
self.grpc_process = self._run_server()
self.logger.info(f"gRPC server started in pid: {self.grpc_process.pid}")
# Run rcssserver
if self.run_rcssserver:
self.logger.info("Starting RCSSServer...")
self.rcssserver_process = self._run_rcssserver()
self.logger.info(f"RCSSServer started in pid: {self.rcssserver_process.pid}")
self.rcssserver_thread = threading.Thread(target=self._stream_output_to_logger, args=(self.rcssserver_process, 'server'))
self.rcssserver_thread.start()
self.logger.info(f"RCSSServer thread started in pid: {self.rcssserver_thread.ident}")
time.sleep(5)
# Run player and trainer
if self.run_trainer_player:
self.logger.info("Starting Trainer and Player...")
self.agents_process = self._run_trainer_player()
self.logger.info(f"Trainer and Player started in pid: {self.agents_process.pid}")
self.agents_thread = threading.Thread(target=self._stream_output_to_logger, args=(self.agents_process, 'Agents'))
self.agents_thread.start()
self.logger.info(f"Trainer and Player thread started in pid: {self.agents_thread.ident}")
self._wait_for_agents()
def _fake_player(self, action: pb2.PlayerAction = pb2.PlayerAction(body_hold_ball=pb2.Body_HoldBall())) -> tuple:
"""
Simulate player actions and get the latest player state.
:param action: Player action to be sent.
:return: Tuple containing the current cycle and stopped cycle.
"""
try:
state: pb2.State = self.player_state_queue.get(block=False)
if state is None:
return (-1, -1)
self.logger.debug(f"Player state cycle: {state.world_model.cycle}")
self._latest_player_state = state
self._send_action_to_player(action)
return (state.world_model.cycle, state.world_model.stoped_cycle)
except Empty:
return (-1, -1)
except Exception as e:
self.logger.error(f"Error: {e}")
return (-1, -1)
def _fake_trainer(self, action: pb2.TrainerAction = pb2.TrainerAction(do_change_mode=pb2.DoChangeMode(game_mode_type=pb2.GameModeType.PlayOn, side=pb2.Side.LEFT))) -> tuple:
"""
Simulate trainer actions and get the latest trainer state.
:param action: Trainer action to be sent.
:return: Tuple containing the current cycle and stopped cycle.
"""
try:
state: pb2.State = self.trainer_state_queue.get(block=False)
if state is None:
return (-1, -1)
self.logger.debug(f"Trainer state cycle: {state.world_model.cycle}")
self._latest_trainer_state = state
self._send_action_to_trainer(action)
return (state.world_model.cycle, state.world_model.stoped_cycle)
except Empty:
return (-1, -1)
except Exception as e:
self.logger.error(f"Error: {e}")
return (-1, -1)
latest_player_cycle = (-1, -1)
latest_trainer_cycle = (-1, -1)
def _wait_for_agents(self, current_player_cycle: tuple = (-1, -1), current_trainer_cycle: tuple = (-1, -1)):
"""
Wait for the player and trainer agents to synchronize their cycles.
"""
self.logger.debug(f"Waiting for agents to synchronize... Player cycle: {current_player_cycle}, Trainer cycle: {current_trainer_cycle}")
Soccer2DEnv.latest_player_cycle = current_player_cycle
Soccer2DEnv.latest_trainer_cycle = current_trainer_cycle
while True:
self.logger.debug(f"Waiting for agents to synchronize... Player cycle: {Soccer2DEnv.latest_player_cycle}, Trainer cycle: {Soccer2DEnv.latest_trainer_cycle}")
if current_player_cycle[0] == -1 or current_trainer_cycle[0] == -1:
latest_player_cycle = self._fake_player()
latest_trainer_cycle = self._fake_trainer()
elif Soccer2DEnv.latest_player_cycle[0] > Soccer2DEnv.latest_trainer_cycle[0]:
latest_trainer_cycle = self._fake_trainer()
latest_player_cycle = Soccer2DEnv.latest_player_cycle
elif Soccer2DEnv.latest_player_cycle[0] < Soccer2DEnv.latest_trainer_cycle[0]:
latest_player_cycle = self._fake_player()
latest_trainer_cycle = Soccer2DEnv.latest_trainer_cycle
if latest_player_cycle is not None and latest_player_cycle[0] != -1:
Soccer2DEnv.latest_player_cycle = latest_player_cycle
if latest_trainer_cycle is not None and latest_trainer_cycle[0] != -1:
Soccer2DEnv.latest_trainer_cycle = latest_trainer_cycle
try:
self.logger.debug(f"Player cycle: {Soccer2DEnv.latest_player_cycle}, Trainer cycle: {Soccer2DEnv.latest_trainer_cycle}")
if Soccer2DEnv.latest_player_cycle is None or Soccer2DEnv.latest_trainer_cycle is None:
continue
if Soccer2DEnv.latest_player_cycle[0] == -1 or Soccer2DEnv.latest_trainer_cycle[0] == -1:
continue
if Soccer2DEnv.latest_player_cycle[0] == Soccer2DEnv.latest_trainer_cycle[0] and abs(Soccer2DEnv.latest_player_cycle[1] - Soccer2DEnv.latest_trainer_cycle[1]) < 2:
self.logger.info("Player and Trainer cycles are in sync.")
break
time.sleep(0.01)
except KeyboardInterrupt:
break
def env_reset(self) -> tuple:
"""
Reset the environment by sending reset actions to the trainer and player.
:return: Tuple containing player observation and trainer state.
"""
# Send reset action to trainer
reset_actions = self.trainer_reset_actions()
self._send_action_to_trainer(reset_actions)
# Send fake action to player
self._send_action_to_player(pb2.PlayerAction(body_hold_ball=pb2.Body_HoldBall()))
# Wait for player observation
player_state = self.player_state_queue.get()
self._latest_player_state = player_state
# Wait for trainer observation
trainer_state = self.trainer_state_queue.get()
self._latest_trainer_state = trainer_state
if player_state.world_model.cycle != trainer_state.world_model.cycle:
self.logger.error(f"SyncError: Player cycle: {player_state.world_model.cycle}, Trainer cycle: {trainer_state.world_model.cycle} are not in sync.")
self._wait_for_agents()
self.logger.debug(f"Reset Environment at cycle: ##{player_state.world_model.cycle}##")
# Return player observation
return self.state_to_observation(player_state), trainer_state
@abstractmethod
def abs_reset(self) -> np.ndarray:
"""
Perform an absolute reset of the environment.
:return: Player observation after reset.
"""
player_observation, trainer_observation = self.env_reset()
return player_observation
def reset(self) -> np.ndarray:
"""
Reset the environment.
:return: Player observation after reset.
"""
return self.abs_reset()
def step(self, action: int) -> tuple:
"""
Take a step in the environment by sending actions to the player and trainer.
:param action: Action to be taken by the player.
:return: Tuple containing player observation, reward, done flag, and info.
"""
self.logger.debug("#" * 50)
self.logger.debug(f"Taking step with action: {action} on player state cycle: #{self._latest_player_state.world_model.cycle}# stamina: {self._latest_player_state.world_model.self.stamina}")
# Send action to player
self.logger.debug(f"Sending action to player...")
self._send_action_to_player(self.action_to_rpc_actions(action, self._latest_player_state))
# Send fake action to trainer
self.logger.debug(f"Sending fake action to trainer...")
self._send_action_to_trainer(pb2.TrainerAction(do_change_mode=pb2.DoChangeMode(game_mode_type=pb2.GameModeType.PlayOn, side=pb2.Side.LEFT)))
# Wait for player observation
self._latest_player_state:pb2.State = self.player_state_queue.get()
self.logger.debug(f"Received new Player state cycle: #{self._latest_player_state.world_model.cycle}# stamina: {self._latest_player_state.world_model.self.stamina}")
player_observation = self.state_to_observation(self._latest_player_state)
# Wait for trainer observation
self._latest_trainer_state:pb2.State = self.trainer_state_queue.get()
self.logger.debug(f"Received new Trainer state cycle: #{self._latest_trainer_state.world_model.cycle}#")
if self._latest_player_state.world_model.cycle != self._latest_trainer_state.world_model.cycle:
self.logger.error(f"Player cycle: {self._latest_player_state.world_model.cycle}, Trainer cycle: {self._latest_trainer_state.world_model.cycle} are not in sync.")
self._wait_for_agents(current_player_cycle=(self._latest_player_state.world_model.cycle, self._latest_player_state.world_model.stoped_cycle),
current_trainer_cycle=(self._latest_trainer_state.world_model.cycle, self._latest_trainer_state.world_model.stoped_cycle))
self.logger.debug(f"Calculating Reward by trainer observation, player target cycle #{self._latest_player_state.world_model.cycle}# trainer state cycle #{self._latest_trainer_state.world_model.cycle}#")
# Check trainer observation
# Calculate reward
done, reward, info = self.check_trainer_observation(self._latest_trainer_state)
# Return player observation, reward, done, info
return player_observation, reward, done, info
@abstractmethod
def render(self, mode="human"):
"""
Render the environment.
:param mode: Mode for rendering.
"""
pass
def close(self):
"""
Close the environment and terminate all processes.
"""
self.logger.info("Closing RoboEnv...")
# Kill all processes
if self.run_grpc_server:
self.logger.info("Terminating gRPC server...")
self.grpc_process.terminate()
if self.run_rcssserver:
self.logger.info("Terminating RCSSServer...")
self._kill_process_group(self.rcssserver_process)
self.rcssserver_process.terminate()
self.rcssserver_thread.join()
if self.run_trainer_player:
self.logger.info("Terminating Trainer and Player by killing process group...")
self._kill_process_group(self.agents_process)
self.logger.info("Terminating Trainer and Player by terminating process...")
self.agents_process.terminate()
self.logger.info("Terminating Trainer and Player by joining thread...")
def _send_action_to_player(self, action: pb2.PlayerAction):
"""
Send an action to the player.
:param action: Player action to be sent.
"""
self.player_action_queue.put(action)
def _send_action_to_trainer(self, action: pb2.TrainerAction):
"""
Send an action to the trainer.
:param action: Trainer action to be sent.
"""
self.trainer_action_queue.put(action)
@abstractmethod
def action_to_rpc_actions(self, action: int, player_state: pb2.State) -> list[pb2.PlayerAction]:
"""
Convert an action to RPC actions.
:param action: Action to be converted.
:param player_state: Current state of the player.
"""
pass
@abstractmethod
def state_to_observation(self, state: pb2.State) -> np.ndarray:
"""
Convert a state to an observation.
:param state: State to be converted.
:return: Observation as a numpy array.
"""
pass
@abstractmethod
def check_trainer_observation(self, observation: pb2.State) -> tuple:
"""
Check the trainer's observation and calculate reward.
:param observation: Trainer's observation.
:return: Tuple containing done flag, reward, and info.
"""
pass
@abstractmethod
def trainer_reset_actions(self) -> list[pb2.TrainerAction]:
"""
Get the reset actions for the trainer.
:return: Trainer reset actions.
"""
pass
def _run_rcssserver(self) -> subprocess.Popen:
"""
Run the RCSSServer process.
:return: Subprocess running the RCSSServer.
"""
rcssserver_path = 'scripts/rcssserver/rcssserver'
rcssserver_params = ['--server::synch_mode=true',
'--server::auto_mode=true',
'--server::fullstate_l=true',
'--server::coach=true',
f'--server::game_log_dir={self.log_dir}',
f'--server::text_log_dir={self.log_dir}'
]
if not os.path.exists(rcssserver_path):
raise FileNotFoundError(f"{rcssserver_path} does not exist.")
if not os.access(rcssserver_path, os.X_OK):
raise PermissionError(f"{rcssserver_path} is not executable. Check permissions.")
process = subprocess.Popen(
['./rcssserver'] + rcssserver_params,
cwd='scripts/rcssserver', # Corrected directory to where start.sh is located
preexec_fn=os.setsid, # Create a new session and set the process group ID
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT # Capture stderr and redirect it to stdout
)
return process
def _run_trainer_player(self) -> subprocess.Popen:
"""
Run the trainer and player processes.
:return: Subprocess running the trainer and player.
"""
process = subprocess.Popen(
['bash', 'train.sh', '--rpc-type', 'grpc', '--rpc-timeout', '6'],
cwd='scripts/proxy', # Corrected directory to where start.sh is located
preexec_fn=os.setsid, # Create a new session and set the process group ID
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT # Capture stderr and redirect it to stdout
)
return process
def _stream_output_to_logger(self, process: subprocess.Popen, prefix: str):
"""
Stream output from a process to the logger.
:param process: Process whose output is to be streamed.
:param prefix: Prefix for the log messages.
"""
# Stream output from the process and log it with a prefix
logger = setup_logger(prefix, log_dir, console_level=None, file_level=logging.DEBUG)
for line in iter(process.stdout.readline, b''):
logger.info(line.decode().strip())
process.stdout.close()
def _run_grpc(self):
"""
Run the gRPC server.
"""
manager = Manager()
shared_lock = Lock() # Create a Lock for synchronization
shared_number_of_connections = manager.Value('i', 0)
# Run the gRPC server in the current process
serve(50051, shared_lock, shared_number_of_connections,
self.player_state_queue, self.trainer_state_queue,
self.trainer_action_queue, self.player_action_queue,
log_dir)
def _run_server(self) -> Process:
"""
Start the gRPC server process.
:return: Process running the gRPC server.
"""
grpc_process = Process(target=self._run_grpc, args=())
grpc_process.start()
return grpc_process
def _kill_process_group(self, process: subprocess.Popen):
"""
Kill a process group.
:param process: Process whose group is to be killed.
"""
try:
os.killpg(os.getpgid(process.pid), signal.SIGTERM) # Send SIGTERM to the process group
except ProcessLookupError:
pass # The process might have already exited