-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathproject_ddpg.py
146 lines (125 loc) · 6.41 KB
/
project_ddpg.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
import torch
import numpy as np
import numpy.random as rd
from elegantrl.agents.AgentBase import AgentBase
from elegantrl.agents.net import Actor, Critic
class project_DDPG(AgentBase):
"""
:param net_dim[int]: the dimension of networks (the width of neural networks)
:param state_dim[int]: the dimension of state (the number of state vector)
:param action_dim[int]: the dimension of action (the number of discrete action)
:param learning_rate[float]: learning rate of optimizer
:param if_per_or_gae[bool]: PER (off-policy) or GAE (on-policy) for sparse reward
:param env_num[int]: the env number of VectorEnv. env_num == 1 means don't use VectorEnv
:param agent_id[int]: if the visible_gpu is '1,9,3,4', agent_id=1 means (1,9,4,3)[agent_id] == 9
"""
def __init__(self):
AgentBase.__init__(self)
self.ClassAct = Actor
self.ClassCri = Critic
self.if_use_cri_target = True
self.if_use_act_target = True
self.explore_noise = 0.2 # explore noise of action (OrnsteinUhlenbeckNoise)
self.ou_noise = None
def init(self, net_dim=512, state_dim=8, action_dim=2, reward_scale=1.0, gamma=0.99,
learning_rate=1e-4, if_per_or_gae=False, env_num=1, gpu_id=0):
"""
Explict call ``self.init()`` to overwrite the ``self.object`` in ``__init__()`` for multiprocessing.
"""
AgentBase.init(self, net_dim=net_dim, state_dim=state_dim, action_dim=action_dim,
reward_scale=reward_scale, gamma=gamma,
learning_rate=learning_rate, if_per_or_gae=if_per_or_gae,
env_num=env_num, gpu_id=gpu_id, )
self.ou_noise = OrnsteinUhlenbeckNoise(size=action_dim, sigma=self.explore_noise)
self.criterion = torch.nn.SmoothL1Loss(reduction='none' if if_per_or_gae else 'mean')
if if_per_or_gae:
self.get_obj_critic = self.get_obj_critic_per
else:
self.get_obj_critic = self.get_obj_critic_raw
def select_actions(self, states: torch.Tensor) -> torch.Tensor:
"""
Select actions given an array of states.
:param states: an array of states in a shape (batch_size, state_dim, ).
:return: an array of actions in a shape (batch_size, action_dim, ) where each action is clipped into range(-1, 1).
"""
actions = self.act(states.to(self.device))
if rd.rand() < self.explore_rate: # epsilon-greedy
noise = torch.as_tensor(self.ou_noise(), dtype=torch.float32, device=self.device).unsqueeze(0)
actions = (actions + noise).clamp(-1, 1)
return actions.detach().cpu()
def update_net(self, buffer, batch_size, repeat_times, soft_update_tau) -> (float, float):
"""
Update the neural networks by sampling batch data from ``ReplayBuffer``.
:param buffer: the ReplayBuffer instance that stores the trajectories.
:param batch_size: the size of batch data for Stochastic Gradient Descent (SGD).
:param repeat_times: the re-using times of each trajectory.
:param soft_update_tau: the soft update parameter.
:return: a tuple of the log information.
"""
buffer.update_now_len()
object_critic = None
object_actor = None
for _ in range(int(buffer.now_len / batch_size * repeat_times)):
object_critic, state = self.get_obj_critic(buffer, batch_size)
self.optim_update(self.cri_optim, object_critic)
if self.if_use_cri_target:
self.soft_update(self.cri_target, self.cri, soft_update_tau)
action_pg = self.act(state) # policy gradient
object_actor = -self.cri(state, action_pg).mean()
self.optim_update(self.act_optim, object_actor)
if self.if_use_act_target:
self.soft_update(self.act_target, self.act, soft_update_tau)
return object_critic.item(), object_actor.item()
def get_obj_critic_raw(self, buffer, batch_size):
"""
Calculate the loss of networks with **uniform sampling**.
:param buffer: the ReplayBuffer instance that stores the trajectories.
:param batch_size: the size of batch data for Stochastic Gradient Descent (SGD).
:return: the loss of the network and states.
"""
with torch.no_grad():
reward, mask, action, state, next_s = buffer.sample_batch(batch_size)
next_q = self.cri_target(next_s, self.act_target(next_s))
q_label = reward + mask * next_q
q_value = self.cri(state, action)
obj_critic = self.criterion(q_value, q_label)
return obj_critic, state
def get_obj_critic_per(self, buffer, batch_size):
"""
Calculate the loss of the network with **Prioritized Experience Replay (PER)**.
:param buffer: the ReplayBuffer instance that stores the trajectories.
:param batch_size: the size of batch data for Stochastic Gradient Descent (SGD).
:return: the loss of the network and states.
"""
with torch.no_grad():
reward, mask, action, state, next_s, is_weights = buffer.sample_batch(batch_size)
next_q = self.cri_target(next_s, self.act_target(next_s))
q_label = reward + mask * next_q
q_value = self.cri(state, action)
td_error = self.criterion(q_value, q_label) # or td_error = (q_value - q_label).abs()
obj_critic = (td_error * is_weights).mean()
buffer.td_error_update(td_error.detach())
return obj_critic, state
class OrnsteinUhlenbeckNoise:
def __init__(self, size, theta=0.15, sigma=0.3, ou_noise=0.0, dt=1e-2):
"""
The noise of Ornstein-Uhlenbeck Process
:param size int: the size of noise, noise.shape==(-1, action_dim)
:param theta float: related to the not independent of OU-noise
:param sigma float: related to action noise std
:param ou_noise float: initialize OU-noise
:param dt float: derivative
"""
self.theta = theta
self.sigma = sigma
self.ou_noise = ou_noise
self.dt = dt
self.size = size
def __call__(self) -> float:
"""
output a OU-noise
:return array ou_noise: a noise generated by Ornstein-Uhlenbeck Process
"""
noise = self.sigma * np.sqrt(self.dt) * rd.normal(size=self.size)
self.ou_noise -= self.theta * self.ou_noise * self.dt + noise
return self.ou_noise