Source code for brian2.devices.device

Module containing the `Device` base class as well as the `RuntimeDevice`
implementation and some helper functions to access/set devices.

from weakref import WeakKeyDictionary
import numbers

import numpy as np

from brian2.memory.dynamicarray import DynamicArray, DynamicArray1D
from brian2.codegen.targets import codegen_targets
from brian2.codegen.runtime.numpy_rt import NumpyCodeObject
from brian2.core.names import find_name
from brian2.core.preferences import prefs
from brian2.core.variables import ArrayVariable, DynamicArrayVariable
from brian2.core.functions import Function
from brian2.units import ms
from brian2.utils.logger import get_logger
from brian2.utils.stringtools import code_representation, indent

__all__ = ['Device', 'RuntimeDevice',
           'get_device', 'set_device',
           'all_devices', 'reinit_devices', 'reinit_and_delete',
           'reset_device', 'device', 'seed'

logger = get_logger(__name__)

all_devices = {}

prefs.register_preferences('devices', 'Device preferences')

#: caches the automatically determined code generation target
_auto_target = None

[docs]def auto_target(): ''' Automatically chose a code generation target (invoked when the `` preference is set to `'auto'`. Caches its result so it only does the check once. Prefers cython > numpy. Returns ------- target : class derived from `CodeObject` The target to use ''' global _auto_target if _auto_target is None: target_dict = dict((target.class_name, target) for target in codegen_targets if target.class_name) using_fallback = False if 'cython' in target_dict and target_dict['cython'].is_available(): _auto_target = target_dict['cython'] else: _auto_target = target_dict['numpy'] using_fallback = True if using_fallback:'Cannot use compiled code, falling back to the numpy ' 'code generation target. Note that this will likely ' 'be slower than using compiled code. Set the code ' 'generation to numpy manually to avoid this message:\n' ' = "numpy"', 'codegen_fallback', once=True) else: logger.debug(('Chosing %r as the code generation ' 'target.') % _auto_target.class_name) return _auto_target
[docs]class Device(object): ''' Base Device object. ''' def __init__(self): #: The network schedule that this device supports. If the device only #: supports a specific, fixed schedule, it has to set this attribute to #: the respective schedule (see `Network.schedule` for details). If it #: supports arbitrary schedules, it should be set to ``None`` (the #: default). self.network_schedule = None self.defaultclock = None self._maximum_run_time = None self._state_tuple = (self.__module__, self.__class__.__name__) def _set_maximum_run_time(self, maximum_run_time): ''' Sets a maximum time for a run before it will break. Used primarily for testing purposes. Not guaranteed to be respected by a device. ''' self._maximum_run_time = maximum_run_time
[docs] def get_array_name(self, var, access_data=True): ''' Return a globally unique name for `var`. Parameters ---------- access_data : bool, optional For `DynamicArrayVariable` objects, specifying `True` here means the name for the underlying data is returned. If specifying `False`, the name of object itself is returned (e.g. to allow resizing). Returns ------- name : str The name for `var`. ''' raise NotImplementedError()
[docs] def get_len(self, array): ''' Return the length of the array. Parameters ---------- array : `ArrayVariable` The array for which the length is requested. Returns ------- l : int The length of the array. ''' raise NotImplementedError()
[docs] def add_array(self, var): ''' Add an array to this device. Parameters ---------- var : `ArrayVariable` The array to add. ''' raise NotImplementedError()
[docs] def init_with_zeros(self, var, dtype): ''' Initialize an array with zeros. Parameters ---------- var : `ArrayVariable` The array to initialize with zeros. dtype : `dtype` The data type to use for the array. ''' raise NotImplementedError()
[docs] def init_with_arange(self, var, start, dtype): ''' Initialize an array with an integer range. Parameters ---------- var : `ArrayVariable` The array to fill with the integer range. start : int The start value for the integer range dtype : `dtype` The data type to use for the array. ''' raise NotImplementedError()
[docs] def fill_with_array(self, var, arr): ''' Fill an array with the values given in another array. Parameters ---------- var : `ArrayVariable` The array to fill. arr : `ndarray` The array values that should be copied to `var`. ''' raise NotImplementedError()
[docs] def spike_queue(self, source_start, source_end): ''' Create and return a new `SpikeQueue` for this `Device`. Parameters ---------- source_start : int The start index of the source group (necessary for subgroups) source_end : int The end index of the source group (necessary for subgroups) ''' raise NotImplementedError()
[docs] def resize(self, var, new_size): ''' Resize a `DynamicArrayVariable`. Parameters ---------- var : `DynamicArrayVariable` The variable that should be resized. new_size : int The new size of the variable ''' raise NotImplementedError()
[docs] def resize_along_first(self, var, new_size): # Can be overwritten with a better implementation return self.resize(var, new_size)
[docs] def seed(self, seed=None): ''' Set the seed for the random number generator. Parameters ---------- seed : int, optional The seed value for the random number generator, or ``None`` (the default) to set a random seed. ''' raise NotImplementedError()
[docs] def code_object_class(self, codeobj_class=None, fallback_pref=''): ''' Return `CodeObject` class according to input/default settings Parameters ---------- codeobj_class : a `CodeObject` class, optional If this is keyword is set to None or no arguments are given, this method will return the default. fallback_pref : str, optional String describing which attribute of prefs to access to retrieve the 'default' target. Usually this is, but in some cases we want to use object-specific targets such as codegen.string_expression_target. Returns ------- codeobj_class : class The `CodeObject` class that should be used ''' if isinstance(codeobj_class, str): raise TypeError("codeobj_class argument given to code_object_class device method " "should be a CodeObject class, not a string. You can, however, " "send a string description of the target desired for the CodeObject " "under the keyword fallback_pref") if codeobj_class is None: codeobj_class = prefs[fallback_pref] if isinstance(codeobj_class, str): if codeobj_class == 'auto': return auto_target() for target in codegen_targets: if target.class_name == codeobj_class: return target # No target found targets = ['auto'] + [target.class_name for target in codegen_targets if target.class_name] raise ValueError("Unknown code generation target: %s, should be " " one of %s" % (codeobj_class, targets)) else: return codeobj_class
[docs] def code_object(self, owner, name, abstract_code, variables, template_name, variable_indices, codeobj_class=None, template_kwds=None, override_conditional_write=None, compiler_kwds=None): if compiler_kwds is None: compiler_kwds = {} name = find_name(name) codeobj_class = self.code_object_class(codeobj_class) template = getattr(codeobj_class.templater, template_name) iterate_all = template.iterate_all generator = codeobj_class.generator_class(variables=variables, variable_indices=variable_indices, owner=owner, iterate_all=iterate_all, codeobj_class=codeobj_class, override_conditional_write=override_conditional_write, allows_scalar_write=template.allows_scalar_write, name=name, template_name=template_name) if template_kwds is None: template_kwds = dict() else: template_kwds = template_kwds.copy() logger.diagnostic('%s abstract code:\n%s' % (name, indent(code_representation(abstract_code)))) scalar_code, vector_code, kwds = generator.translate(abstract_code, dtype=prefs['core.default_float_dtype']) # Add the array names as keywords as well for varname, var in variables.items(): if isinstance(var, ArrayVariable): pointer_name = generator.get_array_name(var) if var.scalar: pointer_name += '[0]' template_kwds[varname] = pointer_name if hasattr(var, 'resize'): dyn_array_name = generator.get_array_name(var, access_data=False) template_kwds['_dynamic_'+varname] = dyn_array_name template_kwds.update(kwds) logger.diagnostic('%s snippet (scalar):\n%s' % (name, indent(code_representation(scalar_code)))) logger.diagnostic('%s snippet (vector):\n%s' % (name, indent(code_representation(vector_code)))) code = template(scalar_code, vector_code, owner=owner, variables=variables, codeobj_name=name, variable_indices=variable_indices, get_array_name=generator.get_array_name, **template_kwds) logger.diagnostic('%s code:\n%s' % (name, indent(code_representation(code)))) codeobj = codeobj_class(owner, code, variables, variable_indices, template_name=template_name, template_source=template.template_source, name=name, compiler_kwds=compiler_kwds) codeobj.compile() return codeobj
[docs] def activate(self, build_on_run=True, **kwargs): ''' Called when this device is set as the current device. ''' from brian2.core.clocks import Clock # avoid import issues if self.defaultclock is None: self.defaultclock = Clock(dt=0.1*ms, name='defaultclock') self._set_maximum_run_time(None) self.build_on_run = build_on_run self.build_options = dict(kwargs)
[docs] def insert_device_code(self, slot, code): # Deprecated raise AttributeError("The method 'insert_device_code' has been renamed " "to 'insert_code'.")
[docs] def insert_code(self, slot, code): ''' Insert code directly into a given slot in the device. By default does nothing. ''' logger.warn("Ignoring device code, unknown slot: %s, code: %s" % (slot, code))
[docs] def build(self, **kwds): ''' For standalone projects, called when the project is ready to be built. Does nothing for runtime mode. ''' pass
[docs] def reinit(self): ''' Reinitialize the device. For standalone devices, clears all the internal state of the device. ''' pass
[docs] def delete(self, data=True, code=True, directory=True, force=False): ''' Delete code and/or data generated/stored by the device. Parameters ---------- data : bool, optional Whether to delete the data generated by the simulation (final values of state variables, data stored in monitors, etc.). Defaults to ``True``. code : bool, optional Whether to delete the code generated by the simulation. Includes the numerical values used for initialization of state variables in assignments not using strings. Defaults to ``True``. directory : bool, optional Whether to delete the project directory generated by the simulation. Will not delete directories that contain files not created by Brian unless the ``force`` option is specfied. Defaults to ``True``. force : bool, optional Whether to delete the project directory with all its content, even if it contains files that were not created by Brian. Useful only when the ``directory`` option is set to ``True`` as well. Defaults to ``False``. ''' pass
[docs] def get_random_state(self): ''' Return a (pickable) representation of the current random number generator state. Providing the returned object (e.g. a dict) to `.Device.set_random_state` should restore the random number generator state. Returns ------- state The state of the random number generator in a representation that can be passed as an argument to `.Device.set_random_state`. ''' raise NotImplementedError('Device does not support getting the state ' 'of the random number generator.')
[docs] def set_random_state(self, state): ''' Reset the random number generator state to a previously stored state (see `.Device.get_random_state`). Parameters ---------- state A random number generator state as provided by `Device.get_random_state`. ''' raise NotImplementedError('Device does not support setting the state ' 'of the random number generator.')
[docs]class RuntimeDevice(Device): ''' The default device used in Brian, state variables are stored as numpy arrays in memory. ''' def __init__(self): super(RuntimeDevice, self).__init__() #: Mapping from `Variable` objects to numpy arrays (or `DynamicArray` #: objects). Arrays in this dictionary will disappear as soon as the #: last reference to the `Variable` object used as a key is gone self.arrays = WeakKeyDictionary() # Note that the buffers only store a pointer to the actual random # numbers -- the buffer will be filled in Cython code self.randn_buffer = np.zeros(1, dtype=np.intp) self.rand_buffer = np.zeros(1, dtype=np.intp) self.randn_buffer_index = np.zeros(1, dtype=np.int32) self.rand_buffer_index = np.zeros(1, dtype=np.int32)
[docs] def get_array_name(self, var, access_data=True): # if no owner is set, this is a temporary object (e.g. the array # of indices when doing G.x[indices] = ...). The name is not # necessarily unique over several CodeObjects in this case. owner_name = getattr(var.owner, 'name', 'temporary') if isinstance(var, DynamicArrayVariable): if access_data: return '_array_' + owner_name + '_' + else: return '_dynamic_array_' + owner_name + '_' + elif isinstance(var, ArrayVariable): return '_array_' + owner_name + '_' + else: raise TypeError(('Do not have a name for variable of type ' '%s') % type(var))
[docs] def add_array(self, var): # This creates the actual numpy arrays (or DynamicArrayVariable objects) if isinstance(var, DynamicArrayVariable): if var.ndim == 1: arr = DynamicArray1D(var.size, dtype=var.dtype) else: arr = DynamicArray(var.size, dtype=var.dtype) else: arr = np.empty(var.size, dtype=var.dtype) self.arrays[var] = arr
[docs] def get_value(self, var, access_data=True): if isinstance(var, DynamicArrayVariable) and access_data: return self.arrays[var].data else: return self.arrays[var]
[docs] def set_value(self, var, value): self.arrays[var][:] = value
[docs] def resize(self, var, new_size): self.arrays[var].resize(new_size)
[docs] def resize_along_first(self, var, new_size): self.arrays[var].resize_along_first(new_size)
[docs] def init_with_zeros(self, var, dtype): self.arrays[var][:] = 0
[docs] def init_with_arange(self, var, start, dtype): self.arrays[var][:] = np.arange(start, stop=var.get_len()+start, dtype=dtype)
[docs] def fill_with_array(self, var, arr): self.arrays[var][:] = arr
[docs] def spike_queue(self, source_start, source_end): # Use the C++ version of the SpikeQueue when available try: from brian2.synapses.cythonspikequeue import SpikeQueue logger.diagnostic('Using the C++ SpikeQueue', once=True) except ImportError: from brian2.synapses.spikequeue import SpikeQueue logger.diagnostic('Using the Python SpikeQueue', once=True) return SpikeQueue(source_start=source_start, source_end=source_end)
[docs] def seed(self, seed=None): ''' Set the seed for the random number generator. Parameters ---------- seed : int, optional The seed value for the random number generator, or ``None`` (the default) to set a random seed. ''' np.random.seed(seed) self.rand_buffer_index[:] = 0 self.randn_buffer_index[:] = 0
[docs] def get_random_state(self): return {'numpy_state': np.random.get_state(), 'rand_buffer_index': np.array(self.rand_buffer_index), 'rand_buffer': np.array(self.rand_buffer), 'randn_buffer_index': np.array(self.randn_buffer_index), 'randn_buffer': np.array(self.randn_buffer) }
[docs] def set_random_state(self, state): np.random.set_state(state['numpy_state']) self.rand_buffer_index[:] = state['rand_buffer_index'] self.rand_buffer[:] = state['rand_buffer'] self.randn_buffer_index[:] = state['randn_buffer_index'] self.randn_buffer[:] = state['randn_buffer']
[docs]class Dummy(object): ''' Dummy object ''' def __getattr__(self, name): return Dummy()
[docs] def __call__(self, *args, **kwds): return Dummy()
def __enter__(self): return Dummy() def __exit__(self, type, value, traceback): pass def __getitem__(self, i): return Dummy() def __setitem__(self, i, val): pass
[docs]class CurrentDeviceProxy(object): ''' Method proxy for access to the currently active device ''' def __getattr__(self, name): if not hasattr(active_device, name): if name.startswith('_'): # Do not fake private/magic attributes raise AttributeError(('Active device does not have an ' 'attribute %s') % name) else: logger.warn(("Active device does not have an attribute '%s', " "ignoring this") % name) attr = Dummy() else: attr = getattr(active_device, name) return attr
#: Proxy object to access methods of the current device device = CurrentDeviceProxy() #: The currently active device (set with `set_device`) active_device = None
[docs]def get_device(): ''' Gets the actve `Device` object ''' global active_device return active_device
#: A stack of previously set devices as a tuple with their options (see #: `set_device`): (device, build_on_run, build_options) previous_devices = []
[docs]def set_device(device, build_on_run=True, **kwargs): ''' Set the device used for simulations. Parameters ---------- device : `Device` or str The `Device` object or the name of the device. build_on_run : bool, optional Whether a call to `run` (or ``) should directly trigger a ``. This is only relevant for standalone devices and means that a run call directly triggers the start of a simulation. If the simulation consists of multiple run calls, set ``build_on_run`` to ``False`` and call `` explicitly. Defaults to ``True``. kwargs : dict, optional Only relevant when ``build_on_run`` is ``True``: additional arguments that will be given to the `` call. ''' global previous_devices if active_device is not None: prev_build_on_run = getattr(active_device, 'build_on_run', True) prev_build_options = getattr(active_device, 'build_options', {}) previous_devices.append((active_device, prev_build_on_run, prev_build_options)) _do_set_device(device, build_on_run, **kwargs)
def _do_set_device(device, build_on_run=True, **kwargs): global active_device if isinstance(device, str): device = all_devices[device] if active_device is not None and active_device.defaultclock is not None: previous_dt = active_device.defaultclock.dt else: previous_dt = None active_device = device active_device.activate(build_on_run=build_on_run, **kwargs) if previous_dt is not None: # Copy over the dt information of the defaultclock active_device.defaultclock.dt = previous_dt
[docs]def reset_device(device=None): ''' Reset to a previously used device. Restores also the previously specified build options (see `set_device`) for the device. Mostly useful for internal Brian code and testing on various devices. Parameters ---------- device : `Device` or str, optional The device to go back to. If none is specified, go back to the device chosen with `set_device` before the current one. ''' global previous_devices if isinstance(device, str): device = all_devices[device] if len(previous_devices) == 0 and device is None: device = runtime_device build_on_run = True build_options = {} elif device is None: device, build_on_run, build_options = previous_devices.pop() else: build_on_run = device.build_on_run build_options = device.build_options _do_set_device(device, build_on_run, **build_options)
[docs]def reinit_devices(): ''' Reinitialize all devices, call `Device.activate` again on the current device and reset the preferences. Used as a "teardown" function in testing, if users want to reset their device (e.g. for multiple standalone runs in a single script), calling ``device.reinit()`` followed by ``device.activate()`` should normally be sufficient. Notes ----- This also resets the `defaultclock`, i.e. a non-standard ``dt`` has to be set again. ''' from brian2 import restore_initial_state # avoids circular import for device in all_devices.values(): device.reinit() if active_device is not None: # Reactivate the current device reset_device(active_device) restore_initial_state()
[docs]def reinit_and_delete(): ''' Calls `reinit_devices` and additionally deletes the files left behind by the standalone mode in the temporary directory. Silently suppresses errors that occur while deleting the directory. ''' reinit_devices() device.delete(directory=True, force=True)
[docs]def seed(seed=None): ''' Set the seed for the random number generator. Parameters ---------- seed : int, optional The seed value for the random number generator, or ``None`` (the default) to set a random seed. Notes ----- This function delegates the call to `Device.seed` of the current device. ''' if seed is not None and not isinstance(seed, numbers.Integral): raise TypeError('Seed has to be None or an integer, was ' '%s' % type(seed)) get_device().seed(seed)
runtime_device = RuntimeDevice() all_devices['runtime'] = runtime_device