Source code for brian2.devices.device

'''
Module containing the `Device` base class as well as the `RuntimeDevice`
implementation and some helper functions to access/set devices.
'''
from weakref import WeakKeyDictionary
import numbers

import numpy as np

from brian2.memory.dynamicarray import DynamicArray, DynamicArray1D
from brian2.codegen.targets import codegen_targets
from brian2.codegen.runtime.numpy_rt import NumpyCodeObject
from brian2.core.names import find_name
from brian2.core.preferences import prefs
from brian2.core.variables import ArrayVariable, DynamicArrayVariable
from brian2.core.functions import Function
from brian2.units import ms
from brian2.utils.logger import get_logger
from brian2.utils.stringtools import code_representation, indent

__all__ = ['Device', 'RuntimeDevice',
           'get_device', 'set_device',
           'all_devices', 'reinit_devices',
           'reset_device', 'device', 'seed'
           ]

logger = get_logger(__name__)

all_devices = {}


prefs.register_preferences('devices', 'Device preferences')


#: caches the automatically determined code generation target
_auto_target = None

[docs]def auto_target(): ''' Automatically chose a code generation target (invoked when the `codegen.target` preference is set to `'auto'`. Caches its result so it only does the check once. Prefers weave > cython > numpy. Returns ------- target : class derived from `CodeObject` The target to use ''' global _auto_target if _auto_target is None: target_dict = dict((target.class_name, target) for target in codegen_targets if target.class_name) using_fallback = False if 'weave' in target_dict and target_dict['weave'].is_available(): _auto_target = target_dict['weave'] elif 'cython' in target_dict and target_dict['cython'].is_available(): _auto_target = target_dict['cython'] else: _auto_target = target_dict['numpy'] using_fallback = True if using_fallback: logger.info('Cannot use compiled code, falling back to the numpy ' 'code generation target. Note that this will likely ' 'be slower than using compiled code. Set the code ' 'generation to numpy manually to avoid this message:\n' 'prefs.codegen.target = "numpy"', 'codegen_fallback', once=True) else: logger.debug(('Chosing %r as the code generation ' 'target.') % _auto_target.class_name) return _auto_target
[docs]def get_default_codeobject_class(pref='codegen.target'): ''' Returns the default `CodeObject` class from the preferences. ''' codeobj_class = prefs[pref] if isinstance(codeobj_class, str): if codeobj_class == 'auto': return auto_target() for target in codegen_targets: if target.class_name == codeobj_class: return target # No target found targets = ['auto'] + [target.class_name for target in codegen_targets if target.class_name] raise ValueError("Unknown code generation target: %s, should be " " one of %s" % (codeobj_class, targets)) return codeobj_class
[docs]class Device(object): ''' Base Device object. ''' def __init__(self): #: The network schedule that this device supports. If the device only #: supports a specific, fixed schedule, it has to set this attribute to #: the respective schedule (see `Network.schedule` for details). If it #: supports arbitrary schedules, it should be set to ``None`` (the #: default). self.network_schedule = None self.defaultclock = None self._maximum_run_time = None def _set_maximum_run_time(self, maximum_run_time): ''' Sets a maximum time for a run before it will break. Used primarily for testing purposes. Not guaranteed to be respected by a device. ''' self._maximum_run_time = maximum_run_time
[docs] def get_array_name(self, var, access_data=True): ''' Return a globally unique name for `var`. Parameters ---------- access_data : bool, optional For `DynamicArrayVariable` objects, specifying `True` here means the name for the underlying data is returned. If specifying `False`, the name of object itself is returned (e.g. to allow resizing). Returns ------- name : str The name for `var`. ''' raise NotImplementedError()
[docs] def get_len(self, array): ''' Return the length of the array. Parameters ---------- array : `ArrayVariable` The array for which the length is requested. Returns ------- l : int The length of the array. ''' raise NotImplementedError()
[docs] def add_array(self, var): ''' Add an array to this device. Parameters ---------- var : `ArrayVariable` The array to add. ''' raise NotImplementedError()
[docs] def init_with_zeros(self, var, dtype): ''' Initialize an array with zeros. Parameters ---------- var : `ArrayVariable` The array to initialize with zeros. dtype : `dtype` The data type to use for the array. ''' raise NotImplementedError()
[docs] def init_with_arange(self, var, start, dtype): ''' Initialize an array with an integer range. Parameters ---------- var : `ArrayVariable` The array to fill with the integer range. start : int The start value for the integer range dtype : `dtype` The data type to use for the array. ''' raise NotImplementedError()
[docs] def fill_with_array(self, var, arr): ''' Fill an array with the values given in another array. Parameters ---------- var : `ArrayVariable` The array to fill. arr : `ndarray` The array values that should be copied to `var`. ''' raise NotImplementedError()
[docs] def spike_queue(self, source_start, source_end): ''' Create and return a new `SpikeQueue` for this `Device`. Parameters ---------- source_start : int The start index of the source group (necessary for subgroups) source_end : int The end index of the source group (necessary for subgroups) ''' raise NotImplementedError()
[docs] def resize(self, var, new_size): ''' Resize a `DynamicArrayVariable`. Parameters ---------- var : `DynamicArrayVariable` The variable that should be resized. new_size : int The new size of the variable ''' raise NotImplementedError()
[docs] def resize_along_first(self, var, new_size): # Can be overwritten with a better implementation return self.resize(var, new_size)
[docs] def seed(self, seed=None): ''' Set the seed for the random number generator. Parameters ---------- seed : int, optional The seed value for the random number generator, or ``None`` (the default) to set a random seed. ''' raise NotImplementedError()
[docs] def code_object_class(self, codeobj_class=None): if codeobj_class is None: codeobj_class = get_default_codeobject_class() return codeobj_class
[docs] def code_object(self, owner, name, abstract_code, variables, template_name, variable_indices, codeobj_class=None, template_kwds=None, override_conditional_write=None): codeobj_class = self.code_object_class(codeobj_class) template = getattr(codeobj_class.templater, template_name) iterate_all = template.iterate_all generator = codeobj_class.generator_class(variables=variables, variable_indices=variable_indices, owner=owner, iterate_all=iterate_all, codeobj_class=codeobj_class, override_conditional_write=override_conditional_write, allows_scalar_write=template.allows_scalar_write, name=name, template_name=template_name) if template_kwds is None: template_kwds = dict() else: template_kwds = template_kwds.copy() # Check that all functions are available for varname, value in variables.iteritems(): if isinstance(value, Function): try: value.implementations[codeobj_class] except KeyError as ex: # if we are dealing with numpy, add the default implementation if codeobj_class is NumpyCodeObject: value.implementations.add_numpy_implementation(value.pyfunc) else: raise NotImplementedError(('Cannot use function ' '%s: %s') % (varname, ex)) logger.diagnostic('%s abstract code:\n%s' % (name, indent(code_representation(abstract_code)))) scalar_code, vector_code, kwds = generator.translate(abstract_code, dtype=prefs['core.default_float_dtype']) # Add the array names as keywords as well for varname, var in variables.iteritems(): if isinstance(var, ArrayVariable): pointer_name = generator.get_array_name(var) if var.scalar: pointer_name += '[0]' template_kwds[varname] = pointer_name if hasattr(var, 'resize'): dyn_array_name = generator.get_array_name(var, access_data=False) template_kwds['_dynamic_'+varname] = dyn_array_name template_kwds.update(kwds) logger.diagnostic('%s snippet (scalar):\n%s' % (name, indent(code_representation(scalar_code)))) logger.diagnostic('%s snippet (vector):\n%s' % (name, indent(code_representation(vector_code)))) name = find_name(name) code = template(scalar_code, vector_code, owner=owner, variables=variables, codeobj_name=name, variable_indices=variable_indices, get_array_name=generator.get_array_name, **template_kwds) logger.diagnostic('%s code:\n%s' % (name, indent(code_representation(code)))) codeobj = codeobj_class(owner, code, variables, variable_indices, template_name=template_name, template_source=template.template_source, name=name) codeobj.compile() return codeobj
[docs] def activate(self, build_on_run=True, **kwargs): ''' Called when this device is set as the current device. ''' from brian2.core.clocks import Clock # avoid import issues if self.defaultclock is None: self.defaultclock = Clock(dt=0.1*ms, name='defaultclock') self._set_maximum_run_time(None) self.build_on_run = build_on_run self.build_options = dict(kwargs)
[docs] def insert_device_code(self, slot, code): # Deprecated raise AttributeError("The method 'insert_device_code' has been renamed " "to 'insert_code'.")
[docs] def insert_code(self, slot, code): ''' Insert code directly into a given slot in the device. By default does nothing. ''' logger.warn("Ignoring device code, unknown slot: %s, code: %s" % (slot, code))
[docs] def build(self, **kwds): ''' For standalone projects, called when the project is ready to be built. Does nothing for runtime mode. ''' pass
[docs] def reinit(self): ''' Reinitialize the device. For standalone devices, clears all the internal state of the device. ''' pass
[docs]class RuntimeDevice(Device): ''' The default device used in Brian, state variables are stored as numpy arrays in memory. ''' def __init__(self): super(RuntimeDevice, self).__init__() #: Mapping from `Variable` objects to numpy arrays (or `DynamicArray` #: objects). Arrays in this dictionary will disappear as soon as the #: last reference to the `Variable` object used as a key is gone self.arrays = WeakKeyDictionary() # Note that the buffers only store a pointer to the actual random # numbers -- the buffer will be filled in weave/Cython code self.randn_buffer = np.zeros(1, dtype=np.intp) self.rand_buffer = np.zeros(1, dtype=np.intp) self.randn_buffer_index = np.zeros(1, dtype=np.int32) self.rand_buffer_index = np.zeros(1, dtype=np.int32)
[docs] def get_array_name(self, var, access_data=True): # if no owner is set, this is a temporary object (e.g. the array # of indices when doing G.x[indices] = ...). The name is not # necessarily unique over several CodeObjects in this case. owner_name = getattr(var.owner, 'name', 'temporary') if isinstance(var, DynamicArrayVariable): if access_data: return '_array_' + owner_name + '_' + var.name else: return '_dynamic_array_' + owner_name + '_' + var.name elif isinstance(var, ArrayVariable): return '_array_' + owner_name + '_' + var.name else: raise TypeError(('Do not have a name for variable of type ' '%s') % type(var))
[docs] def add_array(self, var): # This creates the actual numpy arrays (or DynamicArrayVariable objects) if isinstance(var, DynamicArrayVariable): if var.dimensions == 1: arr = DynamicArray1D(var.size, dtype=var.dtype) else: arr = DynamicArray(var.size, dtype=var.dtype) else: arr = np.empty(var.size, dtype=var.dtype) self.arrays[var] = arr
[docs] def get_value(self, var, access_data=True): if isinstance(var, DynamicArrayVariable) and access_data: return self.arrays[var].data else: return self.arrays[var]
[docs] def set_value(self, var, value): self.arrays[var][:] = value
[docs] def resize(self, var, new_size): self.arrays[var].resize(new_size)
[docs] def resize_along_first(self, var, new_size): self.arrays[var].resize_along_first(new_size)
[docs] def init_with_zeros(self, var, dtype): self.arrays[var][:] = 0
[docs] def init_with_arange(self, var, start, dtype): self.arrays[var][:] = np.arange(start, stop=var.get_len()+start, dtype=dtype)
[docs] def fill_with_array(self, var, arr): self.arrays[var][:] = arr
[docs] def spike_queue(self, source_start, source_end): # Use the C++ version of the SpikeQueue when available try: from brian2.synapses.cythonspikequeue import SpikeQueue logger.diagnostic('Using the C++ SpikeQueue', once=True) except ImportError: from brian2.synapses.spikequeue import SpikeQueue logger.diagnostic('Using the Python SpikeQueue', once=True) return SpikeQueue(source_start=source_start, source_end=source_end)
[docs] def seed(self, seed=None): ''' Set the seed for the random number generator. Parameters ---------- seed : int, optional The seed value for the random number generator, or ``None`` (the default) to set a random seed. ''' np.random.seed(seed) self.rand_buffer_index[:] = 0 self.randn_buffer_index[:] = 0
[docs]class Dummy(object): ''' Dummy object ''' def __getattr__(self, name): return Dummy()
[docs] def __call__(self, *args, **kwds): return Dummy()
def __enter__(self): return Dummy() def __exit__(self, type, value, traceback): pass def __getitem__(self, i): return Dummy() def __setitem__(self, i, val): pass
[docs]class CurrentDeviceProxy(object): ''' Method proxy for access to the currently active device ''' def __getattr__(self, name): if not hasattr(active_device, name): if name.startswith('_'): # Do not fake private/magic attributes raise AttributeError(('Active device does not have an ' 'attribute %s') % name) else: logger.warn(("Active device does not have an attribute '%s', " "ignoring this") % name) attr = Dummy() else: attr = getattr(active_device, name) return attr
#: Proxy object to access methods of the current device device = CurrentDeviceProxy() #: The currently active device (set with `set_device`) active_device = None
[docs]def get_device(): ''' Gets the actve `Device` object ''' global active_device return active_device
#: A stack of previously set devices as a tuple with their options (see #: `set_device`): (device, build_on_run, build_options) previous_devices = []
[docs]def set_device(device, build_on_run=True, **kwargs): ''' Set the device used for simulations. Parameters ---------- device : `Device` or str The `Device` object or the name of the device. build_on_run : bool, optional Whether a call to `run` (or `Network.run`) should directly trigger a `Device.build`. This is only relevant for standalone devices and means that a run call directly triggers the start of a simulation. If the simulation consists of multiple run calls, set ``build_on_run`` to ``False`` and call `Device.build` explicitly. Defaults to ``True``. kwargs : dict, optional Only relevant when ``build_on_run`` is ``True``: additional arguments that will be given to the `Device.build` call. ''' global previous_devices if active_device is not None: prev_build_on_run = getattr(active_device, 'build_on_run', True) prev_build_options = getattr(active_device, 'build_options', {}) previous_devices.append((active_device, prev_build_on_run, prev_build_options)) _do_set_device(device, build_on_run, **kwargs)
def _do_set_device(device, build_on_run=True, **kwargs): global active_device if isinstance(device, str): device = all_devices[device] if active_device is not None and active_device.defaultclock is not None: previous_dt = active_device.defaultclock.dt else: previous_dt = None active_device = device active_device.activate(build_on_run=build_on_run, **kwargs) if previous_dt is not None: # Copy over the dt information of the defaultclock active_device.defaultclock.dt = previous_dt
[docs]def reset_device(device=None): ''' Reset to a previously used device. Restores also the previously specified build options (see `set_device`) for the device. Mostly useful for internal Brian code and testing on various devices. Parameters ---------- device : `Device` or str, optional The device to go back to. If none is specified, go back to the device chosen with `set_device` before the current one. ''' global previous_devices if isinstance(device, str): device = all_devices[device] if len(previous_devices) == 0 and device is None: device = runtime_device build_on_run = True build_options = {} elif device is None: device, build_on_run, build_options = previous_devices.pop() else: build_on_run = device.build_on_run build_options = device.build_options _do_set_device(device, build_on_run, **build_options)
[docs]def reinit_devices(): ''' Reinitialize all devices, call `Device.activate` again on the current device and reset the preferences. Used as a "teardown" function in testing, if users want to reset their device (e.g. for multiple standalone runs in a single script), calling ``device.reinit()`` followed by ``device.activate()`` should normally be sufficient. Notes ----- This also resets the `defaultclock`, i.e. a non-standard ``dt`` has to be set again. ''' from brian2 import restore_initial_state # avoids circular import for device in all_devices.itervalues(): device.reinit() if active_device is not None: # Reactivate the current device reset_device(active_device) restore_initial_state()
[docs]def seed(seed=None): ''' Set the seed for the random number generator. Parameters ---------- seed : int, optional The seed value for the random number generator, or ``None`` (the default) to set a random seed. Notes ----- This function delegates the call to `Device.seed` of the current device. ''' if seed is not None and not isinstance(seed, numbers.Integral): raise TypeError('Seed has to be None or an integer, was ' '%s' % type(seed)) get_device().seed(seed)
runtime_device = RuntimeDevice() all_devices['runtime'] = runtime_device