diff --git a/pkg/pyepo/func/blackbox.py b/pkg/pyepo/func/blackbox.py index 628644e0..99a50db2 100644 --- a/pkg/pyepo/func/blackbox.py +++ b/pkg/pyepo/func/blackbox.py @@ -49,11 +49,17 @@ def __init__(self, optmodel, lambd=10, processes=1, solve_ratio=1, dataset=None) if lambd <= 0: raise ValueError("lambda is not positive.") self.lambd = lambd - # num of processors + # number of processes if processes not in range(mp.cpu_count()+1): raise ValueError("Invalid processors number {}, only {} cores.". format(processes, mp.cpu_count())) - self.processes = processes + self.processes = mp.cpu_count() if not processes else processes + # single-core + if processes == 1: + self.pool = None + # multi-core + else: + self.pool = ProcessingPool(processes) print("Num of cores: {}".format(self.processes)) # solution pool self.solve_ratio = solve_ratio @@ -73,7 +79,7 @@ def forward(self, pred_cost): Forward pass """ sols = self.dbb.apply(pred_cost, self.lambd, self.optmodel, - self.processes, self.solve_ratio, self) + self.processes, self.pool, self.solve_ratio, self) return sols @@ -83,7 +89,7 @@ class blackboxOptFunc(Function): """ @staticmethod - def forward(ctx, pred_cost, lambd, optmodel, processes, solve_ratio, module): + def forward(ctx, pred_cost, lambd, optmodel, processes, pool, solve_ratio, module): """ Forward pass for DBB @@ -92,6 +98,7 @@ def forward(ctx, pred_cost, lambd, optmodel, processes, solve_ratio, module): lambd (float): a hyperparameter for differentiable block-box to contral interpolation degree optmodel (optModel): an PyEPO optimization model processes (int): number of processors, 1 for single-core, 0 for all of cores + pool (ProcessPool): process pool object solve_ratio (float): the ratio of new solutions computed during training module (nn.Module): blackboxOpt module @@ -105,7 +112,7 @@ def forward(ctx, pred_cost, lambd, optmodel, processes, solve_ratio, module): # solve rand_sigma = np.random.uniform() if rand_sigma <= solve_ratio: - sol = _solve_in_pass(cp, optmodel, processes) + sol = _solve_in_pass(cp, optmodel, processes, pool) if solve_ratio < 1: module.solpool = np.concatenate((module.solpool, sol)) else: @@ -119,6 +126,7 @@ def forward(ctx, pred_cost, lambd, optmodel, processes, solve_ratio, module): ctx.lambd = lambd ctx.optmodel = optmodel ctx.processes = processes + ctx.pool = pool ctx.solve_ratio = solve_ratio if solve_ratio < 1: ctx.module = module @@ -134,6 +142,7 @@ def backward(ctx, grad_output): lambd = ctx.lambd optmodel = ctx.optmodel processes = ctx.processes + pool = ctx.pool solve_ratio = ctx.solve_ratio rand_sigma = ctx.rand_sigma if solve_ratio < 1: @@ -149,7 +158,7 @@ def backward(ctx, grad_output): cq = cp + lambd * dl # solve if rand_sigma <= solve_ratio: - sol = _solve_in_pass(cq, optmodel, processes) + sol = _solve_in_pass(cq, optmodel, processes, pool) if solve_ratio < 1: module.solpool = np.concatenate((module.solpool, sol)) else: @@ -161,10 +170,10 @@ def backward(ctx, grad_output): # convert to tensor grad = np.array(grad) grad = torch.FloatTensor(grad).to(device) - return grad, None, None, None, None, None + return grad, None, None, None, None, None, None -def _solve_in_pass(cp, optmodel, processes): +def _solve_in_pass(cp, optmodel, processes, pool): """ A function to solve optimization in the forward/backward pass """ @@ -180,16 +189,13 @@ def _solve_in_pass(cp, optmodel, processes): sol.append(solp) # multi-core else: - # number of processes - processes = mp.cpu_count() if not processes else processes # get class model_type = type(optmodel) # get args args = getArgs(optmodel) # parallel computing - with ProcessingPool(processes) as pool: - sol = pool.amap(_solveWithObj4Par, cp, [args] * ins_num, - [model_type] * ins_num).get() + sol = pool.amap(_solveWithObj4Par, cp, [args] * ins_num, + [model_type] * ins_num).get() return sol diff --git a/pkg/pyepo/func/perturbed.py b/pkg/pyepo/func/perturbed.py index c49d05ba..d9993eac 100644 --- a/pkg/pyepo/func/perturbed.py +++ b/pkg/pyepo/func/perturbed.py @@ -50,11 +50,17 @@ def __init__(self, optmodel, n_samples=10, sigma=1.0, processes=1, self.n_samples = n_samples # perturbation amplitude self.sigma = sigma - # num of processors + # number of processes if processes not in range(mp.cpu_count()+1): raise ValueError("Invalid processors number {}, only {} cores.". format(processes, mp.cpu_count())) - self.processes = processes + self.processes = mp.cpu_count() if not processes else processes + # single-core + if processes == 1: + self.pool = None + # multi-core + else: + self.pool = ProcessingPool(processes) print("Num of cores: {}".format(self.processes)) # random state self.rnd = np.random.RandomState(seed) @@ -76,7 +82,7 @@ def forward(self, pred_cost): Forward pass """ sols = self.ptb.apply(pred_cost, self.optmodel, self.n_samples, - self.sigma, self.processes, self.rnd, + self.sigma, self.processes, self.pool, self.rnd, self.solve_ratio, self) return sols @@ -88,7 +94,7 @@ class perturbedOptFunc(Function): @staticmethod def forward(ctx, pred_cost, optmodel, n_samples, sigma, - processes, rnd, solve_ratio, module): + processes, pool, rnd, solve_ratio, module): """ Forward pass for perturbed @@ -98,6 +104,7 @@ def forward(ctx, pred_cost, optmodel, n_samples, sigma, n_samples (int): number of Monte-Carlo samples sigma (float): the amplitude of the perturbation processes (int): number of processors, 1 for single-core, 0 for all of cores + pool (ProcessPool): process pool object rnd (RondomState): numpy random state solve_ratio (float): the ratio of new solutions computed during training module (nn.Module): perturbedOpt module @@ -115,7 +122,7 @@ def forward(ctx, pred_cost, optmodel, n_samples, sigma, # solve with perturbation rand_sigma = np.random.uniform() if rand_sigma <= solve_ratio: - ptb_sols = _solve_in_forward(ptb_c, optmodel, processes) + ptb_sols = _solve_in_forward(ptb_c, optmodel, processes, pool) if solve_ratio < 1: sols = ptb_sols.reshape(-1, cp.shape[1]) module.solpool = np.concatenate((module.solpool, sols)) @@ -148,7 +155,7 @@ def backward(ctx, grad_output): noises, torch.einsum("bnd,bd->bn", ptb_sols, grad_output)) grad /= n_samples * sigma - return grad, None, None, None, None, None, None, None + return grad, None, None, None, None, None, None, None, None class perturbedFenchelYoung(nn.Module): @@ -186,11 +193,17 @@ def __init__(self, optmodel, n_samples=10, sigma=1.0, processes=1, self.n_samples = n_samples # perturbation amplitude self.sigma = sigma - # num of processors + # number of processes if processes not in range(mp.cpu_count()+1): raise ValueError("Invalid processors number {}, only {} cores.". format(processes, mp.cpu_count())) - self.processes = processes + self.processes = mp.cpu_count() if not processes else processes + # single-core + if processes == 1: + self.pool = None + # multi-core + else: + self.pool = ProcessingPool(processes) print("Num of cores: {}".format(self.processes)) # random state self.rnd = np.random.RandomState(seed) @@ -212,7 +225,7 @@ def forward(self, pred_cost, true_sol): Forward pass """ loss = self.pfy.apply(pred_cost, true_sol, self.optmodel, self.n_samples, - self.sigma, self.processes, self.rnd, + self.sigma, self.processes, self.pool, self.rnd, self.solve_ratio, self) return loss @@ -224,7 +237,7 @@ class perturbedFenchelYoungFunc(Function): @staticmethod def forward(ctx, pred_cost, true_sol, optmodel, n_samples, sigma, - processes, rnd, solve_ratio, module): + processes, pool, rnd, solve_ratio, module): """ Forward pass for perturbed Fenchel-Young loss @@ -235,6 +248,7 @@ def forward(ctx, pred_cost, true_sol, optmodel, n_samples, sigma, n_samples (int): number of Monte-Carlo samples sigma (float): the amplitude of the perturbation processes (int): number of processors, 1 for single-core, 0 for all of cores + pool (ProcessPool): process pool object rnd (RondomState): numpy random state solve_ratio (float): the ratio of new solutions computed during training module (nn.Module): perturbedFenchelYoung module @@ -253,7 +267,7 @@ def forward(ctx, pred_cost, true_sol, optmodel, n_samples, sigma, # solve with perturbation rand_sigma = np.random.uniform() if rand_sigma <= solve_ratio: - ptb_sols = _solve_in_forward(ptb_c, optmodel, processes) + ptb_sols = _solve_in_forward(ptb_c, optmodel, processes, pool) if solve_ratio < 1: sols = ptb_sols.reshape(-1, cp.shape[1]) module.solpool = np.concatenate((module.solpool, sols)) @@ -282,10 +296,10 @@ def backward(ctx, grad_output): """ grad, = ctx.saved_tensors grad_output = torch.unsqueeze(grad_output, dim=-1) - return grad * grad_output, None, None, None, None, None, None, None, None + return grad * grad_output, None, None, None, None, None, None, None, None, None -def _solve_in_forward(ptb_c, optmodel, processes): +def _solve_in_forward(ptb_c, optmodel, processes, pool): """ A function to solve optimization in the forward pass """ @@ -305,16 +319,13 @@ def _solve_in_forward(ptb_c, optmodel, processes): ptb_sols.append(sols) # multi-core else: - # number of processes - processes = mp.cpu_count() if not processes else processes # get class model_type = type(optmodel) # get args args = getArgs(optmodel) # parallel computing - with ProcessingPool(processes) as pool: - ptb_sols = pool.amap(_solveWithObj4Par, ptb_c.transpose(1,0,2), - [args] * ins_num, [model_type] * ins_num).get() + ptb_sols = pool.amap(_solveWithObj4Par, ptb_c.transpose(1,0,2), + [args] * ins_num, [model_type] * ins_num).get() return np.array(ptb_sols) diff --git a/pkg/pyepo/func/spoplus.py b/pkg/pyepo/func/spoplus.py index bee3a063..f2ccada9 100644 --- a/pkg/pyepo/func/spoplus.py +++ b/pkg/pyepo/func/spoplus.py @@ -44,11 +44,17 @@ def __init__(self, optmodel, processes=1, solve_ratio=1, dataset=None): if not isinstance(optmodel, optModel): raise TypeError("arg model is not an optModel") self.optmodel = optmodel - # num of processors + # number of processes if processes not in range(mp.cpu_count()+1): raise ValueError("Invalid processors number {}, only {} cores.". format(processes, mp.cpu_count())) - self.processes = processes + self.processes = mp.cpu_count() if not processes else processes + # single-core + if processes == 1: + self.pool = None + # multi-core + else: + self.pool = ProcessingPool(processes) print("Num of cores: {}".format(self.processes)) # solution pool self.solve_ratio = solve_ratio @@ -68,8 +74,8 @@ def forward(self, pred_cost, true_cost, true_sol, true_obj): Forward pass """ loss = self.spop.apply(pred_cost, true_cost, true_sol, true_obj, - self.optmodel, self.processes, self.solve_ratio, - self) + self.optmodel, self.processes, self.pool, + self.solve_ratio, self) return loss @@ -81,7 +87,7 @@ class SPOPlusFunc(Function): @staticmethod def forward(ctx, pred_cost, true_cost, true_sol, true_obj, - optmodel, processes, solve_ratio, module): + optmodel, processes, pool, solve_ratio, module): """ Forward pass for SPO+ @@ -92,6 +98,7 @@ def forward(ctx, pred_cost, true_cost, true_sol, true_obj, true_obj (torch.tensor): a batch of true optimal objective values optmodel (optModel): an PyEPO optimization model processes (int): number of processors, 1 for single-core, 0 for all of cores + pool (ProcessPool): process pool object solve_ratio (float): the ratio of new solutions computed during training module (nn.Module): SPOPlus modeul @@ -109,7 +116,7 @@ def forward(ctx, pred_cost, true_cost, true_sol, true_obj, #_check_sol(c, w, z) # solve if np.random.uniform() <= solve_ratio: - sol, loss = _solve_in_forward(cp, c, w, z, optmodel, processes) + sol, loss = _solve_in_forward(cp, c, w, z, optmodel, processes, pool) if solve_ratio < 1: module.solpool = np.concatenate((module.solpool, sol)) else: @@ -143,7 +150,7 @@ def backward(ctx, grad_output): return grad_output * grad, None, None, None, None, None, None, None -def _solve_in_forward(cp, c, w, z, optmodel, processes): +def _solve_in_forward(cp, c, w, z, optmodel, processes, pool): """ A function to solve optimization in the forward pass """ @@ -167,16 +174,11 @@ def _solve_in_forward(cp, c, w, z, optmodel, processes): model_type = type(optmodel) # get args args = getArgs(optmodel) - # number of processes - processes = mp.cpu_count() if not processes else processes - # parallel computing - with ProcessingPool(processes) as pool: - res = pool.amap( - _solveWithObj4Par, - 2 * cp - c, - [args] * ins_num, - [model_type] * ins_num, - ).get() + res = pool.amap( + _solveWithObj4Par, + 2 * cp - c, + [args] * ins_num, + [model_type] * ins_num).get() # get res sol = np.array(list(map(lambda x: x[0], res))) obj = np.array(list(map(lambda x: x[1], res))) diff --git a/pkg/setup.py b/pkg/setup.py index be622c1b..6f54de94 100644 --- a/pkg/setup.py +++ b/pkg/setup.py @@ -13,7 +13,7 @@ # description description = "PyTorch-based End-to-End Predict-then-Optimize Tool", # version - version = "0.2.3", + version = "0.2.4", # Github repo url = "https://github.com/khalil-research/PyEPO", # author name