Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

192

193

194

195

196

197

198

199

200

201

202

203

204

205

206

207

208

209

210

211

212

213

214

215

216

217

218

219

220

221

222

223

224

225

226

227

228

229

230

231

232

233

234

235

236

237

238

239

240

241

242

243

244

245

246

247

248

249

250

251

252

253

254

255

256

257

258

259

260

261

262

263

264

265

266

267

268

269

270

271

272

273

274

275

276

277

278

279

280

281

282

283

284

285

286

287

288

289

290

291

292

293

294

295

296

297

298

299

300

301

302

303

304

305

306

307

308

309

310

311

312

313

314

315

316

317

318

319

320

321

322

323

324

325

326

327

328

329

330

331

332

333

334

335

336

337

338

339

340

341

342

343

344

345

346

347

348

349

350

351

352

353

354

355

356

357

358

359

360

361

362

363

364

365

366

367

368

369

370

371

372

373

374

375

376

377

378

379

380

381

# -*- coding: utf-8 -*- 

 

""" 

chemreac.core 

============= 

In chemreac.core you will find :py:class:`ReactionDiffusion` which 

is the class describing the system of ODEs. 

 

""" 

 

import numpy as np 

 

from .util.pyutil import monotonic 

from .units import unitof, get_derived_unit, to_unitless 

from .constants import get_unitless_constant 

 

from ._chemreac import CppReactionDiffusion, diag_data_len 

 

DENSE, BANDED, SPARSE = range(3) 

FLAT, CYLINDRICAL, SPHERICAL = range(3) 

Geom_names = {FLAT: 'Flat', CYLINDRICAL: 'Cylindrical', SPHERICAL: 'Spherical'} 

GEOM_ORDER = ('Flat', 'Cylindrical', 'Spherical') 

 

 

class ReactionDiffusionBase(object): 

    def to_Reaction(self, ri): 

        """ 

        Convenience method for making a Reaction instance 

        for reaction index ri 

        """ 

        from .chemistry import Reaction 

        return Reaction( 

            {self.substance_names[i]: self.stoich_active[ri].count(i) for 

             i in range(self.n)}, 

            {self.substance_names[i]: self.stoich_prod[ri].count(i) for 

             i in range(self.n)}, 

            {self.substance_names[i]: self.stoich_inactv[ri].count(i) for 

             i in range(self.n)}, 

            k=self.k[ri]) 

 

    def alloc_fout(self): 

        return np.zeros(self.n*self.N) 

 

    def alloc_jout(self, banded=True, order='C', pad=0): 

        if order == 'C': 

            rpad, cpad = 0, pad 

        elif order == 'F': 

            rpad, cpad = pad, 0 

        else: 

            raise ValueError("Order must be 'C' or 'F'") 

 

        if banded: 

            return np.zeros((self.n*2 + 1 + rpad, self.n*self.N + cpad), 

                            order=order) 

        else: 

            return np.zeros((self.n*self.N + rpad, self.n*self.N + cpad), 

                            order=order) 

 

    def alloc_jout_compressed(self, ndiag): 

        # TODO: ndiag from nstencil 

        return np.zeros(self.n*self.n*self.N + 2*diag_data_len( 

            self.N, self.n, ndiag)) 

 

    @property 

    def ny(self): 

        return self.N*self.n 

 

 

def get_unit(units, key): 

    try: 

        return get_derived_unit(units, key) 

    except KeyError: 

        return dict( 

            radyield=units['amount']/get_unit(units, 'energy'), 

            field=get_unit(units, 'energy')/(get_unit(units, 'time') * 

                                             get_unit(units, 'length')**3) 

        )[key] 

 

 

