from __future__ import annotations
from typing import TypeVar, TYPE_CHECKING
import os
import re
import time
import warnings
from copy import deepcopy
import numpy as np
import scipy.sparse as sparse
from ruamel.yaml import YAML, add_constructor, SafeConstructor
if TYPE_CHECKING: # pragma: no cover
from ._experiment import Experiment
from ._solutions import BaseSolution, StepSolution, CycleSolution
Solution = TypeVar('Solution', bound='BaseSolution')
[docs]
class Model:
"""Circuit model."""
def __init__(self, params: dict | str = 'params.yaml'):
"""
A class to construct and run the model. Provide the parameters using
either a dictionary or a '.yaml' file. Note that the number of Rj and
Cj attributes must be consistent with the num_RC_pairs value. See the
notes for more information on the callable parameters.
Parameters
----------
params : dict | str
Mapping of model parameter names to their values. Can be either
a dict or absolute/relateive file path to a yaml file (str). The
keys/value pair descriptions are given below. The default uses an
internal yaml file.
============= ========================== ================
Key Value *type*, units
============= ========================== ================
num_RC_pairs number of RC pairs *int*, -
soc0 initial state of charge *float*, -
capacity maximum battery capacity *float*, Ah
ce coulombic efficiency *float*, -
mass total battery mass *float*, kg
isothermal flag for isothermal model *bool*, -
Cp specific heat capacity *float*, J/kg/K
T_inf room/air temperature *float*, K
h_therm convective coefficient *float*, W/m2/K
A_therm heat loss area *float*, m2
ocv open circuit voltage *callable*, V
R0 series resistance *callable*, Ohm
Rj resistance in RCj *callable*, Ohm
Cj capacity in RCj *callable*, F
============= ========================== ================
Raises
------
TypeError
'params' must be type dict or str.
ValueError
'params' contains invalid and/or excess key/value pairs.
Warning
-------
A pre-processor runs at the end of the model initialization. If you
modify any parameters after class instantiation, you will need to
re-run the pre-processor (i.e., the ``pre()`` method) afterward.
Notes
-----
The 'ocv' property needs a signature like ``f(soc: float) -> float``,
where 'soc' is the state of charge. All R0, Rj, and Cj properties need
signatures like ``f(soc: float, T_cell: float) -> float``. 'T_cell' is
the cell temperature in K.
Rj and Cj are not real property names. These are used generally in the
documentation. If ``num_RC_pairs=1`` then in addition to R0, you should
define R1 and C1. If ``num_RC_pairs=2`` then you should also give R2
and C2, etc. For the special case where ``num_RC_pairs=0``, you should
not provide any resistance or capacitance values besides the series
resistance R0, which is always required.
"""
if isinstance(params, dict):
params = params.copy()
elif isinstance(params, str):
params = _yaml_reader(params)
self._repr_keys = [
'num_RC_pairs',
'soc0',
'capacity',
'ce',
'mass',
'isothermal',
'Cp',
'T_inf',
'h_therm',
'A_therm',
]
self.num_RC_pairs = params.pop('num_RC_pairs')
self.soc0 = params.pop('soc0')
self.capacity = params.pop('capacity')
self.ce = params.pop('ce')
self.mass = params.pop('mass')
self.isothermal = params.pop('isothermal')
self.Cp = params.pop('Cp')
self.T_inf = params.pop('T_inf')
self.h_therm = params.pop('h_therm')
self.A_therm = params.pop('A_therm')
self.ocv = params.pop('ocv')
self.R0 = params.pop('R0')
for j in range(1, self.num_RC_pairs + 1):
setattr(self, 'R' + str(j), params.pop('R' + str(j)))
setattr(self, 'C' + str(j), params.pop('C' + str(j)))
if len(params) != 0:
extra_keys = list(params.keys())
raise ValueError("'params' contains invalid and/or excess"
f" key/value pairs: {extra_keys=}")
self.pre()
def __repr__(self) -> str: # pragma: no cover
"""
Return a readable repr string.
Returns
-------
readable : str
A console-readable instance representation.
"""
keys = self._repr_keys
values = [getattr(self, k) for k in keys]
summary = "\n ".join([f"{k}={v}," for k, v in zip(keys, values)])
readable = f"Model(\n {summary}\n)"
return readable
[docs]
def pre(self, initial_state: bool | Solution = True) -> None:
"""
Pre-process and prepare the model for running experiments.
This method builds solution pointers, registers algebraic variable
indices, stores the mass matrix, and initializes the battery state.
Parameters
----------
initial_state : bool | Solution
Control how the model state is initialized. If True (default), the
state is set to a rested condition at 'soc0'. If False, the state
is left untouched and only the parameters and pointers are updated.
Given a Solution instance, the state is set to the final state of
the solution. See notes for more information.
Returns
-------
None.
Warning
-------
This method runs during the class initialization. It generally does not
have to be run again unless you modify model properties or attributes.
You should manually re-run the pre-processor if you change properties
after initialization. Forgetting to re-run the pre-processor can cause
inconsistencies between the updated properties and the pointers, state,
etc. If you are updating properties, but want the model's internal state
to not be reset back to a rested condition, use the ``initial_state``
option.
Notes
-----
Using ``initial_state=False`` will raise an error if you are changing
the size of your circuit (e.g., updating from one to two RC pairs).
Without re-initializing, the model's state vector would be a different
size than the circuit it is trying to solve. For this same reason, when
initializing based on a Solution instance, the solution must also be
the same size as the current model. In other words, a 1RC-pair model
cannot be initialized given a solution from a 2RC-pair circuit.
"""
from ._solutions import BaseSolution
missing_attrs = []
for j in range(1, self.num_RC_pairs + 1):
if not hasattr(self, 'R' + str(j)):
missing_attrs.append('R' + str(j))
if not hasattr(self, 'C' + str(j)):
missing_attrs.append('C' + str(j))
if missing_attrs:
raise AttributeError(f"Model is missing attrs {missing_attrs} to"
" be consistent with 'num_RC_pairs'.")
extra_attrs = []
pattern = re.compile(r"^[RC](\d+)")
for attr in list(self.__dict__.keys()):
matches = pattern.match(attr)
if matches is None:
pass
elif int(matches.group(1)) > self.num_RC_pairs:
extra_attrs.append(attr)
if extra_attrs:
short_warn(f"Extra RC attributes {extra_attrs} are present, beyond"
" what was expected based on 'num_RC_pairs'.")
ptr = {}
ptr['soc'] = 0
ptr['T_cell'] = 1
ptr['eta_j'] = np.arange(2, 2 + self.num_RC_pairs)
ptr['V_cell'] = self.num_RC_pairs + 2
ptr['size'] = self.num_RC_pairs + 3
algidx = [ptr['V_cell']]
mass_matrix = np.zeros(ptr['size'])
mass_matrix[ptr['soc']] = 1.
mass_matrix[ptr['T_cell']] = self.mass*self.Cp*self.T_inf
mass_matrix[ptr['eta_j']] = 1.
sv0 = np.zeros(ptr['size'])
sv0[ptr['soc']] = self.soc0
sv0[ptr['T_cell']] = 1.
sv0[ptr['eta_j']] = 0.
sv0[ptr['V_cell']] = self.ocv(self.soc0)
svdot0 = np.zeros_like(sv0)
self._ptr = ptr
self._algidx = algidx
self._mass_matrix = sparse.diags(mass_matrix)
self._t0 = 0.
if isinstance(initial_state, BaseSolution):
soln = deepcopy(initial_state)
if soln.y[-1].size != sv0.size:
raise ValueError("Cannot initialize state based on Solution"
" object given in 'initial_state'. The model"
" and solution have incompatible sizes.")
self._sv0 = soln.y[-1].copy()
self._svdot0 = soln.yp[-1].copy()
elif not initial_state:
if self._sv0.size != sv0.size:
raise ValueError("The pre-processor failed. The model state is"
" changing sizes but 'initial_state=False'.")
elif initial_state:
self._sv0 = sv0
self._svdot0 = svdot0
[docs]
def rhs_funcs(self, t: float, sv: np.ndarray, inputs: dict) -> np.ndarray:
"""
Right hand side functions.
Returns the right hand side for the DAE system. For any differential
variable i, rhs[i] must be equivalent to M[i, i]*y[i] where M is the
mass matrix and y is an array of states. For algebraic variables rhs[i]
must be an expression that equals zero.
Parameters
----------
t : float
Value of time [s].
sv : 1D np.array
State variables at time t.
inputs : dict
Dictionary detailing an experimental step.
Returns
-------
rhs : 1D np.array
The right hand side values of the DAE system.
"""
rhs = np.zeros(self._ptr['size'])
# state
soc = sv[self._ptr['soc']]
T_cell = sv[self._ptr['T_cell']]*self.T_inf
eta_j = sv[self._ptr['eta_j']]
voltage = sv[self._ptr['V_cell']]
# state-dependent properties
ocv = self.ocv(soc)
R0 = self.R0(soc, T_cell)
# calculated current and power
current = -(voltage - ocv + np.sum(eta_j)) / R0
power = current*voltage
# state of charge (differential)
ce = 1. if current >= 0. else self.ce
rhs[self._ptr['soc']] = -ce*current / 3600. / self.capacity
# temperature (differential)
Q_gen = current*(ocv - voltage)
Q_conv = self.h_therm*self.A_therm*(self.T_inf - T_cell)
rhs[self._ptr['T_cell']] = (Q_gen + Q_conv) * (1 - self.isothermal)
# RC overpotentials (differential)
for j, ptr in enumerate(self._ptr['eta_j'], start=1):
Rj = getattr(self, 'R' + str(j))(soc, T_cell)
Cj = getattr(self, 'C' + str(j))(soc, T_cell)
rhs[ptr] = -sv[ptr] / (Rj*Cj) + current / Cj
# cell voltage (algebraic)
mode = inputs['mode']
units = inputs['units']
value = inputs['value']
if mode == 'current' and units == 'A':
rhs[self._ptr['V_cell']] = current - value(t)
elif mode == 'current' and units == 'C':
rhs[self._ptr['V_cell']] = current - self.capacity*value(t)
elif mode == 'voltage':
rhs[self._ptr['V_cell']] = voltage - value(t)
elif mode == 'power':
rhs[self._ptr['V_cell']] = power - value(t)
# values for rootfns
total_time = self._t0 + t
roots = {
'soc': soc,
'temperature_K': T_cell,
'current_A': current,
'current_C': current / self.capacity,
'voltage_V': voltage,
'power_W': power,
'capacity_Ah': soc*self.capacity,
'time_s': total_time,
'time_min': total_time / 60.,
'time_h': total_time / 3600.,
}
inputs['roots'] = roots
return rhs
[docs]
def residuals(self, t: float, sv: np.ndarray, svdot: np.ndarray,
inputs: dict) -> np.ndarray:
"""
Return the DAE residuals.
The DAE residuals should be near zero at each time step. The solver
requires the DAE to be written in terms of its residuals in order to
minimize their values.
Parameters
----------
t : float
Value of time [s].
sv : 1D np.array
State variables at time t.
svdot : 1D np.array
State variable time derivatives at time t.
inputs : dict
Dictionary detailing an experimental step.
Returns
-------
res : 1D np.array
DAE residuals, res = M*yp - rhs(t, y).
"""
return self._mass_matrix.dot(svdot) - self.rhs_funcs(t, sv, inputs)
[docs]
def run_step(self, exp: Experiment, stepidx: int) -> StepSolution:
"""
Run a single experimental step.
Parameters
----------
exp : Experiment
An experiment instance.
stepidx : int
Step index to run. The first step has index 0.
Returns
-------
:class:`~thevenin.StepSolution`
Solution to the experimental step.
Warning
-------
The model's internal state is changed at the end of each experimental
step. Consequently, you should not run steps out of order. You should
always start with ``stepidx = 0`` and then progress to the subsequent
steps afterward. Run ``pre()`` after your last step to reset the state
back to a rested condition at 'soc0', if needed. Alternatively, you
can continue running experiments back-to-back without a pre-processing
in between if you want the following experiment to pick up from the
same state that the last experiment ended.
See also
--------
Experiment : Build an experiment.
StepSolution : Wrapper for a single-step solution.
Notes
-----
Using the ``run()`` loops through all steps in an experiment and then
stitches their solutions together. Most of the time, this is more
convenient. However, advantages for running step-by-step is that it
makes it easier to fine tune solver options, and allows for analyses
or control decisions in the middle of an experiment.
"""
from ._ida_solver import IDASolver
from ._solutions import StepSolution
step = exp.steps[stepidx].copy()
kwargs = exp._kwargs[stepidx].copy()
if not callable(step['value']):
value = step['value']
step['value'] = lambda t: value
kwargs['userdata'] = step
kwargs['calc_initcond'] = 'yp0'
kwargs['algebraic_idx'] = self._algidx
if step['limits'] is not None:
_setup_rootfn(step['limits'], kwargs)
solver = IDASolver(self._residuals, **kwargs)
start = time.time()
ida_soln = solver.solve(step['tspan'], self._sv0, self._svdot0)
timer = time.time() - start
soln = StepSolution(self, ida_soln, timer)
self._t0 = soln.t[-1]
self._sv0 = soln.y[-1].copy()
self._svdot0 = soln.yp[-1].copy()
return soln
[docs]
def run(self, exp: Experiment, reset_state: bool = True,
t_shift: float = 1e-3) -> CycleSolution:
"""
Run a full experiment.
Parameters
----------
exp : Experiment
An experiment instance.
reset_state : bool
If True (default), the internal state of the model will be reset
back to a rested condition at 'soc0' at the end of all steps. When
False, the state does not reset. Instead it will update to match
the final state of the last experimental step.
t_shift : float
Time (in seconds) to shift step solutions by when stitching them
together. If zero the end time of each step overlaps the starting
time of its following step. The default is 1e-3.
Returns
-------
:class:`~thevenin.CycleSolution`
A stitched solution with all experimental steps.
Warning
-------
The default behavior resets the model's internal state back to a rested
condition at 'soc0' by calling the ``pre()`` method at the end of all
steps. This means that if you run a second experiment afterward, it
will not start where the previous one left off. Instead, it will start
from the original rested condition that the model initialized with. You
can bypass this by using ``reset_state=False``, which keeps the state
at the end of the final experimental step.
See also
--------
Experiment : Build an experiment.
CycleSolution : Wrapper for an all-steps solution.
"""
from ._solutions import CycleSolution
solns = []
for i in range(exp.num_steps):
solns.append(self.run_step(exp, i))
soln = CycleSolution(*solns, t_shift=t_shift)
self._t0 = 0.
if reset_state:
self.pre()
return soln
def _residuals(self, t: float, sv: np.ndarray, svdot: np.ndarray,
res: np.ndarray, inputs: dict) -> None:
"""
Solver-structured residuals.
The IDASolver requires a residuals function in this exact form.
Rather than outputting the residuals, the function returns None,
but fills the 'res' input array with the DAE residuals.
Parameters
----------
t : float
Value of time [s].
sv : 1D np.array
State variables at time t.
svdot : 1D np.array
State variable time derivatives at time t.
res : 1D np.array
DAE residuals, res = M*yp - rhs(t, y).
inputs : dict
Dictionary detailing an experimental step.
Returns
-------
None.
"""
res[:] = self.residuals(t, sv, svdot, inputs)
class _RootFunction:
"""Root function callables."""
def __init__(self, limits: tuple[str, float]) -> None:
"""
This class is a generalized root function callable.
Parameters
----------
limits : tuple[str, float]
A tuple of root function criteria arranged as ordered pairs of
limit names and values, e.g., ('time_h', 10., 'voltage_V', 4.2).
"""
self.keys = limits[0::2]
self.values = limits[1::2]
self.size = len(self.keys)
def __call__(self, t: float, sv: np.ndarray, svdot: np.ndarray,
events: np.ndarray, inputs: dict) -> None:
"""
Solver-structured root function.
The IDASolver requires a root function in this exact form. Rather
than outputting the events array, the function returns None, but
fills the 'events' input array with root functions. If any 'events'
index equals zero, the solver will exit prior to 'tspan[-1]'.
Parameters
----------
t : float
Value of time [s].
sv : 1D np.array
State variables at time t.
svdot : 1D np.array
State variable time derivatives at time t.
events : 1D np.array
An array of root/event functions. During integration, the solver
will exit prior to 'tspan[-1]' if any 'events' index equals zero.
inputs : dict
Dictionary detailing an experimental step, with the 'roots' key
added and filled within the `rhs_funcs()' method.
Returns
-------
None.
"""
for i, (key, value) in enumerate(zip(self.keys, self.values)):
events[i] = inputs['roots'][key] - value
def _setup_rootfn(limits: tuple[str, float], kwargs: dict) -> None:
"""
Set up a root function for the IDASolver.
The IDASolver requires two keyword arguments to be set when using root
functions. The first is 'rootfn' which requires a Callable. The second
is 'num_events' with allocates memory to an array that stores the root
function values.
Parameters
----------
limits : tuple[str, float]
A tuple of root function criteria arranged as ordered pairs of limit
names and values, e.g., ('time_h', 10., 'voltage_V', 4.2).
kwargs : dict
The IDASolver keyword argumnents dictionary. Both the 'eventsfn' and
'num_events' keyword arguments must be added to 'kwargs'.
Returns
-------
None.
"""
rootfn = _RootFunction(limits)
kwargs['eventsfn'] = rootfn
kwargs['num_events'] = rootfn.size
def _yaml_reader(file: str) -> dict:
"""
Read a yaml file.
This yaml reader has a custom ``!eval`` constructor, which allows you to
evaluate ``np`` expressions and ``lambda`` functions.
Parameters
----------
file : str
An absolute or relative path to a '.yaml' file. The extension will
be added if not included.
Raises
------
ValueError
Invalid file. Only supports '.yaml' files.
FileNotFoundError
File does not exist.
Returns
-------
data : dict
Data dictionary corresponding to the input file.
"""
_, extension = os.path.splitext(file)
if extension == '':
file += '.yaml'
elif extension != '.yaml':
raise ValueError("Invalid file. Only supports '.yaml' files.")
here = os.path.dirname(__file__)
templates = here + '/templates'
if file in os.listdir(templates):
short_warn(f"Using the default parameter file '{file}'.")
file = templates + '/' + file
def eval_constructor(loader, node):
return eval(node.value)
if not os.path.exists(file):
raise FileNotFoundError(f"File '{file}' does not exist.")
reader = YAML(typ='safe')
add_constructor('!eval', eval_constructor, constructor=SafeConstructor)
with open(file, 'r') as f:
data = reader.load(f)
return data
def formatwarning(message, category, filename, lineno, line=None):
"""Shortened warning format - used for parameter/pre warnings."""
return f"\n[thevenin {category.__name__}] {message}\n\n"
def short_warn(message, category=None, stacklevel=1, source=None):
"""Print a warning with the short format from ``formatwarning``."""
original_format = warnings.formatwarning
warnings.formatwarning = formatwarning
warnings.warn(message, category, stacklevel, source)
warnings.formatwarning = original_format