Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

changes to linearize from our xcsp3 branch #516

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 89 additions & 17 deletions cpmpy/transformations/linearize.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,21 @@
"""
import copy
import numpy as np
from cpmpy.transformations.reification import only_implies, only_bv_reifies

from cpmpy.transformations.normalize import toplevel_list
from .decompose_global import decompose_in_tree

from .flatten_model import flatten_constraint, get_or_make_var
from .get_variables import get_variables
from .. import Abs
from ..exceptions import TransformationNotImplementedError

from ..expressions.core import Comparison, Operator, BoolVal
from ..expressions.globalconstraints import GlobalConstraint, DirectConstraint
from ..expressions.utils import is_any_list, is_num, eval_comparison, is_bool
from ..expressions.utils import is_num, eval_comparison, get_bounds

from ..expressions.variables import _BoolVarImpl, boolvar, NegBoolView, _NumVarImpl, intvar

from ..expressions.variables import _BoolVarImpl, boolvar, NegBoolView, _NumVarImpl

def linearize_constraint(lst_of_expr, supported={"sum","wsum"}, reified=False):
"""
Expand All @@ -61,8 +65,17 @@ def linearize_constraint(lst_of_expr, supported={"sum","wsum"}, reified=False):
Any other unsupported global constraint should be decomposed using `cpmpy.transformations.decompose_global.decompose_global()`

