Source code for meavis.engines.loop

"""Main loop functions for running MeaVis measurements."""
import collections
import itertools
import logging
import threading

import meavis._detail.debug
import meavis.config
import meavis.engines.completer
import meavis.engines.synchroniser
import meavis.engines.tasks
import meavis.instruments


[docs]class LoopMeasurement: """Define a measurement running a loop."""
[docs] def __init__(self, parameters, measurements): """Ceate a loop measurement.""" meavis._detail.debug.parameter_isinstance( "parameters", collections.abc.Sequence ) meavis._detail.debug.parameter_isinstance( "measurements", collections.abc.Sequence ) self.parameters = parameters self.measurements = measurements self._meavis_logger = logging.getLogger("meavis") self._meavis_handler = self self._meavis_lock = threading.Lock()
[docs] def trigger(self, handler): """Nothing to trigger.""" pass
[docs] def wait(self, handler): """Run the loop.""" for samples in itertools.product( *(parameter.data for parameter in self.parameters) ): parameter_tasks = [] parameter_locks = [threading.Lock()] for parameter, sample in zip(self.parameters, samples): if sample == parameter._meavis_current: continue parameter_locks.append(threading.Lock()) parameter_locks[-1].acquire() parameter_tasks.append( threading.Thread( target=meavis.engines.tasks.settle, args=( parameter, sample, parameter_locks[-2], parameter_locks[-1], True, ), ) ) parameter_tasks[-1].start() self._meavis_logger.debug("\tJoin parameter tasks.") for task in parameter_tasks: task.join() measurement_tasks = [] measurement_locks = [threading.Lock()] measurement_barriers = [] for measurement in self.measurements: measurement_locks.append(threading.Lock()) measurement_locks[-1].acquire() measurement_barriers.append(threading.Lock()) measurement_barriers[-1].acquire() measurement_tasks.append( threading.Thread( target=meavis.engines.tasks.trigger_wait, args=( measurement, measurement_locks[-2], measurement_locks[-1], measurement_barriers[-1], ), ) ) if measurement._meavis_invasive: for barrier in measurement_barriers[:-1]: barrier.release() for task in measurement_tasks[:-1]: task.join() measurement_barriers = measurement_barriers[-1:] measurement_tasks = measurement_tasks[-1:] measurement_tasks[-1].start() self._meavis_logger.debug("\tRelease measurement waits.") for barrier in measurement_barriers: barrier.release() self._meavis_logger.debug("\tJoin measurement tasks.") for task in measurement_tasks: task.join()
[docs]class LoopEngine: """Define how a loop has to be processed.""" items_map = {}
[docs] def __init__(self, data): """Store a data structure as loop pattern.""" meavis._detail.debug.parameter_isinstance( "data", collections.abc.Mapping ) self.data = data self.logger = logging.getLogger("meavis")
[docs] @classmethod def clear(cls): """Clear MeaVis item maps.""" cls.items_map = {}
[docs] @classmethod def inject_items(cls, *items): """Inject MeaVis items in the LoopEngine.""" for item in items: logging.getLogger("meavis").debug( "Add MeaVis item {} to LoopEngine.".format(item._meavis_name) ) cls.items_map[item._meavis_name] = item
[docs] def complete(self): """Complete current data structure.""" for parameters_completion in ( meavis.engines.completer.CompleterEngine(self.data) .complete() .values() ): if not parameters_completion: continue self.logger.info( "Complete parameters with [{}]".format( ", ".join(parameters_completion) ) ) self.data["parameters"][:0] = parameters_completion
[docs] def create(self, *items, completion=True): """Create a measurement from the pattern.""" self.inject_items(*items) if completion: self.complete() effective_parameters = [ self.items_map[parameter] for parameter in self.data["parameters"] ] effective_measurements = [] for measurement in self.data["measurements"]: effective_measurements.append( self.items_map[measurement] if isinstance(measurement, str) else LoopEngine(measurement).create(completion=False) ) nested_loop = LoopMeasurement( effective_parameters, effective_measurements ) for key, value in self.data.items(): if key in ["parameters", "measurements"]: continue meavis_key = "_meavis_{}".format(key.lower().strip()) setattr(nested_loop, meavis_key, value) self.logger.debug( "\tAdd attribute self.{} = {} to nested loop.".format( meavis_key, value ) ) meavis.instruments.inject_default(nested_loop, "measurements") if hasattr(nested_loop, "_meavis_name"): self.items_map[nested_loop._meavis_name] = nested_loop return nested_loop
[docs] def synchronisers(self, state_parameters, completion=True): """Synchronise parameters group from the pattern.""" meavis._detail.debug.parameter_isinstance( "state_parameters", collections.abc.Sequence ) for parameter in state_parameters: self.items_map[parameter._meavis_name] = parameter if completion: self.complete() candidate_parameters = [ parameter._meavis_name for parameter in state_parameters ] loop_parameters = [ self.items_map[parameter] for parameter in self.data["parameters"] if parameter in candidate_parameters ] effective_synchronisers = [] for measurement in self.data["measurements"]: if isinstance(measurement, str): continue effective_synchronisers.extend( LoopEngine(measurement).synchronisers( state_parameters, completion=False ) ) return ( [ meavis.engines.synchroniser.LoopSynchroniser( state_parameters, loop_parameters, effective_synchronisers ) ] if loop_parameters else effective_synchronisers )