diff --git a/ams/routines/dcopf.py b/ams/routines/dcopf.py index 825f8ce7..1fb0ed5f 100644 --- a/ams/routines/dcopf.py +++ b/ams/routines/dcopf.py @@ -5,17 +5,17 @@ import numpy as np from ams.core.param import RParam -from ams.core.service import NumOp, NumOpDual, VarSelect +from ams.core.service import NumOp, NumOpDual -from ams.routines.routine import RoutineBase +from ams.routines.dcpf import DCPFBase -from ams.opt import Var, Constraint, Objective, ExpressionCalc, Expression +from ams.opt import Constraint, Objective, ExpressionCalc, Expression logger = logging.getLogger(__name__) -class DCOPF(RoutineBase): +class DCOPF(DCPFBase): """ DC optimal power flow (DCOPF). @@ -29,7 +29,7 @@ class DCOPF(RoutineBase): """ def __init__(self, system, config): - RoutineBase.__init__(self, system, config) + DCPFBase.__init__(self, system, config) self.info = 'DC Optimal Power Flow' self.type = 'DCED' @@ -62,10 +62,6 @@ def __init__(self, system, config): indexer='gen', imodel='StaticGen', no_parse=True) # --- generator --- - self.ug = RParam(info='Gen connection status', - name='ug', tex_name=r'u_{g}', - model='StaticGen', src='u', - no_parse=True) self.ctrl = RParam(info='Gen controllability', name='ctrl', tex_name=r'c_{trl}', model='StaticGen', src='ctrl', @@ -90,20 +86,7 @@ def __init__(self, system, config): name='pmin', tex_name=r'p_{g, min}', unit='p.u.', model='StaticGen', no_parse=False,) - self.pg0 = RParam(info='Gen initial active power', - name='pg0', tex_name=r'p_{g, 0}', - unit='p.u.', model='StaticGen', - src='p0', no_parse=False,) - # --- bus --- - self.buss = RParam(info='Bus slack', - name='buss', tex_name=r'B_{us,s}', - model='Slack', src='bus', - no_parse=True,) - # --- load --- - self.pd = RParam(info='active demand', - name='pd', tex_name=r'p_{d}', - model='StaticLoad', src='p0', - unit='p.u.',) + # --- line --- self.ul = RParam(info='Line connection status', name='ul', tex_name=r'u_{l}', @@ -121,53 +104,8 @@ def __init__(self, system, config): name='amin', tex_name=r'\theta_{bus, min}', info='min line angle difference', no_parse=True,) - # --- shunt --- - self.gsh = RParam(info='shunt conductance', - name='gsh', tex_name=r'g_{sh}', - model='Shunt', src='g', - no_parse=True,) - # --- connection matrix --- - self.Cg = RParam(info='Gen connection matrix', - name='Cg', tex_name=r'C_{g}', - model='mats', src='Cg', - no_parse=True, sparse=True,) - self.Cl = RParam(info='Load connection matrix', - name='Cl', tex_name=r'C_{l}', - model='mats', src='Cl', - no_parse=True, sparse=True,) - self.CftT = RParam(info='Transpose of line connection matrix', - name='CftT', tex_name=r'C_{ft}^T', - model='mats', src='CftT', - no_parse=True, sparse=True,) - self.Csh = RParam(info='Shunt connection matrix', - name='Csh', tex_name=r'C_{sh}', - model='mats', src='Csh', - no_parse=True, sparse=True,) - # --- system matrix --- - self.Bbus = RParam(info='Bus admittance matrix', - name='Bbus', tex_name=r'B_{bus}', - model='mats', src='Bbus', - no_parse=True, sparse=True,) - self.Bf = RParam(info='Bf matrix', - name='Bf', tex_name=r'B_{f}', - model='mats', src='Bf', - no_parse=True, sparse=True,) - self.Pbusinj = RParam(info='Bus power injection vector', - name='Pbusinj', tex_name=r'P_{bus}^{inj}', - model='mats', src='Pbusinj', - no_parse=True,) - self.Pfinj = RParam(info='Line power injection vector', - name='Pfinj', tex_name=r'P_{f}^{inj}', - model='mats', src='Pfinj', - no_parse=True,) # --- Model Section --- - # --- generation --- - self.pg = Var(info='Gen active power', - unit='p.u.', - name='pg', tex_name=r'p_g', - model='StaticGen', src='p', - v0=self.pg0) self.pmaxe = Expression(info='Effective pmax', name='pmaxe', tex_name=r'p_{g, max, e}', e_str='mul(nctrle, pg0) + mul(ctrle, pmax)', @@ -180,28 +118,8 @@ def __init__(self, system, config): e_str='-pg + pmine', is_eq=False,) self.pgub = Constraint(name='pgub', info='pg max', e_str='pg - pmaxe', is_eq=False,) - # --- bus --- - self.aBus = Var(info='Bus voltage angle', - unit='rad', - name='aBus', tex_name=r'\theta_{bus}', - model='Bus', src='a',) - self.csb = VarSelect(info='select slack bus', - name='csb', tex_name=r'c_{sb}', - u=self.aBus, indexer='buss', - no_parse=True,) - self.sba = Constraint(info='align slack bus angle', - name='sbus', is_eq=True, - e_str='csb@aBus',) - # --- power balance --- - pb = 'Bbus@aBus + Pbusinj + Cl@pd + Csh@gsh - Cg@pg' - self.pb = Constraint(name='pb', info='power balance', - e_str=pb, is_eq=True,) + # --- line flow --- - self.plf = Expression(info='Line flow', - name='plf', tex_name=r'p_{lf}', - unit='p.u.', - e_str='Bf@aBus + Pfinj', - model='Line', src=None,) self.plflb = Constraint(info='line flow lower bound', name='plflb', is_eq=False, e_str='-plf - mul(ul, rate_a)',) @@ -228,13 +146,6 @@ def __init__(self, system, config): info='total cost', unit='$', sense='min', e_str=obj,) - def solve(self, **kwargs): - """ - Solve the routine optimization model. - args and kwargs go to `self.om.prob.solve()` (`cvxpy.Problem.solve()`). - """ - return self.om.prob.solve(**kwargs) - def run(self, **kwargs): """ Run the routine. @@ -282,65 +193,6 @@ def run(self, **kwargs): """ return super().run(**kwargs) - def _post_solve(self): - """ - Post-solve calculations. - """ - # NOTE: unpack Expressions if owner and arc are available - for expr in self.exprs.values(): - if expr.owner and expr.src: - expr.owner.set(src=expr.src, attr='v', - idx=expr.get_idx(), value=expr.v) - return True - - def unpack(self, **kwargs): - """ - Unpack the results from CVXPY model. - """ - # --- solver Var results to routine algeb --- - for _, var in self.vars.items(): - # --- copy results from routine algeb into system algeb --- - if var.model is None: # if no owner - continue - if var.src is None: # if no source - continue - else: - try: - idx = var.owner.get_idx() - except AttributeError: - idx = var.owner.idx.v - else: - pass - # NOTE: only unpack the variables that are in the model or group - try: - var.owner.set(src=var.src, idx=idx, attr='v', value=var.v) - except (KeyError, TypeError): - logger.error(f'Failed to unpack <{var}> to <{var.owner.class_name}>.') - pass - - # --- solver ExpressionCalc results to routine algeb --- - for _, exprc in self.exprcs.items(): - if exprc.model is None: - continue - if exprc.src is None: - continue - else: - try: - idx = exprc.owner.get_idx() - except AttributeError: - idx = exprc.owner.idx.v - else: - pass - try: - exprc.owner.set(src=exprc.src, idx=idx, attr='v', value=exprc.v) - except (KeyError, TypeError): - logger.error(f'Failed to unpack <{exprc}> to <{exprc.owner.class_name}>.') - pass - - # label the most recent solved routine - self.system.recent = self.system.routines[self.class_name] - return True - def dc2ac(self, kloss=1.0, **kwargs): """ Convert the DCOPF results with ACOPF. diff --git a/ams/routines/dcpf.py b/ams/routines/dcpf.py index cd7abeb4..e99a59ca 100644 --- a/ams/routines/dcpf.py +++ b/ams/routines/dcpf.py @@ -12,32 +12,33 @@ logger = logging.getLogger(__name__) -class DCPF(RoutineBase): +class DCPFBase(RoutineBase): """ - DC power flow. + Base class for DC power flow. """ def __init__(self, system, config): RoutineBase.__init__(self, system, config) - self.info = 'DC Power Flow' - self.type = 'PF' self.ug = RParam(info='Gen connection status', name='ug', tex_name=r'u_{g}', model='StaticGen', src='u', no_parse=True) + self.pg0 = RParam(info='Gen initial active power', + name='pg0', tex_name=r'p_{g, 0}', + unit='p.u.', model='StaticGen', + src='p0', no_parse=False,) + # --- shunt --- self.gsh = RParam(info='shunt conductance', name='gsh', tex_name=r'g_{sh}', model='Shunt', src='g', no_parse=True,) + self.buss = RParam(info='Bus slack', name='buss', tex_name=r'B_{us,s}', model='Slack', src='bus', no_parse=True,) - self.genpv = RParam(info='gen of PV', - name='genpv', tex_name=r'g_{DG}', - model='PV', src='idx', - no_parse=True,) + # --- load --- self.pd = RParam(info='active demand', name='pd', tex_name=r'p_{d}', model='StaticLoad', src='p0', @@ -79,32 +80,25 @@ def __init__(self, system, config): model='mats', src='Pfinj', no_parse=True,) + # --- generation --- self.pg = Var(info='Gen active power', unit='p.u.', name='pg', tex_name=r'p_g', - model='StaticGen', src='p') + model='StaticGen', src='p', + v0=self.pg0) + + # --- bus --- self.aBus = Var(info='Bus voltage angle', unit='rad', name='aBus', tex_name=r'\theta_{bus}', model='Bus', src='a',) - self.cpv = VarSelect(u=self.pg, indexer='genpv', - name='cpv', tex_name=r'C_{PV}', - info='Select PV from pg', - no_parse=True,) - self.pg0 = RParam(info='Gen initial active power', - name='pg0', tex_name=r'p_{g, 0}', - unit='p.u.', model='StaticGen', - src='p0', no_parse=False,) - # --- power balance --- pb = 'Bbus@aBus + Pbusinj + Cl@pd + Csh@gsh - Cg@pg' self.pb = Constraint(name='pb', info='power balance', e_str=pb, is_eq=True,) - self.pvb = Constraint(name='pvb', info='PV generator', - e_str='cpv @ (pg - mul(ug, pg0))', - is_eq=True,) + # --- bus --- self.csb = VarSelect(info='select slack bus', name='csb', tex_name=r'c_{sb}', u=self.aBus, indexer='buss', @@ -112,16 +106,14 @@ def __init__(self, system, config): self.sba = Constraint(info='align slack bus angle', name='sbus', is_eq=True, e_str='csb@aBus',) + + # --- line flow --- self.plf = Expression(info='Line flow', name='plf', tex_name=r'p_{lf}', unit='p.u.', e_str='Bf@aBus + Pfinj', model='Line', src=None,) - self.obj = Objective(name='obj', - info='place holder', unit='$', - sense='min', e_str='0',) - def solve(self, **kwargs): """ Solve the routine optimization model. @@ -188,17 +180,30 @@ def _post_solve(self): idx=expr.get_idx(), value=expr.v) return True - def summary(self, **kwargs): - """ - # TODO: Print power flow summary. - """ - raise NotImplementedError - def enable(self, name): - raise NotImplementedError +class DCPF(DCPFBase): + """ + DC power flow. + """ - def disable(self, name): - raise NotImplementedError + def __init__(self, system, config): + DCPFBase.__init__(self, system, config) + self.info = 'DC Power Flow' + self.type = 'PF' - def dc2ac(self, name): - raise NotImplementedError + self.genpv = RParam(info='gen of PV', + name='genpv', tex_name=r'g_{DG}', + model='PV', src='idx', + no_parse=True,) + self.cpv = VarSelect(u=self.pg, indexer='genpv', + name='cpv', tex_name=r'C_{PV}', + info='Select PV from pg', + no_parse=True,) + + self.pvb = Constraint(name='pvb', info='PV generator', + e_str='cpv @ (pg - mul(ug, pg0))', + is_eq=True,) + + self.obj = Objective(name='obj', + info='place holder', unit='$', + sense='min', e_str='0',) diff --git a/tests/test_known_good.py b/tests/test_known_good.py index 7f72d894..e53d1839 100644 --- a/tests/test_known_good.py +++ b/tests/test_known_good.py @@ -164,8 +164,10 @@ def test_DCPF_case118(self): Test DC power flow for case118. """ self.sp.DCPF.run() - np.testing.assert_allclose(self.sp.DCPF.aBus.v * rad2deg, - np.array(self.mpres['case118']['DCPF']['aBus']).reshape(-1), + aBus_mp = np.array(self.mpres['case118']['DCPF']['aBus']).reshape(-1) + aBus_mp -= aBus_mp[0] + np.testing.assert_allclose((self.sp.DCPF.aBus.v - self.sp.DCPF.aBus.v[0]) * rad2deg, + aBus_mp, rtol=1e-2, atol=1e-2) np.testing.assert_allclose(self.sp.DCPF.pg.v.sum() * self.sp.config.mva,