"""Main loop functions for running MeaVis measurements."""
import collections
import itertools
import logging
import threading
import meavis._detail.debug
import meavis.config
import meavis.engines.completer
import meavis.engines.synchroniser
import meavis.engines.tasks
import meavis.instruments
[docs]class LoopMeasurement:
"""Define a measurement running a loop."""
[docs] def __init__(self, parameters, measurements):
"""Ceate a loop measurement."""
meavis._detail.debug.parameter_isinstance(
"parameters", collections.abc.Sequence
)
meavis._detail.debug.parameter_isinstance(
"measurements", collections.abc.Sequence
)
self.parameters = parameters
self.measurements = measurements
self._meavis_logger = logging.getLogger("meavis")
self._meavis_handler = self
self._meavis_lock = threading.Lock()
[docs] def trigger(self, handler):
"""Nothing to trigger."""
pass
[docs] def wait(self, handler):
"""Run the loop."""
for samples in itertools.product(
*(parameter.data for parameter in self.parameters)
):
parameter_tasks = []
parameter_locks = [threading.Lock()]
for parameter, sample in zip(self.parameters, samples):
if sample == parameter._meavis_current:
continue
parameter_locks.append(threading.Lock())
parameter_locks[-1].acquire()
parameter_tasks.append(
threading.Thread(
target=meavis.engines.tasks.settle,
args=(
parameter,
sample,
parameter_locks[-2],
parameter_locks[-1],
True,
),
)
)
parameter_tasks[-1].start()
self._meavis_logger.debug("\tJoin parameter tasks.")
for task in parameter_tasks:
task.join()
measurement_tasks = []
measurement_locks = [threading.Lock()]
measurement_barriers = []
for measurement in self.measurements:
measurement_locks.append(threading.Lock())
measurement_locks[-1].acquire()
measurement_barriers.append(threading.Lock())
measurement_barriers[-1].acquire()
measurement_tasks.append(
threading.Thread(
target=meavis.engines.tasks.trigger_wait,
args=(
measurement,
measurement_locks[-2],
measurement_locks[-1],
measurement_barriers[-1],
),
)
)
if measurement._meavis_invasive:
for barrier in measurement_barriers[:-1]:
barrier.release()
for task in measurement_tasks[:-1]:
task.join()
measurement_barriers = measurement_barriers[-1:]
measurement_tasks = measurement_tasks[-1:]
measurement_tasks[-1].start()
self._meavis_logger.debug("\tRelease measurement waits.")
for barrier in measurement_barriers:
barrier.release()
self._meavis_logger.debug("\tJoin measurement tasks.")
for task in measurement_tasks:
task.join()
[docs]class LoopEngine:
"""Define how a loop has to be processed."""
items_map = {}
[docs] def __init__(self, data):
"""Store a data structure as loop pattern."""
meavis._detail.debug.parameter_isinstance(
"data", collections.abc.Mapping
)
self.data = data
self.logger = logging.getLogger("meavis")
[docs] @classmethod
def clear(cls):
"""Clear MeaVis item maps."""
cls.items_map = {}
[docs] @classmethod
def inject_items(cls, *items):
"""Inject MeaVis items in the LoopEngine."""
for item in items:
logging.getLogger("meavis").debug(
"Add MeaVis item {} to LoopEngine.".format(item._meavis_name)
)
cls.items_map[item._meavis_name] = item
[docs] def complete(self):
"""Complete current data structure."""
for parameters_completion in (
meavis.engines.completer.CompleterEngine(self.data)
.complete()
.values()
):
if not parameters_completion:
continue
self.logger.info(
"Complete parameters with [{}]".format(
", ".join(parameters_completion)
)
)
self.data["parameters"][:0] = parameters_completion
[docs] def create(self, *items, completion=True):
"""Create a measurement from the pattern."""
self.inject_items(*items)
if completion:
self.complete()
effective_parameters = [
self.items_map[parameter] for parameter in self.data["parameters"]
]
effective_measurements = []
for measurement in self.data["measurements"]:
effective_measurements.append(
self.items_map[measurement]
if isinstance(measurement, str)
else LoopEngine(measurement).create(completion=False)
)
nested_loop = LoopMeasurement(
effective_parameters, effective_measurements
)
for key, value in self.data.items():
if key in ["parameters", "measurements"]:
continue
meavis_key = "_meavis_{}".format(key.lower().strip())
setattr(nested_loop, meavis_key, value)
self.logger.debug(
"\tAdd attribute self.{} = {} to nested loop.".format(
meavis_key, value
)
)
meavis.instruments.inject_default(nested_loop, "measurements")
if hasattr(nested_loop, "_meavis_name"):
self.items_map[nested_loop._meavis_name] = nested_loop
return nested_loop
[docs] def synchronisers(self, state_parameters, completion=True):
"""Synchronise parameters group from the pattern."""
meavis._detail.debug.parameter_isinstance(
"state_parameters", collections.abc.Sequence
)
for parameter in state_parameters:
self.items_map[parameter._meavis_name] = parameter
if completion:
self.complete()
candidate_parameters = [
parameter._meavis_name for parameter in state_parameters
]
loop_parameters = [
self.items_map[parameter]
for parameter in self.data["parameters"]
if parameter in candidate_parameters
]
effective_synchronisers = []
for measurement in self.data["measurements"]:
if isinstance(measurement, str):
continue
effective_synchronisers.extend(
LoopEngine(measurement).synchronisers(
state_parameters, completion=False
)
)
return (
[
meavis.engines.synchroniser.LoopSynchroniser(
state_parameters, loop_parameters, effective_synchronisers
)
]
if loop_parameters
else effective_synchronisers
)