From 7d04bfa08253a1e6acb82a3d0ed3364f9b2edd41 Mon Sep 17 00:00:00 2001 From: Joshua Spear Date: Wed, 29 May 2024 14:29:12 +0100 Subject: [PATCH] reworked weighting approach. Removed discounting option and included cumulative option --- src/offline_rl_ope/OPEEstimators/IS.py | 6 +- src/offline_rl_ope/OPEEstimators/utils.py | 74 +- tests/Metrics/test_EffectiveSampleSize.py | 45 +- tests/Metrics/test_ValidWeightsProp.py | 40 +- tests/OPEEstimators/test_DoublyRobust.py | 326 +++++---- tests/OPEEstimators/test_IS.py | 123 ++-- tests/OPEEstimators/test_utils.py | 754 ++++++++++++++------- tests/base.py | 187 +++-- tests/components/test_ImportanceSampler.py | 363 +++++----- tests/components/test_Policy.py | 375 +++++----- 10 files changed, 1298 insertions(+), 995 deletions(-) diff --git a/src/offline_rl_ope/OPEEstimators/IS.py b/src/offline_rl_ope/OPEEstimators/IS.py index dc508de..28acd43 100644 --- a/src/offline_rl_ope/OPEEstimators/IS.py +++ b/src/offline_rl_ope/OPEEstimators/IS.py @@ -1,5 +1,5 @@ import torch -from typing import Any, Dict, List +from typing import Any, Dict, List, Union from jaxtyping import jaxtyped, Float from typeguard import typechecked as typechecker @@ -21,7 +21,7 @@ def __init__( clip_weights:bool=False, cache_traj_rewards:bool=False, clip:float=0.0, - norm_kwargs:Dict[str,Any] = {} + norm_kwargs:Dict[str,Union[str,bool]] = {} ) -> None: super().__init__(cache_traj_rewards) assert isinstance(norm_weights,bool) @@ -75,7 +75,7 @@ def __init__( clip_weights:bool=False, clip: float = 0.0, cache_traj_rewards:bool=False, - norm_kwargs:Dict[str,Any] = {} + norm_kwargs:Dict[str,Union[str,bool]] = {} ) -> None: super().__init__(norm_weights=norm_weights, clip_weights=clip_weights, clip=clip, cache_traj_rewards=cache_traj_rewards, diff --git a/src/offline_rl_ope/OPEEstimators/utils.py b/src/offline_rl_ope/OPEEstimators/utils.py index 01adde5..76d04fb 100644 --- a/src/offline_rl_ope/OPEEstimators/utils.py +++ b/src/offline_rl_ope/OPEEstimators/utils.py @@ -28,17 +28,17 @@ def __init__( self, smooth_eps:float=0.0, avg_denom:bool=False, - discount:float=1.0, + cumulative:bool=False, *args, **kwargs ) -> None: assert isinstance(smooth_eps,float) assert isinstance(avg_denom,bool) - assert isinstance(discount,float) + assert isinstance(cumulative,bool) self.smooth_eps = smooth_eps self.avg_denom = avg_denom - self.discount = discount - + self.cumulative = cumulative + @jaxtyped(typechecker=typechecker) def calc_norm( self, @@ -49,22 +49,40 @@ def calc_norm( smooth_eps prevents nan values occuring in instances where there exists valid time t importance ratios however, these are all 0. This should be set as small as possible. - avg_denom: defines the denominator as the average weight for time t - as per http://proceedings.mlr.press/v48/jiang16.pdf + avg_denom defines the denominator as the average importance weight + rather than the sum of importance weights i.e.: + - http://proceedings.mlr.press/v48/jiang16.pdf and; + - https://arxiv.org/pdf/2005.01643 Note: - - If traj_is_weights represents vanilla IS samples then: - - The denominator will be w_{t} = sum_{i=1}^{n} p_{1:H} for all - samples. + vanilla IS samples => traj_is_weights has entries: + $w_{i,H} = \prod_{t=0}^{H_{i}}w_{i,t}$ + - If traj_is_weights represents vanilla IS samples: + - The denominator will be: + $sum_{i=1}^{n} w_{i,H}$ for all samples. + - If cumulative is True, the denominator will be: + $sum_{i=1}^{n} w_{i,H}$ for all samples i.e., there is no + difference + as the cumulative sum of weights are all the same - If avg_denom is set to true, the denominator will be - w_{t} = 1/n_{t} sum_{i=1}^{n} p_{1:H} where n_{t} is the number of - trajectories of at least length, t. + $\frac{1}{n}sum_{i=1}^{n} w_{i,H}$ + + PD samples => traj_is_weights has entries: + $w_{i,t'} = \prod_{t=0}^{t'}w_{i,t'}$ - If traj_is_weights represents PD IS samples then: - - The denominator will be w_{t} = sum_{i=1}^{n} p_{1:t}. + - The denominator will be: + $sum_{i=1}^{n} w_{i,H}$ for all samples i.e., the same as for + vanilla IS - If avg_denom is set to true, the denominator will be - w_{t} = 1/n_{t} sum_{i=1}^{n} p_{1:t} where n_{t} is the number of - trajectories of at least length, t. This definition aligns with - http://proceedings.mlr.press/v48/jiang16.pdf + $\frac{1}{n}sum_{i=1}^{n} w_{i,H}$ + - If cumulative is True, the denominator will be: + [i,t] entry of the weights will be $sum_{i=1}^{n} w_{i,t'}$ + i.e., the value will be the same across all trajectories, + for a time point + - If avg_denom is set to true, the denominator will be + [i,t] entry of the weights will be + $\frac{1}{n}sum_{i=1}^{n} w_{i,t'}$ + Args: traj_is_weights (torch.Tensor): (# trajectories, max(traj_length)) Tensor. traj_is_weights[i,j] defines the jth timestep propensity @@ -74,7 +92,7 @@ def calc_norm( ith trajectory was observed Returns: - torch.Tensor: Tensor of dimension (# trajectories, 1) + torch.Tensor: Tensor of dimension (1 max(traj_length)) defining the normalisation value for each timestep """ # assert isinstance(traj_is_weights,torch.Tensor) @@ -82,17 +100,21 @@ def calc_norm( # assert traj_is_weights.shape == is_msk.shape # check_array_dim(traj_is_weights,2) # check_array_dim(is_msk,2) - discnt_tens = torch.full(traj_is_weights.shape, self.discount) - discnt_pows = torch.arange(0, traj_is_weights.shape[1])[None,:].repeat( - traj_is_weights.shape[0],1) - discnt_tens = torch.pow(discnt_tens,discnt_pows) - traj_is_weights = torch.mul(traj_is_weights,discnt_tens) - denom = ( - traj_is_weights.sum(dim=0, keepdim=True) + self.smooth_eps - ) + if self.cumulative: + # For each timepoint, sum across the trajectories + denom = ( + traj_is_weights.sum(dim=0, keepdim=True) + self.smooth_eps + ) + else: + # Find the index of the final step for each trajectory + _final_idx = is_msk.cumsum(dim=1).argmax(dim=1) + # Find the associated weight of each trajectory and sum + denom = traj_is_weights[ + torch.arange(traj_is_weights.shape[0]), _final_idx].sum() + denom = denom.repeat((1,traj_is_weights.shape[1])) + self.smooth_eps + if self.avg_denom: - denom = denom/( - is_msk.sum(dim=0, keepdim=True)+self.smooth_eps) + denom = denom/traj_is_weights.shape[0] return denom @jaxtyped(typechecker=typechecker) diff --git a/tests/Metrics/test_EffectiveSampleSize.py b/tests/Metrics/test_EffectiveSampleSize.py index ba3a33f..1c39218 100644 --- a/tests/Metrics/test_EffectiveSampleSize.py +++ b/tests/Metrics/test_EffectiveSampleSize.py @@ -3,32 +3,23 @@ import numpy as np from offline_rl_ope.Metrics import EffectiveSampleSize from offline_rl_ope import logger -# from ..base import weight_test_res -from ..base import ( - single_discrete_action_test as sdat, - duel_discrete_action_test as ddat, - bin_discrete_action_test as bdat - ) +from parameterized import parameterized_class +from ..base import test_configs_fmt_class, TestConfig -for test_conf in [sdat,ddat,bdat]: - class TestImportanceSampler: - - def __init__(self) -> None: - self.is_weight_calc = None - self.traj_is_weights = test_conf.weight_test_res - +@parameterized_class(test_configs_fmt_class) +class EffectiveSampleSizeTest(unittest.TestCase): - class EffectiveSampleSizeTest(unittest.TestCase): - - def test_call(self): - num = 2 - weights = test_conf.weight_test_res.sum(dim=1) - assert len(weights) == 2 - denum = 1 + torch.var(weights) - act_res = (num/denum).item() - metric = EffectiveSampleSize(nan_if_all_0=True) - pred_res = metric( - weights=test_conf.weight_test_res - ) - tol = act_res/1000 - np.testing.assert_allclose(pred_res, act_res, atol=tol) \ No newline at end of file + test_conf:TestConfig + + def test_call(self): + num = 2 + weights = self.test_conf.weight_test_res.sum(dim=1) + assert len(weights) == 2 + denum = 1 + torch.var(weights) + act_res = (num/denum).item() + metric = EffectiveSampleSize(nan_if_all_0=True) + pred_res = metric( + weights=self.test_conf.weight_test_res + ) + tol = act_res/1000 + np.testing.assert_allclose(pred_res, act_res, atol=tol) \ No newline at end of file diff --git a/tests/Metrics/test_ValidWeightsProp.py b/tests/Metrics/test_ValidWeightsProp.py index 85ebb12..a14b76f 100644 --- a/tests/Metrics/test_ValidWeightsProp.py +++ b/tests/Metrics/test_ValidWeightsProp.py @@ -4,26 +4,24 @@ import copy from offline_rl_ope.Metrics import ValidWeightsProp from offline_rl_ope import logger -# from ..base import weight_test_res, msk_test_res -from ..base import ( - single_discrete_action_test as sdat, - duel_discrete_action_test as ddat, - bin_discrete_action_test as bdat - ) +from parameterized import parameterized_class +from ..base import test_configs_fmt_class, TestConfig -for test_conf in [sdat,ddat,bdat]: - class TestValidWeightsProp(unittest.TestCase): +@parameterized_class(test_configs_fmt_class) +class TestValidWeightsProp(unittest.TestCase): - def test_call(self): - max_val=10000 - min_val=0.000001 - num = (test_conf.weight_test_res > min_val) & (test_conf.weight_test_res < max_val) - num = torch.sum(num, axis=1) - denum = torch.sum(test_conf.msk_test_res, axis=1) - act_res = torch.mean(num/denum).item() - metric = ValidWeightsProp( - max_w=max_val, - min_w=min_val - ) - pred_res = metric(weights=test_conf.weight_test_res, weight_msk=test_conf.msk_test_res) - self.assertEqual(act_res,pred_res) \ No newline at end of file + test_conf:TestConfig + + def test_call(self): + max_val=10000 + min_val=0.000001 + num = (self.test_conf.weight_test_res > min_val) & (self.test_conf.weight_test_res < max_val) + num = torch.sum(num, axis=1) + denum = torch.sum(self.test_conf.msk_test_res, axis=1) + act_res = torch.mean(num/denum).item() + metric = ValidWeightsProp( + max_w=max_val, + min_w=min_val + ) + pred_res = metric(weights=self.test_conf.weight_test_res, weight_msk=self.test_conf.msk_test_res) + self.assertEqual(act_res,pred_res) \ No newline at end of file diff --git a/tests/OPEEstimators/test_DoublyRobust.py b/tests/OPEEstimators/test_DoublyRobust.py index da23855..de3e82e 100644 --- a/tests/OPEEstimators/test_DoublyRobust.py +++ b/tests/OPEEstimators/test_DoublyRobust.py @@ -5,182 +5,166 @@ from offline_rl_ope.OPEEstimators.DoublyRobust import DREstimator from offline_rl_ope.OPEEstimators.DirectMethod import DirectMethodBase from offline_rl_ope.RuntimeChecks import check_array_dim -# from ..base import (test_reward_values, weight_test_res, test_dm_s_values, -# test_dm_sa_values, test_state_vals, test_action_vals, -# msk_test_res) -from ..base import ( - single_discrete_action_test as sdat, - duel_discrete_action_test as ddat, - bin_discrete_action_test as bdat - ) +from parameterized import parameterized +from ..base import test_configs_fmt + gamma = 0.99 -for test_conf in [sdat,ddat,bdat]: +dm_model = MagicMock(spec=DirectMethodBase) - # class MockDMModel: - - # def __init__(self) -> None: - # pass - - # def get_v(self, *args, **kwargs): - # pass +class DREstimatorTest(unittest.TestCase): + + @parameterized.expand(test_configs_fmt) + def test_update_step_ignore(self, name, test_conf): - # def get_q(self, *args, **kwargs): - # pass + # is_est = DREstimator(dm_model=MockDMModel(), norm_weights=False, + # clip=None, ignore_nan=True) + is_est = DREstimator(dm_model=dm_model, norm_weights=False, + clip=0.0, ignore_nan=True) + v_dr_t = torch.tensor([0.0]) + v_t = torch.tensor(test_conf.test_dm_s_values[0][-1]) + p_t = test_conf.weight_test_res[0,-1].reshape(-1) + r_t = torch.tensor(test_conf.test_reward_values[0][-1]).float() + q_t = torch.tensor(test_conf.test_dm_sa_values[0][-1]) + assert len(v_dr_t.shape) == 1, "Test input dim not correct" + assert len(v_t.shape) == 1, "Test input dim not correct" + assert len(p_t.shape) == 1, "Test input dim not correct" + assert len(r_t.shape) == 1, "Test input dim not correct" + assert len(q_t.shape) == 1, "Test input dim not correct" + pred_res:torch.Tensor = is_est._DREstimator__update_step( + v_t, p_t, r_t, v_dr_t, torch.tensor([gamma]), q_t + ) + test_res:torch.Tensor = v_t + p_t*(r_t+torch.tensor(gamma)*v_dr_t-q_t) + tol = test_res/1000 + np.testing.assert_allclose(pred_res.numpy(), test_res.numpy(), + atol=tol.numpy().item()) - dm_model = MagicMock(spec=DirectMethodBase) + @parameterized.expand(test_configs_fmt) + def test_get_traj_discnt_reward(self, name, test_conf): + # dm_model = MockDMModel() + def q_side_effect(state:torch.Tensor, action:torch.Tensor): + lkp = { + "_".join([str(torch.Tensor(s)), str(torch.Tensor(a))]): q + for s,a,q in zip(test_conf.test_state_vals, test_conf.test_action_vals, + test_conf.test_dm_sa_values) + } + res = lkp["_".join([str(state), str(action)])] + return torch.Tensor(res) + def v_side_effect(state:torch.Tensor): + lkp = { + str(torch.Tensor(s)): v + for s,v in zip(test_conf.test_state_vals, test_conf.test_dm_s_values) + } + res = lkp[str(state)] + return torch.Tensor(res) + dm_model.get_q = MagicMock(side_effect=q_side_effect) + dm_model.get_v = MagicMock(side_effect=v_side_effect) + # dm_model.get_q.return_value = q_side_effect + # dm_model.get_v.return_value = v_side_effect + is_est = DREstimator(dm_model=dm_model, norm_weights=False, clip=0.0, + ignore_nan=True) + pred_res = [] + test_res = [] + for idx, traj in enumerate(zip( + test_conf.test_state_vals, test_conf.weight_test_res, test_conf.test_reward_values, + test_conf.test_action_vals, test_conf.test_dm_sa_values, + test_conf.test_dm_s_values, test_conf.msk_test_res + )): + s_t = torch.Tensor(traj[0]) + p_t = torch.masked_select(traj[1], traj[6]>0).reshape(-1,1) + r_t = torch.Tensor(traj[2]).float() + a_t = torch.Tensor(traj[3]) + q_t = torch.Tensor(traj[4]) + v_t = torch.Tensor(traj[5]) + assert len(s_t.shape) == 2, "Test input dim not correct" + assert len(p_t.shape) == 2, "Test input dim not correct" + assert len(r_t.shape) == 2, "Test input dim not correct" + assert len(a_t.shape) == 2, "Test input dim not correct" + assert len(q_t.shape) == 2, "Test input dim not correct" + assert len(v_t.shape) == 2, "Test input dim not correct" + __pred_res = is_est.get_traj_discnt_reward( + reward_array=r_t, discount=gamma, + state_array=s_t, action_array=a_t, weight_array=p_t) + pred_res.append(__pred_res.numpy()) + __test_res_v = torch.tensor([0.0]) + assert len(__test_res_v.shape) == 1, "Test input dim not correct" + for i in np.arange(s_t.shape[0]-1, 0-1, -1): + _v_t_i = v_t[i] + _q_t_i = q_t[i] + _p_t_i = p_t[i] + _r_t_i = r_t[i] + _gamma = torch.tensor([gamma]) + assert len(_v_t_i.shape) == 1, "Test input dim not correct" + assert len(_p_t_i.shape) == 1, "Test input dim not correct" + assert len(_r_t_i.shape) == 1, "Test input dim not correct" + assert len(_q_t_i.shape) == 1, "Test input dim not correct" + assert len(_gamma.shape) == 1, "Test input dim not correct" + __test_res_v = is_est._DREstimator__update_step( + v_t=v_t[i], q_t=q_t[i], p_t=p_t[i], r_t=r_t[i], + gamma=_gamma, v_dr_t=__test_res_v) + test_res.append(__test_res_v.numpy()) + pred_res = np.concatenate(pred_res) + test_res = np.concatenate(test_res) + tol = (test_res.mean()/1000).item() + np.testing.assert_allclose(pred_res, test_res, atol=tol) - - class DREstimatorTest(unittest.TestCase): - - def test_update_step_ignore(self): - - # is_est = DREstimator(dm_model=MockDMModel(), norm_weights=False, - # clip=None, ignore_nan=True) - is_est = DREstimator(dm_model=dm_model, norm_weights=False, - clip=0.0, ignore_nan=True) - v_dr_t = torch.tensor([0.0]) - v_t = torch.tensor(test_conf.test_dm_s_values[0][-1]) - p_t = test_conf.weight_test_res[0,-1].reshape(-1) - r_t = torch.tensor(test_conf.test_reward_values[0][-1]).float() - q_t = torch.tensor(test_conf.test_dm_sa_values[0][-1]) - assert len(v_dr_t.shape) == 1, "Test input dim not correct" - assert len(v_t.shape) == 1, "Test input dim not correct" - assert len(p_t.shape) == 1, "Test input dim not correct" - assert len(r_t.shape) == 1, "Test input dim not correct" - assert len(q_t.shape) == 1, "Test input dim not correct" - pred_res:torch.Tensor = is_est._DREstimator__update_step( - v_t, p_t, r_t, v_dr_t, torch.tensor([gamma]), q_t - ) - test_res:torch.Tensor = v_t + p_t*(r_t+torch.tensor(gamma)*v_dr_t-q_t) - tol = test_res/1000 - np.testing.assert_allclose(pred_res.numpy(), test_res.numpy(), - atol=tol.numpy().item()) - - def test_get_traj_discnt_reward(self): - # dm_model = MockDMModel() - def q_side_effect(state:torch.Tensor, action:torch.Tensor): - lkp = { - "_".join([str(torch.Tensor(s)), str(torch.Tensor(a))]): q - for s,a,q in zip(test_conf.test_state_vals, test_conf.test_action_vals, - test_conf.test_dm_sa_values) - } - res = lkp["_".join([str(state), str(action)])] - return torch.Tensor(res) - def v_side_effect(state:torch.Tensor): - lkp = { - str(torch.Tensor(s)): v - for s,v in zip(test_conf.test_state_vals, test_conf.test_dm_s_values) - } - res = lkp[str(state)] - return torch.Tensor(res) - dm_model.get_q = MagicMock(side_effect=q_side_effect) - dm_model.get_v = MagicMock(side_effect=v_side_effect) - # dm_model.get_q.return_value = q_side_effect - # dm_model.get_v.return_value = v_side_effect - is_est = DREstimator(dm_model=dm_model, norm_weights=False, clip=0.0, - ignore_nan=True) - pred_res = [] - test_res = [] - for idx, traj in enumerate(zip( - test_conf.test_state_vals, test_conf.weight_test_res, test_conf.test_reward_values, - test_conf.test_action_vals, test_conf.test_dm_sa_values, - test_conf.test_dm_s_values, test_conf.msk_test_res - )): - s_t = torch.Tensor(traj[0]) - p_t = torch.masked_select(traj[1], traj[6]>0).reshape(-1,1) - r_t = torch.Tensor(traj[2]).float() - a_t = torch.Tensor(traj[3]) - q_t = torch.Tensor(traj[4]) - v_t = torch.Tensor(traj[5]) - assert len(s_t.shape) == 2, "Test input dim not correct" - assert len(p_t.shape) == 2, "Test input dim not correct" - assert len(r_t.shape) == 2, "Test input dim not correct" - assert len(a_t.shape) == 2, "Test input dim not correct" - assert len(q_t.shape) == 2, "Test input dim not correct" - assert len(v_t.shape) == 2, "Test input dim not correct" - __pred_res = is_est.get_traj_discnt_reward( - reward_array=r_t, discount=gamma, - state_array=s_t, action_array=a_t, weight_array=p_t) - pred_res.append(__pred_res.numpy()) - __test_res_v = torch.tensor([0.0]) - assert len(__test_res_v.shape) == 1, "Test input dim not correct" - for i in np.arange(s_t.shape[0]-1, 0-1, -1): - _v_t_i = v_t[i] - _q_t_i = q_t[i] - _p_t_i = p_t[i] - _r_t_i = r_t[i] - _gamma = torch.tensor([gamma]) - assert len(_v_t_i.shape) == 1, "Test input dim not correct" - assert len(_p_t_i.shape) == 1, "Test input dim not correct" - assert len(_r_t_i.shape) == 1, "Test input dim not correct" - assert len(_q_t_i.shape) == 1, "Test input dim not correct" - assert len(_gamma.shape) == 1, "Test input dim not correct" - __test_res_v = is_est._DREstimator__update_step( - v_t=v_t[i], q_t=q_t[i], p_t=p_t[i], r_t=r_t[i], - gamma=_gamma, v_dr_t=__test_res_v) - test_res.append(__test_res_v.numpy()) - pred_res = np.concatenate(pred_res) - test_res = np.concatenate(test_res) - tol = (test_res.mean()/1000).item() - np.testing.assert_allclose(pred_res, test_res, atol=tol) + @parameterized.expand(test_configs_fmt) + def test_predict_traj_rewards(self, name, test_conf): + #dm_model = MockDMModel() + def q_side_effect(state:torch.Tensor, action:torch.Tensor): + lkp = { + "_".join([str(torch.Tensor(s)), str(torch.Tensor(a))]): q + for s,a,q in zip(test_conf.test_state_vals, test_conf.test_action_vals, + test_conf.test_dm_sa_values) + } + res = lkp["_".join([str(state), str(action)])] + return torch.Tensor(res) + def v_side_effect(state:torch.Tensor): + lkp = { + str(torch.Tensor(s)): v + for s,v in zip(test_conf.test_state_vals, test_conf.test_dm_s_values) + } + res = lkp[str(state)] + return torch.Tensor(res) + dm_model.get_q = MagicMock(side_effect=q_side_effect) + dm_model.get_v = MagicMock(side_effect=v_side_effect) + # dm_model.get_q.return_value = q_side_effect + # dm_model.get_v.return_value = v_side_effect + is_est = DREstimator(dm_model=dm_model, norm_weights=False, clip=0.0, + ignore_nan=True) + rewards = [ + torch.Tensor(x).float() for x in test_conf.test_reward_values + ] + states = [torch.Tensor(x) for x in test_conf.test_state_vals] + actions = [torch.Tensor(x) for x in test_conf.test_action_vals] + test_res = [] + pred_res = is_est.predict_traj_rewards( + rewards=rewards, states=states, actions=actions, + weights=test_conf.weight_test_res, discount=gamma, + is_msk=test_conf.msk_test_res + ) + #weight_test_res = weight_test_res/weight_test_res.shape[0] + denom = test_conf.weight_test_res.shape[0] + for idx, (r,s,a,w,msk) in enumerate(zip( + rewards, states, actions, test_conf.weight_test_res, test_conf.msk_test_res + )): + w = w/denom + p = torch.masked_select(w, msk>0).reshape(-1,1) + assert len(r.shape) == 2, "Test input dim not correct" + assert len(s.shape) == 2, "Test input dim not correct" + assert len(a.shape) == 2, "Test input dim not correct" + assert len(p.shape) == 2, "Test input dim not correct" + assert isinstance(gamma, float), "Test input dim not correct" + __test_res = is_est.get_traj_discnt_reward( + reward_array=r, discount=gamma, state_array=s, + action_array=a, + weight_array=p) + test_res.append(__test_res.numpy()) + #test_res = np.concatenate(test_res).mean() + test_res = np.concatenate(test_res) + tol = (np.abs(test_res.mean()/100)).item() + self.assertEqual(pred_res.shape, torch.Size((len(rewards),))) + np.testing.assert_allclose(pred_res.numpy(),test_res, atol=tol) - def test_predict_traj_rewards(self): - #dm_model = MockDMModel() - def q_side_effect(state:torch.Tensor, action:torch.Tensor): - lkp = { - "_".join([str(torch.Tensor(s)), str(torch.Tensor(a))]): q - for s,a,q in zip(test_conf.test_state_vals, test_conf.test_action_vals, - test_conf.test_dm_sa_values) - } - res = lkp["_".join([str(state), str(action)])] - return torch.Tensor(res) - def v_side_effect(state:torch.Tensor): - lkp = { - str(torch.Tensor(s)): v - for s,v in zip(test_conf.test_state_vals, test_conf.test_dm_s_values) - } - res = lkp[str(state)] - return torch.Tensor(res) - dm_model.get_q = MagicMock(side_effect=q_side_effect) - dm_model.get_v = MagicMock(side_effect=v_side_effect) - # dm_model.get_q.return_value = q_side_effect - # dm_model.get_v.return_value = v_side_effect - is_est = DREstimator(dm_model=dm_model, norm_weights=False, clip=0.0, - ignore_nan=True) - rewards = [ - torch.Tensor(x).float() for x in test_conf.test_reward_values - ] - states = [torch.Tensor(x) for x in test_conf.test_state_vals] - actions = [torch.Tensor(x) for x in test_conf.test_action_vals] - test_res = [] - pred_res = is_est.predict_traj_rewards( - rewards=rewards, states=states, actions=actions, - weights=test_conf.weight_test_res, discount=gamma, - is_msk=test_conf.msk_test_res - ) - #weight_test_res = weight_test_res/weight_test_res.shape[0] - denom = test_conf.weight_test_res.shape[0] - for idx, (r,s,a,w,msk) in enumerate(zip( - rewards, states, actions, test_conf.weight_test_res, test_conf.msk_test_res - )): - w = w/denom - p = torch.masked_select(w, msk>0).reshape(-1,1) - assert len(r.shape) == 2, "Test input dim not correct" - assert len(s.shape) == 2, "Test input dim not correct" - assert len(a.shape) == 2, "Test input dim not correct" - assert len(p.shape) == 2, "Test input dim not correct" - assert isinstance(gamma, float), "Test input dim not correct" - __test_res = is_est.get_traj_discnt_reward( - reward_array=r, discount=gamma, state_array=s, - action_array=a, - weight_array=p) - test_res.append(__test_res.numpy()) - #test_res = np.concatenate(test_res).mean() - test_res = np.concatenate(test_res) - tol = (np.abs(test_res.mean()/100)).item() - self.assertEqual(pred_res.shape, torch.Size((len(rewards),))) - np.testing.assert_allclose(pred_res.numpy(),test_res, atol=tol) - - \ No newline at end of file + \ No newline at end of file diff --git a/tests/OPEEstimators/test_IS.py b/tests/OPEEstimators/test_IS.py index fb6c34f..35f3996 100644 --- a/tests/OPEEstimators/test_IS.py +++ b/tests/OPEEstimators/test_IS.py @@ -3,74 +3,71 @@ import torch import numpy as np from offline_rl_ope.OPEEstimators.IS import ISEstimator -# from ..base import (test_reward_values, reward_test_res, weight_test_res, -# msk_test_res) -from ..base import ( - single_discrete_action_test as sdat, - duel_discrete_action_test as ddat, - bin_discrete_action_test as bdat - ) +from parameterized import parameterized +from ..base import test_configs_fmt gamma = 0.99 -for test_conf in [sdat,ddat,bdat]: - class ISEstimatorTest(unittest.TestCase): - - def setUp(self) -> None: - self.is_estimator = ISEstimator(norm_weights=False) + +class ISEstimatorTest(unittest.TestCase): + + def setUp(self) -> None: + self.is_estimator = ISEstimator(norm_weights=False) + + @parameterized.expand(test_configs_fmt) + def test_get_traj_discnt_reward(self, name, test_conf): + for r in test_conf.test_reward_values: + disc_vals = torch.full(size=(len(r),1), fill_value=gamma) + power_vals = torch.Tensor(list(range(0,len(r)))).view(-1,1) + disc_vals = torch.pow(disc_vals,power_vals).squeeze() + r = torch.Tensor(r).view(-1,1) + test_res = r.squeeze()*disc_vals + tol = np.abs(test_res.mean().numpy().item()) + res = self.is_estimator.get_traj_discnt_reward( + reward_array=r, discount=gamma) + self.assertEqual(res.shape,torch.Size((len(r),))) + np.testing.assert_allclose(res, test_res, atol=tol) + + @parameterized.expand(test_configs_fmt) + def test_get_dataset_discnt_reward(self, name, test_conf): + def __mock_return(reward_array, discount): + lkp = { + "_".join([str(torch.Tensor(r)), str(gamma)]): w for r,w in zip( + test_conf.test_reward_values, test_conf.reward_test_res + ) + } + return lkp["_".join([str(reward_array), str(discount)])] + self.is_estimator.get_traj_discnt_reward = MagicMock( + side_effect=__mock_return) - def test_get_traj_discnt_reward(self): - for r in test_conf.test_reward_values: - disc_vals = torch.full(size=(len(r),1), fill_value=gamma) - power_vals = torch.Tensor(list(range(0,len(r)))).view(-1,1) - disc_vals = torch.pow(disc_vals,power_vals).squeeze() - r = torch.Tensor(r).view(-1,1) - test_res = r.squeeze()*disc_vals - tol = np.abs(test_res.mean().numpy().item()) - res = self.is_estimator.get_traj_discnt_reward( - reward_array=r, discount=gamma) - self.assertEqual(res.shape,torch.Size((len(r),))) - np.testing.assert_allclose(res, test_res, atol=tol) + rewards = [torch.Tensor(r) for r in test_conf.test_reward_values] + pred_res = self.is_estimator.get_dataset_discnt_reward( + rewards=rewards, discount=gamma, + h=test_conf.reward_test_res.shape[1] + ) + self.assertTrue(pred_res.shape, test_conf.reward_test_res.shape) + np.testing.assert_allclose(pred_res.numpy(),test_conf.reward_test_res.numpy(), + np.abs(test_conf.reward_test_res.mean().numpy())) - def test_get_dataset_discnt_reward(self): - def __mock_return(reward_array, discount): - lkp = { - "_".join([str(torch.Tensor(r)), str(gamma)]): w for r,w in zip( - test_conf.test_reward_values, test_conf.reward_test_res - ) - } - return lkp["_".join([str(reward_array), str(discount)])] - - self.is_estimator.get_traj_discnt_reward = MagicMock( - side_effect=__mock_return) - - rewards = [torch.Tensor(r) for r in test_conf.test_reward_values] - pred_res = self.is_estimator.get_dataset_discnt_reward( - rewards=rewards, discount=gamma, h=test_conf.reward_test_res.shape[1] - ) - self.assertTrue(pred_res.shape, test_conf.reward_test_res.shape) - np.testing.assert_allclose(pred_res.numpy(),test_conf.reward_test_res.numpy(), - np.abs(test_conf.reward_test_res.mean().numpy())) - - - def test_predict_traj_rewards(self): - def __mock_return(rewards, discount, h): - return test_conf.reward_test_res - self.is_estimator.get_dataset_discnt_reward = MagicMock( - side_effect=__mock_return) - rewards = [torch.Tensor(r) for r in test_conf.test_reward_values] - pred_res = self.is_estimator.predict_traj_rewards( - rewards=rewards, actions=[], states=[], weights=test_conf.weight_test_res, - discount=gamma, is_msk=test_conf.msk_test_res) - test_res = np.multiply( - test_conf.reward_test_res.numpy(), - test_conf.weight_test_res.numpy()/test_conf.weight_test_res.shape[0] - ) - test_res=test_res.sum(axis=1) - #test_res = test_res.sum(axis=1).mean() - tol = test_res.mean()/1000 - self.assertEqual(pred_res.shape, torch.Size((len(rewards),))) - np.testing.assert_allclose(pred_res.numpy(), test_res, atol=tol) + @parameterized.expand(test_configs_fmt) + def test_predict_traj_rewards(self, name, test_conf): + def __mock_return(rewards, discount, h): + return test_conf.reward_test_res + self.is_estimator.get_dataset_discnt_reward = MagicMock( + side_effect=__mock_return) + rewards = [torch.Tensor(r) for r in test_conf.test_reward_values] + pred_res = self.is_estimator.predict_traj_rewards( + rewards=rewards, actions=[], states=[], weights=test_conf.weight_test_res, + discount=gamma, is_msk=test_conf.msk_test_res) + test_res = np.multiply( + test_conf.reward_test_res.numpy(), + test_conf.weight_test_res.numpy()/test_conf.weight_test_res.shape[0] + ) + test_res=test_res.sum(axis=1) + #test_res = test_res.sum(axis=1).mean() + tol = test_res.mean()/1000 + self.assertEqual(pred_res.shape, torch.Size((len(rewards),))) + np.testing.assert_allclose(pred_res.numpy(), test_res, atol=tol) diff --git a/tests/OPEEstimators/test_utils.py b/tests/OPEEstimators/test_utils.py index fc33c92..3438f63 100644 --- a/tests/OPEEstimators/test_utils.py +++ b/tests/OPEEstimators/test_utils.py @@ -4,273 +4,527 @@ import unittest from offline_rl_ope.OPEEstimators.utils import ( clip_weights, clip_weights_pass, VanillaNormWeights, WISWeightNorm) -# from ..base import (weight_test_res, msk_test_res) -from ..base import ( - single_discrete_action_test as sdat, - duel_discrete_action_test as ddat, - bin_discrete_action_test as bdat - ) +from parameterized import parameterized_class +from ..base import test_configs_fmt_class, TestConfig +@parameterized_class(test_configs_fmt_class) +class UtilsTestVanillaIS(unittest.TestCase): + + test_conf:TestConfig + + def setUp(self) -> None: + self.clip_toll = self.test_conf.weight_test_res.numpy().mean()/1000 - -for test_conf in [sdat,ddat,bdat]: - - weight_test_res_alter = copy.deepcopy(test_conf.weight_test_res) - weight_test_res_alter[0] = torch.zeros(len(weight_test_res_alter[0])) - - class UtilsTest(unittest.TestCase): + def test_clip_weights(self): + clip = 1.2 + test_res = self.test_conf.weight_test_res.clamp(max=1.2, min=1/1.2) + assert len(self.test_conf.weight_test_res.shape) == 2, "Incorrect test input dimensions" + pred_res = clip_weights(self.test_conf.weight_test_res, clip=clip) + self.assertEqual(pred_res.shape,self.test_conf.weight_test_res.shape) + np.testing.assert_allclose(pred_res, test_res, atol=self.clip_toll) + + def test_clip_weights_pass(self): + clip = 1.2 + test_res = copy.deepcopy(self.test_conf.weight_test_res) + assert len(self.test_conf.weight_test_res.shape) == 2, "Incorrect test input dimensions" + pred_res = clip_weights_pass(self.test_conf.weight_test_res, clip=clip) + self.assertEqual(pred_res.shape,self.test_conf.weight_test_res.shape) + np.testing.assert_allclose(pred_res, test_res, atol=self.clip_toll) - def setUp(self) -> None: - self.clip_toll = test_conf.weight_test_res.numpy().mean()/1000 + # def test_norm_weights_pass(self): + # test_res = weight_test_res/msk_test_res.sum(axis=0) + # toll = test_res.mean()/1000 + # pred_res = norm_weights_pass(traj_is_weights=weight_test_res, + # is_msk=msk_test_res) + # self.assertEqual(pred_res.shape,weight_test_res.shape) + # np.testing.assert_allclose(pred_res.numpy(), test_res.numpy(), + # atol=toll.numpy()) + + def test_norm_weights_vanilla(self): + """Vanilla IS with non-bias averaging: + $w_{H,i}=\prod_{t=0}^{H}w_{t,i}$ - def test_clip_weights(self): - clip = 1.2 - test_res = test_conf.weight_test_res.clamp(max=1.2, min=1/1.2) - assert len(test_conf.weight_test_res.shape) == 2, "Incorrect test input dimensions" - pred_res = clip_weights(test_conf.weight_test_res, clip=clip) - self.assertEqual(pred_res.shape,test_conf.weight_test_res.shape) - np.testing.assert_allclose(pred_res, test_res, atol=self.clip_toll) - - def test_clip_weights_pass(self): - clip = 1.2 - test_res = copy.deepcopy(test_conf.weight_test_res) - assert len(test_conf.weight_test_res.shape) == 2, "Incorrect test input dimensions" - pred_res = clip_weights_pass(test_conf.weight_test_res, clip=clip) - self.assertEqual(pred_res.shape,test_conf.weight_test_res.shape) - np.testing.assert_allclose(pred_res, test_res, atol=self.clip_toll) - - # def test_norm_weights_pass(self): - # test_res = weight_test_res/msk_test_res.sum(axis=0) - # toll = test_res.mean()/1000 - # pred_res = norm_weights_pass(traj_is_weights=weight_test_res, - # is_msk=msk_test_res) - # self.assertEqual(pred_res.shape,weight_test_res.shape) - # np.testing.assert_allclose(pred_res.numpy(), test_res.numpy(), - # atol=toll.numpy()) + $\frac{1}/{n}\sum_{i=1}^{n}\sum_{t=0}^{H}r_{t}\gamma^{t}w_{H,i}$ - def test_norm_weights_vanilla(self): - denom = test_conf.weight_test_res.shape[0] - test_res = test_conf.weight_test_res/denom - toll = test_res.mean()/1000 - calculator = VanillaNormWeights() - assert len(test_conf.weight_test_res.shape) == 2, "Incorrect test input dimensions" - assert len(test_conf.msk_test_res.shape) == 2, "Incorrect test input dimensions" - pred_res = calculator( - traj_is_weights=test_conf.weight_test_res, - is_msk=test_conf.msk_test_res - ) - self.assertEqual(pred_res.shape,test_conf.weight_test_res.shape) - np.testing.assert_allclose(pred_res.numpy(), test_res.numpy(), - atol=toll.numpy()) + => The output should be of the form: + \frac{1}/{n}w_{H,i} + """ + denom = self.test_conf.traj_is_weights_is.shape[0] + test_res = self.test_conf.traj_is_weights_is/denom + toll = test_res.mean()/1000 + calculator = VanillaNormWeights() + assert len(self.test_conf.traj_is_weights_is.shape) == 2, "Incorrect test input dimensions" + assert len(self.test_conf.msk_test_res.shape) == 2, "Incorrect test input dimensions" + pred_res = calculator( + traj_is_weights=self.test_conf.traj_is_weights_is, + is_msk=self.test_conf.msk_test_res + ) + self.assertEqual(pred_res.shape,self.test_conf.traj_is_weights_is.shape) + np.testing.assert_allclose(pred_res.numpy(), test_res.numpy(), + atol=toll.numpy()) + + def test_norm_weights_wis(self): + """Vanilla IS with WIS averaging: + w_{H,i}=\prod_{t=0}^{H}w_{t,i} + w_{H} = \sum_{i=1}^{n} w_{H,i} + $\frac{1}/{w_{H}}\sum_{i=1}^{n}\sum_{t=0}^{H}r_{t,i}\gamma^{t}w_{H,i}$ + => The output should be of the form: + \frac{1}/{w_{H}}w_{H,i} + """ + # test_conf.traj_is_weights_is defines the Vanilla IS one step weights + # i.e., w_{H,i} + # Summing to define \sum_{i=1}^{n}\prod_{t=0}^{H}w_{t,i} + # The input weights are the same for all steps in a trajectory, + # therefore, sum across the trajectories + + # Find the final weight for each trajectory + term_idx = [len(i) for i in self.test_conf.test_act_indiv_weights] + term_weights = [] + for idx, traj in zip(term_idx, self.test_conf.traj_is_weights_is): + term_weights.append(traj[idx-1]) + term_weights = torch.tensor(term_weights) + # Sum over the weights as we are not doing cumulative + denom = term_weights.sum().repeat((1,max(term_idx))) + denom_toll = denom.squeeze().mean().numpy()/1000 + test_res = self.test_conf.traj_is_weights_is/denom + toll = test_res.mean()/1000 + calculator = WISWeightNorm() + norm = calculator.calc_norm( + traj_is_weights=self.test_conf.traj_is_weights_is, + is_msk=self.test_conf.msk_test_res + ) + np.testing.assert_allclose( + norm.numpy(), denom.numpy(), + atol=denom_toll + ) + assert len(self.test_conf.traj_is_weights_is.shape) == 2, "Incorrect test input dimensions" + assert len(self.test_conf.msk_test_res.shape) == 2, "Incorrect test input dimensions" + pred_res = calculator( + traj_is_weights=self.test_conf.traj_is_weights_is, + is_msk=self.test_conf.msk_test_res + ) + self.assertEqual(pred_res.shape,self.test_conf.traj_is_weights_is.shape) + np.testing.assert_allclose(pred_res.numpy(), test_res.numpy(), + atol=toll.numpy()) - def test_norm_weights_wis(self): - denom = test_conf.weight_test_res.sum(dim=0) - test_res = test_conf.weight_test_res/denom - toll = test_res.mean()/1000 - calculator = WISWeightNorm() - assert len(test_conf.weight_test_res.shape) == 2, "Incorrect test input dimensions" - assert len(test_conf.msk_test_res.shape) == 2, "Incorrect test input dimensions" - pred_res = calculator( - traj_is_weights=test_conf.weight_test_res, - is_msk=test_conf.msk_test_res - ) - self.assertEqual(pred_res.shape,test_conf.weight_test_res.shape) - np.testing.assert_allclose(pred_res.numpy(), test_res.numpy(), - atol=toll.numpy()) + def test_norm_weights_wis_cum(self): + """Vanilla IS with WIS cumulative averaging: + $w_{H,i}=\prod_{t=0}^{H}w_{t,i}$ + $w_{H,t} = \sum_{i=1}^{n} w_{H,i}\mathbb{1}_{m_{i,t}\neq=0}$ + $\sum_{i=1}^{n}\sum_{t=0}^{H}r_{t,i}\gamma^{t}\frac{1}/{w_{H,t}}w_{H,i}$ + => The output should be of the form: + $\frac{1}/{w_{H,t}}w_{H,i}$ + """ + # Sum across the trajectories to get the time t cumulative weight + # Note, the weight is already cumulative due to PD input + denom = self.test_conf.traj_is_weights_is.sum(dim=0, keepdim=True) + denom_toll = denom.squeeze().mean().numpy()/1000 + test_res = self.test_conf.traj_is_weights_is/denom + toll = test_res.mean()/1000 + calculator = WISWeightNorm(cumulative=True) + norm = calculator.calc_norm( + traj_is_weights=self.test_conf.traj_is_weights_is, + is_msk=self.test_conf.msk_test_res + ) + np.testing.assert_allclose( + norm.numpy(), denom.numpy(), + atol=denom_toll + ) + assert len(self.test_conf.traj_is_weights_is.shape) == 2, "Incorrect test input dimensions" + assert len(self.test_conf.msk_test_res.shape) == 2, "Incorrect test input dimensions" + pred_res = calculator( + traj_is_weights=self.test_conf.traj_is_weights_is, + is_msk=self.test_conf.msk_test_res + ) + self.assertEqual(pred_res.shape,self.test_conf.traj_is_weights_is.shape) + np.testing.assert_allclose(pred_res.numpy(), test_res.numpy(), + atol=toll.numpy()) - def test_norm_weights_wis_smooth(self): - smooth_eps = 0.00000001 - denom = weight_test_res_alter.sum(dim=0)+smooth_eps - test_res = weight_test_res_alter/denom - toll = test_res.nanmean()/1000 - calculator = WISWeightNorm(smooth_eps=smooth_eps) - assert len(weight_test_res_alter.shape) == 2, "Incorrect test input dimensions" - assert len(test_conf.msk_test_res.shape) == 2, "Incorrect test input dimensions" - pred_res = calculator( - traj_is_weights=weight_test_res_alter, - is_msk=test_conf.msk_test_res - ) - self.assertEqual(pred_res.shape,weight_test_res_alter.shape) - np.testing.assert_allclose(pred_res.numpy(), test_res.numpy(), - atol=toll.numpy()) - - def test_norm_weights_wis_no_smooth(self): - denom = weight_test_res_alter.sum(dim=0) - test_res = weight_test_res_alter/denom - toll = test_res.nanmean()/1000 - calculator = WISWeightNorm() - assert len(weight_test_res_alter.shape) == 2, "Incorrect test input dimensions" - assert len(test_conf.msk_test_res.shape) == 2, "Incorrect test input dimensions" - pred_res = calculator(traj_is_weights=weight_test_res_alter, - is_msk=test_conf.msk_test_res) - self.assertEqual(pred_res.shape,weight_test_res_alter.shape) - np.testing.assert_allclose(pred_res.numpy(), test_res.numpy(), - atol=toll.numpy(), equal_nan=True) - - def test_norm_weights_wis_smooth_discount(self): - smooth_eps = 0.00000001 - discount=0.99 - discnt_tens = torch.full( - weight_test_res_alter.shape, - discount - ) - discnt_pows = torch.arange( - 0, weight_test_res_alter.shape[1])[None,:].repeat( - weight_test_res_alter.shape[0],1 - ) - discnt_tens = torch.pow(discnt_tens,discnt_pows) - denom = torch.mul( - weight_test_res_alter, - discnt_tens + + def test_norm_weights_wis_smooth(self): + smooth_eps = 0.00000001 + term_idx = [ + len(i) for i in self.test_conf.test_act_indiv_weights_alter + ] + term_weights = [] + for idx, traj in zip( + term_idx, + self.test_conf.traj_is_weights_is_alter + ): + term_weights.append(traj[idx-1]) + term_weights = torch.tensor(term_weights) + # Sum over the weights as we are not doing cumulative + denom = term_weights.sum().repeat((1,max(term_idx))) + smooth_eps + denom_toll = denom.squeeze().mean().numpy()/1000 + test_res = self.test_conf.traj_is_weights_is_alter/denom + toll = test_res.mean()/1000 + calculator = WISWeightNorm(smooth_eps=smooth_eps) + norm = calculator.calc_norm( + traj_is_weights=self.test_conf.traj_is_weights_is_alter, + is_msk=self.test_conf.msk_test_res + ) + np.testing.assert_allclose( + norm.numpy(), denom.numpy(), + atol=denom_toll ) - denom = denom.sum(dim=0)+smooth_eps - test_res = weight_test_res_alter/denom - toll = test_res.nanmean()/1000 - calculator = WISWeightNorm( - smooth_eps=smooth_eps, - discount=discount - ) - pred_res = calculator(traj_is_weights=weight_test_res_alter, - is_msk=test_conf.msk_test_res) - self.assertEqual(pred_res.shape,weight_test_res_alter.shape) - np.testing.assert_allclose(pred_res.numpy(), test_res.numpy(), - atol=toll.numpy()) + assert len(self.test_conf.traj_is_weights_is_alter.shape) == 2, "Incorrect test input dimensions" + assert len(self.test_conf.msk_test_res.shape) == 2, "Incorrect test input dimensions" + pred_res = calculator( + traj_is_weights=self.test_conf.traj_is_weights_is_alter, + is_msk=self.test_conf.msk_test_res + ) + self.assertEqual(pred_res.shape,self.test_conf.traj_is_weights_is_alter.shape) + np.testing.assert_allclose(pred_res.numpy(), test_res.numpy(), + atol=toll.numpy()) + + + def test_norm_weights_wis_no_smooth(self): + term_idx = [ + len(i) for i in self.test_conf.test_act_indiv_weights_alter + ] + term_weights = [] + for idx, traj in zip( + term_idx, + self.test_conf.traj_is_weights_is_alter + ): + term_weights.append(traj[idx-1]) + term_weights = torch.tensor(term_weights) + # Sum over the weights as we are not doing cumulative + denom = term_weights.sum().repeat((1,max(term_idx))) + denom_toll = denom.squeeze().mean().numpy()/1000 + test_res = self.test_conf.traj_is_weights_is_alter/denom + toll = test_res.mean()/1000 + calculator = WISWeightNorm() + norm = calculator.calc_norm( + traj_is_weights=self.test_conf.traj_is_weights_is_alter, + is_msk=self.test_conf.msk_test_res + ) + np.testing.assert_allclose( + norm.numpy(), denom.numpy(), + atol=denom_toll + ) + assert len(self.test_conf.traj_is_weights_is_alter.shape) == 2, "Incorrect test input dimensions" + assert len(self.test_conf.msk_test_res.shape) == 2, "Incorrect test input dimensions" + pred_res = calculator( + traj_is_weights=self.test_conf.traj_is_weights_is_alter, + is_msk=self.test_conf.msk_test_res + ) + self.assertEqual(pred_res.shape,self.test_conf.traj_is_weights_is_alter.shape) + np.testing.assert_allclose(pred_res.numpy(), test_res.numpy(), + atol=toll.numpy()) - def test_norm_weights_wis_no_smooth_discount(self): - discount=0.99 - discnt_tens = torch.full( - weight_test_res_alter.shape, - discount - ) - discnt_pows = torch.arange( - 0, weight_test_res_alter.shape[1])[None,:].repeat( - weight_test_res_alter.shape[0],1 - ) - discnt_tens = torch.pow(discnt_tens,discnt_pows) - denom = torch.mul( - weight_test_res_alter, - discnt_tens + def test_norm_weights_wis_smooth_avg(self): + smooth_eps = 0.00000001 + term_idx = [ + len(i) for i in self.test_conf.test_act_indiv_weights_alter + ] + term_weights = [] + for idx, traj in zip( + term_idx, + self.test_conf.traj_is_weights_is_alter + ): + term_weights.append(traj[idx-1]) + term_weights = torch.tensor(term_weights) + # Sum over the weights as we are not doing cumulative + denom = term_weights.sum().repeat((1,max(term_idx))) + denom = (denom/len(term_idx)) + smooth_eps + denom_toll = denom.squeeze().mean().numpy()/1000 + test_res = self.test_conf.traj_is_weights_is_alter/denom + toll = test_res.mean()/1000 + calculator = WISWeightNorm(smooth_eps=smooth_eps, avg_denom=True) + norm = calculator.calc_norm( + traj_is_weights=self.test_conf.traj_is_weights_is_alter, + is_msk=self.test_conf.msk_test_res + ) + np.testing.assert_allclose( + norm.numpy(), denom.numpy(), + atol=denom_toll + ) + assert len(self.test_conf.traj_is_weights_is_alter.shape) == 2, "Incorrect test input dimensions" + assert len(self.test_conf.msk_test_res.shape) == 2, "Incorrect test input dimensions" + pred_res = calculator( + traj_is_weights=self.test_conf.traj_is_weights_is_alter, + is_msk=self.test_conf.msk_test_res ) - denom = denom.sum(dim=0) - test_res = weight_test_res_alter/denom - toll = test_res.nanmean()/1000 - calculator = WISWeightNorm( - discount=discount - ) - assert len(weight_test_res_alter.shape) == 2, "Incorrect test input dimensions" - assert len(test_conf.msk_test_res.shape) == 2, "Incorrect test input dimensions" - pred_res = calculator(traj_is_weights=weight_test_res_alter, - is_msk=test_conf.msk_test_res) - self.assertEqual(pred_res.shape,weight_test_res_alter.shape) - np.testing.assert_allclose(pred_res.numpy(), test_res.numpy(), - atol=toll.numpy()) + self.assertEqual(pred_res.shape,self.test_conf.traj_is_weights_is_alter.shape) + np.testing.assert_allclose(pred_res.numpy(), test_res.numpy(), + atol=toll.numpy()) - def test_norm_weights_wis_smooth_avg(self): - smooth_eps = 0.00000001 - time_t_freq = test_conf.msk_test_res.sum(dim=0, keepdim=True).repeat( - test_conf.msk_test_res.shape[0],1 + def test_norm_weights_wis_no_smooth_avg(self): + term_idx = [ + len(i) for i in self.test_conf.test_act_indiv_weights_alter + ] + term_weights = [] + for idx, traj in zip( + term_idx, + self.test_conf.traj_is_weights_is_alter + ): + term_weights.append(traj[idx-1]) + term_weights = torch.tensor(term_weights) + # Sum over the weights as we are not doing cumulative + denom = term_weights.sum().repeat((1,max(term_idx))) + denom = (denom/len(term_idx)) + denom_toll = denom.squeeze().mean().numpy()/1000 + test_res = self.test_conf.traj_is_weights_is_alter/denom + toll = test_res.mean()/1000 + calculator = WISWeightNorm(avg_denom=True) + norm = calculator.calc_norm( + traj_is_weights=self.test_conf.traj_is_weights_is_alter, + is_msk=self.test_conf.msk_test_res + ) + np.testing.assert_allclose( + norm.numpy(), denom.numpy(), + atol=denom_toll ) - denom = weight_test_res_alter/time_t_freq - denom = denom.sum(dim=0)+smooth_eps - test_res = weight_test_res_alter/denom - toll = test_res.nanmean()/1000 - calculator = WISWeightNorm( - smooth_eps=smooth_eps, - avg_denom=True - ) - assert len(weight_test_res_alter.shape) == 2, "Incorrect test input dimensions" - assert len(test_conf.msk_test_res.shape) == 2, "Incorrect test input dimensions" - pred_res = calculator(traj_is_weights=weight_test_res_alter, - is_msk=test_conf.msk_test_res) - self.assertEqual(pred_res.shape,weight_test_res_alter.shape) - np.testing.assert_allclose(pred_res.numpy(), test_res.numpy(), - atol=toll.numpy()) + assert len(self.test_conf.traj_is_weights_is_alter.shape) == 2, "Incorrect test input dimensions" + assert len(self.test_conf.msk_test_res.shape) == 2, "Incorrect test input dimensions" + pred_res = calculator( + traj_is_weights=self.test_conf.traj_is_weights_is_alter, + is_msk=self.test_conf.msk_test_res + ) + self.assertEqual(pred_res.shape,self.test_conf.traj_is_weights_is_alter.shape) + np.testing.assert_allclose(pred_res.numpy(), test_res.numpy(), + atol=toll.numpy()) + +@parameterized_class(test_configs_fmt_class) +class UtilsTestPD(unittest.TestCase): + + test_conf:TestConfig + + def test_norm_weights_vanilla(self): + """PD with non-bias averaging: + $\frac{1}/{n}\sum_{i=1}^{n}\sum_{t=0}^{H}r_{t}\gamma^{t}\prod_{t=0}^{t'}w_{t,i}$ + + => The output should be of the form: + $\frac{1}/{n}\prod_{t=0}^{t'}w_{t,i}$ + """ + denom = self.test_conf.traj_is_weights_pd.shape[0] + test_res = self.test_conf.traj_is_weights_pd/denom + toll = test_res.mean()/1000 + calculator = VanillaNormWeights() + assert len(self.test_conf.traj_is_weights_pd.shape) == 2, "Incorrect test input dimensions" + assert len(self.test_conf.msk_test_res.shape) == 2, "Incorrect test input dimensions" + pred_res = calculator( + traj_is_weights=self.test_conf.traj_is_weights_pd, + is_msk=self.test_conf.msk_test_res + ) + self.assertEqual(pred_res.shape,self.test_conf.traj_is_weights_pd.shape) + np.testing.assert_allclose(pred_res.numpy(), test_res.numpy(), + atol=toll.numpy()) + + def test_norm_weights_wpd(self): + """WPD: + w_{H,i}=\prod_{t=0}^{H}w_{t,i} + w_{H} = \sum_{i=1}^{n} w_{H,i} + $\frac{1}/{w_{H}}\sum_{i=1}^{n}\sum_{t=0}^{H}r_{t,i}\gamma^{t}\prod_{t=0}^{t'}w_{t,i}$ + => The output should be of the form: + \frac{1}/{w_{H}}\prod_{t=0}^{t'}w_{t,i} + """ + term_idx = [len(i) for i in self.test_conf.test_act_indiv_weights] + term_weights = [] + for idx, traj in zip(term_idx, self.test_conf.traj_is_weights_pd): + term_weights.append(traj[idx-1]) + term_weights = torch.tensor(term_weights) + # Sum over the weights as we are not doing cumulative + denom = term_weights.sum().repeat((1,max(term_idx))) + denom_toll = denom.squeeze().mean().numpy()/1000 + test_res = self.test_conf.traj_is_weights_pd/denom + toll = test_res.mean()/1000 + calculator = WISWeightNorm() + norm = calculator.calc_norm( + traj_is_weights=self.test_conf.traj_is_weights_pd, + is_msk=self.test_conf.msk_test_res + ) + np.testing.assert_allclose( + norm.numpy(), denom.numpy(), + atol=denom_toll + ) + assert len(self.test_conf.traj_is_weights_pd.shape) == 2, "Incorrect test input dimensions" + assert len(self.test_conf.msk_test_res.shape) == 2, "Incorrect test input dimensions" + pred_res = calculator( + traj_is_weights=self.test_conf.traj_is_weights_pd, + is_msk=self.test_conf.msk_test_res + ) + self.assertEqual(pred_res.shape,self.test_conf.traj_is_weights_pd.shape) + np.testing.assert_allclose(pred_res.numpy(), test_res.numpy(), + atol=toll.numpy()) + + def test_norm_weights_wpd_cum(self): + """WPD: + w_{t',i}=\prod_{t=0}^{t'}w_{t,i} + w_{t'} = \sum_{i=1}^{n} w_{t',i} + $\frac{1}/{w_{t'}}\sum_{i=1}^{n}\sum_{t=0}^{H}r_{t,i}\gamma^{t}\prod_{t=0}^{t'}w_{t,i}$ + => The output should be of the form: + \frac{1}/{w_{t'}}\prod_{t=0}^{t'}w_{t,i} + """ + # Sum across the trajectories to get the time t cumulative weight + # Note, the weight is already cumulative due to PD input + denom = self.test_conf.traj_is_weights_pd.sum(dim=0, keepdim=True) + # No need to alter shape + denom_toll = denom.squeeze().mean().numpy()/1000 + test_res = self.test_conf.traj_is_weights_pd/denom + toll = test_res.mean()/1000 + calculator = WISWeightNorm(cumulative=True) + norm = calculator.calc_norm( + traj_is_weights=self.test_conf.traj_is_weights_pd, + is_msk=self.test_conf.msk_test_res + ) + np.testing.assert_allclose( + norm.numpy(), denom.numpy(), + atol=denom_toll + ) + assert len(self.test_conf.traj_is_weights_pd.shape) == 2, "Incorrect test input dimensions" + assert len(self.test_conf.msk_test_res.shape) == 2, "Incorrect test input dimensions" + pred_res = calculator( + traj_is_weights=self.test_conf.traj_is_weights_pd, + is_msk=self.test_conf.msk_test_res + ) + self.assertEqual(pred_res.shape,self.test_conf.traj_is_weights_pd.shape) + np.testing.assert_allclose(pred_res.numpy(), test_res.numpy(), + atol=toll.numpy()) - def test_norm_weights_wis_no_smooth_avg(self): - time_t_freq = test_conf.msk_test_res.sum(dim=0, keepdim=True).repeat( - test_conf.msk_test_res.shape[0],1 + + def test_norm_weights_wpd_smooth(self): + smooth_eps = 0.00000001 + term_idx = [ + len(i) for i in self.test_conf.test_act_indiv_weights_alter + ] + term_weights = [] + for idx, traj in zip( + term_idx, + self.test_conf.traj_is_weights_pd_alter + ): + term_weights.append(traj[idx-1]) + term_weights = torch.tensor(term_weights) + # Sum over the weights as we are not doing cumulative + denom = term_weights.sum().repeat((1,max(term_idx))) + smooth_eps + denom_toll = denom.squeeze().mean().numpy()/1000 + test_res = self.test_conf.traj_is_weights_pd_alter/denom + toll = test_res.mean()/1000 + calculator = WISWeightNorm(smooth_eps=smooth_eps) + norm = calculator.calc_norm( + traj_is_weights=self.test_conf.traj_is_weights_pd_alter, + is_msk=self.test_conf.msk_test_res + ) + np.testing.assert_allclose( + norm.numpy(), denom.numpy(), + atol=denom_toll + ) + assert len(self.test_conf.traj_is_weights_pd_alter.shape) == 2, "Incorrect test input dimensions" + assert len(self.test_conf.msk_test_res.shape) == 2, "Incorrect test input dimensions" + pred_res = calculator( + traj_is_weights=self.test_conf.traj_is_weights_pd_alter, + is_msk=self.test_conf.msk_test_res + ) + self.assertEqual(pred_res.shape,self.test_conf.traj_is_weights_pd_alter.shape) + np.testing.assert_allclose(pred_res.numpy(), test_res.numpy(), + atol=toll.numpy()) + + + def test_norm_weights_wpd_no_smooth(self): + term_idx = [ + len(i) for i in self.test_conf.test_act_indiv_weights_alter + ] + term_weights = [] + for idx, traj in zip( + term_idx, + self.test_conf.traj_is_weights_pd_alter + ): + term_weights.append(traj[idx-1]) + term_weights = torch.tensor(term_weights) + # Sum over the weights as we are not doing cumulative + denom = term_weights.sum().repeat((1,max(term_idx))) + denom_toll = denom.squeeze().mean().numpy()/1000 + test_res = self.test_conf.traj_is_weights_pd_alter/denom + toll = test_res.mean()/1000 + calculator = WISWeightNorm() + norm = calculator.calc_norm( + traj_is_weights=self.test_conf.traj_is_weights_pd_alter, + is_msk=self.test_conf.msk_test_res + ) + np.testing.assert_allclose( + norm.numpy(), denom.numpy(), + atol=denom_toll + ) + assert len(self.test_conf.traj_is_weights_pd_alter.shape) == 2, "Incorrect test input dimensions" + assert len(self.test_conf.msk_test_res.shape) == 2, "Incorrect test input dimensions" + pred_res = calculator( + traj_is_weights=self.test_conf.traj_is_weights_pd_alter, + is_msk=self.test_conf.msk_test_res ) - denom = weight_test_res_alter/time_t_freq - denom = denom.sum(dim=0) - test_res = weight_test_res_alter/denom - toll = test_res.nanmean()/1000 - calculator = WISWeightNorm( - avg_denom=True - ) - assert len(weight_test_res_alter.shape) == 2, "Incorrect test input dimensions" - assert len(test_conf.msk_test_res.shape) == 2, "Incorrect test input dimensions" - pred_res = calculator(traj_is_weights=weight_test_res_alter, - is_msk=test_conf.msk_test_res) - self.assertEqual(pred_res.shape,weight_test_res_alter.shape) - np.testing.assert_allclose(pred_res.numpy(), test_res.numpy(), - atol=toll.numpy()) + self.assertEqual(pred_res.shape,self.test_conf.traj_is_weights_pd_alter.shape) + np.testing.assert_allclose(pred_res.numpy(), test_res.numpy(), + atol=toll.numpy()) - def test_norm_weights_wis_smooth_discount_avg(self): - smooth_eps = 0.00000001 - discount=0.99 - discnt_tens = torch.full( - weight_test_res_alter.shape, - discount - ) - discnt_pows = torch.arange( - 0, weight_test_res_alter.shape[1])[None,:].repeat( - weight_test_res_alter.shape[0],1 - ) - discnt_tens = torch.pow(discnt_tens,discnt_pows) - denom = torch.mul( - weight_test_res_alter, - discnt_tens + def test_norm_weights_wpd_smooth_avg(self): + smooth_eps = 0.00000001 + term_idx = [ + len(i) for i in self.test_conf.test_act_indiv_weights_alter + ] + term_weights = [] + for idx, traj in zip( + term_idx, + self.test_conf.traj_is_weights_pd_alter + ): + term_weights.append(traj[idx-1]) + term_weights = torch.tensor(term_weights) + # Sum over the weights as we are not doing cumulative + denom = term_weights.sum().repeat((1,max(term_idx))) + denom = (denom/len(term_idx)) + smooth_eps + denom_toll = denom.squeeze().mean().numpy()/1000 + test_res = self.test_conf.traj_is_weights_pd_alter/denom + toll = test_res.mean()/1000 + calculator = WISWeightNorm(smooth_eps=smooth_eps, avg_denom=True) + norm = calculator.calc_norm( + traj_is_weights=self.test_conf.traj_is_weights_pd_alter, + is_msk=self.test_conf.msk_test_res + ) + np.testing.assert_allclose( + norm.numpy(), denom.numpy(), + atol=denom_toll ) - time_t_freq = test_conf.msk_test_res.sum(dim=0, keepdim=True).repeat( - test_conf.msk_test_res.shape[0],1 + assert len(self.test_conf.traj_is_weights_pd_alter.shape) == 2, "Incorrect test input dimensions" + assert len(self.test_conf.msk_test_res.shape) == 2, "Incorrect test input dimensions" + pred_res = calculator( + traj_is_weights=self.test_conf.traj_is_weights_pd_alter, + is_msk=self.test_conf.msk_test_res ) - denom = denom/time_t_freq - denom = denom.sum(dim=0)+smooth_eps - test_res = weight_test_res_alter/denom - toll = test_res.nanmean()/1000 - calculator = WISWeightNorm( - smooth_eps=smooth_eps, - discount=discount, - avg_denom=True - ) - assert len(weight_test_res_alter.shape) == 2, "Incorrect test input dimensions" - assert len(test_conf.msk_test_res.shape) == 2, "Incorrect test input dimensions" - pred_res = calculator(traj_is_weights=weight_test_res_alter, - is_msk=test_conf.msk_test_res) - self.assertEqual(pred_res.shape,weight_test_res_alter.shape) - np.testing.assert_allclose(pred_res.numpy(), test_res.numpy(), - atol=toll.numpy()) + self.assertEqual(pred_res.shape,self.test_conf.traj_is_weights_pd_alter.shape) + np.testing.assert_allclose(pred_res.numpy(), test_res.numpy(), + atol=toll.numpy()) - def test_norm_weights_wis_no_smooth_discount_avg(self): - discount=0.99 - discnt_tens = torch.full( - weight_test_res_alter.shape, - discount - ) - discnt_pows = torch.arange( - 0, weight_test_res_alter.shape[1])[None,:].repeat( - weight_test_res_alter.shape[0],1 - ) - discnt_tens = torch.pow(discnt_tens,discnt_pows) - denom = torch.mul( - weight_test_res_alter, - discnt_tens + def test_norm_weights_wpd_no_smooth_avg(self): + term_idx = [ + len(i) for i in self.test_conf.test_act_indiv_weights_alter + ] + term_weights = [] + for idx, traj in zip( + term_idx, + self.test_conf.traj_is_weights_pd_alter + ): + term_weights.append(traj[idx-1]) + term_weights = torch.tensor(term_weights) + # Sum over the weights as we are not doing cumulative + denom = term_weights.sum().repeat((1,max(term_idx))) + denom = (denom/len(term_idx)) + denom_toll = denom.squeeze().mean().numpy()/1000 + test_res = self.test_conf.traj_is_weights_pd_alter/denom + toll = test_res.mean()/1000 + calculator = WISWeightNorm(avg_denom=True) + norm = calculator.calc_norm( + traj_is_weights=self.test_conf.traj_is_weights_pd_alter, + is_msk=self.test_conf.msk_test_res + ) + np.testing.assert_allclose( + norm.numpy(), denom.numpy(), + atol=denom_toll ) - time_t_freq = test_conf.msk_test_res.sum(dim=0, keepdim=True).repeat( - test_conf.msk_test_res.shape[0],1 + assert len(self.test_conf.traj_is_weights_pd_alter.shape) == 2, "Incorrect test input dimensions" + assert len(self.test_conf.msk_test_res.shape) == 2, "Incorrect test input dimensions" + pred_res = calculator( + traj_is_weights=self.test_conf.traj_is_weights_pd_alter, + is_msk=self.test_conf.msk_test_res ) - denom = denom/time_t_freq - denom = denom.sum(dim=0) - test_res = weight_test_res_alter/denom - toll = test_res.nanmean()/1000 - calculator = WISWeightNorm( - discount=0.99, - avg_denom=True - ) - assert len(weight_test_res_alter.shape) == 2, "Incorrect test input dimensions" - assert len(test_conf.msk_test_res.shape) == 2, "Incorrect test input dimensions" - pred_res = calculator(traj_is_weights=weight_test_res_alter, - is_msk=test_conf.msk_test_res) - self.assertEqual(pred_res.shape,weight_test_res_alter.shape) - np.testing.assert_allclose(pred_res.numpy(), test_res.numpy(), - atol=toll.numpy()) + self.assertEqual(pred_res.shape,self.test_conf.traj_is_weights_pd_alter.shape) + np.testing.assert_allclose(pred_res.numpy(), test_res.numpy(), + atol=toll.numpy()) \ No newline at end of file diff --git a/tests/base.py b/tests/base.py index c1c4bfe..894e393 100644 --- a/tests/base.py +++ b/tests/base.py @@ -1,7 +1,8 @@ from dataclasses import dataclass -from typing import Any, List +from typing import Any, List, Dict import numpy as np import torch +import copy @dataclass class TestConfig: @@ -16,9 +17,46 @@ class TestConfig: test_dm_sa_values:List[List[float]] test_act_indiv_weights:List[np.ndarray[float]] = None weight_test_res:torch.Tensor = None + traj_is_weights_is:torch.Tensor = None + traj_is_weights_pd:torch.Tensor = None + weight_test_res_alter:torch.Tensor = None + traj_is_weights_is_alter:torch.Tensor = None + traj_is_weights_pd_alter:torch.Tensor = None msk_test_res:torch.Tensor = None reward_test_res:torch.Tensor = None + @staticmethod + def __get_traj_weights( + weight_test_res:torch.Tensor, + msk_test_res:torch.Tensor + ): + # Taking product to define \prod_{t=0}^{H}w_{t,i} + _traj_is_weights_sub = weight_test_res.detach().clone() + _traj_is_weights_sub[msk_test_res == 0] = 1 + _traj_is_weights_is = _traj_is_weights_sub.prod(dim=1, keepdim=True) + traj_is_weights_is = _traj_is_weights_is.repeat( + (1,weight_test_res.shape[1]) + ) + traj_is_weights_pd = _traj_is_weights_sub.cumprod( + dim=1) + traj_is_weights_is[msk_test_res == 0] = 0 + traj_is_weights_pd[msk_test_res == 0] = 0 + return traj_is_weights_is, traj_is_weights_pd + + @staticmethod + def __get_weight_mask_matrix( + test_act_indiv_weights:List[np.array] + ): + max_len = max([len(i) for i in test_act_indiv_weights]) + weight_test_res = [] + msk_test_res = [] + for i in test_act_indiv_weights: + weight_test_res.append(np.pad(i,(0,max_len-len(i))).tolist()) + msk_test_res.append( + np.pad(i.astype(bool),(0,max_len-len(i))).tolist() + ) + return torch.Tensor(weight_test_res), torch.Tensor(msk_test_res).float() + def __post_init__(self): test_act_indiv_weights = [] for i,j in zip(self.test_eval_action_probs,self.test_action_probs): @@ -27,17 +65,39 @@ def __post_init__(self): ) self.test_act_indiv_weights = test_act_indiv_weights - max_len = max([len(i) for i in self.test_act_indiv_weights]) - weight_test_res = [] - msk_test_res = [] - for i in self.test_act_indiv_weights: - weight_test_res.append(np.pad(i,(0,max_len-len(i))).tolist()) - msk_test_res.append( - np.pad(i.astype(bool),(0,max_len-len(i))).tolist() + ( + self.weight_test_res, + self.msk_test_res + ) = self.__get_weight_mask_matrix( + self.test_act_indiv_weights + ) + + ( + self.traj_is_weights_is, + self.traj_is_weights_pd + ) = self.__get_traj_weights( + self.weight_test_res, self.msk_test_res ) - self.weight_test_res = torch.Tensor(weight_test_res) - self.msk_test_res = torch.Tensor(msk_test_res).float() + # Check for trivial weights i.e., all 0 + assert not (self.traj_is_weights_is == 0).all().item(), "Weights are trivial" + assert not (self.traj_is_weights_pd == 0).all().item(), "Weights are trivial" + + self.test_act_indiv_weights_alter = copy.deepcopy( + self.test_act_indiv_weights) + self.test_act_indiv_weights_alter[0] = np.zeros( + len(self.test_act_indiv_weights_alter[0]) + ) + self.weight_test_res_alter,_ = self.__get_weight_mask_matrix( + self.test_act_indiv_weights_alter + ) + + (self.traj_is_weights_is_alter, + self.traj_is_weights_pd_alter) = self.__get_traj_weights( + self.weight_test_res_alter, self.msk_test_res + ) + + max_len = max([len(i) for i in test_act_indiv_weights]) reward_test_res = [] for i in self.test_reward_values: reward_test_res.append( @@ -85,43 +145,21 @@ def __post_init__(self): [[-3],[-2], [-0.8]] ] -# test_act_indiv_weights = [ -# np.array([1/0.9, 0.07/0.7, 0.89/0.66, 1/0.7]), -# np.array([ 0.75/0.54, 0.9/0.9, 0.2/0.5]) -# ] - - -# weight_test_res = torch.Tensor( -# [ -# test_act_indiv_weights[0].tolist(), -# [*test_act_indiv_weights[1].tolist(),0] -# ] -# ) - -# msk_test_res = torch.Tensor( -# [ -# [1]*4, -# [*[1]*3,0] -# ] -# ) - -# reward_test_res = torch.Tensor( -# [ -# test_reward_values[0], -# [*test_reward_values[1],[0]] -# ] -# ).squeeze() - - -bin_discrete_action_test = TestConfig( - test_state_vals=test_state_vals, - test_action_vals=test_action_vals, - test_action_probs=test_action_probs, - test_eval_action_vals=test_eval_action_vals, - test_eval_action_probs=test_eval_action_probs, - test_reward_values=test_reward_values, - test_dm_s_values=test_dm_s_values, - test_dm_sa_values=test_dm_sa_values +test_configs:Dict[str,TestConfig] = {} + +test_configs.update( + { + "binary_action": TestConfig( + test_state_vals=test_state_vals, + test_action_vals=test_action_vals, + test_action_probs=test_action_probs, + test_eval_action_vals=test_eval_action_vals, + test_eval_action_probs=test_eval_action_probs, + test_reward_values=test_reward_values, + test_dm_s_values=test_dm_s_values, + test_dm_sa_values=test_dm_sa_values + ) + } ) test_action_vals = [ @@ -134,17 +172,21 @@ def __post_init__(self): [[0], [0], [1]] ] -single_discrete_action_test = TestConfig( - test_state_vals=test_state_vals, - test_action_vals=test_action_vals, - test_action_probs=test_action_probs, - test_eval_action_vals=test_eval_action_vals, - test_eval_action_probs=test_eval_action_probs, - test_reward_values=test_reward_values, - test_dm_s_values=test_dm_s_values, - test_dm_sa_values=test_dm_sa_values -) +test_configs.update( + { + "categorical_action": TestConfig( + test_state_vals=test_state_vals, + test_action_vals=test_action_vals, + test_action_probs=test_action_probs, + test_eval_action_vals=test_eval_action_vals, + test_eval_action_probs=test_eval_action_probs, + test_reward_values=test_reward_values, + test_dm_s_values=test_dm_s_values, + test_dm_sa_values=test_dm_sa_values + ) + } +) test_action_vals = [ [[1,1], [0,1], [0,1], [1,0]], @@ -156,17 +198,26 @@ def __post_init__(self): [[0,1], [0,0], [0,1]] ] -duel_discrete_action_test = TestConfig( - test_state_vals=test_state_vals, - test_action_vals=test_action_vals, - test_action_probs=test_action_probs, - test_eval_action_vals=test_eval_action_vals, - test_eval_action_probs=test_eval_action_probs, - test_reward_values=test_reward_values, - test_dm_s_values=test_dm_s_values, - test_dm_sa_values=test_dm_sa_values +test_configs.update( + { + "multi_binary_action": TestConfig( + test_state_vals=test_state_vals, + test_action_vals=test_action_vals, + test_action_probs=test_action_probs, + test_eval_action_vals=test_eval_action_vals, + test_eval_action_probs=test_eval_action_probs, + test_reward_values=test_reward_values, + test_dm_s_values=test_dm_s_values, + test_dm_sa_values=test_dm_sa_values + ) + } ) +test_configs_fmt = [[key,test_configs[key]] for key in test_configs.keys()] +test_configs_fmt_class = [ + {"test_conf":test_configs[key]} for key in test_configs.keys() + ] + def flatten_lst(input_lst:List[Any], recursive:bool=True)->List[Any]: """Function for flattening a list containing lists @@ -187,4 +238,6 @@ def flatten_lst(input_lst:List[Any], recursive:bool=True)->List[Any]: output_lst = output_lst + sub_lst else: output_lst.append(sub_lst) - return output_lst \ No newline at end of file + return output_lst + +tmp = test_configs["binary_action"] diff --git a/tests/components/test_ImportanceSampler.py b/tests/components/test_ImportanceSampler.py index 440e6c2..c0019be 100644 --- a/tests/components/test_ImportanceSampler.py +++ b/tests/components/test_ImportanceSampler.py @@ -8,14 +8,8 @@ VanillaIS, PerDecisionIS, ISWeightCalculator ) from offline_rl_ope import logger -# from ..base import (test_action_probs, test_action_vals, test_eval_action_probs, -# test_eval_action_vals, test_reward_values, test_state_vals, -# test_act_indiv_weights, weight_test_res, msk_test_res) -from ..base import ( - single_discrete_action_test as sdat, - duel_discrete_action_test as ddat, - bin_discrete_action_test as bdat - ) +from parameterized import parameterized_class +from ..base import test_configs_fmt_class, TestConfig test_act_inidiv_rew = [ @@ -23,190 +17,199 @@ np.array([-1, -1*0.99, -1*(np.power(0.99,2))]) ] -for test_conf in [sdat,ddat,bdat]: - test_act_norm_conts = [val.prod() for val in test_conf.test_act_indiv_weights] +class TestPolicy: + + def __init__(self, values) -> None: + self.idx=0 + self.values = values + + def __call__(self, state: torch.Tensor, action: torch.Tensor): + res = self.values[self.idx] + self.idx += 1 + return torch.Tensor(res) + + def reset(self): + self.idx = 0 - test_act_pd_weights = [val.cumprod() for val in test_conf.test_act_indiv_weights] +@parameterized_class(test_configs_fmt_class) +class ISWeightCalculatorTest(unittest.TestCase): + + test_conf:TestConfig + def setUp(self) -> None: - test_act_traj_rew = [val.sum() for val in test_act_inidiv_rew] - test_act_traj_weights = [val.prod() for val in test_conf.test_act_indiv_weights] - - test_act_traj_w_r = [] - for w,r in zip(test_act_traj_weights, test_act_traj_rew): - test_act_traj_w_r.append( - ( - torch.Tensor([w]).squeeze(), - torch.Tensor([r]).squeeze() - ) - ) + self.test_act_norm_conts = [val.prod() for val in self.test_conf.test_act_indiv_weights] + self.test_act_pd_weights = [val.cumprod() for val in self.test_conf.test_act_indiv_weights] - test_act_traj_w = [w for w,r in test_act_traj_w_r] - test_act_losses = [] - for i,(w,r) in enumerate(test_act_traj_w_r): - w = w/sum(test_act_traj_w) - test_act_losses.append(w*r) - test_act_loss = sum(test_act_losses).item() + test_act_traj_rew = [val.sum() for val in test_act_inidiv_rew] + test_act_traj_weights = [val.prod() for val in self.test_conf.test_act_indiv_weights] + + test_act_traj_w_r = [] + for w,r in zip(test_act_traj_weights, test_act_traj_rew): + test_act_traj_w_r.append( + ( + torch.Tensor([w]).squeeze(), + torch.Tensor([r]).squeeze() + ) + ) - # clip = 0.03 - # test_act_losses_clip = [] - # for i,(w,r) in enumerate(test_act_traj_w_r): - # w = w/sum(test_act_traj_w) - # if w > clip: - # test_act_losses_clip.append(clip*r) - # else: - # test_act_losses_clip.append(w*r) - # test_act_loss_clip = sum(test_act_losses_clip).item() - class TestPolicy: - - def __init__(self, values) -> None: - self.idx=0 - self.values = values - - def __call__(self, state: torch.Tensor, action: torch.Tensor): - res = self.values[self.idx] - self.idx += 1 - return torch.Tensor(res) - - def reset(self): - self.idx = 0 + test_act_traj_w = [w for w,r in test_act_traj_w_r] + + test_act_losses = [] + for i,(w,r) in enumerate(test_act_traj_w_r): + w = w/sum(test_act_traj_w) + test_act_losses.append(w*r) + test_act_loss = sum(test_act_losses).item() + + be_policy_mock = TestPolicy(self.test_conf.test_action_probs) + behav_policy = MagicMock( + spec=Policy, + side_effect=be_policy_mock + ) + #behav_policy.__call__ = MagicMock(side_effect=) + #behav_policy = TestPolicy(self.test_conf.test_action_probs) + self.is_sampler = ISWeightCalculator(behav_policy=behav_policy) + # def __return_func(weight_array): + # return weight_array - # class TestISWeightCalculator: + # self.is_sampler.get_traj_weight_array = MagicMock( + # side_effect=__return_func) + self.tollerance = [abs(val.mean())/1000 + for val in self.test_conf.test_act_indiv_weights] - # def __init__(self) -> None: - # self.is_weights = test_conf.weight_test_res - # self.is_msk = test_conf.msk_test_res + self.test_IS_weight_calculator = MagicMock(spec=ISWeightCalculator) + self.test_IS_weight_calculator.is_weights = MagicMock( + return_value=self.test_conf.weight_test_res) + self.test_IS_weight_calculator.is_msk = MagicMock( + return_value=self.test_conf.msk_test_res) + - test_IS_weight_calculator = MagicMock(spec=ISWeightCalculator) - test_IS_weight_calculator.is_weights = MagicMock( - return_value=test_conf.weight_test_res) - test_IS_weight_calculator.is_msk = MagicMock( - return_value=test_conf.msk_test_res) - - class ISWeightCalculatorTest(unittest.TestCase): - - def setUp(self) -> None: - be_policy_mock = TestPolicy(test_conf.test_action_probs) - behav_policy = MagicMock( - spec=Policy, - side_effect=be_policy_mock - ) - #behav_policy.__call__ = MagicMock(side_effect=) - #behav_policy = TestPolicy(test_conf.test_action_probs) - self.is_sampler = ISWeightCalculator(behav_policy=behav_policy) - # def __return_func(weight_array): - # return weight_array + def test_get_traj_w(self): + test_pred = [] + #eval_policy = TestPolicy(self.test_conf.test_eval_action_probs) + e_policy_mock = TestPolicy(self.test_conf.test_eval_action_probs) + eval_policy = MagicMock( + spec=Policy, + side_effect=e_policy_mock + ) + for s,a in zip(self.test_conf.test_state_vals, self.test_conf.test_action_vals): + s = torch.Tensor(s) + a = torch.Tensor(a) + pred = self.is_sampler.get_traj_w( + states=s, actions=a, eval_policy=eval_policy + ) + self.assertEqual(pred.shape, torch.Size([s.shape[0]])) + test_pred.append(pred.tolist()) + for p,t,toll in zip( + test_pred, + self.test_conf.test_act_indiv_weights, + self.tollerance): + np.testing.assert_allclose(p, t, atol=toll) + + def test_get_dataset_w(self): + input_states = [torch.Tensor(s) for s in self.test_conf.test_state_vals] + input_actions = [torch.Tensor(a) for a in self.test_conf.test_action_vals] + #eval_policy = TestPolicy(self.test_conf.test_eval_action_probs) + e_policy_mock = TestPolicy(self.test_conf.test_eval_action_probs) + eval_policy = MagicMock( + spec=Policy, + side_effect=e_policy_mock + ) + is_weights, weight_msk = self.is_sampler.get_dataset_w( + states=input_states, actions=input_actions, eval_policy=eval_policy) + self.assertEqual(is_weights.shape, self.test_conf.weight_test_res.shape) + self.assertEqual(weight_msk.shape, self.test_conf.weight_test_res.shape) + tol = torch.Tensor(self.tollerance).view(-1,1).expand( + size=(len(self.tollerance), is_weights.shape[1])).mean() + np.testing.assert_allclose( + is_weights.numpy(), self.test_conf.weight_test_res.numpy(), atol=tol.numpy() + ) + np.testing.assert_allclose( + weight_msk.numpy(), self.test_conf.msk_test_res.numpy(), atol=tol.numpy() + ) - # self.is_sampler.get_traj_weight_array = MagicMock( - # side_effect=__return_func) - self.tollerance = [abs(val.mean())/1000 - for val in test_conf.test_act_indiv_weights] + # def test_eval_traj_reward(self): - def test_get_traj_w(self): - test_pred = [] - #eval_policy = TestPolicy(test_conf.test_eval_action_probs) - e_policy_mock = TestPolicy(test_conf.test_eval_action_probs) - eval_policy = MagicMock( - spec=Policy, - side_effect=e_policy_mock - ) - for s,a in zip(test_conf.test_state_vals, test_conf.test_action_vals): - s = torch.Tensor(s) - a = torch.Tensor(a) - pred = self.is_sampler.get_traj_w( - states=s, actions=a, eval_policy=eval_policy - ) - self.assertEqual(pred.shape, torch.Size([s.shape[0]])) - test_pred.append(pred.tolist()) - for p,t,toll in zip( - test_pred, - test_conf.test_act_indiv_weights, - self.tollerance): - np.testing.assert_allclose(p, t, atol=toll) + # tollerance = abs(test_act_inidiv_rew.mean())/1000 + # test_pred = [] + # for r in test_reward_values: + # r = torch.Tensor(r) + # pred = self.is_sampler._ImportanceSampling__eval_traj_reward( + # reward_array=r + # ) + # self.assertEqual(pred.shape, torch.Size([3])) + # test_pred.append(pred.tolist()) + # test_pred = np.array(test_pred) + # res = test_pred==test_act_inidiv_rew + # if not res.all(): + # logger.debug(test_pred) + # logger.debug(test_act_inidiv_rew) + # diff_res = test_pred-test_act_inidiv_rew + # diff_res = (diff_res < tollerance).all() + # self.assertTrue(diff_res) + # else: + # self.assertTrue(res.all()) + +@parameterized_class(test_configs_fmt_class) +class VanillaISTest(unittest.TestCase): + + test_conf:TestConfig + + def setUp(self) -> None: + self.test_act_norm_conts = [val.prod() for val in self.test_conf.test_act_indiv_weights] + self.test_IS_weight_calculator = MagicMock(spec=ISWeightCalculator) + self.test_IS_weight_calculator.is_weights = MagicMock( + return_value=self.test_conf.weight_test_res) + self.test_IS_weight_calculator.is_msk = MagicMock( + return_value=self.test_conf.msk_test_res) + + self.is_sampler = VanillaIS(is_weight_calc=self.test_IS_weight_calculator) + + def test_get_traj_weight_array(self): + test_act_norm_conts_w_m = copy.deepcopy(self.test_conf.msk_test_res) + for i in range(len(self.test_act_norm_conts)): + test_act_norm_conts_w_m[i,:] = test_act_norm_conts_w_m[i,:]*self.test_act_norm_conts[i] + + tollerance_w_m = abs(test_act_norm_conts_w_m.numpy().mean())/1000 + test_act_norm_conts_w_m = torch.tensor(test_act_norm_conts_w_m) + pred = self.is_sampler.get_traj_weight_array( + is_weights=self.test_conf.weight_test_res, + weight_msk=self.test_conf.msk_test_res + ) - def test_get_dataset_w(self): - input_states = [torch.Tensor(s) for s in test_conf.test_state_vals] - input_actions = [torch.Tensor(a) for a in test_conf.test_action_vals] - #eval_policy = TestPolicy(test_conf.test_eval_action_probs) - e_policy_mock = TestPolicy(test_conf.test_eval_action_probs) - eval_policy = MagicMock( - spec=Policy, - side_effect=e_policy_mock - ) - is_weights, weight_msk = self.is_sampler.get_dataset_w( - states=input_states, actions=input_actions, eval_policy=eval_policy) - self.assertEqual(is_weights.shape, test_conf.weight_test_res.shape) - self.assertEqual(weight_msk.shape, test_conf.weight_test_res.shape) - tol = torch.Tensor(self.tollerance).view(-1,1).expand( - size=(len(self.tollerance), is_weights.shape[1])).mean() - np.testing.assert_allclose( - is_weights.numpy(), test_conf.weight_test_res.numpy(), atol=tol.numpy() - ) - np.testing.assert_allclose( - weight_msk.numpy(), test_conf.msk_test_res.numpy(), atol=tol.numpy() - ) - - # def test_eval_traj_reward(self): - - # tollerance = abs(test_act_inidiv_rew.mean())/1000 - # test_pred = [] - # for r in test_reward_values: - # r = torch.Tensor(r) - # pred = self.is_sampler._ImportanceSampling__eval_traj_reward( - # reward_array=r - # ) - # self.assertEqual(pred.shape, torch.Size([3])) - # test_pred.append(pred.tolist()) - # test_pred = np.array(test_pred) - # res = test_pred==test_act_inidiv_rew - # if not res.all(): - # logger.debug(test_pred) - # logger.debug(test_act_inidiv_rew) - # diff_res = test_pred-test_act_inidiv_rew - # diff_res = (diff_res < tollerance).all() - # self.assertTrue(diff_res) - # else: - # self.assertTrue(res.all()) - - class VanillaISTest(unittest.TestCase): - def setUp(self) -> None: - self.is_sampler = VanillaIS(is_weight_calc=test_IS_weight_calculator) - - def test_get_traj_weight_array(self): - test_act_norm_conts_w_m = copy.deepcopy(test_conf.msk_test_res) - for i in range(len(test_act_norm_conts)): - test_act_norm_conts_w_m[i,:] = test_act_norm_conts_w_m[i,:]*test_act_norm_conts[i] - - tollerance_w_m = abs(test_act_norm_conts_w_m.numpy().mean())/1000 - test_act_norm_conts_w_m = torch.tensor(test_act_norm_conts_w_m) - pred = self.is_sampler.get_traj_weight_array( - is_weights=test_conf.weight_test_res, - weight_msk=test_conf.msk_test_res - ) + self.assertEqual(pred.shape, test_act_norm_conts_w_m.shape) + np.testing.assert_allclose( + pred, test_act_norm_conts_w_m, atol=tollerance_w_m + ) + +@parameterized_class(test_configs_fmt_class) +class PerDecisionISTest(unittest.TestCase): + + test_conf:TestConfig + + def setUp(self) -> None: + self.test_act_pd_weights = [val.cumprod() for val in self.test_conf.test_act_indiv_weights] + self.test_IS_weight_calculator = MagicMock(spec=ISWeightCalculator) + self.test_IS_weight_calculator.is_weights = MagicMock( + return_value=self.test_conf.weight_test_res) + self.test_IS_weight_calculator.is_msk = MagicMock( + return_value=self.test_conf.msk_test_res) + self.is_sampler = PerDecisionIS(is_weight_calc=self.test_IS_weight_calculator) + + def test_get_traj_weight_array(self): + test_act_norm_conts_w_m = copy.deepcopy(self.test_conf.msk_test_res) + for i in range(len(self.test_act_pd_weights)): + test_act_norm_conts_w_m[i,0:len(self.test_act_pd_weights[i])] = torch.tensor(self.test_act_pd_weights[i]) - self.assertEqual(pred.shape, test_act_norm_conts_w_m.shape) - np.testing.assert_allclose( - pred, test_act_norm_conts_w_m, atol=tollerance_w_m - ) - - class PerDecisionISTest(unittest.TestCase): - def setUp(self) -> None: - self.is_sampler = PerDecisionIS(is_weight_calc=test_IS_weight_calculator) - - def test_get_traj_weight_array(self): - test_act_norm_conts_w_m = copy.deepcopy(test_conf.msk_test_res) - for i in range(len(test_act_pd_weights)): - test_act_norm_conts_w_m[i,0:len(test_act_pd_weights[i])] = torch.tensor(test_act_pd_weights[i]) - - tollerance_w_m = abs(test_act_norm_conts_w_m.numpy().mean())/1000 - pred = self.is_sampler.get_traj_weight_array( - is_weights=test_conf.weight_test_res, - weight_msk=test_conf.msk_test_res - ) - self.assertEqual(pred.shape, test_act_norm_conts_w_m.shape) - np.testing.assert_allclose( - pred, test_act_norm_conts_w_m, atol=tollerance_w_m - ) \ No newline at end of file + tollerance_w_m = abs(test_act_norm_conts_w_m.numpy().mean())/1000 + pred = self.is_sampler.get_traj_weight_array( + is_weights=self.test_conf.weight_test_res, + weight_msk=self.test_conf.msk_test_res + ) + self.assertEqual(pred.shape, test_act_norm_conts_w_m.shape) + np.testing.assert_allclose( + pred, test_act_norm_conts_w_m, atol=tollerance_w_m + ) \ No newline at end of file diff --git a/tests/components/test_Policy.py b/tests/components/test_Policy.py index e2d875a..b378dd1 100644 --- a/tests/components/test_Policy.py +++ b/tests/components/test_Policy.py @@ -5,201 +5,202 @@ from offline_rl_ope.components.Policy import ( GreedyDeterministic, BehavPolicy) from offline_rl_ope import logger -# from ..base import (test_action_probs, test_action_vals, test_eval_action_probs, -# test_eval_action_vals, test_reward_values, test_state_vals) -from ..base import ( - single_discrete_action_test as sdat, - duel_discrete_action_test as ddat, - bin_discrete_action_test as bdat - ) +from parameterized import parameterized_class +from ..base import test_configs_fmt_class, TestConfig + eps = 0.001 -for test_conf in [sdat,ddat,bdat]: - class GreedyDeterministicTest(unittest.TestCase): +@parameterized_class(test_configs_fmt_class) +class GreedyDeterministicTest(unittest.TestCase): + + test_conf:TestConfig - def setUp(self) -> None: - def __mock_return(x): - lkp = { - str(torch.Tensor(state)):torch.Tensor(act) - for state,act in zip( - test_conf.test_state_vals, - test_conf.test_eval_action_vals - ) - } - return lkp[str(x)] - policy_func = MagicMock(side_effect=__mock_return) - self.policy_0_eps = GreedyDeterministic(policy_func, gpu=False) - self.policy_001_eps = GreedyDeterministic( - policy_func, gpu=False, eps=eps) - - def __mock_return_multi_dim(x): - lkp = { - str(torch.Tensor(state)):torch.concat( - [torch.Tensor(act),torch.abs(1-torch.Tensor(act))], - dim=1 - ) - for state,act in zip( - test_conf.test_state_vals, - test_conf.test_eval_action_vals - ) - } - return lkp[str(x)] - policy_func_multi_dim = MagicMock(side_effect=__mock_return_multi_dim) - self.policy_0_eps_multi_dim = GreedyDeterministic( - policy_func_multi_dim, - gpu=False - ) - self.policy_001_eps_multi_dim = GreedyDeterministic( - policy_func_multi_dim, - gpu=False, - eps=eps - ) + def setUp(self) -> None: + def __mock_return(x): + lkp = { + str(torch.Tensor(state)):torch.Tensor(act) + for state,act in zip( + self.test_conf.test_state_vals, + self.test_conf.test_eval_action_vals + ) + } + return lkp[str(x)] + policy_func = MagicMock(side_effect=__mock_return) + self.policy_0_eps = GreedyDeterministic(policy_func, gpu=False) + self.policy_001_eps = GreedyDeterministic( + policy_func, gpu=False, eps=eps) - def test___call__0_eps(self): - test_pred = [] - __test_action_vals = [np.array(i) for i in test_conf.test_action_vals] - __test_eval_action_vals = [np.array(i) for i in test_conf.test_eval_action_vals] - test_res = [(x==y).astype(int) - for x,y in zip(__test_action_vals, __test_eval_action_vals)] - test_res = np.concatenate(test_res).squeeze() - tollerance = test_res.mean()/1000 - for s,a in zip(test_conf.test_state_vals, __test_action_vals): - s = torch.Tensor(s) - a = torch.Tensor(a) - assert len(s.shape) == 2, "Incorrect test input dimensions" - assert len(a.shape) == 2, "Incorrect test input dimensions" - pred = self.policy_0_eps(state=s, action=a) - self.assertEqual(pred.shape, torch.Size((s.shape[0],1))) - test_pred.append(pred.squeeze().numpy()) - test_pred = np.concatenate(test_pred) - np.testing.assert_allclose(test_pred, test_res, atol=tollerance) - - def test___call__0001_eps(self): - test_pred = [] - __test_action_vals = [np.array(i) for i in test_conf.test_action_vals] - __test_eval_action_vals = [np.array(i) for i in test_conf.test_eval_action_vals] - test_res = [(x==y).astype(int) - for x,y in zip(__test_action_vals, __test_eval_action_vals)] - test_res = np.concatenate(test_res).squeeze() - test_res = np.where( - test_res == 1, 1-eps, 0+eps + def __mock_return_multi_dim(x): + lkp = { + str(torch.Tensor(state)):torch.concat( + [torch.Tensor(act),torch.abs(1-torch.Tensor(act))], + dim=1 + ) + for state,act in zip( + self.test_conf.test_state_vals, + self.test_conf.test_eval_action_vals + ) + } + return lkp[str(x)] + policy_func_multi_dim = MagicMock(side_effect=__mock_return_multi_dim) + self.policy_0_eps_multi_dim = GreedyDeterministic( + policy_func_multi_dim, + gpu=False ) - tollerance = test_res.mean()/1000 - for s,a in zip(test_conf.test_state_vals, __test_action_vals): - s = torch.Tensor(s) - a = torch.Tensor(a) - assert len(s.shape) == 2, "Incorrect test input dimensions" - assert len(a.shape) == 2, "Incorrect test input dimensions" - pred = self.policy_001_eps(state=s, action=a) - self.assertEqual(pred.shape, torch.Size((s.shape[0],1))) - test_pred.append(pred.squeeze().numpy()) - test_pred = np.concatenate(test_pred) - np.testing.assert_allclose(test_pred, test_res, atol=tollerance) - - # def test___call__0_eps_multi_dim(self): - # test_pred = [] - # __test_action_vals = [ - # np.concatenate( - # [np.array(i),np.abs(1-np.array(i))], - # axis=1 - # ) for i in test_conf.test_action_vals - # ] - # __test_eval_action_vals = [ - # np.concatenate( - # [np.array(i),np.abs(1-np.array(i))], - # axis=1 - # ) for i in test_conf.test_eval_action_vals - # ] - # test_res = [(x==y).all(axis=1).astype(int) - # for x,y in zip(__test_action_vals, __test_eval_action_vals)] - # test_res = np.concatenate(test_res).squeeze() - # tollerance = test_res.mean()/1000 - # for s,a in zip(test_conf.test_state_vals, __test_action_vals): - # s = torch.Tensor(s) - # a = torch.Tensor(a) - # pred = self.policy_0_eps_multi_dim(state=s, action=a) - # self.assertEqual(pred.shape, torch.Size((s.shape[0],1))) - # test_pred.append(pred.squeeze().numpy()) - # test_pred = np.concatenate(test_pred) - # np.testing.assert_allclose(test_pred, test_res, atol=tollerance) - - # def test___call__0001_eps_multi_dim(self): - # test_pred = [] - # __test_action_vals = [ - # np.concatenate( - # [np.array(i),np.abs(1-np.array(i))], - # axis=1 - # ) for i in test_conf.test_action_vals - # ] - # __test_eval_action_vals = [ - # np.concatenate( - # [np.array(i),np.abs(1-np.array(i))], - # axis=1 - # ) for i in test_conf.test_eval_action_vals - # ] - # test_res = [(x==y).all(axis=1).astype(int) - # for x,y in zip(__test_action_vals, __test_eval_action_vals)] - # test_res = np.concatenate(test_res).squeeze() - # test_res = np.where( - # test_res == 1, 1-eps, 0+eps - # ) - # tollerance = test_res.mean()/1000 - # for s,a in zip(test_conf.test_state_vals, __test_action_vals): - # s = torch.Tensor(s) - # a = torch.Tensor(a) - # pred = self.policy_001_eps_multi_dim(state=s, action=a) - # self.assertEqual(pred.shape, torch.Size((s.shape[0],1))) - # test_pred.append(pred.squeeze().numpy()) - # test_pred = np.concatenate(test_pred) - # np.testing.assert_allclose(test_pred, test_res, atol=tollerance) - - class MockPolicyClass: + self.policy_001_eps_multi_dim = GreedyDeterministic( + policy_func_multi_dim, + gpu=False, + eps=eps + ) + + def test___call__0_eps(self): + test_pred = [] + __test_action_vals = [np.array(i) for i in self.test_conf.test_action_vals] + __test_eval_action_vals = [np.array(i) for i in self.test_conf.test_eval_action_vals] + test_res = [(x==y).all(axis=1).astype(int) + for x,y in zip(__test_action_vals, __test_eval_action_vals)] + test_res = np.concatenate(test_res).squeeze() + tollerance = test_res.mean()/1000 + for s,a in zip(self.test_conf.test_state_vals, __test_action_vals): + s = torch.Tensor(s) + a = torch.Tensor(a) + assert len(s.shape) == 2, "Incorrect test input dimensions" + assert len(a.shape) == 2, "Incorrect test input dimensions" + pred = self.policy_0_eps(state=s, action=a) + self.assertEqual(pred.shape, torch.Size((s.shape[0],1))) + test_pred.append(pred.squeeze().numpy()) + test_pred = np.concatenate(test_pred) + np.testing.assert_allclose(test_pred, test_res, atol=tollerance) - def __init__(self) -> None: - pass + def test___call__0001_eps(self): + test_pred = [] + __test_action_vals = [np.array(i) for i in self.test_conf.test_action_vals] + __test_eval_action_vals = [np.array(i) for i in self.test_conf.test_eval_action_vals] + test_res = [(x==y).all(axis=1).astype(int) + for x,y in zip(__test_action_vals, __test_eval_action_vals)] + test_res = np.concatenate(test_res).squeeze() + test_res = np.where( + test_res == 1, 1-eps, 0+eps + ) + tollerance = test_res.mean()/1000 + for s,a in zip(self.test_conf.test_state_vals, __test_action_vals): + s = torch.Tensor(s) + a = torch.Tensor(a) + assert len(s.shape) == 2, "Incorrect test input dimensions" + assert len(a.shape) == 2, "Incorrect test input dimensions" + pred = self.policy_001_eps(state=s, action=a) + self.assertEqual(pred.shape, torch.Size((s.shape[0],1))) + test_pred.append(pred.squeeze().numpy()) + test_pred = np.concatenate(test_pred) + np.testing.assert_allclose(test_pred, test_res, atol=tollerance) + + # def test___call__0_eps_multi_dim(self): + # test_pred = [] + # __test_action_vals = [ + # np.concatenate( + # [np.array(i),np.abs(1-np.array(i))], + # axis=1 + # ) for i in self.test_conf.test_action_vals + # ] + # __test_eval_action_vals = [ + # np.concatenate( + # [np.array(i),np.abs(1-np.array(i))], + # axis=1 + # ) for i in self.test_conf.test_eval_action_vals + # ] + # test_res = [(x==y).all(axis=1).astype(int) + # for x,y in zip(__test_action_vals, __test_eval_action_vals)] + # test_res = np.concatenate(test_res).squeeze() + # tollerance = test_res.mean()/1000 + # for s,a in zip(self.test_conf.test_state_vals, __test_action_vals): + # s = torch.Tensor(s) + # a = torch.Tensor(a) + # pred = self.policy_0_eps_multi_dim(state=s, action=a) + # self.assertEqual(pred.shape, torch.Size((s.shape[0],1))) + # test_pred.append(pred.squeeze().numpy()) + # test_pred = np.concatenate(test_pred) + # np.testing.assert_allclose(test_pred, test_res, atol=tollerance) - class BehavPolicyTest(unittest.TestCase): - - def setUp(self) -> None: - def __mock_return(y, x): - lkp = { - "_".join( - [ - str(torch.tensor(state).float()), - str(torch.tensor(act).float()) - ] - ): torch.tensor(probs) - for state,act,probs in zip( - test_conf.test_state_vals, test_conf.test_action_vals, - test_conf.test_action_probs) - } - print(f"x: {x}") - print(f"y: {y}") - print(f"lkp: {list(lkp.keys())[0]}") - print(f'id: {"_".join([str(x),str(y)])}') - return lkp["_".join([str(x),str(y)])] - #policy_func = MockPolicyClass() - #policy_func.__call__ = MagicMock(side_effect=__mock_return) - #self.policy = BehavPolicy(policy_func) - self.policy = BehavPolicy( - policy_func=MagicMock(side_effect=__mock_return)) + # def test___call__0001_eps_multi_dim(self): + # test_pred = [] + # __test_action_vals = [ + # np.concatenate( + # [np.array(i),np.abs(1-np.array(i))], + # axis=1 + # ) for i in self.test_conf.test_action_vals + # ] + # __test_eval_action_vals = [ + # np.concatenate( + # [np.array(i),np.abs(1-np.array(i))], + # axis=1 + # ) for i in self.test_conf.test_eval_action_vals + # ] + # test_res = [(x==y).all(axis=1).astype(int) + # for x,y in zip(__test_action_vals, __test_eval_action_vals)] + # test_res = np.concatenate(test_res).squeeze() + # test_res = np.where( + # test_res == 1, 1-eps, 0+eps + # ) + # tollerance = test_res.mean()/1000 + # for s,a in zip(self.test_conf.test_state_vals, __test_action_vals): + # s = torch.Tensor(s) + # a = torch.Tensor(a) + # pred = self.policy_001_eps_multi_dim(state=s, action=a) + # self.assertEqual(pred.shape, torch.Size((s.shape[0],1))) + # test_pred.append(pred.squeeze().numpy()) + # test_pred = np.concatenate(test_pred) + # np.testing.assert_allclose(test_pred, test_res, atol=tollerance) - - def test___call__(self): - test_pred = [] - test_res = [np.array(i) for i in test_conf.test_action_probs] - test_res = np.concatenate(test_res).squeeze() - tollerance = test_res.mean()/1000 - for s,a in zip(test_conf.test_state_vals, test_conf.test_action_vals): - s = torch.Tensor(s) - a = torch.Tensor(a) - assert len(s.shape) == 2, "Incorrect test input dimensions" - assert len(a.shape) == 2, "Incorrect test input dimensions" - pred = self.policy(state=s, action=a) - self.assertEqual(pred.shape, torch.Size((s.shape[0],1))) - test_pred.append(pred.squeeze().numpy()) - test_pred = np.concatenate(test_pred) - np.testing.assert_allclose(test_pred, test_res, atol=tollerance) +class MockPolicyClass: + + def __init__(self) -> None: + pass + +@parameterized_class(test_configs_fmt_class) +class BehavPolicyTest(unittest.TestCase): + + test_conf:TestConfig + + def setUp(self) -> None: + def __mock_return(y, x): + lkp = { + "_".join( + [ + str(torch.tensor(state).float()), + str(torch.tensor(act).float()) + ] + ): torch.tensor(probs) + for state,act,probs in zip( + self.test_conf.test_state_vals, self.test_conf.test_action_vals, + self.test_conf.test_action_probs) + } + print(f"x: {x}") + print(f"y: {y}") + print(f"lkp: {list(lkp.keys())[0]}") + print(f'id: {"_".join([str(x),str(y)])}') + return lkp["_".join([str(x),str(y)])] + #policy_func = MockPolicyClass() + #policy_func.__call__ = MagicMock(side_effect=__mock_return) + #self.policy = BehavPolicy(policy_func) + self.policy = BehavPolicy( + policy_func=MagicMock(side_effect=__mock_return)) + + + def test___call__(self): + test_pred = [] + test_res = [np.array(i) for i in self.test_conf.test_action_probs] + test_res = np.concatenate(test_res).squeeze() + tollerance = test_res.mean()/1000 + for s,a in zip(self.test_conf.test_state_vals, self.test_conf.test_action_vals): + s = torch.Tensor(s) + a = torch.Tensor(a) + assert len(s.shape) == 2, "Incorrect test input dimensions" + assert len(a.shape) == 2, "Incorrect test input dimensions" + pred = self.policy(state=s, action=a) + self.assertEqual(pred.shape, torch.Size((s.shape[0],1))) + test_pred.append(pred.squeeze().numpy()) + test_pred = np.concatenate(test_pred) + np.testing.assert_allclose(test_pred, test_res, atol=tollerance)