class ReactionDiffusion(CppReactionDiffusion, ReactionDiffusionBase): 

    """ 

    Object representing the numerical model, with callbacks for evaluating 

    derivatives and jacobian. 

 

    The instance provides methods: 

 

    - ``f(t, y, fout)`` 

    - ``dense_jac_rmaj(t, y, jout)`` 

    - ``dense_jac_cmaj(t, y, jout)`` 

    - ``banded_jac_cmaj(t, y, jout)`` 

    - ``banded_packed_jac_cmaj(t, y, jout)`` 

 

    some of which are used by chemreac.integrate.integrate_scipy 

 

    Additional convenience attributes (not used by underlying C++ class): 

 

    - :py:attr:`substance_names` 

    - :py:attr:`substance_tex_names` 

 

    Parameters 

    ---------- 

    n: integer 

        number of species 

    stoich_active: list of lists of integer indices 

        reactant index lists per reaction. 

    stoich_prod: list of lists of integer indices 

        product index lists per reaction. 

    k: 1-dimensional array 

        reaction rate coefficients 

    N: integer 

        number of compartments (default: 1 if x==None else len(x)-1) 

    D: sequence of floats 

        diffusion coefficients (of length n) 

    z_chg: sequence of integers 

        1-dimensional array ion charges 

    mobility: sequence of floats 

        mobility of ions 

    x: sequence of floats or pair of flats or float, optional 

        compartment boundaries (of length N+1), default: linspace(0, 1, N+1) 

        if x is a pair of floats it is expanded into linspace(x[0], x[1], N+1). 

        if x is a float it is expanded into linspace(0, x, N+1) 

    stoich_inactv: list of lists of integer indices 

        list of inactive reactant index lists per reaction.n, default: [] 

    geom: integer 

        any of (FLAT, SPHERICAL, CYLINDRICAL) 

    logy: bool 

        f and \*_jac_\* routines operate on log(concentration) 

    logt: bool 

        f and \*_jac_\* routines operate on log(time) 

    logx: bool 

        f and \*_jac_\* routines operate on log(space) 

    nstencil: integer 

        number of points used in finite difference scheme 

    lrefl: bool 

        reflective left boundary (default: True) 

    rrefl: bool 

        reflective right boundary (default: True) 

    auto_efield: bool 

        calculate electric field from concentrations (default: False) 

    surf_chg: pair of floats 

        total charge of surface (defaut: (0.0, 0.0)) 

    eps_rel: float 

        relative permitivity of medium (dielectric constant) 

    g_values: sequence of sequences of floats 

        per specie yields (amount / energy) per field type 

    g_value_parents: sequence of integers (optional) 

        indices of parents for each g_value. -1 denotes no concentration 

        dependence of g_values. default: [-1]*len(g_values) 

    fields: sequence of sequence of floats 

        per bin field strength (energy / (volume time)) per field type. 

        May be calculated as product between density and doserate 

    modulated_rxns: sequence of integers 

        Indicies of reactions subject to per bin modulation 

    modulation: sequence of sequences of floats 

        Per bin modulation vectors for each index in modulated_rxns 

    units: dict (optional) 

        default: None, see ``chemreac.units.SI_base`` for an 

        example. 

 

    Attributes 

    ---------- 

    units: dict 

 

    """ 

    # not used by C++ class 

    extra_attrs = ['substance_names', 'substance_tex_names'] 

 

    # subset of extra_attrs optionally passed by user 

    kwarg_attrs = ['substance_names', 'substance_tex_names'] 

 

    _substance_names = None 

    _substance_tex_names = None 

 

    @property 

    def substance_names(self): 

        return self._substance_names or list(map(str, range(self.n))) 

 

    @substance_names.setter 

    def substance_names(self, names): 

        self._substance_names = names 

 

    @property 

    def substance_tex_names(self): 

        return self._substance_tex_names or list(map(str, range(self.n))) 

 

    @substance_tex_names.setter 

    def substance_tex_names(self, tex_names): 

        self._substance_tex_names = tex_names 

 

    def __new__(cls, n, stoich_active, stoich_prod, k, N=0, D=None, z_chg=None, 

                mobility=None, x=None, stoich_inactv=None, geom=FLAT, 

                logy=False, logt=False, logx=False, nstencil=None, 

                lrefl=True, rrefl=True, auto_efield=False, surf_chg=None, 

                eps_rel=1.0, g_values=None, g_value_parents=None, 

                fields=None, 

                modulated_rxns=None, 

                modulation=None, 

                units=None, 

                faraday=None,  # deprecated 

                vacuum_permittivity=None,  # deprecated 

                k_unitless=None, 

                **kwargs): 

        if N == 0: 

            if x is None: 

                N = 1 

            else: 

                N = len(x)-1 

        nstencil = nstencil or (1 if N == 1 else 3) 

        if N < nstencil: 

            raise ValueError("N must be >= nstencil") 

 

        if z_chg is None: 

            z_chg = list([0]*n) 

        if mobility is None: 

            mobility = np.zeros(n)*get_unit(units, 'electrical_mobility') 

        if N > 1: 

            assert n == len(D) 

            assert n == len(z_chg) 

            assert n == len(mobility) 

        else: 

            if D is None: 

                D = np.zeros(n)*get_unit(units, 'diffusion') 

 

        if x is None: 

            x = 1.0*get_unit(units, 'length') 

 

        try: 

            if len(x) == N+1: 

                if not monotonic(x, 1, True): 

                    raise ValueError("x must be strictly positive monotonic") 

                _x = x 

            elif len(x) == 2: 

                _x = np.linspace(x[0], x[1], N+1) 

            elif len(x) == 1: 

                raise TypeError 

            else: 

                raise ValueError("Don't know what to do with len(x) == %d" % 

                                 len(x)) 

        except TypeError: 

            _x = np.linspace(0*unitof(x), x, N+1) 

 

        if stoich_inactv is None: 

            stoich_inactv = list([[]]*len(stoich_active)) 

        assert len(stoich_inactv) == len(stoich_active) 

        assert len(stoich_active) == len(stoich_prod) 

 

        if k is not None: 

            assert len(stoich_active) == len(k) 

        else: 

            assert len(stoich_active) == len(k_unitless) 

        assert geom in (FLAT, CYLINDRICAL, SPHERICAL) 

 

        if surf_chg is None: 

            surf_chg = (0.0*get_unit(units, 'charge'), 

                        0.0*get_unit(units, 'charge')) 

 

        # Handle g_values 

        if g_values is None: 

            g_values = [] 

        else: 

            if fields is not None: 

                assert len(g_values) == len(fields) 

            for gv in g_values: 

                assert len(gv) == n 

        if g_value_parents is None: 

            g_value_parents = [-1]*len(g_values) 

 

        if fields is None: 

            fields = [[0.0*get_unit(units, 'field')]*N]*len(g_values) 

        else: 

            assert len(fields) == len(g_values) 

            for fld in fields: 

                assert len(fld) == N 

 

        reac_orders = map(len, stoich_active) 

        if k_unitless is None: 

            k_unitless = [to_unitless(kval, kunit) for kval, kunit in 

                          zip(k, cls.k_units(units, reac_orders))] 

        else: 

            if k is not None: 

                raise ValueError( 

                    "When passing k_unitless you must set k to None") 

 

        if modulated_rxns is not None: 

            if len(modulated_rxns) != len(modulation): 

                raise ValueError("len(modulated_rxns) != len(modulation)") 

        if modulation is not None: 

            if modulated_rxns is None: 

                raise ValueError("modulated_rxns missing.") 

            if any(len(arr) != N for arr in modulation): 

                raise ValueError("An array in modulation of size != N") 

 

        rd = super(ReactionDiffusion, cls).__new__( 

            cls, n, stoich_active, stoich_prod, 

            np.asarray(k_unitless), 

            N, 

            to_unitless(D, get_unit(units, 'diffusion')), 

            z_chg, 

            to_unitless(mobility, get_unit(units, 'electrical_mobility')), 

            to_unitless(_x, get_unit(units, 'length')), 

            stoich_inactv, geom, logy, logt, logx, 

            [np.asarray([to_unitless(yld, yld_unit) for yld in gv]) for gv, 

             yld_unit in zip(g_values, cls.g_units(units, g_value_parents))], 

            g_value_parents, 

            [to_unitless(fld, get_unit(units, 'field')) for fld in fields], 

            modulated_rxns or [], 

            modulation or [], 

            nstencil, lrefl, rrefl, auto_efield, 

            (to_unitless(surf_chg[0], get_unit(units, 'charge')), 

             to_unitless(surf_chg[1], get_unit(units, 'charge'))), 

            eps_rel, 

            faraday or get_unitless_constant(units, 'Faraday_constant'), 

            vacuum_permittivity or get_unitless_constant( 

                units, 'vacuum_permittivity'), 

        ) 

 

        rd.units = units 

 

        for attr in cls.kwarg_attrs: 

            if attr in kwargs: 

                setattr(rd, '_' + attr, kwargs.pop(attr)) 

        if kwargs: 

            raise KeyError("Unkown kwargs: ", kwargs.keys()) 

        return rd 

 

    @property 

    def fields(self): 

        return np.asarray(self._fields)*get_unit(self.units, 'field') 

 

    @fields.setter 

    def fields(self, value): 

        self._fields = to_unitless(value, get_unit(self.units, 'field')) 

 

    @staticmethod 

    def g_units(units, g_value_parents): 

        g_units = [] 

        for parent in g_value_parents: 

            if parent == -1: 

                g_units.append(get_unit(units, 'radyield')) 

            else: 

                g_units.append(get_unit(units, 'radyield') / 

                               get_unit(units, 'concentration')) 

        return g_units 

 

    @property 

    def g_values(self): 

        return [np.asarray(gv)*gu for gv, gu in zip( 

            self._g_values, self.g_units(self.units, self.g_value_parents))] 

 

    @staticmethod 

    def k_units(units, reaction_orders): 

        return [get_unit(units, 'concentration')**( 

            1-order)/get_unit(units, 'time') for order in reaction_orders] 

 

    @property 

    def k(self): 

        reac_orders = map(len, self.stoich_active) 

        return [kv*ku for kv, ku in zip(self._k, self.k_units( 

            self.units, reac_orders))] 

 

    @k.setter 

    def k(self, value): 

        reac_orders = map(len, self.stoich_active) 

        self._k = [to_unitless(kv, ku) for kv, ku in zip(value, self.k_units( 

            reac_orders))] 

 

    @property 

    def D(self): 

        return np.asarray(self._D)*get_unit(self.units, 'diffusion') 

 

    @D.setter 

    def D(self, value): 

        self._D = to_unitless(value, get_unit(self.units, 'diffusion')) 

 

    @property 

    def mobility(self): 

        return np.asarray(self._mobility)*get_unit(self.units, 'mobility') 

 

    @mobility.setter 

    def mobility(self, value): 

        self._mobility = to_unitless(value, get_unit(self.units, 'mobility'))