-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsetup.py
99 lines (70 loc) · 3.05 KB
/
setup.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
'''
setup file
- create the two different bayesian models to be analyzed
- create the rwmh algorithm code
'''
import numpy as np
import numpy.random as rnd
import matplotlib.pyplot as plt
from scipy.stats import multivariate_normal as mvnorm
class bayes_model:
m0 = np.array([0.,0.])
Sigma0 = np.array([[10.,0.], [0., 10.]])
def __init__(self, data, noise_level):
self.data = data
self.q = data['Q']
self.Gamma = np.diag(noise_level * np.log(self.q)) #Gamma depends on noise level and is set from task to task
def prior(self, x):
return mvnorm(mean=self.m0, cov=self.Sigma0).pdf(x)
def likelihood(self, x):
temp = np.log(self.q) - self.A @ x
return np.exp(-1/2 * temp.T @ self.Gamma @ temp) # TODO check matrix norm
def posterior(self, x):
return self.prior(x) * self.likelihood(x)
def formula_post_mean_var(self):
mean = self.m0 + self.Sigma0 @ self.A.T @ np.linalg.inv(self.Gamma + self.A @ self.Sigma0 @ self.A.T) @ (self.q- self.A @ self.m0)
cov = self.Sigma0 - self.Sigma0 @ self.A.T @ np.linalg.inv(self.Gamma + self.A @ self.Sigma0 @ self.A.T) @ self.A @ self.Sigma0
return (mean, cov)
class model1(bayes_model):
def __init__(self, data, noise_level):
super().__init__(data, noise_level)
self.A = np.column_stack((np.ones(len(data)), np.log(data['H'].values)))
class model2(bayes_model):
def __init__(self, data, noise_level):
super().__init__(data, noise_level)
self.A = 0 # TODO
class rwmh:
n = 2 # dimension of the samples
def __init__(self, model, prop_var, x0, N):
self.sigma2 = prop_var
self.model = model
self.x0 = x0
self.N = N
def proposal(self):
#return rnd.multivariate_normal(mean=np.zeros(self.n), cov=self.sigma2*np.eye(self.n))
return mvnorm(mean=np.zeros(self.n), cov=self.sigma2*np.eye(self.n)).rvs()
def execute(self):
samples = np.empty((self.N, self.n)) # storage for generated samples
samples[0, :] = self.x0 # frist element in the chain is x0
x = self.x0 # initialization
dens_x = self.model.posterior(x)
accptd = 0 # number of accepted proposals
for j in range(1, self.N):
xp = x + self.proposal() # x proposed sample
dens_xp = self.model.posterior(xp)
acc_prob = np.min([1, dens_xp / dens_x])
if acc_prob >= rnd.random(): # accept xp
x = xp
dens_x = dens_xp
accptd += 1
samples[j, :] = x
if j % 100 == 0:
# Print acceptance rate every 100th step
print("Acceptance rate: %f" % (accptd/j))
np.savetxt('samples.txt', samples)
plt.figure()
plt.plot(range(1, self.N+1), samples[:, 0]) # plot first component of all chain elements
plt.plot(range(1, self.N+1), samples[:, 1]) # plot first component of all chain elements
plt.tight_layout()
plt.savefig('plot.png')
plt.show()