Source code for chemreac.integrate

# -*- coding: utf-8 -*-
"""
chemreac.integrate
==================

This module provides functions for integrating the
system of ODEs which the ReactionDiffusion represent.
The main class representing a numerical integration of the system
of ODEs is :py:class:`Integration`.

If one does not want to hard code the choice of solver and solver parameters
(e.g. tolerances), one may use :py:func:`run` which defers those choices to
the user of the script through the use of environment variables.
"""

from __future__ import (absolute_import, division,
                        print_function, unicode_literals)


import time

import numpy as np

from chemreac import DENSE, BANDED
from chemreac.units import get_derived_unit, to_unitless
from chemreac.util.analysis import suggest_t0


DEFAULTS = {
    'atol': 1e-9,
    'rtol': 1e-7,
}


def _integrate_sundials(rd, y0, tout, mode=None, **kwargs):
    """
    see integrate.

    kwargs:
      method: linear multistep method: 'bdf' or 'adams'

    """
    from ._chemreac import sundials_integrate

    # Handle kwargs
    new_kwargs = {}
    if mode is not None:
        raise NotImplementedError(
            "Sundials integrator auto-selects banded for N>1")
    atol = np.asarray(kwargs.pop('atol', DEFAULTS['atol']))
    if atol.ndim == 0:
        atol = atol.reshape((1,))
    new_kwargs['atol'] = atol
    new_kwargs['rtol'] = kwargs.pop('rtol', DEFAULTS['rtol'])
    new_kwargs['method'] = kwargs.pop('method', 'bdf')
    new_kwargs['with_jacobian'] = kwargs.pop('with_jacobian', True)
    new_kwargs['iterative'] = kwargs.pop('iterative', 0)
    if kwargs != {}:
        raise KeyError("Unkown kwargs: {}".format(kwargs))

    # Run the integration
    rd.zero_out_counters()
    texec = time.time()
    try:
        yout = sundials_integrate(rd, np.asarray(y0).flatten(),
                                  np.asarray(tout).flatten(),
                                  **new_kwargs)
    except RuntimeError:
        yout = np.empty((len(tout), rd.N, rd.n), order='C')/0  # NaN
        success = False
    else:
        success = True
    texec = time.time() - texec

    info = new_kwargs.copy()
    info.update({
        'neval_f': rd.neval_f,
        'neval_j': rd.neval_j,
        'texec': texec,
        'success': success
    })
    if info['iterative'] > 0:
        info['nprec_setup'] = rd.nprec_setup
        info['nprec_solve'] = rd.nprec_solve
        info['njacvec_dot'] = rd.njacvec_dot
    return yout, tout, info


def _integrate_rk4(rd, y0, tout, **kwargs):
    """
    For demonstration purposes only, fixed step size
    give no error control and requires excessive work
    for accurate solutions. Unknown kwargs are simply
    ignored.

    see integrate
    """
    from ._chemreac import rk4
    texec = time.time()
    yout, Dyout = rk4(rd, y0, tout)
    texec = time.time() - texec
    info = {
        'neval_f': 4*(tout.size-1),
        'neval_j': 0,
        'texec': texec,
        'success': True,
    }
    return yout, tout, info


