forked from rradules/equilibria_monfg
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathQLearnerESR.py
136 lines (112 loc) · 4.72 KB
/
QLearnerESR.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
import random
import numpy as np
from scipy.optimize import minimize
# TODO: Make code less redundant by implementing a class hierarchy
# with generic Q-Learner as a parent to ESR and SER QLearner classes
#
def scalarize(agent, rewards):
scalar = None
if agent == 0:
scalar = (rewards[0] * rewards[0]) + (rewards[1] * rewards[1])
elif agent == 1:
scalar = rewards[0] * rewards[1]
return scalar
# # Comment the above scalarize function and uncomment the one below before running SONFG's
# def scalarize(agent, rewards):
# scalar = None
# if agent == 0:
# scalar = rewards[0]
# elif agent == 1:
# scalar = rewards[1]
# return scalar
# Put Q_values into boltzman distribution
def gibs_boltzman(q_table, curr_state, T):
transposed_q_values = q_table[curr_state].T[0]
a = np.exp(transposed_q_values / T)
b = a.sum(axis=0)
return a / b
class QLearnerESR:
def __init__(self, agent_id, alpha, gamma, epsilon, num_states, num_actions, opt=False, rand_prob=False):
self.agent_id = agent_id
self.alpha = alpha
self.gamma = gamma
self.epsilon = epsilon
self.num_states = num_states
self.num_actions = num_actions
self.num_objectives = 1
# optimistic initialization of Q-table
if opt:
self.q_table = np.ones((num_states, num_actions, self.num_objectives)) * 20
else:
self.q_table = np.zeros((num_states, num_actions, self.num_objectives))
self.current_state = -1
self.rand_prob = rand_prob
self.sc = 0
self.T = 1000
def update_q_table(self, prev_state, action, curr_state, reward):
scalarized_reward = scalarize(self.agent_id, reward)
old_q = self.q_table[prev_state][action]
next_q = self.q_table[curr_state, :]
new_q = np.zeros(self.num_objectives)
for o in range(self.num_objectives):
new_q[o] = old_q[o] + self.alpha * (scalarized_reward + self.gamma * max(next_q[:, o]) - old_q[o])
self.q_table[prev_state][action][o] = new_q[o]
# epsilon greedy based on nonlinear optimiser mixed strategy search
def select_action_mixed_nonlinear(self, state):
self.current_state = state
if random.uniform(0.0, 1.0) < self.epsilon:
return self.select_random_action()
else:
return self.select_action_greedy_mixed_nonlinear(state)
# random action selection
def select_random_action(self):
random_action = np.random.randint(self.num_actions)
return random_action
# softmax action selection
def select_action_softmax(self, state):
self.current_state = state
q_table_boltz_dist = gibs_boltzman(self.q_table, self.current_state, self.T)
scale = []
trigger = random.random()
total = 0
for i in q_table_boltz_dist:
total = total + i
scale.append(total)
action_selection = None
for point in range(len(scale)):
if trigger <= scale[point]:
action_selection = point
break
return action_selection
# greedy action selection based on nonlinear optimiser mixed strategy search
def select_action_greedy_mixed_nonlinear(self, state):
strategy = self.calc_mixed_strategy_nonlinear(state)
if isinstance(strategy, int) or isinstance(strategy, np.int64):
return strategy
else:
if np.sum(strategy) != 1:
strategy = strategy / np.sum(strategy)
return np.random.choice(range(self.num_actions), p=strategy)
def calc_mixed_strategy_nonlinear(self, state):
if self.rand_prob:
s0 = np.random.random(self.num_actions)
s0 /= np.sum(s0)
else:
s0 = np.full(self.num_actions, 1.0 / self.num_actions) # initial guess set to equal prob over all actions
b = (0.0, 1.0)
bnds = (b,) * self.num_actions
con1 = {'type': 'eq', 'fun': lambda x: np.sum(x) - 1}
cons = ([con1])
solution = minimize(self.objective, s0, bounds=bnds, constraints=cons)
strategy = solution.x
return strategy
# this is the objective function to be minimised by the nonlinear optimiser
# (therefore it returns the negative of ESR)
def objective(self, strategy):
return - self.calc_esr_from_strategy(strategy)
# Calculates the SER for a given strategy using the agent's own Q values
def calc_esr_from_strategy(self, strategy):
esr = self.calc_expected_scalar(self.current_state, strategy)
return esr
def calc_expected_scalar(self, state, strategy):
return np.dot(self.q_table[state, :, 0], np.array(strategy)) # expected scalar