Skip to content

Commit

Permalink
Merge pull request #6 from lajd/merge-to-master
Browse files Browse the repository at this point in the history
Merge to master
  • Loading branch information
lajd authored Sep 27, 2020
2 parents abc101b + 96ad6b1 commit 143c4c1
Show file tree
Hide file tree
Showing 145 changed files with 4,849 additions and 2,088 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@
**/ray_tunings/
**/visual_tunings/
**/environments/
**/unity-environment.log
**/unity-environment.log
2 changes: 1 addition & 1 deletion .idea/misc.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

61 changes: 24 additions & 37 deletions agents/base.py
Original file line number Diff line number Diff line change
@@ -1,65 +1,52 @@
from abc import abstractmethod
import torch
import numpy as np
from typing import Tuple
from agents.policies.base_policy import Policy
from torch.optim.lr_scheduler import _LRScheduler
from tools.rl_constants import Experience, Action
from typing import Tuple, Union
from tools.rl_constants import Experience, ExperienceBatch, BrainSet, Action
from tools.parameter_capture import ParameterCapture


class Agent(torch.nn.Module):
class Agent:
""" An agent which received state & reward from, and interacts with, and environment"""
def __init__(self, state_shape: Tuple[int, ...], action_size: int, policy: Policy, optimizer: torch.optim.Optimizer, lr_scheduler: _LRScheduler):
super().__init__()
def __init__(self, state_shape: Union[Tuple[int, ...], int], action_size: int):
self.state_shape = state_shape
self.action_size = action_size

self.policy: Policy = policy
self.optimizer: optimizer = optimizer
self.lr_scheduler: _LRScheduler = lr_scheduler

self.warmup = False
self.t_step = 0
self.episode_counter = 0
self.param_capture = ParameterCapture()
self.training = True

def set_mode(self, mode: str):
""" Set the mode of the agent """
if mode == 'train':
self.train()
self.policy.train = True
elif mode.startswith('eval'):
self.eval()
self.policy.eval() # Make the policy greedy
else:
raise ValueError("only modes `train`, `evaluate` are supported")

def preprocess_state(self, state: torch.Tensor):
return state
def set_warmup(self, warmup: bool):
self.warmup = warmup

@abstractmethod
def save(self, *args, **kwargs) -> dict:
"""Save the agent model"""
def set_mode(self, mode: str):
pass

@abstractmethod
def load(self, *args, **kwargs):
""" Load the agent model """
pass
def preprocess_state(self, state):
""" Perform any state preprocessing """
return state

@abstractmethod
def get_action(self, state: np.array) -> Action:
def get_action(self, state: np.array, *args, **kwargs) -> Action:
"""Determine an action given an environment state"""
pass
raise NotImplementedError

@abstractmethod
def get_random_action(self, *args) -> Action:
pass
def get_random_action(self, *args, **kwargs) -> Action:
raise NotImplementedError

@abstractmethod
def step(self, experience: Experience, **kwargs) -> None:
"""Take a step in the environment, encompassing model learning and memory population"""
pass
raise NotImplementedError

@abstractmethod
def step_episode(self, episode: int) -> None:
def step_episode(self, episode: int, *args) -> None:
"""Perform any end-of-episode updates"""
raise NotImplementedError

@abstractmethod
def learn(self, experience_batch: ExperienceBatch):
pass
169 changes: 92 additions & 77 deletions agents/ddpg_agent.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,21 @@
import numpy as np
import random
from typing import Callable, Union, Tuple
from typing import Callable, Union, Tuple, Optional
from agents.memory.memory import Memory
from agents.memory.prioritized_memory import PrioritizedMemory
from tools.rl_constants import Experience, ExperienceBatch
from tools.rl_constants import Experience, ExperienceBatch, Action
from torch.optim.lr_scheduler import _LRScheduler
from torch.optim.optimizer import Optimizer
from tools.misc import soft_update
import torch
from tools.rl_constants import Action
from agents.base import Agent
from agents.policies.base_policy import Policy

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
# torch.autograd.set_detect_anomaly(True)
torch.autograd.set_detect_anomaly(True)


class DDPGAgent:
class DDPGAgent(Agent):
"""Interacts with and learns from the environment."""
memory = None
online_actor = None
Expand All @@ -34,29 +35,32 @@ class DDPGAgent:

