%load_ext autoreload
%autoreload 2
%reload_ext autoreload
%run ../../notebooks/setup.py
%matplotlib inline
%config InlineBackend.figure_format = 'png'
import sys
sys.path.insert(0, '../../')
import torch
import pandas as pd
from time import perf_counter
from src.network import Network
from src.decode import decode_bump, decode_bump_torch
def convert_seconds(seconds):
h = seconds // 3600
m = (seconds % 3600) // 60
s = seconds % 60
return h, m, s
def get_theta(a, b, GM=0, IF_NORM=0):
if GM:
b = b - np.dot(b, a) / np.dot(a, a) * a
if IF_NORM:
u = a / np.linalg.norm(a)
v = b / np.linalg.norm(b)
else:
u=a
v=b
return np.arctan2(v, u)
def normalize(v):
return v / np.linalg.norm(v)
def project(x, u):
return x * u
# return np.dot(x, u) * u
def sort_by_angle(x, u, v):
u_hat = normalize(u)
v_hat = normalize(v)
x_proj_u = project(x, u_hat)
x_proj_v = project(x, v_hat)
# x_proj = x_proj_u + x_proj_v
theta = np.arctan2(x_proj_v, x_proj_u) + np.pi
# cos_theta = np.dot(x_proj, u_hat) / np.linalg.norm(x_proj) * u_hat
# sin_theta = np.dot(x_proj, v_hat) / np.linalg.norm(x_proj) * v_hat
# theta = np.arctan2(sin_theta, cos_theta)
# Pair up each element of x with the corresponding angle
# x_angle_pairs = list(zip(x, theta))
# Sort based on the angle
# x_angle_pairs.sort(key=lambda pair: pair[1])
# Extract the sorted elements
# sorted_x = [pair[0] for pair in x_angle_pairs]
return theta
def get_idx(model):
ksi = model.PHI0.cpu().detach().numpy()
print(ksi.shape)
idx = np.arange(0, len(ksi[0]))
theta = get_theta(ksi[0], ksi[2], GM=0, IF_NORM=0)
return theta.argsort()
def get_overlap(model, rates):
ksi = model.PHI0.cpu().detach().numpy()
return rates @ ksi.T / rates.shape[-1]
def get_fourier_moments(signal, axis=-1):
# Perform the FFT
fft_coeffs = np.fft.fft(signal, axis=axis)
# Calculate the zero, first, and second Fourier moments
zero_moment = fft_coeffs[..., 0]
first_moment = fft_coeffs[..., 1]
# Calculate magnitude m0, m1, and m2
m0 = np.abs(zero_moment) / signal.shape[axis] # Normalize m0 by the signal length
m1 = 2.0 * np.abs(first_moment) / signal.shape[axis]
# Calculate the phase of the signal
phases = (np.angle(first_moment) + np.pi) % (2.0 * np.pi) - np.pi
return m0, m1, phases
def compute_fourier_moments(signal, dim=-1):
# Perform the FFT
fft_coeffs = torch.fft.fft(signal, dim=dim)
# Calculate the zero, first, and second Fourier moments
zero_moment = fft_coeffs[..., 0]
first_moment = fft_coeffs[..., 1]
# second_moment = fft_coeffs[..., 2]
# Calculate magnitude m0, m1, and m2
m0 = torch.abs(zero_moment) / signal.size(dim) # Normalize m0 by the signal length
m1 = 2.0 * torch.abs(first_moment) / signal.size(dim)
# m2 = 2.0 * torch.abs(second_moment) / signal.size(dim)
# Calculate the phase of the signal
phases = torch.angle(first_moment) % (2.0 * torch.pi)
return m0, m1, phases
from src.stimuli import Stimuli
ff_input = Stimuli(task='odr', size=(10, 1000))(1, 1, np.pi/2, rnd_phase=0).cpu().numpy()
print(ff_input.shape)
plt.plot(ff_input.T[:, :5])
plt.xticks(np.linspace(0, 1000, 5), np.linspace(0, 360, 5).astype(int))
plt.xlabel('Neuron #')
plt.ylabel('Input Strength')
plt.title('ODR')
plt.show()
m0, m1, phase = decode_bump(ff_input)
print(phase * 180 / np.pi)
# print((360 - phase * 180 / np.pi))
xi = torch.randn((2, 1000), device='cuda')
ff_input = Stimuli(task='dual', size=(10, 1000))(-1, 1, xi[1]).cpu().detach().numpy()
print(ff_input.shape)
theta = get_theta(xi[0].cpu().numpy(), xi[1].cpu().numpy(), GM=0, IF_NORM=0)
theta = np.arctan2(xi[1].cpu().numpy(), xi[0].cpu().numpy())
index_order = theta.argsort()
ff_input = ff_input[index_order]
plt.plot(ff_input)
plt.xlabel('Neuron #')
plt.ylabel('Input Strength')
plt.title('Dual Task')
plt.show()
m0, m1, phase = decode_bump(ff_input)
print(phase * 180 / np.pi)
REPO_ROOT = "/home/leon/models/NeuroFlame"
model = Network('train_odr.yml', REPO_ROOT, VERBOSE=0, DEVICE='cuda', TASK='odr', N_BATCH=10, seed=0)
Ne = model.Na[0].cpu().numpy()
N = model.N_NEURON
print(model.PHI0.shape)
model.PHI0 = torch.randint(0, 360, (model.N_BATCH, 2, 1)).to('cuda')
ff_input = model.init_ff_input()
print('ff', ff_input.shape)
rates = model(ff_input=ff_input)
print('rates', rates.shape)
# m0, m1, phase = decode_bump(ff_input[..., model.slices[0]].cpu().numpy())
# F0, F1, phi = decode_bump(rates.cpu().detach().numpy())
m0, m1, phase = get_fourier_moments(ff_input[..., model.slices[0]].cpu().numpy())
F0, F1, phi = get_fourier_moments(rates.cpu().detach().numpy())
m0, m1, phase = decode_bump_torch(ff_input[..., model.slices[0]])
F0, F1, phi = decode_bump_torch(rates)
try:
phase = phase.cpu().numpy()
phi = phi.cpu().detach().numpy()
m1 = m1.cpu().numpy()
F1 = F1.cpu().detach().numpy()
except:
pass
print(m0.shape, F0.shape)
fig, ax = plt.subplots(1, 3, figsize=(2.5*width, height))
xtime = np.linspace(-model.T_STEADY, model.DURATION, ff_input.shape[1])
idx = np.random.randint(model.N_BATCH)
ax[0].imshow(ff_input[idx].T.cpu().numpy(),
cmap='jet', aspect='auto',
extent=[-model.T_STEADY, model.DURATION, 0, 1000])
ax[0].set_xlabel('Step')
ax[0].set_ylabel('Neuron #')
ax[0].set_ylim([0, Ne])
ax[1].plot(xtime, m1[idx].T)
ax[1].set_xlabel('Step')
ax[1].set_ylabel('$\mathcal{F}_1$')
ax[2].plot(xtime, phase[idx].T * 180 / np.pi)
ax[2].set_xlabel('Step')
ax[2].set_ylabel('Phase (°)')
ax[2].axhline(model.PHI0.cpu().numpy()[idx, 0]*180/np.pi, color='k', ls='--', label='Stim 1')
ax[2].axhline(model.PHI0.cpu().numpy()[idx, 1]*180/np.pi, color='r', ls='--', label='Stim 2')
# plt.legend(fontsize=12)
plt.show()
fig, ax = plt.subplots(1, 3, figsize=(2.5*width, height))
xtime = np.linspace(0, model.DURATION, rates.shape[1])
idx = np.random.randint(model.N_BATCH)
ax[0].imshow(rates[idx].T.cpu().detach().numpy(),
cmap='jet', aspect='auto',
extent=[0, model.DURATION, 0, 1000])
ax[0].set_xlabel('Step')
ax[0].set_ylabel('Neuron #')
ax[0].set_ylim([0, Ne])
ax[1].plot(xtime, F1[idx].T)
ax[1].set_xlabel('Step')
ax[1].set_ylabel('$\mathcal{F}_1$')
ax[2].plot(xtime, phi[idx].T * 180 / np.pi)
ax[2].set_xlabel('Step')
ax[2].set_ylabel('Phase (°)')
ax[2].axhline(model.PHI0.cpu().numpy()[idx, 0]*180/np.pi, color='k', ls='--', label='Stim 1')
ax[2].axhline(model.PHI0.cpu().numpy()[idx, 1]*180/np.pi, color='r', ls='--', label='Stim 2')
# plt.legend(fontsize=12)
plt.show()
REPO_ROOT = "/home/leon/models/NeuroFlame"
model = Network('config_odr.yml', REPO_ROOT, VERBOSE=0, DEVICE='cuda', TASK='odr_rand', N_BATCH=10)
Ne = model.Na[0].cpu().numpy()
N = model.N_NEURON
ff_input = model.init_ff_input().cpu().numpy()
print(ff_input.shape)
m0, m1, phase = decode_bump(ff_input[..., model.slices[0]])
print(m0.shape)
fig, ax = plt.subplots(1, 3, figsize=(2.25*width, height))
idx = np.random.randint(model.N_BATCH)
ax[0].imshow(ff_input[idx].T, cmap='jet', aspect='auto')
ax[0].set_xlabel('Step')
ax[0].set_ylabel('Neuron #')
ax[0].set_ylim([0, Ne])
ax[1].plot(m1[idx].T)
ax[1].set_xlabel('Step')
ax[1].set_ylabel('$\mathcal{F}_1$')
ax[2].plot(phase[idx].T * 180 / np.pi)
ax[2].set_xlabel('Step')
ax[2].set_ylabel('Phase (°)')
ax[2].axhline(model.phase.cpu().numpy()[idx]*180/np.pi, color='k', ls='--')
print(model.phase[idx].item()*180/np.pi)
plt.show()
plt.hist(model.phase.cpu().numpy() * 180 / np.pi, bins=20)
plt.hist(phase[:, model.N_STIM_ON[0]]* 180 / np.pi, bins=20, histtype='step')
plt.show()
REPO_ROOT = "/home/leon/models/NeuroFlame"
model = Network('config_EI.yml', REPO_ROOT, VERBOSE=0, DEVICE='cuda', TASK='dual_rand', LIVE_FF_UPDATE=0, N_BATCH=10)
Ne = model.Na[0].cpu().numpy()
N = model.N_NEURON
ff_input = model.init_ff_input().cpu().numpy()
print(ff_input.shape)
ksi = model.PHI0.cpu().numpy()
theta = get_theta(ksi[0], ksi[2], GM=0, IF_NORM=0)
index_order = theta.argsort()
ff_ordered = ff_input[..., index_order]
m0, m1, phase = decode_bump(ff_ordered)
print(m0.shape)
fig, ax = plt.subplots(1, 3, figsize=(2.25*width, height))
ax[0].plot(ff_input[0, :, :5])
ax[0].set_xlabel('Step')
ax[0].set_ylabel('FF Input')
ax[1].imshow(ff_input[0].T, cmap='jet', vmin=0, vmax= 400, aspect='auto')
ax[1].set_xlabel('Step')
ax[1].set_ylabel('Neuron #')
ax[1].set_ylim([0, Ne])
ax[2].imshow(ff_ordered[0].T, cmap='jet', vmin=0, aspect='auto')
ax[2].set_xlabel('Step')
ax[2].set_ylabel('Pref Loc. (°)')
ax[2].set_yticks(np.linspace(0, 2000, 5), np.linspace(0, 360, 5).astype(int))
plt.show()
fig, ax = plt.subplots(1, 3, figsize=(2.25*width, height))
ax[0].plot(m0.T)
ax[0].set_xlabel('Step')
ax[0].set_ylabel('$\mathcal{F}_0$')
ax[1].plot(m1.T)
ax[1].set_xlabel('Step')
ax[1].set_ylabel('$\mathcal{F}_1$')
ax[2].plot(phase.T * 180 / np.pi)
ax[2].set_xlabel('Step')
ax[2].set_ylabel('$\Phi$ (°)')
plt.show()
plt.hist(model.phase.cpu().numpy() * 180 / np.pi, bins=10, histtype='step')
plt.hist(360-phase[:, model.N_STIM_ON[0]]* 180 / np.pi, bins=10, histtype='step')
plt.show()
REPO_ROOT = "/home/leon/models/NeuroFlame"
model = Network('config_odr.yml', REPO_ROOT, VERBOSE=0, DEVICE='cuda', TASK='odr_rand', LIVE_FF_UPDATE=1, N_BATCH=10)
rates = model(RET_FF=1)
Ne = model.Na[0].cpu().numpy()
N = model.N_NEURON
ff_input = model.ff_input.cpu().numpy()
print(ff_input.shape)
m0, m1, phase = decode_bump(ff_input[..., model.slices[0]])
print(m0.shape)
fig, ax = plt.subplots(1, 3, figsize=(2.25*width, height))
ax[0].plot(ff_input[0, :, :5])
ax[0].set_xlabel('Step')
ax[0].set_ylabel('FF Input')
ax[1].imshow(ff_input[0].T, cmap='jet', vmin=0, vmax= 400, aspect='auto')
ax[1].set_xlabel('Step')
ax[1].set_ylabel('Neuron #')
ax[1].set_ylim([0, Ne])
# ax[2].imshow(ff_ordered[0].T, cmap='jet', vmin=0, aspect='auto')
# ax[2].set_xlabel('Step')
# ax[2].set_ylabel('Pref Loc. (°)')
# ax[2].set_yticks(np.linspace(0, 2000, 5), np.linspace(0, 360, 5).astype(int))
plt.show()
fig, ax = plt.subplots(1, 3, figsize=(2.25*width, height))
ax[0].plot(m0.T)
ax[0].set_xlabel('Step')
ax[0].set_ylabel('$\mathcal{F}_0$')
ax[1].plot(m1.T)
ax[1].set_xlabel('Step')
ax[1].set_ylabel('$\mathcal{F}_1$')
ax[2].plot(phase.T * 180 / np.pi)
ax[2].set_xlabel('Step')
ax[2].set_ylabel('$\Phi$ (°)')
plt.show()
plt.hist(model.phase.cpu().numpy() * 180 / np.pi, bins='auto')
plt.hist(360 - phase[:, model.N_STIM_ON[0] // model.N_WINDOW]* 180 / np.pi, bins='auto')
plt.show()
REPO_ROOT = "/home/leon/models/NeuroFlame"
model = Network('config_EI.yml', REPO_ROOT, VERBOSE=0, DEVICE='cuda', TASK='dual_rand', LIVE_FF_UPDATE=1, N_BATCH=10)
rates = model(RET_FF=1)
Ne = model.Na[0].cpu().numpy()
N = model.N_NEURON
ff_input = model.ff_input.cpu().numpy()
print(ff_input.shape)
ksi = model.PHI0.cpu().numpy()
theta = get_theta(ksi[0], ksi[2], GM=0, IF_NORM=0)
index_order = theta.argsort()
ff_ordered = ff_input[..., index_order]
m0, m1, phase = decode_bump(ff_ordered)
print(m0.shape)
fig, ax = plt.subplots(1, 3, figsize=(2.25*width, height))
ax[0].plot(ff_input[0, :, :5])
ax[0].set_xlabel('Step')
ax[0].set_ylabel('FF Input')
ax[1].imshow(ff_input[0].T, cmap='jet', vmin=0, vmax= 400, aspect='auto')
ax[1].set_xlabel('Step')
ax[1].set_ylabel('Neuron #')
ax[1].set_ylim([0, Ne])
ax[2].imshow(ff_ordered[0].T, cmap='jet', vmin=0, aspect='auto')
ax[2].set_xlabel('Step')
ax[2].set_ylabel('Pref Loc. (°)')
ax[2].set_yticks(np.linspace(0, 2000, 5), np.linspace(0, 360, 5).astype(int))
plt.show()
fig, ax = plt.subplots(1, 3, figsize=(2.25*width, height))
ax[0].plot(m0.T)
ax[0].set_xlabel('Step')
ax[0].set_ylabel('$\mathcal{F}_0$')
ax[1].plot(m1.T)
ax[1].set_xlabel('Step')
ax[1].set_ylabel('$\mathcal{F}_1$')
ax[2].plot(phase.T * 180 / np.pi)
ax[2].set_xlabel('Step')
ax[2].set_ylabel('$\Phi$ (°)')
plt.show()
plt.hist(model.phase.cpu().numpy() * 180 / np.pi, bins='auto')
plt.hist(phase[:, model.N_STIM_ON[0] // model.N_WINDOW] * 180 / np.pi, bins='auto')
plt.show()
REPO_ROOT = "/home/leon/models/NeuroFlame"
model = Network('config_odr.yml', REPO_ROOT, VERBOSE=0, DEVICE='cuda', TASK='odr_rand', LIVE_FF_UPDATE=0, N_BATCH=10, seed=0)
Ne = model.Na[0].cpu().numpy()
N = model.N_NEURON
ff_input = model.init_ff_input().cpu().numpy()
print(ff_input.shape)
print(model.random_shifts)
m0, m1, phase = decode_bump(ff_input[..., model.slices[0]])
print(m0.shape)
fig, ax = plt.subplots(1, 3, figsize=(2.25*width, height))
idx = np.random.randint(model.N_BATCH, size=(1,))
ax[0].imshow(ff_input[idx].T, cmap='jet', aspect='auto')
ax[0].set_xlabel('Step')
ax[0].set_ylabel('Neuron #')
ax[0].set_ylim([0, Ne])
ax[1].plot(m1[idx].T)
ax[1].set_xlabel('Step')
ax[1].set_ylabel('$\mathcal{F}_1$')
ax[2].plot(phase[idx].T * 180 / np.pi)
ax[2].set_xlabel('Step')
ax[2].set_ylabel('Phase (°)')
plt.show()
plt.hist(model.phase.cpu().numpy() * 180 / np.pi, bins='auto')
plt.hist(360 - phase[:, model.N_STIM_ON[0]]* 180 / np.pi, bins='auto')
plt.show()