Skip to content

Commit

Permalink
Update: multi-process init
Browse files Browse the repository at this point in the history
  • Loading branch information
LucasBoTang committed Mar 8, 2023
1 parent e45f52f commit 18f6882
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 49 deletions.
32 changes: 19 additions & 13 deletions pkg/pyepo/func/blackbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,17 @@ def __init__(self, optmodel, lambd=10, processes=1, solve_ratio=1, dataset=None)
if lambd <= 0:
raise ValueError("lambda is not positive.")
self.lambd = lambd
# num of processors
# number of processes
if processes not in range(mp.cpu_count()+1):
raise ValueError("Invalid processors number {}, only {} cores.".
format(processes, mp.cpu_count()))
self.processes = processes
self.processes = mp.cpu_count() if not processes else processes
# single-core
if processes == 1:
self.pool = None
# multi-core
else:
self.pool = ProcessingPool(processes)
print("Num of cores: {}".format(self.processes))
# solution pool
self.solve_ratio = solve_ratio
Expand All @@ -73,7 +79,7 @@ def forward(self, pred_cost):
Forward pass
"""
sols = self.dbb.apply(pred_cost, self.lambd, self.optmodel,
self.processes, self.solve_ratio, self)
self.processes, self.pool, self.solve_ratio, self)
return sols


Expand All @@ -83,7 +89,7 @@ class blackboxOptFunc(Function):
"""

@staticmethod
def forward(ctx, pred_cost, lambd, optmodel, processes, solve_ratio, module):
def forward(ctx, pred_cost, lambd, optmodel, processes, pool, solve_ratio, module):
"""
Forward pass for DBB
Expand All @@ -92,6 +98,7 @@ def forward(ctx, pred_cost, lambd, optmodel, processes, solve_ratio, module):
lambd (float): a hyperparameter for differentiable block-box to contral interpolation degree
optmodel (optModel): an PyEPO optimization model
processes (int): number of processors, 1 for single-core, 0 for all of cores
pool (ProcessPool): process pool object
solve_ratio (float): the ratio of new solutions computed during training
module (nn.Module): blackboxOpt module
Expand All @@ -105,7 +112,7 @@ def forward(ctx, pred_cost, lambd, optmodel, processes, solve_ratio, module):
# solve
rand_sigma = np.random.uniform()
if rand_sigma <= solve_ratio:
sol = _solve_in_pass(cp, optmodel, processes)
sol = _solve_in_pass(cp, optmodel, processes, pool)
if solve_ratio < 1:
module.solpool = np.concatenate((module.solpool, sol))
else:
Expand All @@ -119,6 +126,7 @@ def forward(ctx, pred_cost, lambd, optmodel, processes, solve_ratio, module):
ctx.lambd = lambd
ctx.optmodel = optmodel
ctx.processes = processes
ctx.pool = pool
ctx.solve_ratio = solve_ratio
if solve_ratio < 1:
ctx.module = module
Expand All @@ -134,6 +142,7 @@ def backward(ctx, grad_output):
lambd = ctx.lambd
optmodel = ctx.optmodel
processes = ctx.processes
pool = ctx.pool
solve_ratio = ctx.solve_ratio
rand_sigma = ctx.rand_sigma
if solve_ratio < 1:
Expand All @@ -149,7 +158,7 @@ def backward(ctx, grad_output):
cq = cp + lambd * dl
# solve
if rand_sigma <= solve_ratio:
sol = _solve_in_pass(cq, optmodel, processes)
sol = _solve_in_pass(cq, optmodel, processes, pool)
if solve_ratio < 1:
module.solpool = np.concatenate((module.solpool, sol))
else:
Expand All @@ -161,10 +170,10 @@ def backward(ctx, grad_output):
# convert to tensor
grad = np.array(grad)
grad = torch.FloatTensor(grad).to(device)
return grad, None, None, None, None, None
return grad, None, None, None, None, None, None


def _solve_in_pass(cp, optmodel, processes):
def _solve_in_pass(cp, optmodel, processes, pool):
"""
A function to solve optimization in the forward/backward pass
"""
Expand All @@ -180,16 +189,13 @@ def _solve_in_pass(cp, optmodel, processes):
sol.append(solp)
# multi-core
else:
# number of processes
processes = mp.cpu_count() if not processes else processes
# get class
model_type = type(optmodel)
# get args
args = getArgs(optmodel)
# parallel computing
with ProcessingPool(processes) as pool:
sol = pool.amap(_solveWithObj4Par, cp, [args] * ins_num,
[model_type] * ins_num).get()
sol = pool.amap(_solveWithObj4Par, cp, [args] * ins_num,
[model_type] * ins_num).get()
return sol


Expand Down
47 changes: 29 additions & 18 deletions pkg/pyepo/func/perturbed.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,17 @@ def __init__(self, optmodel, n_samples=10, sigma=1.0, processes=1,
self.n_samples = n_samples
# perturbation amplitude
self.sigma = sigma
# num of processors
# number of processes
if processes not in range(mp.cpu_count()+1):
raise ValueError("Invalid processors number {}, only {} cores.".
format(processes, mp.cpu_count()))
self.processes = processes
self.processes = mp.cpu_count() if not processes else processes
# single-core
if processes == 1:
self.pool = None
# multi-core
else:
self.pool = ProcessingPool(processes)
print("Num of cores: {}".format(self.processes))
# random state
self.rnd = np.random.RandomState(seed)
Expand All @@ -76,7 +82,7 @@ def forward(self, pred_cost):
Forward pass
"""
sols = self.ptb.apply(pred_cost, self.optmodel, self.n_samples,
self.sigma, self.processes, self.rnd,
self.sigma, self.processes, self.pool, self.rnd,
self.solve_ratio, self)
return sols