"""
return _linearize_constraint_helper(lst_of_expr, supported, reified)[0]

newlist = []

def _linearize_constraint_helper(lst_of_expr, supported={"sum","wsum"}, reified=False):
"""
Helper function for linearize_constraint.
Besides a list of linearised expressions, also keeps track for the newly introduced variables as to prevent unwanted symmetries.
"""

newlist = [] # to collect the linearized constraints
newvars = [] # to collect the newly introduced variables
for cpm_expr in lst_of_expr:

# boolvar
Expand Down Expand Up @@ -95,12 +108,12 @@ def linearize_constraint(lst_of_expr, supported={"sum","wsum"}, reified=False):

# BV -> LinExpr
elif isinstance(cond, _BoolVarImpl):
lin_sub = linearize_constraint([sub_expr], supported=supported, reified=True)
lin_sub, new_vars = _linearize_constraint_helper([sub_expr], supported=supported, reified=True)
newlist += [cond.implies(lin) for lin in lin_sub]
# ensure no new solutions are created
new_vars = set(get_variables(lin_sub)) - set(get_variables(sub_expr))
newlist += linearize_constraint([(~cond).implies(nv == nv.lb) for nv in new_vars], reified=reified)

a, b = _linearize_constraint_helper([(~cond).implies(nv == nv.lb) for nv in new_vars], reified=reified)
newlist += a
newvars += b

# comparisons
elif isinstance(cpm_expr, Comparison):
Expand All @@ -117,6 +130,50 @@ def linearize_constraint(lst_of_expr, supported={"sum","wsum"}, reified=False):
if lhs.name == "mul" and is_num(lhs.args[0]):
lhs = Operator("wsum",[[lhs.args[0]], [lhs.args[1]]])
cpm_expr = eval_comparison(cpm_expr.name, lhs, rhs)

elif lhs.name == "mod" and "mod" not in supported:
if "mul" not in supported:
raise NotImplementedError("Cannot linearize modulo without multiplication")

if cpm_expr.name != "==":
new_rhs, newcons = get_or_make_var(lhs)
newlist.append(eval_comparison(cpm_expr.name, new_rhs, rhs))
newlist += linearize_constraint(newcons, supported=supported, reified=reified)
continue
else:
# "mod" != remainder after division: https://marcelkliemannel.com/articles/2021/dont-confuse-integer-division-with-floor-division/
# -> acts differently for negative numbers
# "mod" is a partial function
# -> x % 0 = x (unless x == 0, then undefined)
x, y = lhs.args
lby, uby = get_bounds(y)
if lby <= 0 <= uby:
raise NotImplementedError("Modulo with a divisor domain containing 0 is not supported. Please safen the expression first.")
k = intvar(*get_bounds((x - rhs) // y))
mult_res, newcons = get_or_make_var(k * y)
# (abs of) modulo rhs is smaller than (abs of) the divisor y, but also needs to be of same sign as x.
newlist += linearize_constraint(only_implies(only_bv_reifies(flatten_constraint([Abs(rhs) < Abs(y), (x > 0).implies(rhs >= 0), (x < 0).implies(rhs <= 0)]))) + newcons, supported, reified=reified)

cpm_expr = (mult_res + rhs) == x

elif lhs.name == 'div' and 'div' not in supported:
if "mul" not in supported:
raise NotImplementedError("Cannot linearize modulo without multiplication")
a, b = lhs.args
# if division is total, b is either strictly negative or strictly positive!
lb, ub = get_bounds(b)
if not ((lb < 0 and ub < 0) or (lb > 0 and ub > 0)):
raise TypeError(
f"Can't divide by a domain containing 0, safen the expression first")
r = intvar(-(max(abs(lb) - 1, abs(ub) - 1)), max(abs(lb) - 1, abs(ub) - 1)) # remainder can be both positive and negative (round towards 0, so negative r if a and b are both negative)
cpm_expr = [eval_comparison(cpm_expr.name, a, b * rhs + r)]
cond = [Abs(r) < Abs(b), Abs(b * rhs) < Abs(a)]
decomp = toplevel_list(decompose_in_tree(cond)) # decompose abs
cpm_exprs = toplevel_list(decomp + cpm_expr)
exprs = linearize_constraint(flatten_constraint(cpm_exprs), supported=supported)
newlist.extend(exprs)
continue

else:
raise TransformationNotImplementedError(f"lhs of constraint {cpm_expr} cannot be linearized, should be any of {supported | set(['sub'])} but is {lhs}. Please report on github")

Expand All @@ -130,11 +187,19 @@ def linearize_constraint(lst_of_expr, supported={"sum","wsum"}, reified=False):
if cpm_expr.name == "<":
new_rhs, cons = get_or_make_var(rhs - 1) # if rhs is constant, will return new constant
newlist.append(lhs <= new_rhs)
newlist += linearize_constraint(cons)
if isinstance(new_rhs, _NumVarImpl):
newvars.append(new_rhs)
a,b = _linearize_constraint_helper(cons)
newlist += a
newvars += b
elif cpm_expr.name == ">":
new_rhs, cons = get_or_make_var(rhs + 1) # if rhs is constant, will return new constant
newlist.append(lhs >= new_rhs)
newlist += linearize_constraint(cons)
if isinstance(new_rhs, _NumVarImpl):
newvars.append(new_rhs)
a,b = _linearize_constraint_helper(cons)
newlist += a
newvars += b
elif cpm_expr.name == "!=":
# Special case: BV != BV
if isinstance(lhs, _BoolVarImpl) and isinstance(rhs, _BoolVarImpl):
Expand All @@ -154,13 +219,16 @@ def linearize_constraint(lst_of_expr, supported={"sum","wsum"}, reified=False):
_, M1 = (lhs - rhs + 1).get_bounds()
_, M2 = (rhs - lhs + 1).get_bounds()
cons = [lhs + -M1*z <= rhs-1, lhs + -M2*z >= rhs-M2+1]
newlist += linearize_constraint(flatten_constraint(cons), supported=supported, reified=reified)

a,b = _linearize_constraint_helper(flatten_constraint(cons), supported=supported, reified=reified)
newlist += a
newvars += b + [z]
else:
# introduce new indicator constraints
z = boolvar()
constraints = [z.implies(lhs < rhs), (~z).implies(lhs > rhs)]
newlist += linearize_constraint(constraints, supported=supported, reified=reified)
a,b = _linearize_constraint_helper(constraints, supported=supported, reified=reified)
newlist += a
newvars += b + [z]
else:
# supported comparison
newlist.append(eval_comparison(cpm_expr.name, lhs, rhs))
Expand All @@ -187,14 +255,15 @@ def linearize_constraint(lst_of_expr, supported={"sum","wsum"}, reified=False):
constraints += [sum(np.arange(lb, ub + 1) * row) + -1*arg == 0]

newlist += constraints
newvars += [sigma]

elif isinstance(cpm_expr, (DirectConstraint, BoolVal)):
newlist.append(cpm_expr)

elif isinstance(cpm_expr, GlobalConstraint) and cpm_expr.name not in supported:
raise ValueError(f"Linearization of global constraint {cpm_expr} not supported, run `cpmpy.transformations.decompose_global.decompose_global() first")

return newlist
return (newlist, newvars)


def only_positive_bv(lst_of_expr):
Expand Down Expand Up @@ -277,7 +346,8 @@ def canonical_comparison(lst_of_expr):
if isinstance(lhs, Comparison) and cpm_expr.name == "==": # reification of comparison
lhs = canonical_comparison(lhs)[0]
elif is_num(lhs) or isinstance(lhs, _NumVarImpl) or (isinstance(lhs, Operator) and lhs.name in {"sum", "wsum"}):
# bring all vars to lhs
# Bring all vars from rhs to lhs
# 1) collect the variables to bring over
lhs2 = []
if isinstance(rhs, _NumVarImpl):
lhs2, rhs = [-1 * rhs], 0
Expand All @@ -290,10 +360,12 @@ def canonical_comparison(lst_of_expr):
if isinstance(b, _NumVarImpl)], \
sum(-a * b for a, b in zip(rhs.args[0], rhs.args[1])
if not isinstance(b, _NumVarImpl))
# 2) add collected variables to lhs
if isinstance(lhs, Operator) and lhs.name == "sum":
lhs, rhs = sum([1 * a for a in lhs.args] + lhs2), rhs
elif isinstance(lhs, _NumVarImpl) or (isinstance(lhs, Operator) and lhs.name == "wsum"):
lhs, rhs = lhs + lhs2, rhs
if len(lhs2) != 0:
lhs, rhs = lhs + lhs2, rhs
else:
raise ValueError(
f"unexpected expression on lhs of expression, should be sum,wsum or intvar but got {lhs}")
Expand Down Expand Up @@ -330,4 +402,4 @@ def canonical_comparison(lst_of_expr):
else: # rest of expressions
newlist.append(cpm_expr)

return newlist
return newlist
45 changes: 45 additions & 0 deletions tests/test_trans_linearize.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,51 @@ def test_neq(self):
# self.assertEqual(str(linearize_constraint(cons)), "[(a) -> (sum([1, -1, -6] * [x, y, BV4]) <= -1), (a) -> (sum([1, -1, -6] * [x, y, BV4]) >= -5)]")


def test_linearize_modulo(self):
x, y, z = cp.intvar(0, 5, shape=3, name=['x', 'y', 'z'])
a, b, c = cp.intvar(-5, 0, shape=3, name=['a', 'b', 'c'])
g, h, i = cp.intvar(-5, 5, shape=3, name=['g', 'h', 'i'])
s_pos = cp.intvar(1, 5, name='s_pos')
s_neg = cp.intvar(-5, -1, name='s_neg')

constraint = [g % s_pos == i]
lin = linearize_constraint(constraint, supported={'sum', 'wsum', 'mul'})

all_sols = set()
lin_all_sols = set()
cons_models = cp.Model(constraint).solveAll(display=lambda: all_sols.add(tuple([x.value() for x in [g, s_pos, i]])))
lin_models = cp.Model(lin).solveAll(display=lambda: lin_all_sols.add(tuple([x.value() for x in [g, s_pos, i]])))
self.assertEqual(cons_models,lin_models)

# ortools only accepts strictly positive modulo argument.

def test_linearize_division(self):
x, y, z = cp.intvar(0, 5, shape=3, name=['x', 'y', 'z'])
a, b, c = cp.intvar(-5, 0, shape=3, name=['a', 'b', 'c'])
g, h, i = cp.intvar(-5, 5, shape=3, name=['g', 'h', 'i'])
s_pos = cp.intvar(1, 5, name='s_pos')
s_neg = cp.intvar(-5, -1, name='s_neg')

constraint = [g / s_pos == i]
lin = linearize_constraint(constraint, supported={'sum', 'wsum', 'mul'})

all_sols = set()
lin_all_sols = set()
cons_models = cp.Model(constraint).solveAll(display=lambda: all_sols.add(tuple([x.value() for x in [g, s_pos, i]])))
lin_models = cp.Model(lin).solveAll(display=lambda: lin_all_sols.add(tuple([x.value() for x in [g, s_pos, i]])))
self.assertEqual(cons_models,lin_models)

# Duplicate test with s_neg instead of s_pos
constraint_neg = [g / s_neg == i]
lin_neg = linearize_constraint(constraint_neg, supported={'sum', 'wsum', 'mul'})

all_sols_neg = set()
lin_all_sols_neg = set()
cons_models_neg = cp.Model(constraint_neg).solveAll(display=lambda: all_sols_neg.add(tuple([x.value() for x in [g, s_neg, i]])))
lin_models_neg = cp.Model(lin_neg).solveAll(display=lambda: lin_all_sols_neg.add(tuple([x.value() for x in [g, s_neg, i]])))
self.assertEqual(cons_models_neg,lin_models_neg)


class TestConstRhs(unittest.TestCase):

def test_numvar(self):
Expand Down
Loading