def _integrate_scipy(rd, y0, tout, mode=None,
                     integrator_name='vode', dense_output=False,
                     **kwargs):
    """
    see integrate

    Parameters
    ----------
    tout: array-like
        at what times to report, e.g.:
        - np.linspace(t0, tend, nt+1)
        - np.logspace(np.log10(t0 + 1e-12), np.log10(tend), nt+1)
    integrator_name: string (default: vode)
    dense_output: bool (default: False)
        if True, tout is taken to be length 2 tuple (t0, tend)

    Returns
    =======
    yout: numpy array of shape (len(tout), rd.N, rd.n)

    """

    from scipy import __version__ as __scipy_version__
    from scipy.integrate import ode
    scipy_version = tuple(map(int, __scipy_version__.split('.')[:3]))

    new_kwargs = {}
    y0 = np.asarray(y0)
    if y0.size != rd.n*rd.N:
        fmtstr = "y0.size (={})not compatible with rd.n*rd.N (={})"
        raise ValueError(fmtstr.format(y0.size, rd.n*rd.N))

    if mode is None:
        if rd.N == 1:
            mode = DENSE
        elif rd.N > 1:
            mode = BANDED
        else:
            raise NotImplementedError

    if mode == BANDED:
        new_kwargs['lband'] = rd.n
        new_kwargs['uband'] = rd.n

    new_kwargs['atol'] = kwargs.pop('atol', DEFAULTS['atol'])
    new_kwargs['rtol'] = kwargs.pop('rtol', DEFAULTS['rtol'])
    new_kwargs['method'] = kwargs.pop('method', 'bdf')
    new_kwargs['with_jacobian'] = kwargs.pop('with_jacobian', True)
    if kwargs != {}:
        raise KeyError("Unkown kwargs: {}".format(kwargs))

    # Create python callbacks with right signature
    fout = np.empty(rd.n*rd.N)

    def f(t, y, *f_args):
        # Python function closure circumvents reallocation
        f.neval += 1
        rd.f(t, y, fout)
        return fout
    f.neval = 0

    if mode == DENSE:
        jout = rd.alloc_jout(banded=False, order='F')
    elif mode == BANDED:
        if scipy_version[0] <= 0 and scipy_version[1] <= 14:
            # Currently SciPy <= v0.14 needs extra padding
            jout = rd.alloc_jout(banded=True, order='F', pad=rd.n)
        else:
            # SciPy >= v0.15 need no extra padding
            jout = rd.alloc_jout(banded=True, order='F')
    else:
        raise NotImplementedError

    def jac(t, y, *j_args):
        jac.neval += 1
        jout[...] = 0  # <--- this is very important (clear old LU decomp)
        if mode == DENSE:
            rd.dense_jac_cmaj(t, y, jout)
        else:
            rd.banded_packed_jac_cmaj(t, y, jout)
        return jout
    jac.neval = 0

    runner = ode(f, jac=jac if new_kwargs['with_jacobian'] else None)
    runner.set_integrator(integrator_name, **new_kwargs)
    runner.set_initial_value(y0.flatten(), tout[0])

    texec = time.time()
    if dense_output:
        import warnings
        if not len(tout) == 2:
            raise ValueError("dense_output implies tout == (t0, tend)")
        # suppress warning printed by Fortran
        runner._integrator.iwork[2] = -1
        warnings.filterwarnings("ignore", category=UserWarning)
        yout = [y0]
        tstep = [tout[0]]
        while runner.t < tout[1]:
            runner.integrate(tout[1], step=True)
            tstep.append(runner.t)
            yout.append(runner.y)
        warnings.resetwarnings()
        tout = np.array(tstep)
        yout = np.array(yout)
    else:
        yout = np.empty((len(tout), rd.n*rd.N), order='C')
        yout[0, :] = y0
        for i in range(1, len(tout)):
            runner.integrate(tout[i])
            yout[i, :] = runner.y

    texec = time.time() - texec

    info = new_kwargs.copy()
    info.update({
        'integrator_name': integrator_name,
        'success': runner.successful(),
        'texec': texec,
        'neval_f': f.neval,
        'neval_j': jac.neval,
    })
    return yout.reshape((len(tout), rd.N, rd.n)), tout, info