def __init__(
self,
state_size: int,
state_shape: int,
action_size: int,
random_seed: int,
memory_factory: Callable[[], Union[Memory, PrioritizedMemory]],
actor_model_factory: Callable[[], torch.nn.Module],
actor_optimizer_factory: Callable[[torch.nn.Module], Optimizer],
actor_optimizer_factory: Callable,
actor_optimizer_scheduler: Callable[[Optimizer], _LRScheduler],
critic_model_factory: Callable[[], torch.nn.Module],
critic_optimizer_factory: Callable[[torch.nn.Module], Optimizer],
critic_optimizer_factory: Callable,
critic_optimizer_scheduler: Callable[[Optimizer], _LRScheduler],
policy_factory: Callable,
policy_factory: Callable[[], Policy],
agent_id: Optional[str] = None,
update_frequency: int = 20,
n_learning_iterations: int = 10,
batch_size: int = 32,
batch_size: int = 512,
gamma: float = 0.99,
tau: float = 1e-3,
policy_update_frequency: int = 1,
tau: float = 1e-2,
policy_update_frequency: int = 2,
critic_grad_norm_clip: float = 1,
td3: bool = False,
shared_agent_brain: bool = False
):
"""Initialize an Agent object.
Params
======
state_size (int): dimension of each state
state_shape (int): dimension of each state
action_size (int): dimension of each action
random_seed (int): random seed
memory_factory: (Callable) Return a Memory or PrioritizedMemory object
Expand All @@ -74,111 +78,122 @@ def __init__(
tau: (float) Parameter used for soft copying; tau=1 -> a hard copy
policy_update_frequency (int, default=1): The number of time steps to wait before optimizing the policy &
updating the target networks. Introduced in TD3.
shared_agent_brain (bool): Use a shared brain/model/optimizer for all agents
"""
super().__init__(action_size=action_size, state_shape=state_shape)

# Shared Memory
if DDPGAgent.memory is None:
self.agent_id = agent_id
self.shared_agent_brain = shared_agent_brain
if not self.shared_agent_brain:

# Shared Memory
DDPGAgent.memory = memory_factory()
assert batch_size < DDPGAgent.memory.capacity, \
"Batch size {} must be less than memory capacity {}".format(batch_size, DDPGAgent.memory.capacity)

# Shared Actor network
if DDPGAgent.online_actor is None:
DDPGAgent.online_actor = actor_model_factory().to(device).train()
if DDPGAgent.target_actor is None:
DDPGAgent.target_actor = actor_model_factory().to(device).eval()
if DDPGAgent.actor_optimizer is None:
DDPGAgent.actor_optimizer = actor_optimizer_factory(DDPGAgent.online_actor)
if DDPGAgent.actor_optimizer_scheduler is None:
DDPGAgent.online_actor = actor_model_factory().to(device).float().train()
DDPGAgent.target_actor = actor_model_factory().to(device).float().eval()
DDPGAgent.target_actor.load_state_dict(DDPGAgent.online_actor.state_dict())

# Shared Critic network
DDPGAgent.online_critic = critic_model_factory().to(device).float().train()
DDPGAgent.target_critic = critic_model_factory().to(device).float().eval()
DDPGAgent.target_critic.load_state_dict(DDPGAgent.online_critic.state_dict())

DDPGAgent.actor_optimizer = actor_optimizer_factory(DDPGAgent.online_actor.parameters())
DDPGAgent.actor_optimizer_scheduler = actor_optimizer_scheduler(DDPGAgent.actor_optimizer)

# Shared Critic network
if DDPGAgent.online_critic is None:
DDPGAgent.online_critic = critic_model_factory().to(device).train()
if DDPGAgent.target_critic is None:
DDPGAgent.target_critic = critic_model_factory().to(device).eval()
if DDPGAgent.critic_optimizer is None:
DDPGAgent.critic_optimizer = critic_optimizer_factory(DDPGAgent.online_critic)
if DDPGAgent.critic_optimizer_scheduler is None:
DDPGAgent.critic_optimizer = critic_optimizer_factory(DDPGAgent.online_critic.parameters())
DDPGAgent.critic_optimizer_scheduler = critic_optimizer_scheduler(DDPGAgent.actor_optimizer)

# Shared Policy
if DDPGAgent.policy is None:
# Shared Policy
DDPGAgent.policy = policy_factory()
DDPGAgent.policy.step_episode(None)
else:
if DDPGAgent.memory is None:
# Shared Memory
DDPGAgent.memory = memory_factory()

# Shared Actor network
if DDPGAgent.online_actor is None:
DDPGAgent.online_actor = actor_model_factory().to(device).train()
if DDPGAgent.target_actor is None:
DDPGAgent.target_actor = actor_model_factory().to(device).eval()
if DDPGAgent.actor_optimizer is None:
DDPGAgent.actor_optimizer = actor_optimizer_factory(DDPGAgent.online_actor.parameters())
if DDPGAgent.actor_optimizer_scheduler is None:
DDPGAgent.actor_optimizer_scheduler = actor_optimizer_scheduler(DDPGAgent.actor_optimizer)

# Shared Critic network
if DDPGAgent.online_critic is None:
DDPGAgent.online_critic = critic_model_factory().to(device).train()
if DDPGAgent.target_critic is None:
DDPGAgent.target_critic = critic_model_factory().to(device).eval()
if DDPGAgent.critic_optimizer is None:
DDPGAgent.critic_optimizer = critic_optimizer_factory(DDPGAgent.online_critic.parameters())
if DDPGAgent.critic_optimizer_scheduler is None:
DDPGAgent.critic_optimizer_scheduler = critic_optimizer_scheduler(DDPGAgent.actor_optimizer)

