# -*- coding: utf-8 -*-
"""
chemreac.core
=============
In chemreac.core you will find :py:class:`ReactionDiffusion` which
is the class describing the system of ODEs.
"""
import numpy as np
from .util.pyutil import monotonic
from .units import unitof, get_derived_unit, to_unitless
from .constants import get_unitless_constant
from ._chemreac import CppReactionDiffusion, diag_data_len
DENSE, BANDED, SPARSE = range(3)
FLAT, CYLINDRICAL, SPHERICAL = range(3)
Geom_names = {FLAT: 'Flat', CYLINDRICAL: 'Cylindrical', SPHERICAL: 'Spherical'}
GEOM_ORDER = ('Flat', 'Cylindrical', 'Spherical')
class ReactionDiffusionBase(object):
def to_Reaction(self, ri):
"""
Convenience method for making a Reaction instance
for reaction index ri
"""
from .chemistry import Reaction
return Reaction(
{self.substance_names[i]: self.stoich_active[ri].count(i) for
i in range(self.n)},
{self.substance_names[i]: self.stoich_prod[ri].count(i) for
i in range(self.n)},
{self.substance_names[i]: self.stoich_inactv[ri].count(i) for
i in range(self.n)},
k=self.k[ri])
def alloc_fout(self):
return np.zeros(self.n*self.N)
def alloc_jout(self, banded=True, order='C', pad=0):
if order == 'C':
rpad, cpad = 0, pad
elif order == 'F':
rpad, cpad = pad, 0
else:
raise ValueError("Order must be 'C' or 'F'")
if banded:
return np.zeros((self.n*2 + 1 + rpad, self.n*self.N + cpad),
order=order)
else:
return np.zeros((self.n*self.N + rpad, self.n*self.N + cpad),
order=order)
def alloc_jout_compressed(self, ndiag):
# TODO: ndiag from nstencil
return np.zeros(self.n*self.n*self.N + 2*diag_data_len(
self.N, self.n, ndiag))
@property
def ny(self):
return self.N*self.n
def get_unit(units, key):
try:
return get_derived_unit(units, key)
except KeyError:
return dict(
radyield=units['amount']/get_unit(units, 'energy'),
field=get_unit(units, 'energy')/(get_unit(units, 'time') *
get_unit(units, 'length')**3)
)[key]
[docs]class ReactionDiffusion(CppReactionDiffusion, ReactionDiffusionBase):
"""
Object representing the numerical model, with callbacks for evaluating
derivatives and jacobian.
The instance provides methods:
- ``f(t, y, fout)``
- ``dense_jac_rmaj(t, y, jout)``
- ``dense_jac_cmaj(t, y, jout)``
- ``banded_jac_cmaj(t, y, jout)``
- ``banded_packed_jac_cmaj(t, y, jout)``
some of which are used by chemreac.integrate.integrate_scipy
Additional convenience attributes (not used by underlying C++ class):
- :py:attr:`substance_names`
- :py:attr:`substance_tex_names`
Parameters
----------
n: integer
number of species
stoich_active: list of lists of integer indices
reactant index lists per reaction.
stoich_prod: list of lists of integer indices
product index lists per reaction.
k: 1-dimensional array
reaction rate coefficients
N: integer
number of compartments (default: 1 if x==None else len(x)-1)
D: sequence of floats
diffusion coefficients (of length n)
z_chg: sequence of integers
1-dimensional array ion charges
mobility: sequence of floats
mobility of ions
x: sequence of floats or pair of flats or float, optional
compartment boundaries (of length N+1), default: linspace(0, 1, N+1)
if x is a pair of floats it is expanded into linspace(x[0], x[1], N+1).
if x is a float it is expanded into linspace(0, x, N+1)
stoich_inactv: list of lists of integer indices
list of inactive reactant index lists per reaction.n, default: []
geom: integer
any of (FLAT, SPHERICAL, CYLINDRICAL)
logy: bool
f and \*_jac_\* routines operate on log(concentration)
logt: bool
f and \*_jac_\* routines operate on log(time)
logx: bool
f and \*_jac_\* routines operate on log(space)
nstencil: integer
number of points used in finite difference scheme
lrefl: bool
reflective left boundary (default: True)
rrefl: bool
reflective right boundary (default: True)
auto_efield: bool
calculate electric field from concentrations (default: False)
surf_chg: pair of floats
total charge of surface (defaut: (0.0, 0.0))
eps_rel: float
relative permitivity of medium (dielectric constant)
g_values: sequence of sequences of floats
per specie yields (amount / energy) per field type
g_value_parents: sequence of integers (optional)
indices of parents for each g_value. -1 denotes no concentration
dependence of g_values. default: [-1]*len(g_values)
fields: sequence of sequence of floats
per bin field strength (energy / (volume time)) per field type.
May be calculated as product between density and doserate
modulated_rxns: sequence of integers
Indicies of reactions subject to per bin modulation
modulation: sequence of sequences of floats
Per bin modulation vectors for each index in modulated_rxns
units: dict (optional)
default: None, see ``chemreac.units.SI_base`` for an
example.
Attributes
----------
units: dict
"""
# not used by C++ class
extra_attrs = ['substance_names', 'substance_tex_names']
# subset of extra_attrs optionally passed by user
kwarg_attrs = ['substance_names', 'substance_tex_names']
_substance_names = None
_substance_tex_names = None
@property
def substance_names(self):
return self._substance_names or list(map(str, range(self.n)))
@substance_names.setter
def substance_names(self, names):
self._substance_names = names
@property
def substance_tex_names(self):
return self._substance_tex_names or list(map(str, range(self.n)))
@substance_tex_names.setter
def substance_tex_names(self, tex_names):
self._substance_tex_names = tex_names
def __new__(cls, n, stoich_active, stoich_prod, k, N=0, D=None, z_chg=None,
mobility=None, x=None, stoich_inactv=None, geom=FLAT,
logy=False, logt=False, logx=False, nstencil=None,
lrefl=True, rrefl=True, auto_efield=False, surf_chg=None,
eps_rel=1.0, g_values=None, g_value_parents=None,
fields=None,
modulated_rxns=None,
modulation=None,
units=None,
faraday=None, # deprecated
vacuum_permittivity=None, # deprecated
k_unitless=None,
**kwargs):
if N == 0:
if x is None:
N = 1
else:
N = len(x)-1
nstencil = nstencil or (1 if N == 1 else 3)
if N < nstencil:
raise ValueError("N must be >= nstencil")
if z_chg is None:
z_chg = list([0]*n)
if mobility is None:
mobility = np.zeros(n)*get_unit(units, 'electrical_mobility')
if N > 1:
assert n == len(D)
assert n == len(z_chg)
assert n == len(mobility)
else:
if D is None:
D = np.zeros(n)*get_unit(units, 'diffusion')
if x is None:
x = 1.0*get_unit(units, 'length')
try:
if len(x) == N+1:
if not monotonic(x, 1, True):
raise ValueError("x must be strictly positive monotonic")
_x = x
elif len(x) == 2:
_x = np.linspace(x[0], x[1], N+1)
elif len(x) == 1:
raise TypeError
else:
raise ValueError("Don't know what to do with len(x) == %d" %
len(x))
except TypeError:
_x = np.linspace(0*unitof(x), x, N+1)
if stoich_inactv is None:
stoich_inactv = list([[]]*len(stoich_active))
assert len(stoich_inactv) == len(stoich_active)
assert len(stoich_active) == len(stoich_prod)
if k is not None:
assert len(stoich_active) == len(k)
else:
assert len(stoich_active) == len(k_unitless)
assert geom in (FLAT, CYLINDRICAL, SPHERICAL)
if surf_chg is None:
surf_chg = (0.0*get_unit(units, 'charge'),
0.0*get_unit(units, 'charge'))
# Handle g_values
if g_values is None:
g_values = []
else:
if fields is not None:
assert len(g_values) == len(fields)
for gv in g_values:
assert len(gv) == n
if g_value_parents is None:
g_value_parents = [-1]*len(g_values)
if fields is None:
fields = [[0.0*get_unit(units, 'field')]*N]*len(g_values)
else:
assert len(fields) == len(g_values)
for fld in fields:
assert len(fld) == N
reac_orders = map(len, stoich_active)
if k_unitless is None:
k_unitless = [to_unitless(kval, kunit) for kval, kunit in
zip(k, cls.k_units(units, reac_orders))]
else:
if k is not None:
raise ValueError(
"When passing k_unitless you must set k to None")
if modulated_rxns is not None:
if len(modulated_rxns) != len(modulation):
raise ValueError("len(modulated_rxns) != len(modulation)")
if modulation is not None:
if modulated_rxns is None:
raise ValueError("modulated_rxns missing.")
if any(len(arr) != N for arr in modulation):
raise ValueError("An array in modulation of size != N")
rd = super(ReactionDiffusion, cls).__new__(
cls, n, stoich_active, stoich_prod,
np.asarray(k_unitless),
N,
to_unitless(D, get_unit(units, 'diffusion')),
z_chg,
to_unitless(mobility, get_unit(units, 'electrical_mobility')),
to_unitless(_x, get_unit(units, 'length')),
stoich_inactv, geom, logy, logt, logx,
[np.asarray([to_unitless(yld, yld_unit) for yld in gv]) for gv,
yld_unit in zip(g_values, cls.g_units(units, g_value_parents))],
g_value_parents,
[to_unitless(fld, get_unit(units, 'field')) for fld in fields],
modulated_rxns or [],
modulation or [],
nstencil, lrefl, rrefl, auto_efield,
(to_unitless(surf_chg[0], get_unit(units, 'charge')),
to_unitless(surf_chg[1], get_unit(units, 'charge'))),
eps_rel,
faraday or get_unitless_constant(units, 'Faraday_constant'),
vacuum_permittivity or get_unitless_constant(
units, 'vacuum_permittivity'),
)
rd.units = units
for attr in cls.kwarg_attrs:
if attr in kwargs:
setattr(rd, '_' + attr, kwargs.pop(attr))
if kwargs:
raise KeyError("Unkown kwargs: ", kwargs.keys())
return rd
@property
def fields(self):
return np.asarray(self._fields)*get_unit(self.units, 'field')
@fields.setter
def fields(self, value):
self._fields = to_unitless(value, get_unit(self.units, 'field'))
@staticmethod
def g_units(units, g_value_parents):
g_units = []
for parent in g_value_parents:
if parent == -1:
g_units.append(get_unit(units, 'radyield'))
else:
g_units.append(get_unit(units, 'radyield') /
get_unit(units, 'concentration'))
return g_units
@property
def g_values(self):
return [np.asarray(gv)*gu for gv, gu in zip(
self._g_values, self.g_units(self.units, self.g_value_parents))]
@staticmethod
def k_units(units, reaction_orders):
return [get_unit(units, 'concentration')**(
1-order)/get_unit(units, 'time') for order in reaction_orders]
@property
def k(self):
reac_orders = map(len, self.stoich_active)
return [kv*ku for kv, ku in zip(self._k, self.k_units(
self.units, reac_orders))]
@k.setter
def k(self, value):
reac_orders = map(len, self.stoich_active)
self._k = [to_unitless(kv, ku) for kv, ku in zip(value, self.k_units(
reac_orders))]
@property
def D(self):
return np.asarray(self._D)*get_unit(self.units, 'diffusion')
@D.setter
def D(self, value):
self._D = to_unitless(value, get_unit(self.units, 'diffusion'))
@property
def mobility(self):
return np.asarray(self._mobility)*get_unit(self.units, 'mobility')
@mobility.setter
def mobility(self, value):
self._mobility = to_unitless(value, get_unit(self.units, 'mobility'))