Expand All @@ -88,7 +94,7 @@ class perturbedOptFunc(Function):

@staticmethod
def forward(ctx, pred_cost, optmodel, n_samples, sigma,
processes, rnd, solve_ratio, module):
processes, pool, rnd, solve_ratio, module):
"""
Forward pass for perturbed
Expand All @@ -98,6 +104,7 @@ def forward(ctx, pred_cost, optmodel, n_samples, sigma,
n_samples (int): number of Monte-Carlo samples
sigma (float): the amplitude of the perturbation
processes (int): number of processors, 1 for single-core, 0 for all of cores
pool (ProcessPool): process pool object
rnd (RondomState): numpy random state
solve_ratio (float): the ratio of new solutions computed during training
module (nn.Module): perturbedOpt module
Expand All @@ -115,7 +122,7 @@ def forward(ctx, pred_cost, optmodel, n_samples, sigma,
# solve with perturbation
rand_sigma = np.random.uniform()
if rand_sigma <= solve_ratio:
ptb_sols = _solve_in_forward(ptb_c, optmodel, processes)
ptb_sols = _solve_in_forward(ptb_c, optmodel, processes, pool)
if solve_ratio < 1:
sols = ptb_sols.reshape(-1, cp.shape[1])
module.solpool = np.concatenate((module.solpool, sols))
Expand Down Expand Up @@ -148,7 +155,7 @@ def backward(ctx, grad_output):
noises,
torch.einsum("bnd,bd->bn", ptb_sols, grad_output))
grad /= n_samples * sigma
return grad, None, None, None, None, None, None, None
return grad, None, None, None, None, None, None, None, None


class perturbedFenchelYoung(nn.Module):
Expand Down Expand Up @@ -186,11 +193,17 @@ def __init__(self, optmodel, n_samples=10, sigma=1.0, processes=1,
self.n_samples = n_samples
# perturbation amplitude
self.sigma = sigma
# num of processors
# number of processes
if processes not in range(mp.cpu_count()+1):
raise ValueError("Invalid processors number {}, only {} cores.".
format(processes, mp.cpu_count()))
self.processes = processes
self.processes = mp.cpu_count() if not processes else processes
# single-core
if processes == 1:
self.pool = None
# multi-core
else:
self.pool = ProcessingPool(processes)
print("Num of cores: {}".format(self.processes))
# random state
self.rnd = np.random.RandomState(seed)
Expand All @@ -212,7 +225,7 @@ def forward(self, pred_cost, true_sol):
Forward pass
"""
loss = self.pfy.apply(pred_cost, true_sol, self.optmodel, self.n_samples,
self.sigma, self.processes, self.rnd,
self.sigma, self.processes, self.pool, self.rnd,
self.solve_ratio, self)
return loss

Expand All @@ -224,7 +237,7 @@ class perturbedFenchelYoungFunc(Function):

@staticmethod
def forward(ctx, pred_cost, true_sol, optmodel, n_samples, sigma,
processes, rnd, solve_ratio, module):
processes, pool, rnd, solve_ratio, module):
"""
Forward pass for perturbed Fenchel-Young loss
Expand All @@ -235,6 +248,7 @@ def forward(ctx, pred_cost, true_sol, optmodel, n_samples, sigma,
n_samples (int): number of Monte-Carlo samples
sigma (float): the amplitude of the perturbation
processes (int): number of processors, 1 for single-core, 0 for all of cores
pool (ProcessPool): process pool object
rnd (RondomState): numpy random state
solve_ratio (float): the ratio of new solutions computed during training
module (nn.Module): perturbedFenchelYoung module
Expand All @@ -253,7 +267,7 @@ def forward(ctx, pred_cost, true_sol, optmodel, n_samples, sigma,
# solve with perturbation
rand_sigma = np.random.uniform()
if rand_sigma <= solve_ratio:
ptb_sols = _solve_in_forward(ptb_c, optmodel, processes)
ptb_sols = _solve_in_forward(ptb_c, optmodel, processes, pool)
if solve_ratio < 1:
sols = ptb_sols.reshape(-1, cp.shape[1])
module.solpool = np.concatenate((module.solpool, sols))
Expand Down Expand Up @@ -282,10 +296,10 @@ def backward(ctx, grad_output):
"""
grad, = ctx.saved_tensors
grad_output = torch.unsqueeze(grad_output, dim=-1)
return grad * grad_output, None, None, None, None, None, None, None, None
return grad * grad_output, None, None, None, None, None, None, None, None, None


