-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmemory.py
122 lines (107 loc) · 4.83 KB
/
memory.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
import numpy as np
import torch
class ReplayBuffer(object):
def __init__(self, state_dim, action_dim, agents, max_size=int(500000)):
self.max_size = max_size
self.ptr = 0
self.size = 0
self.agents = agents
self.state = np.zeros((max_size, state_dim))
self.action = np.zeros((max_size, action_dim))
self.next_state = np.zeros((max_size, state_dim))
self.reward = np.zeros((max_size, 1))
self.int_reward = np.zeros((max_size, 1))
self.n_step = np.zeros((max_size, 1))
self.exp_n_step = np.zeros((max_size, 1))
self.not_done = np.zeros((max_size, 1))
if agents >=2:
self.o_state = np.zeros((max_size, state_dim))
self.o_next_state = np.zeros((max_size, state_dim))
self.o_int_reward = np.zeros((max_size, 1))
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
def add(self,state,action,next_state,reward,ex_rew,n_step,ex_n_step,done):
self.state[self.ptr] = state
self.action[self.ptr] = action
self.next_state[self.ptr] = next_state
self.reward[self.ptr] = reward
self.int_reward[self.ptr] = ex_rew
self.n_step[self.ptr] = n_step
self.exp_n_step[self.ptr] = ex_n_step
self.not_done[self.ptr] = 1. - done
self.ptr = (self.ptr + 1) % self.max_size
self.size = min(self.size + 1, self.max_size)
def add2(self,state,action,next_state,reward,ex_rew,n_step,ex_n_step,done,o_state,o_next_state,o_int_reward):
self.state[self.ptr] = state
self.action[self.ptr] = action
self.next_state[self.ptr] = next_state
self.reward[self.ptr] = reward
self.int_reward[self.ptr] = ex_rew
self.n_step[self.ptr] = n_step
self.exp_n_step[self.ptr] = ex_n_step
self.not_done[self.ptr] = 1. - done
self.o_state[self.ptr] = o_state
self.o_next_state[self.ptr] = o_next_state
self.o_int_reward[self.ptr] = o_int_reward
self.ptr = (self.ptr + 1) % self.max_size
self.size = min(self.size + 1, self.max_size)
def sample(self, batch_size):
ind = np.random.randint(0, self.size, size=batch_size)
return (
torch.FloatTensor(self.state[ind]).to(self.device),
torch.FloatTensor(self.action[ind]).to(self.device),
torch.FloatTensor(self.next_state[ind]).to(self.device),
torch.FloatTensor(self.reward[ind]).to(self.device),
torch.FloatTensor(self.int_reward[ind]).to(self.device),
torch.FloatTensor(self.n_step[ind]).to(self.device),
torch.FloatTensor(self.exp_n_step[ind]).to(self.device),
torch.FloatTensor(self.not_done[ind]).to(self.device)
)
def sample2(self, batch_size):
ind = np.random.randint(0, self.size, size=batch_size)
return (
torch.FloatTensor(self.state[ind]).to(self.device),
torch.FloatTensor(self.action[ind]).to(self.device),
torch.FloatTensor(self.next_state[ind]).to(self.device),
torch.FloatTensor(self.reward[ind]).to(self.device),
torch.FloatTensor(self.int_reward[ind]).to(self.device),
torch.FloatTensor(self.n_step[ind]).to(self.device),
torch.FloatTensor(self.exp_n_step[ind]).to(self.device),
torch.FloatTensor(self.not_done[ind]).to(self.device),
torch.FloatTensor(self.o_state[ind]).to(self.device),
torch.FloatTensor(self.o_next_state[ind]).to(self.device),
torch.FloatTensor(self.o_int_reward[ind]).to(self.device)
)
def sample_range(self, size):
ind = np.arange(0, size)
return (
torch.FloatTensor(self.state[ind]).to(self.device),
torch.FloatTensor(self.action[ind]).to(self.device),
torch.FloatTensor(self.next_state[ind]).to(self.device),
torch.FloatTensor(self.reward[ind]).to(self.device),
torch.FloatTensor(self.int_reward[ind]).to(self.device),
torch.FloatTensor(self.n_step[ind]).to(self.device),
torch.FloatTensor(self.exp_n_step[ind]).to(self.device),
torch.FloatTensor(self.not_done[ind]).to(self.device)
)
def save(self,folder,episode_num):
f = open(folder+'/params_{}'.format(episode_num),'w')
f.write(str(self.max_size)+','+str(self.ptr)+','+str(self.size))
f.close()
np.save(folder+'/state_{}'.format(episode_num),self.state)
np.save(folder+'/action_{}'.format((episode_num)),self.action)
np.save(folder+'/next_state_{}'.format(episode_num),self.next_state)
np.save(folder+'/reward_{}'.format(episode_num),self.reward)
np.save(folder+'/int_reward_{}'.format(episode_num),self.int_reward)
np.save(folder+'/not_done_{}'.format(episode_num),self.not_done)
def load(self,folder,episode_num):
f = open(folder+'/params_{}'.format(episode_num),'r')
a = f.read()
a = a.split(',')
self.max_size,self.ptr,self.size = int(a[0]),int(a[1]),int(a[2])
f.close()
self.state = np.load(folder+'/state_{}.npy'.format(episode_num))
self.action = np.load(folder+'/action_{}.npy'.format(episode_num))
self.next_state = np.load(folder+'/next_state_{}.npy'.format(episode_num))
self.reward = np.load(folder+'/reward_{}.npy'.format(episode_num))
self.int_reward = np.load(folder+'/int_reward_{}.npy'.format(episode_num))
self.not_done = np.load(folder+'/not_done_{}.npy'.format(episode_num))