Source code for brian2.codegen.runtime.numpy_rt.numpy_rt

"""
Module providing `NumpyCodeObject`.
"""

import sys
from collections.abc import Iterable

import numpy as np

from brian2.core.base import BrianObjectException
from brian2.core.functions import Function
from brian2.core.preferences import BrianPreference, prefs
from brian2.core.variables import (
    ArrayVariable,
    AuxiliaryVariable,
    DynamicArrayVariable,
    Subexpression,
)

from ...codeobject import CodeObject, check_compiler_kwds, constant_or_scalar
from ...generators.numpy_generator import NumpyCodeGenerator
from ...targets import codegen_targets
from ...templates import Templater

__all__ = ["NumpyCodeObject"]

# Preferences
prefs.register_preferences(
    "codegen.runtime.numpy",
    "Numpy runtime codegen preferences",
    discard_units=BrianPreference(
        default=False,
        docs="""
        Whether to change the namespace of user-specifed functions to remove
        units.
        """,
    ),
)


[docs] class LazyArange(Iterable): """ A class that can be used as a `~numpy.arange` replacement (with an implied step size of 1) but does not actually create an array of values until necessary. It is somewhat similar to the ``range()`` function in Python 3, but does not use a generator. It is tailored to a special use case, the ``_vectorisation_idx`` variable in numpy templates, and not meant for general use. The ``_vectorisation_idx`` is used for stateless function calls such as ``rand()`` and for the numpy codegen target determines the number of values produced by such a call. This will often be the number of neurons or synapses, and this class avoids creating a new array of that size at every code object call when all that is needed is the *length* of the array. Examples -------- >>> from brian2.codegen.runtime.numpy_rt.numpy_rt import LazyArange >>> ar = LazyArange(10) >>> len(ar) 10 >>> len(ar[:5]) 5 >>> type(ar[:5]) <class 'brian2.codegen.runtime.numpy_rt.numpy_rt.LazyArange'> >>> ar[5] 5 >>> for value in ar[3:7]: ... print(value) ... 3 4 5 6 >>> len(ar[np.array([1, 2, 3])]) 3 """ def __init__(self, stop, start=0, indices=None): self.start = start self.stop = stop self.indices = indices def __len__(self): if self.indices is None: return self.stop - self.start else: return len(self.indices) def __getitem__(self, item): if isinstance(item, slice): if self.indices is None: start, stop, step = item.start, item.stop, item.step if step not in [None, 1]: raise NotImplementedError("Step should be 1") if start is None: start = 0 if stop is None: stop = len(self) return LazyArange( start=self.start + start, stop=min([self.start + stop, self.stop]) ) else: raise NotImplementedError("Cannot slice LazyArange with indices") elif isinstance(item, np.ndarray): if item.dtype == np.dtype(bool): item = np.nonzero(item)[0] # convert boolean array into integers if len(item) == 0: return np.array([], dtype=np.int32) if np.min(item) < 0 or np.max(item) > len(self): raise IndexError("Indexing array contains out-of-bounds values") return LazyArange(start=self.start, stop=self.stop, indices=item) elif isinstance(item, int): if self.indices is None: index = self.start + item if index >= self.stop: raise IndexError(index) return index else: return self.indices[item] else: raise TypeError("Can only index with integer, numpy array, or slice.") def __iter__(self): if self.indices is None: return iter(np.arange(self.start, self.stop)) else: return iter(self.indices) # Allow conversion to a proper array with np.array(...) def __array__(self, dtype=None): if self.indices is None: return np.arange(self.start, self.stop) else: return self.indices + self.start # Allow basic arithmetics (used when shifting stuff for subgroups) def __add__(self, other): if isinstance(other, int): return LazyArange(start=self.start + other, stop=self.stop + other) else: return NotImplemented def __radd__(self, other): return self.__add__(other) def __sub__(self, other): if isinstance(other, int): return LazyArange(start=self.start - other, stop=self.stop - other) else: return NotImplemented
[docs] class NumpyCodeObject(CodeObject): """ Execute code using Numpy Default for Brian because it works on all platforms. """ templater = Templater( "brian2.codegen.runtime.numpy_rt", ".py_", env_globals={"constant_or_scalar": constant_or_scalar}, ) generator_class = NumpyCodeGenerator class_name = "numpy" def __init__( self, owner, code, variables, variable_indices, template_name, template_source, compiler_kwds, name="numpy_code_object*", ): check_compiler_kwds(compiler_kwds, [], "numpy") from brian2.devices.device import get_device self.device = get_device() self.namespace = { "_owner": owner, # TODO: This should maybe go somewhere else "logical_not": np.logical_not, } CodeObject.__init__( self, owner, code, variables, variable_indices, template_name, template_source, compiler_kwds=compiler_kwds, name=name, ) self.variables_to_namespace()
[docs] @classmethod def is_available(cls): # no test necessary for numpy return True
[docs] def variables_to_namespace(self): # Variables can refer to values that are either constant (e.g. dt) # or change every timestep (e.g. t). We add the values of the # constant variables here and add the names of non-constant variables # to a list # A list containing tuples of name and a function giving the value self.nonconstant_values = [] for name, var in self.variables.items(): if isinstance(var, (AuxiliaryVariable, Subexpression)): continue try: if not hasattr(var, "get_value"): raise TypeError() value = var.get_value() except TypeError: # Either a dummy Variable without a value or a Function object if isinstance(var, Function): impl = var.implementations[self.__class__].get_code(self.owner) self.namespace[name] = impl else: self.namespace[name] = var continue if isinstance(var, ArrayVariable): self.namespace[self.generator_class.get_array_name(var)] = value if var.scalar and var.constant: self.namespace[name] = value[0] else: self.namespace[name] = value if isinstance(var, DynamicArrayVariable): dyn_array_name = self.generator_class.get_array_name( var, access_data=False ) self.namespace[dyn_array_name] = self.device.get_value( var, access_data=False ) # Also provide the Variable object itself in the namespace (can be # necessary for resize operations, for example) self.namespace[f"_var_{name}"] = var # There is one type of objects that we have to inject into the # namespace with their current value at each time step: dynamic # arrays that change in size during runs (i.e. not synapses but # e.g. the structures used in monitors) if isinstance(var, DynamicArrayVariable) and var.needs_reference_update: self.nonconstant_values.append( ( self.generator_class.get_array_name(var, self.variables), var.get_value, ) )
[docs] def update_namespace(self): # update the values of the non-constant values in the namespace for name, func in self.nonconstant_values: self.namespace[name] = func()
[docs] def compile_block(self, block): code = getattr(self.code, block, "").strip() if not code or "EMPTY_CODE_BLOCK" in code: return None return compile(code, "(string)", "exec")
[docs] def run_block(self, block): compiled_code = self.compiled_code[block] if not compiled_code: return try: exec(compiled_code, self.namespace) except Exception as exc: code = getattr(self.code, block) message = ( "An exception occured during the execution of the " f"'{block}' block of code object {self.name}.\n" ) lines = code.split("\n") message += "The error was raised in the following line:\n" _, _, tb = sys.exc_info() tb = tb.tb_next # Line in the code object's code message += f"{lines[tb.tb_lineno - 1]}\n" raise BrianObjectException(message, self.owner) from exc # output variables should land in the variable name _return_values if "_return_values" in self.namespace: return self.namespace["_return_values"]
codegen_targets.add(NumpyCodeObject)