self.policy = policy_factory()

assert batch_size < DDPGAgent.memory.capacity, \
"Batch size {} must be less than memory capacity {}".format(batch_size, DDPGAgent.memory.capacity)

# Parameters
self.state_size = state_size
self.action_size = action_size
self.seed = random.seed(random_seed)
self.t_step = 0
self.update_frequency = update_frequency
self.n_learning_iterations = n_learning_iterations
self.batch_size = batch_size
self.gamma = gamma
self.tau = tau
self.policy_update_frequency = policy_update_frequency
self.critic_grad_norm_clip = critic_grad_norm_clip
self.td3 = td3

def set_mode(self, mode: str):
if mode == 'train':
DDPGAgent.online_actor.train()
DDPGAgent.online_critic.train()
self.policy.train()
elif mode == 'eval':
DDPGAgent.online_actor.eval()
DDPGAgent.online_critic.eval()
self.policy.eval()
else:
raise ValueError('Invalid mode: {}'.format(mode))

@staticmethod
def preprocess_state(state):
""" Perform any state preprocessing """
return state

def step_episode(self, episode: int):
""" Perform any end-of-epsiode updates """
if episode > DDPGAgent.episode_counter:
# Only perform once (regardless of the number
# of agents) at end of episode
DDPGAgent.policy.step_episode(episode)
DDPGAgent.actor_optimizer.step()
DDPGAgent.critic_optimizer.step()
DDPGAgent.memory.step_episode(episode)
DDPGAgent.episode_counter += 1

def step(self, experience: Experience):
def step(self, experience: Experience, **kwargs):
"""Save experience in replay memory, and use random sample from buffer to learn."""

self.t_step += 1

DDPGAgent.memory.add(experience)

if self.t_step % self.update_frequency != 0:
if self.warmup:
return
else:
self.t_step += 1
# Learn, if enough samples are available in memory
if self.t_step % self.update_frequency == 0 and len(DDPGAgent.memory) > self.batch_size:
for i in range(self.n_learning_iterations):
experience_batch: ExperienceBatch = DDPGAgent.memory.sample(self.batch_size)
critic_loss, critic_errors, actor_loss, actor_errors = self.learn(experience_batch)

# Learn, if enough samples are available in memory
if len(DDPGAgent.memory) > self.batch_size:
for i in range(self.n_learning_iterations):
experience_batch: ExperienceBatch = DDPGAgent.memory.sample(self.batch_size)
critic_loss, critic_errors, actor_loss, actor_errors = self.learn(experience_batch)
# Update the priority replay buffer
with torch.no_grad():
if critic_errors.min() < 0:
raise RuntimeError("Errors must be > 0, found {}".format(critic_errors.min()))

# Update the priority replay buffer
with torch.no_grad():
if critic_errors.min() < 0:
raise RuntimeError("Errors must be > 0, found {}".format(critic_errors.min()))
priorities = critic_errors.detach().cpu().numpy()
DDPGAgent.memory.update(experience_batch.sample_idxs, priorities)

priorities = critic_errors.detach().cpu().numpy()
DDPGAgent.memory.update(experience_batch.sample_idxs, priorities)
def step_episode(self, episode: int, *args) -> None:
self.policy.step_episode(episode)

def get_action(self, state: torch.Tensor, add_noise=True) -> Action:
def get_action(self, state: torch.Tensor, *args, **kwargs) -> Action:
"""Returns actions for given state as per current policy."""
state = state.to(device)
action = DDPGAgent.policy.get_action(state, DDPGAgent.online_actor, add_noise)
action: Action = self.policy.get_action(state, DDPGAgent.online_actor)
return action

def get_random_action(self, *args):
def get_random_action(self, *args,**kwargs) -> Action:
""" Get a random action, used for warmup"""
return self.policy.get_random_action()
action: Action = self.policy.get_random_action()
return action

def learn(self, experience_batch: ExperienceBatch) -> tuple:
"""Update value parameters using given batch of experience tuples and return TD error
Expand All @@ -193,7 +208,7 @@ def learn(self, experience_batch: ExperienceBatch) -> tuple:
actor_errors
"""
experience_batch = experience_batch.to(device)
critic_loss, critic_errors = DDPGAgent.policy.compute_critic_errors(
critic_loss, critic_errors = self.policy.compute_critic_errors(
experience_batch,
online_actor=DDPGAgent.online_actor,
online_critic=DDPGAgent.online_critic,
Expand All @@ -208,7 +223,7 @@ def learn(self, experience_batch: ExperienceBatch) -> tuple:

if self.t_step % self.policy_update_frequency == 0:
# Delay the policy update as in TD3
actor_loss, actor_errors = DDPGAgent.policy.compute_actor_errors(
actor_loss, actor_errors = self.policy.compute_actor_errors(
experience_batch,
online_actor=DDPGAgent.online_actor,
online_critic=DDPGAgent.online_critic,
Expand Down
Loading

0 comments on commit 143c4c1

Please sign in to comment.