Source code for yapss._private.problem

"""

Module problem.

"""

# future imports
from __future__ import annotations

__all__ = ["Problem"]

import inspect

# standard imports
from collections.abc import Sequence
from types import FrameType, SimpleNamespace
from typing import TYPE_CHECKING, Any, Callable

# third party imports
import numpy as np
from numpy import float64

# package imports
from .bounds import Bounds
from .guess import Guess
from .ipopt_options import IpoptOptions
from .solver import solve
from .types_ import LimitOptions, Protected

if TYPE_CHECKING:
    # third party imports
    from numpy.typing import NDArray

    # package imports
    import yapss

    from .input_args import (
        ContinuousFunction,
        ContinuousHessianFunction,
        ContinuousJacobianFunction,
        DiscreteFunction,
        DiscreteHessianFunction,
        DiscreteJacobianFunction,
        ObjectiveFunction,
        ObjectiveGradientFunction,
        ObjectiveHessianFunction,
    )
    from .solution import Solution

    Array = NDArray[float64]

# default options
DEFAULT_NUMBER_OF_SEGMENTS = 10
DEFAULT_NUMBER_OF_COLLOCATION_POINTS = 10
DEFAULT_SPECTRAL_METHOD = "lgl"
DEFAULT_DERIVATIVE_METHOD = "auto"
DEFAULT_DERIVATIVE_ORDER = "second"


