"""
Module defining `SpikeGeneratorGroup`.
"""
import numpy as np
from brian2.core.functions import timestep
from brian2.core.spikesource import SpikeSource
from brian2.core.variables import Variables
from brian2.groups.group import CodeRunner, Group
from brian2.units.allunits import second
from brian2.units.fundamentalunits import Quantity, check_units
from brian2.utils.logger import get_logger
__all__ = ["SpikeGeneratorGroup"]
logger = get_logger(__name__)
[docs]class SpikeGeneratorGroup(Group, CodeRunner, SpikeSource):
"""
SpikeGeneratorGroup(N, indices, times, dt=None, clock=None,
period=0*second, when='thresholds', order=0,
sorted=False, name='spikegeneratorgroup*',
codeobj_class=None)
A group emitting spikes at given times.
Parameters
----------
N : int
The number of "neurons" in this group
indices : array of integers
The indices of the spiking cells
times : `Quantity`
The spike times for the cells given in ``indices``. Has to have the
same length as ``indices``.
period : `Quantity`, optional
If this is specified, it will repeat spikes with this period. A
period of 0s means not repeating spikes.
dt : `Quantity`, optional
The time step to be used for the simulation. Cannot be combined with
the `clock` argument.
clock : `Clock`, optional
The update clock to be used. If neither a clock, nor the `dt` argument
is specified, the `defaultclock` will be used.
when : str, optional
When to run within a time step, defaults to the ``'thresholds'`` slot.
See :ref:`scheduling` for possible values.
order : int, optional
The priority of of this group for operations occurring at the same time
step and in the same scheduling slot. Defaults to 0.
sorted : bool, optional
Whether the given indices and times are already sorted. Set to ``True``
if your events are already sorted (first by spike time, then by index),
this can save significant time at construction if your arrays contain
large numbers of spikes. Defaults to ``False``.
Notes
-----
* If `sorted` is set to ``True``, the given arrays will not be copied
(only affects runtime mode)..
"""
@check_units(N=1, indices=1, times=second, period=second)
def __init__(
self,
N,
indices,
times,
dt=None,
clock=None,
period=0 * second,
when="thresholds",
order=0,
sorted=False,
name="spikegeneratorgroup*",
codeobj_class=None,
):
Group.__init__(self, dt=dt, clock=clock, when=when, order=order, name=name)
# We store the indices and times also directly in the Python object,
# this way we can use them for checks in `before_run` even in standalone
# TODO: Remove this when the checks in `before_run` have been moved to the template
#: Array of spiking neuron indices.
self._neuron_index = None
#: Array of spiking neuron times.
self._spike_time = None
#: "Dirty flag" that will be set when spikes are changed after the
#: `before_run` check
self._spikes_changed = True
# Let other objects know that we emit spikes events
self.events = {"spike": None}
self.codeobj_class = codeobj_class
if N < 1 or int(N) != N:
raise TypeError("N has to be an integer >=1.")
N = int(N)
self.start = 0
self.stop = N
self.variables = Variables(self)
self.variables.create_clock_variables(self._clock)
indices, times = self._check_args(
indices, times, period, N, sorted, self._clock.dt
)
self.variables.add_constant("N", value=N)
self.variables.add_array(
"period",
dimensions=second.dim,
size=1,
constant=True,
read_only=True,
scalar=True,
dtype=self._clock.variables["t"].dtype,
)
self.variables.add_arange("i", N)
self.variables.add_dynamic_array(
"spike_number",
values=np.arange(len(indices)),
size=len(indices),
dtype=np.int32,
read_only=True,
constant=True,
index="spike_number",
unique=True,
)
self.variables.add_dynamic_array(
"neuron_index",
values=indices,
size=len(indices),
dtype=np.int32,
index="spike_number",
read_only=True,
constant=True,
)
self.variables.add_dynamic_array(
"spike_time",
values=times,
size=len(times),
dimensions=second.dim,
index="spike_number",
read_only=True,
constant=True,
dtype=self._clock.variables["t"].dtype,
)
self.variables.add_dynamic_array(
"_timebins",
size=len(times),
index="spike_number",
read_only=True,
constant=True,
dtype=np.int32,
)
self.variables.add_array(
"_period_bins",
size=1,
constant=True,
read_only=True,
scalar=True,
dtype=np.int32,
)
self.variables.add_array("_spikespace", size=N + 1, dtype=np.int32)
self.variables.add_array(
"_lastindex", size=1, values=0, dtype=np.int32, read_only=True, scalar=True
)
#: Remember the dt we used the last time when we checked the spike bins
#: to not repeat the work for multiple runs with the same dt
self._previous_dt = None
CodeRunner.__init__(
self,
self,
code="",
template="spikegenerator",
clock=self._clock,
when=when,
order=order,
name=None,
)
# Activate name attribute access
self._enable_group_attributes()
self.variables["period"].set_value(period)
def _full_state(self):
state = super()._full_state()
# Store the internal information we use to decide whether to rebuild
# the time bins
state["_previous_dt"] = self._previous_dt
state["_spikes_changed"] = self._spikes_changed
return state
def _restore_from_full_state(self, state):
state = state.copy() # copy to avoid errors for multiple restores
self._previous_dt = state.pop("_previous_dt")
self._spikes_changed = state.pop("_spikes_changed")
super()._restore_from_full_state(state)
[docs] def before_run(self, run_namespace):
# Do some checks on the period vs. dt
dt = self.dt_[:] # make a copy
period = self.period_
if period < np.inf and period != 0:
if period < dt:
raise ValueError(
f"The period of '{self.name}' is {self.period[:]!s}, "
f"which is smaller than its dt of {dt*second!s}."
)
if self._spikes_changed:
current_t = self.variables["t"].get_value().item()
timesteps = timestep(self._spike_time, dt)
current_step = timestep(current_t, dt)
in_the_past = np.nonzero(timesteps < current_step)[0]
if len(in_the_past):
logger.warn(
"The SpikeGeneratorGroup contains spike times "
"earlier than the start time of the current run "
f"(t = {current_t*second!s}), these spikes will be "
"ignored.",
name_suffix="ignored_spikes",
)
self.variables["_lastindex"].set_value(in_the_past[-1] + 1)
else:
self.variables["_lastindex"].set_value(0)
# Check that we don't have more than one spike per neuron in a time bin
if self._previous_dt is None or dt != self._previous_dt or self._spikes_changed:
# We shift all the spikes by a tiny amount to make sure that spikes
# at exact multiples of dt do not end up in the previous time bin
# This shift has to be quite significant relative to machine
# epsilon, we use 1e-3 of the dt here
shift = 1e-3 * dt
timebins = np.asarray(
np.asarray(self._spike_time + shift) / dt, dtype=np.int32
)
# time is already in sorted order, so it's enough to check if the condition
# that timebins[i]==timebins[i+1] and self._neuron_index[i]==self._neuron_index[i+1]
# is ever both true
if (
np.logical_and(np.diff(timebins) == 0, np.diff(self._neuron_index) == 0)
).any():
raise ValueError(
f"Using a dt of {self.dt!s}, some neurons of "
f"SpikeGeneratorGroup '{self.name}' spike more than "
"once during a time step."
)
self.variables["_timebins"].set_value(timebins)
period_bins = np.round(period / dt)
max_int = np.iinfo(np.int32).max
if period_bins > max_int:
logger.warn(
f"Periods longer than {max_int} timesteps "
f"(={max_int*dt*second!s}) are not "
"supported, the period will therefore be "
"considered infinite. Set the period to 0*second "
"to avoid this "
"warning.",
"spikegenerator_long_period",
)
period = period_bins = 0
if np.abs(period_bins * dt - period) > period * np.finfo(dt.dtype).eps:
raise NotImplementedError(
f"The period of '{self.name}' is "
f"{self.period[:]!s}, which is "
"not an integer multiple of its dt "
f"of {dt*second!s}."
)
self.variables["_period_bins"].set_value(period_bins)
self._previous_dt = dt
self._spikes_changed = False
super().before_run(run_namespace=run_namespace)
@check_units(indices=1, times=second, period=second)
def set_spikes(self, indices, times, period=0 * second, sorted=False):
"""
set_spikes(indices, times, period=0*second, sorted=False)
Change the spikes that this group will generate.
This can be used to set the input for a second run of a model based on
the output of a first run (if the input for the second run is already
known before the first run, then all the information should simply be
included in the initial `SpikeGeneratorGroup` initializer call,
instead).
Parameters
----------
indices : array of integers
The indices of the spiking cells
times : `Quantity`
The spike times for the cells given in ``indices``. Has to have the
same length as ``indices``.
period : `Quantity`, optional
If this is specified, it will repeat spikes with this period. A
period of 0s means not repeating spikes.
sorted : bool, optional
Whether the given indices and times are already sorted. Set to
``True`` if your events are already sorted (first by spike time,
then by index), this can save significant time at construction if
your arrays contain large numbers of spikes. Defaults to ``False``.
"""
indices, times = self._check_args(
indices, times, period, self.N, sorted, self.dt
)
self.variables["period"].set_value(period)
self.variables["neuron_index"].resize(len(indices))
self.variables["spike_time"].resize(len(indices))
self.variables["spike_number"].resize(len(indices))
self.variables["spike_number"].set_value(np.arange(len(indices)))
self.variables["_timebins"].resize(len(indices))
self.variables["neuron_index"].set_value(indices)
self.variables["spike_time"].set_value(times)
# _lastindex and _timebins will be set as part of before_run
def _check_args(self, indices, times, period, N, sorted, dt):
times = Quantity(times)
if len(indices) != len(times):
raise ValueError(
"Length of the indices and times array must "
f"match, but {len(indices)} != {len(times)}"
)
if period < 0 * second:
raise ValueError("The period cannot be negative.")
elif len(times) and period != 0 * second:
period_bins = np.round(period / dt)
# Note: we have to use the timestep function here, to use the same
# binning as in the actual simulation
max_bin = timestep(np.max(times), dt)
if max_bin >= period_bins:
raise ValueError(
"The period has to be greater than the maximum of the spike times"
)
if len(times) and np.min(times) < 0 * second:
raise ValueError("Spike times cannot be negative")
if len(indices) and (np.min(indices) < 0 or np.max(indices) >= N):
raise ValueError(f"Indices have to lie in the interval [0, {int(N)}[")
times = np.asarray(times)
indices = np.asarray(indices)
if not sorted:
# sort times and indices first by time, then by indices
sort_indices = np.lexsort((indices, times))
indices = indices[sort_indices]
times = times[sort_indices]
# We store the indices and times also directly in the Python object,
# this way we can use them for checks in `before_run` even in standalone
# TODO: Remove this when the checks in `before_run` have been moved to the template
self._neuron_index = indices
self._spike_time = times
self._spikes_changed = True
return indices, times
@property
def spikes(self):
"""
The spikes returned by the most recent thresholding operation.
"""
# Note that we have to directly access the ArrayVariable object here
# instead of using the Group mechanism by accessing self._spikespace
# Using the latter would cut _spikespace to the length of the group
spikespace = self.variables["_spikespace"].get_value()
return spikespace[: spikespace[-1]]
def __repr__(self):
cls = self.__class__.__name__
size = self.variables["neuron_index"].size
return (
f"{cls}({self.N}, indices=<length {size} array>, times=<length"
f" {size} array>)"
)