[docs]def sigm(x, lim=150., n=8): """ Algebraic sigmoid to avoid overflow/underflow of 'double exp(double)' """ return x/((x/lim)**n+1)**(1./n)
[docs]class Integration(object): """ Model kinetcs by integrating system of ODEs using user specified solver. Parameters ---------- solver: string "sundials" or "scipy" where scipy uses VODE as the solver. rd: ReactionDiffusion instance C0: array initial concentrations (untransformed, i.e. linear) tout: array times for which to report solver results (untransformed) sigm_damp: bool or tuple of (lim: float, n: int) conditionally damp C0 with an algebraic sigmoid when rd.logy == True. s(x) = x/((x/lim)**n+1)**(1./n) if sigm==True then `lim` and `n` are the default of `sigm()` C0_is_log: bool If True: passed values in C0 are taken to be the natural logarithm of initial concentrations. If False and rd.logy == True: a very small number is added to C0 to avoid applying log to zero (see `tiny`). tiny: float added to C0 when rd.logy==True and C0_is_log==False. Note that if you explicitly want to avoid adding tiny you need to set it to zero (e.g. when manually setting any C0==0 to some epsilon). (default: None => numpy.finfo(np.float64).tiny) **kwargs: mode: not supported by Sundials solver (current wrapper code auto selects banded for N>1 and uses dense mode for N==1) atol: float or sequence absolute tolerance of solution rtol: float relative tolerance of solution Attributes ---------- Cout: array linear output concentrations yout: array output from solver: log(concentrations) if rd.logy == True info: dict Information from solver. Guaranteed to contain: - 'texec': execution time in seconds. - 'atol': float or array, absolute tolerance(s). - 'rtol': float, relative tolerance Methods ------- _integrate() performs the integration, automatically called by __init__ """ _callbacks = { 'sundials': _integrate_sundials, 'scipy': _integrate_scipy, 'rk4': _integrate_rk4, } def __init__(self, solver, rd, C0, tout, sigm_damp=False, C0_is_log=False, tiny=None, **kwargs): if solver not in self._callbacks: raise KeyError("Unknown solver %s" % solver) self.solver = solver self.rd = rd self.C0 = to_unitless(C0, get_derived_unit( rd.units, 'concentration')).flatten() self.tout = to_unitless(tout, get_derived_unit( rd.units, 'time')) self.sigm_damp = sigm_damp self.C0_is_log = C0_is_log self.tiny = tiny or np.finfo(np.float64).tiny self.kwargs = kwargs self.yout = None self.info = None self.Cout = None self._sanity_checks() self._integrate() def _sanity_checks(self): if not self.C0_is_log: if np.any(self.C0 < 0): raise ValueError("Negative concentrations encountered in C0") def with_units(self, value, key): return value*get_derived_unit(self.rd.units, key) def _integrate(self): """ Performs the integration by calling the callback chosen by self.solver. If rd.logy == True, a transformation of self.C0 to log(C0) will be performed before running the integration (the same is done for self.tout / rd.logt == True). After the integration is done the attributes `Cout`, `info` and `yout` are set. Cout is guaranteed to be linear concentrations (transformed from yout by calling exp if rd.logy==True) and yout is the unprocessed output from the solver. """ C0 = self.C0 # Transform initial concentrations if self.rd.logy: if not self.C0_is_log: C0 = np.log(C0 + self.tiny) if self.sigm_damp is True: y0 = sigm(C0) elif isinstance(self.sigm_damp, tuple): y0 = sigm(C0, *self.sigm_damp) else: y0 = C0 else: if self.C0_is_log: if self.sigm_damp is True: y0 = np.exp(sigm(C0)) elif isinstance(self.sigm_damp, tuple): y0 = np.exp(sigm(C0, *self.sigm_damp)) else: y0 = np.exp(C0) else: y0 = C0 # Transform time tout = self.tout if tout[0] == 0.0 and self.rd.logt: t0_set = True t0 = suggest_t0(self.rd, y0) t = np.log(tout + t0) # conserve total time else: t0_set = False t = np.log(tout) if self.rd.logt else tout # Run the integration self.yout, self.internal_t, self.info = self._callbacks[self.solver]( self.rd, y0, t, **self.kwargs) self.info['t0_set'] = t0 if t0_set else False # Back-transform independent variable into linear time self.tout = self.with_units( np.exp(self.internal_t) if self.rd.logt else self.internal_t, 'time') # Back-transform integration output into linear concentration self.Cout = self.with_units( np.exp(self.yout) if self.rd.logy else self.yout, 'concentration')
[docs]def run(*args, **kwargs): """ ``run`` is provided for environment variable directed solver choice. Set ``CHEMREAC_SOLVER`` to indicate what integrator to use (default: "scipy"). Set ``CHEMREAC_SOLVER_KWARGS`` to a string which can be eval'd to a python dictionary. e.g. "{'atol': 1e-4, 'rtol'=1e-7}" """ import os environ_kwargs = os.getenv('CHEMREAC_SOLVER_KWARGS', None) if environ_kwargs: environ_kwargs = eval(environ_kwargs) if not isinstance(environ_kwargs, dict): fmtstr = "CHEMREAC_SOLVER_KWARGS not evaluated to a dictinary: {}" raise TypeError(fmtstr.format(environ_kwargs)) kwargs.update(environ_kwargs) return Integration( os.getenv('CHEMREAC_SOLVER', 'scipy'), *args, **kwargs)