[docs] class Problem(Protected): """Instances of the `Problem` class define the optimal control problem. Parameters ---------- name : str Name of the optimal control problem nx : Sequence[int] Number of states in each phase nu : Optional[Sequence[int]] Number of controls in each phase nq : Sequence[int], optional Number of integrals in each phase nh : Sequence[int], optional Number of path constraints in each phase ns : int, optional Number of parameters in problem nd : int, optional Number of discrete constraints in problem Attributes ---------- name : str The name of the problem. np : int The number of phases in the problem. nx : tuple[int, ...] The number of state variables in each phase. nu : tuple[int, ...] The number of control variables in each phase. nq : tuple[int, ...] The number of integrals in each phase. nh : tuple[int, ...] The number of path constraints in each phase. nd : int The number of discrete constraints in the problem. ns : int The number of parameters in the problem. auxdata : Auxdata The auxiliary data for the problem. bounds : Bounds The bounds object structure for the problem. derivatives : Derivatives The derivative options for the problem. Attributes: `method`, `order`. functions : UserFunctions The user-defined functions for the problem. Attributes: `objective`, `continuous`, `discrete`, `objective_gradient`, `objective_hessian`, `continuous_jacobian`, `continuous_hessian`, `discrete_jacobian`, `discrete_hessian`. guess : Guess The initial guess for the problem. **Attributes**: `parameter`, `phase[p].time`, `phase[p].state`, `phase[p].control`, `phase[p].integral`. ipopt_options : IpoptOptions The user-selected Ipopt options for the problem. To select a particular Ipopt option, use the `ipopt_options` attribute of the `Problem` class. For example, for a Problem instance `problem`, set the `tol` option to 1e-6 by setting ``problem.ipopt_options.tol = 1e-6``. scale : Scale The scaling data structure for the problem. mesh : Mesh The mesh data structure for the problem. spectral_method : {"lg", "lgr", "lgl"} The type of interpolation used for the problem. """ auxdata: Auxdata catch_keyboard_interrupt: LimitOptions[bool] = LimitOptions((True, False)) name: str np: int nx: tuple[int, ...] nu: tuple[int, ...] nq: tuple[int, ...] nh: tuple[int, ...] ns: int nd: int bounds: Bounds functions: UserFunctions guess: Guess derivatives: Derivatives ipopt_options: IpoptOptions scale: Scale mesh: Mesh spectral_method: LimitOptions[str] = LimitOptions(("lgl", "lgr", "lg")) # TODO: mesh should be ReadOnlyProperty def __init__( # noqa: PLR0913 self, *, name: str, nx: Sequence[int], nu: Sequence[int] | None = None, nq: Sequence[int] | None = None, nh: Sequence[int] | None = None, ns: int | None = 0, nd: int | None = 0, ) -> None: """Initialize the optimal control problem name and dimensions. Parameters ---------- name : str Name of the optimal control problem nx : {List[int], tuple[int, ...]} Number of states in each phase nu : {List[int], tuple[int, ...]} Number of controls in each phase nq : {List[int], tuple[int, ...]} Number of integrals in each phase nh : {List[int], tuple[int, ...]} Number of path constraints in each phase ns : int Number of parameters in problem nd : int Number of discrete constraints in problem """ self.name = name self._nx = nx self._nu = nu self._nq = nq self._nh = nh self.ns: int = ns if ns is not None else 0 self.nd: int = nd if nd is not None else 0 self._np: int # validate input and put in canonical form self._check_input() # fields for user input self.auxdata: Auxdata = Auxdata() self.bounds = Bounds(self) self.derivatives: Derivatives = Derivatives() self.functions = UserFunctions() self.guess = Guess(self) self.ipopt_options = IpoptOptions() self.scale = self._init_scale() self.mesh = Mesh(self) self.__dict__["_ipopt_source"] = "default" self._abort: bool = False self.catch_keyboard_interrupt = True self._allowed_del_attrs = () self._allowed_attrs = ( "spectral_method", "_spectral_method", "catch_keyboard_interrupt", "ipopt_source", "_abort", "_catch_keyboard_interrupt", ) self.spectral_method = DEFAULT_SPECTRAL_METHOD # ipopt_source getter @property def ipopt_source(self) -> str: value = self.__dict__["_ipopt_source"] if not isinstance(value, str): msg = "Internal error: 'ipopt_source' must have type 'str'" raise TypeError(msg) return value @ipopt_source.setter def ipopt_source(self, value: str) -> None: if not isinstance(value, str): msg = f"'ipopt_source' must have type 'str', not {type(value)}" # type: ignore[unreachable] raise TypeError(msg) self.__dict__["_ipopt_source"] = value
[docs] def solve(self) -> Solution: """Solve the optimal control problem. Returns ------- solution : Solution The solution to the optimal control problem. """ return solve(self)
[docs] def validate(self) -> None: """Validate the optimal control problem input. Raises ------ ValueError If the problem is invalid. """ self.bounds.validate() self.guess.validate() self.mesh.validate() self._validate_functions()
def _validate_functions(self) -> None: """Validate the user-defined functions. Raises ------ ValueError If the functions are invalid. """ if self.functions.objective is None: msg = "'functions.objective' function is required." raise ValueError(msg) if self.np > 0 and self.functions.continuous is None: msg = "'functions.continuous' function is required." raise ValueError(msg) if self.nd > 0 and self.functions.discrete is None: msg = "'functions.discrete' function is required." raise ValueError(msg) if self.derivatives.method == "user": if self.functions.objective_gradient is None: msg = "'functions.objective_gradient' function is required." raise ValueError(msg) if self.np > 0 and self.functions.continuous_jacobian is None: msg = "'functions.continuous_jacobian' function is required." raise ValueError(msg) if self.nd > 0 and self.functions.discrete_jacobian is None: msg = "'functions.discrete_jacobian' function is required." raise ValueError(msg) if self.derivatives.order == "second": if self.functions.objective_hessian is None: msg = "'functions.objective_hessian' function is required." raise ValueError(msg) if self.np > 0 and self.functions.continuous_hessian is None: msg = "'functions.continuous_hessian' function is required." raise ValueError(msg) if self.nd > 0 and self.functions.discrete_hessian is None: msg = "'functions.discrete_hessian' function is required." raise ValueError(msg) def __str__(self) -> str: """Return repr(self).""" return ( f"Problem(\n" f" name='{self.name}',\n" f" nx={self.nx},\n" f" nu={self.nu},\n" f" nq={self.nq},\n" f" nh={self.nh},\n" f" nd={self.nd},\n" f" ns={self.ns}\n" f")" ) def _validate_integer_array( self, array: Sequence[int] | None, arg_name: str, ) -> tuple[int, ...]: if array is None: return self.np * (0,) if arg_name == "nx": msg = f"Keyword '{arg_name}' must be a tuple or list of positive integers." else: msg = ( f"Keyword '{arg_name}' must be a tuple or list of nonnegative integers, " f"or None." ) if not isinstance(array, (tuple, list)): raise TypeError(msg) if not all(isinstance(item, int) for item in array): raise TypeError(msg) if not all(item >= (0 if arg_name == "nx" else 0) for item in array): raise ValueError(msg) if arg_name != "nx" and len(array) != self.np: msg = f"Length of '{arg_name}' must be the same as length of 'nx'." raise ValueError(msg) return tuple(array) def _check_input(self) -> None: """Check the validity of the arguments. * The user must supply a value for `name` that is a string * The value of `nx` must be a list or tuple of positive integers * Each of `nu`, `nq`, and `nh` must be `None`, or a list or tuple of non-negative integers the same length as `nx` * ns, nd must be nonnegative integers Raises ------ TypeError, ValueError """ msg = "Value of keyword 'name' must be a nonempty string." if not isinstance(self.name, str): raise TypeError(msg) if len(self.name) == 0: raise ValueError(msg) self.nx = self._validate_integer_array(self._nx, "nx") self.np = len(self.nx) self.nu = self._validate_integer_array(self._nu, "nu") self.nq = self._validate_integer_array(self._nq, "nq") self.nh = self._validate_integer_array(self._nh, "nh") msg = "Argument 'ns' must a nonnegative integer or None." if not isinstance(self.ns, (int, type(None))): raise TypeError(msg) if self.ns < 0: raise ValueError(msg) msg = "Argument 'nd' must a nonnegative integer or None." if not isinstance(self.nd, (int, type(None))): raise TypeError(msg) if self.nd < 0: raise ValueError(msg) def _init_scale(self) -> Scale: """Initialize the scaling object.""" def make_scale(n: int) -> SimpleNamespace: scale_shift = SimpleNamespace() scale_shift.scale = np.ones(n, dtype=float64) return scale_shift scales = SimpleNamespace() scales.parameter = make_scale(self.ns) scales.discrete = make_scale(self.nd) scales.objective = SimpleNamespace() scales.objective.scale = 1.0 scales.phase = self.np * [None] p: int scales.phase = [] for p in range(self.np): phase = SimpleNamespace() phase.state = make_scale(self.nx[p]) phase.control = make_scale(self.nu[p]) phase.dynamics = make_scale(self.nx[p]) phase.integral = make_scale(self.nq[p]) phase.path = make_scale(self.nh[p]) phase.initial_time = make_scale(1) phase.final_time = make_scale(1) scales.phase.append(phase) return Scale(self) def _signal_handler(self, signum: int, frame: FrameType | None) -> None: # noqa: ARG002 self._abort = True def _intermediate_cb(self, *args: Any) -> bool: # noqa: ARG002 if self._abort: self._abort = False return False return True
class Auxdata(SimpleNamespace): """Auxiliary problem data, which can be anything.""" class ScaleArray(Protected): """Scale array.""" name: str def __set_name__(self, owner: type, name: str) -> None: """Set the name of the attribute.""" self.name = name def __get__(self, instance: ScalePhase | Scale | None, owner: type) -> Array: """Get the value of the scale array.""" if instance is None: msg = f"attribute '{self.name}' can be accessed on instance objects only." raise AttributeError(msg) value = getattr(instance, "_" + self.name) if not isinstance(value, np.ndarray): msg = f"Internal error: '{self.name}' must be a numpy array." raise TypeError(msg) return value def __set__( self, instance: ScalePhase | Scale, value: Sequence[float] | Array, ) -> None: """Set the value of the scale array.""" scale = np.array(value, dtype=float64) shape = getattr(instance, "_" + self.name).shape if scale.shape != shape: if hasattr(instance, "_p"): msg = ( f"Scale '{self.name}' in phase {instance._p} must be an array of length " f"{shape[0]}." ) else: msg = f"Scale '{self.name}' must be an array of length {shape[0]}." raise ValueError(msg) setattr(instance, "_" + self.name, scale) class ScalePhase(Protected): """One phase of the scaling object. Attributes ---------- time : float state : ScaleArray control : ScaleArray integral : ScaleArray dynamics : ScaleArray path : ScaleArray """ _allowed_attrs = ( "state", "control", "integral", "dynamics", "path", "time", "_state", "_control", "_integral", "_dynamics", "_path", "_p", "p", ) state: ScaleArray = ScaleArray() """State scaling array for a single phase.""" control: ScaleArray = ScaleArray() """Control scaling array for a single phase.""" integral: ScaleArray = ScaleArray() """Integral scaling array for a single phase.""" dynamics: ScaleArray = ScaleArray() """Dynamics scaling array for a single phase.""" path: ScaleArray = ScaleArray() """Path scaling array for a single phase.""" def __init__(self, problem: yapss.Problem, p: int) -> None: """Initialize the scaling object. Initialize the scaling object for phase `p` of the problem, based on the `problem` object. Parameters ---------- problem : Problem The problem object. p : int The phase index. """ self.time = 1.0 self._p: int = p self._state: Array = np.ones([problem.nx[p]], dtype=float) self._control: Array = np.ones([problem.nu[p]], dtype=float) self._integral: Array = np.ones([problem.nq[p]], dtype=float) self._dynamics: Array = np.ones([problem.nx[p]], dtype=float) self._path: Array = np.ones([problem.nh[p]], dtype=float) class Scale(Protected): """Scaling object.""" _allowed_attrs = ( "_phase", "_discrete", "_parameter", "objective", "phase", "discrete", "parameter", ) phase: tuple[ScalePhase, ...] discrete: ScaleArray = ScaleArray() parameter: ScaleArray = ScaleArray() def __init__(self, ocp: Problem) -> None: self.phase = tuple(ScalePhase(ocp, p) for p in range(ocp.np)) self._discrete: Array = np.ones([ocp.nd], dtype=float) self._parameter: Array = np.ones([ocp.ns], dtype=float) self.objective = 1.0 def __getitem__(self, item: tuple[int, str, int]) -> float: # TODO: not correct """Get the scale value for a given item.""" p, v, i = item if v == "s": return float(self._parameter[i]) if v in ("x", "x0", "xf"): return float(self.phase[p].state[i]) if v == "u": return float(self.phase[p].control[i]) if v in ("t", "t0", "tf"): return self.phase[p].time if v == "q": return float(self.phase[p].integral[i]) raise RuntimeError
[docs] class Derivatives(Protected): """Derivative class. Attributes ---------- method : {"auto", "central-difference", "central-difference-full", "user"} order : {"first", "second"} """ _allowed_attrs = ("_method", "_order", "method", "order") order: LimitOptions[str] = LimitOptions(("first", "second")) """Order of derivatives used in search for optimum.""" method: LimitOptions[str] = LimitOptions( ("auto", "central-difference", "central-difference-full", "user"), ) """Method used to compute derivatives.""" def __init__(self) -> None: """Initialize the derivative object with default values.""" super().__init__() self._method = DEFAULT_DERIVATIVE_METHOD self._order = DEFAULT_DERIVATIVE_ORDER
class UserFunctions: """Container for the user-defined callback functions and their derivatives. The `functions` attribute of a `Problem` instance is an instance of the `UserFunctions` class, which stores the user-defined callback functions and their derivatives. Every optimal control problem must have at least an objective function. Most problems will have one or more phases with dynamics, path constraints, and/or integrands, and these problems require at least a `continuous` callback function. Problems with discrete constraints require at least a `discrete` callback function. For problems that use automatic differentiation, or differentiation by finite differences, no further callbacks are required. For problems that use user-supplied derivatives, additional callback functions are required. The `objective_gradient` callback is required for problems that use user-supplied gradients, and the `objective_hessian` callback is required for problems that use user-supplied Hessians. The `continuous_jacobian`, `continuous_hessian`, `discrete_jacobian`, and `discrete_hessian` callbacks are required as appropriate for problems that use user-supplied derivatives. Attributes ---------- objective : ObjectiveFunction | None continuous : ContinuousFunction | None discrete : DiscreteFunction | None objective_gradient : ObjectiveGradientFunction | None continuous_jacobian : ContinuousJacobianFunction | None discrete_jacobian : DiscreteJacobianFunction | None objective_hessian : ObjectiveHessianFunction | None continuous_hessian : ContinuousHessianFunction | None discrete_hessian : DiscreteHessianFunction | None """ def __init__(self) -> None: """Initialize the user functions.""" self.objective: ObjectiveFunction | None = None self.objective_gradient: ObjectiveGradientFunction | None = None self.objective_hessian: ObjectiveHessianFunction | None = None self.continuous: ContinuousFunction | None = None self.continuous_jacobian: ContinuousJacobianFunction | None = None self.continuous_hessian: ContinuousHessianFunction | None = None self.discrete: DiscreteFunction | None = None self.discrete_jacobian: DiscreteJacobianFunction | None = None self.discrete_hessian: DiscreteHessianFunction | None = None _function_names = ( "objective", "objective_gradient", "objective_hessian", "continuous", "continuous_jacobian", "continuous_hessian", "discrete", "discrete_jacobian", "discrete_hessian", ) def __setattr__(self, key: str, value: Callable[[Any], None] | None) -> None: """Set the value of the attribute.""" if key not in self._function_names: msg = f"Cannot set 'UserFunctions' attribute '{key}'" raise AttributeError(msg) # None is a valid value if value is None: super().__setattr__(key, value) return # Check if the value is a callable object with one argument msg = f"Value of '{key}' must be a callable object with one argument, or None." if not callable(value): raise TypeError(msg) sig = inspect.signature(value) # Count the parameters in the signature params = sig.parameters if len(params) != 1 or not all( p.kind in (inspect.Parameter.POSITIONAL_OR_KEYWORD, inspect.Parameter.POSITIONAL_ONLY) for p in params.values() ): raise TypeError(msg) super().__setattr__(key, value) def __delattr__(self, key: str) -> None: """Raise an error if the attribute is deleted.""" if key not in self._function_names: msg = f"Cannot delete function '{key}'; set to None instead." raise AttributeError(msg) msg = f"Cannot delete 'UserFunctions' attribute '{key}'" raise AttributeError(msg) class MeshPhase(Protected): """MeshPhase instances represent the mesh structure of a phase of the NLP.""" _allowed_attrs = ( "collocation_points", "fraction", "_fraction", "_collocation_points", ) _allowed_del_attrs = () def __init__(self) -> None: self._fraction: Sequence[float] = 10 * (0.1,) self._collocation_points: Sequence[int] = 10 * (10,) @property def fraction(self) -> Sequence[float]: return self._fraction @fraction.setter def fraction(self, value: Sequence[float]) -> None: if not all(isinstance(f, (int, float)) and f > 0 for f in value): msg = "fraction must be a sequence of floats" raise TypeError(msg) s = sum(value) if not np.isclose(s, 1.0, atol=0.01): msg = f"Sum of mesh fractions must be close to 1.0. Sum is {s}" raise ValueError(msg) self._fraction = tuple(fraction / s for fraction in value) @property def collocation_points(self) -> Sequence[int]: return self._collocation_points @collocation_points.setter def collocation_points(self, value: Sequence[int]) -> None: if not isinstance(value, Sequence): msg = f"collocation_points must be a sequence of positive integers, not {value}" # type: ignore[unreachable] raise TypeError(msg) # TODO: Change to minimum number of collocation points. 3? if not all(isinstance(i, int) and i > 0 for i in value): msg = "collocation_points must be a sequence of positive integers" raise ValueError(msg) self._collocation_points = tuple(value) class Mesh(Protected): """Mesh data for a problem.""" phase: tuple[MeshPhase, ...] def __init__(self, problem: yapss.Problem) -> None: """Initialize the mesh object.""" self.phase = tuple(MeshPhase() for _ in range(problem.np)) segments = DEFAULT_NUMBER_OF_SEGMENTS points = DEFAULT_NUMBER_OF_COLLOCATION_POINTS for p in range(problem.np): self.phase[p].collocation_points = segments * (segments,) self.phase[p].fraction = segments * (1 / points,) # TODO: should just init mesh inside class? def validate(self) -> None: """Validate the user-supplied mesh geometry. The method raises a `ValueError` or `TypeError` if the user supplied _mesh data is invalid. Raises ------ ValueError """ # fix below to use enumerate for p, phase in enumerate(self.phase): cp = phase.collocation_points f = phase.fraction if len(cp) != len(f): # TODO: Check error message msg = ( "mesh.phase[{}].col_points and mesh.phase[{}].fraction must be the same length" ) raise ValueError(msg.format(p, p))