def _solve_in_forward(ptb_c, optmodel, processes):
def _solve_in_forward(ptb_c, optmodel, processes, pool):
"""
A function to solve optimization in the forward pass
"""
Expand All @@ -305,16 +319,13 @@ def _solve_in_forward(ptb_c, optmodel, processes):
ptb_sols.append(sols)
# multi-core
else:
# number of processes
processes = mp.cpu_count() if not processes else processes
# get class
model_type = type(optmodel)
# get args
args = getArgs(optmodel)
# parallel computing
with ProcessingPool(processes) as pool:
ptb_sols = pool.amap(_solveWithObj4Par, ptb_c.transpose(1,0,2),
[args] * ins_num, [model_type] * ins_num).get()
ptb_sols = pool.amap(_solveWithObj4Par, ptb_c.transpose(1,0,2),
[args] * ins_num, [model_type] * ins_num).get()
return np.array(ptb_sols)


Expand Down
36 changes: 19 additions & 17 deletions pkg/pyepo/func/spoplus.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,17 @@ def __init__(self, optmodel, processes=1, solve_ratio=1, dataset=None):
if not isinstance(optmodel, optModel):
raise TypeError("arg model is not an optModel")
self.optmodel = optmodel
# num of processors
# number of processes
if processes not in range(mp.cpu_count()+1):
raise ValueError("Invalid processors number {}, only {} cores.".
format(processes, mp.cpu_count()))
self.processes = processes
self.processes = mp.cpu_count() if not processes else processes
# single-core
if processes == 1:
self.pool = None
# multi-core
else:
self.pool = ProcessingPool(processes)
print("Num of cores: {}".format(self.processes))
# solution pool
self.solve_ratio = solve_ratio
Expand All @@ -68,8 +74,8 @@ def forward(self, pred_cost, true_cost, true_sol, true_obj):
Forward pass
"""
loss = self.spop.apply(pred_cost, true_cost, true_sol, true_obj,
self.optmodel, self.processes, self.solve_ratio,
self)
self.optmodel, self.processes, self.pool,
self.solve_ratio, self)
return loss


Expand All @@ -81,7 +87,7 @@ class SPOPlusFunc(Function):

@staticmethod
def forward(ctx, pred_cost, true_cost, true_sol, true_obj,
optmodel, processes, solve_ratio, module):
optmodel, processes, pool, solve_ratio, module):
"""
Forward pass for SPO+
Expand All @@ -92,6 +98,7 @@ def forward(ctx, pred_cost, true_cost, true_sol, true_obj,
true_obj (torch.tensor): a batch of true optimal objective values
optmodel (optModel): an PyEPO optimization model
processes (int): number of processors, 1 for single-core, 0 for all of cores
pool (ProcessPool): process pool object
solve_ratio (float): the ratio of new solutions computed during training
module (nn.Module): SPOPlus modeul
Expand All @@ -109,7 +116,7 @@ def forward(ctx, pred_cost, true_cost, true_sol, true_obj,
#_check_sol(c, w, z)
# solve
if np.random.uniform() <= solve_ratio:
sol, loss = _solve_in_forward(cp, c, w, z, optmodel, processes)
sol, loss = _solve_in_forward(cp, c, w, z, optmodel, processes, pool)
if solve_ratio < 1:
module.solpool = np.concatenate((module.solpool, sol))
else:
Expand Down Expand Up @@ -143,7 +150,7 @@ def backward(ctx, grad_output):
return grad_output * grad, None, None, None, None, None, None, None


def _solve_in_forward(cp, c, w, z, optmodel, processes):
def _solve_in_forward(cp, c, w, z, optmodel, processes, pool):
"""
A function to solve optimization in the forward pass
"""
Expand All @@ -167,16 +174,11 @@ def _solve_in_forward(cp, c, w, z, optmodel, processes):
model_type = type(optmodel)
# get args
args = getArgs(optmodel)
# number of processes
processes = mp.cpu_count() if not processes else processes
# parallel computing
with ProcessingPool(processes) as pool:
res = pool.amap(
_solveWithObj4Par,
2 * cp - c,
[args] * ins_num,
[model_type] * ins_num,
).get()
res = pool.amap(
_solveWithObj4Par,
2 * cp - c,
[args] * ins_num,
[model_type] * ins_num).get()
# get res
sol = np.array(list(map(lambda x: x[0], res)))
obj = np.array(list(map(lambda x: x[1], res)))
Expand Down
2 changes: 1 addition & 1 deletion pkg/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# description
description = "PyTorch-based End-to-End Predict-then-Optimize Tool",
# version
version = "0.2.3",
version = "0.2.4",
# Github repo
url = "https://github.com/khalil-research/PyEPO",
# author name
Expand Down

0 comments on commit 18f6882

Please sign in to comment.