"""MeaVis session."""
import datetime
import itertools
import os
import meavis._detail.debug
import meavis._detail.sqlite
import meavis.config
import meavis.filesystem.loaders
import meavis.filesystem.utils
import meavis.instruments
import meavis.utility_drivers.meavis_switch
[docs]class Manager(meavis._detail.sqlite.SQLiteAccess):
"""Provides an access to the MeaVis filesystem."""
[docs] def __init__(self, access=None):
"""Connect to the internal database and loads the last state."""
super().__init__(access)
self._meavis_logger.info("Connect session manager.")
self.root_id = None
self.session_id = None
self.sample_id = None
self.dataset_id = None
cursor = self.execute(
"SELECT root_id,session_id,sample_id,dataset_id "
"FROM datasets ORDER BY dataset_id DESC"
)
result = cursor.fetchone()
if result:
self.root_id = result[0]
self.session_id = result[1]
self.sample_id = result[2]
self.dataset_id = result[3]
else:
cursor = self.execute(
"SELECT root_id FROM roots ORDER BY root_id DESC"
)
result = cursor.fetchone()
if result:
self.root_id = result[0]
cursor = self.execute(
"SELECT session_id FROM sessions ORDER BY session_id DESC"
)
result = cursor.fetchone()
if result:
self.session_id = result[0]
cursor = self.execute(
"SELECT sample_id FROM samples ORDER BY sample_id DESC"
)
result = cursor.fetchone()
if result:
self.sample_id = result[0]
try:
self.reload_session()
self.reload_sample()
except Exception as err:
self._meavis_logger.error("{}: {}".format(type(err).__name__, err))
self.sample_id = None
self.session_id = None
[docs] def new_setup(self):
"""Get a new setup to be filled and sealed."""
cursor = self.execute(
"SELECT loader,kind FROM loaders ORDER BY priority ASC"
)
return meavis.filesystem.utils.Setup(cursor.fetchall(), self)
@property
def roots(self):
"""Get all roots."""
cursor = self.execute("SELECT root FROM roots")
return set(root_name for (root_name,) in cursor.fetchall())
@property
def root(self):
"""Get current root."""
if not self.root_id:
self._meavis_logger.debug("Root ID undefined.")
return None
cursor = self.execute(
"SELECT root FROM roots WHERE root_id=?", (self.root_id,)
)
result = cursor.fetchone()
if not result:
meavis.config.log_and_raise(
RuntimeError, "Root ID {} not found.".format(self.root_id)
)
return result[0]
@property
def root_path(self):
"""Get current root path."""
if not self.root_id:
self._meavis_logger.debug("Root ID undefined.")
return None
cursor = self.execute(
"SELECT root_path FROM roots WHERE root_id=?", (self.root_id,)
)
result = cursor.fetchone()
if not result:
meavis.config.log_and_raise(
RuntimeError, "Root ID {} not found.".format(self.root_id)
)
return result[0]
@root.setter
def root(self, value):
"""Set the root."""
cursor = self.execute(
"SELECT root_id FROM roots WHERE root=?", (value,)
)
result = cursor.fetchone()
if not result:
meavis.config.log_and_raise(
RuntimeError, "No root named {{ {} }}.".format(value)
)
self.root_id = result[0]
[docs] def register_root(self, value, folder):
"""Register a new root."""
if not os.path.exists(folder):
os.makedirs(folder)
self.execute(
"INSERT INTO roots (root,root_path) VALUES (?, ?)",
(value, folder),
)
@property
def sessions(self):
"""Get all sessions regarding the current root and sample."""
if not self.root_id:
self._meavis_logger.debug("Root ID undefined.")
return None
if self.sample_id:
cursor = self.execute(
"SELECT sessions.session FROM ((sessions INNER JOIN "
"datasets ON sessions.session_id=datasets.session_id) "
"INNER JOIN samples ON datasets.sample_id=samples.sample_id) "
"WHERE datasets.root_id=? AND samples.sample=?",
(self.root_id, self.sample),
)
else:
cursor = self.execute(
"SELECT sessions.session FROM (sessions INNER JOIN "
"datasets ON sessions.session_id=datasets.session_id) "
"WHERE datasets.root_id=?",
(self.root_id,),
)
return set(session_name for (session_name,) in cursor)
@property
def session(self):
"""Get the current session."""
if not self.session_id:
self._meavis_logger.debug("Session ID undefined.")
return ""
cursor = self.execute(
"SELECT session FROM sessions WHERE session_id=?",
(self.session_id,),
)
result = cursor.fetchone()
if not result:
meavis.config.log_and_raise(
RuntimeError,
"Session ID {} not found.".format(self.session_id),
)
return result[0]
@session.setter
def session(self, value):
"""Set the current session."""
if not self.root_id:
meavis.config.log_and_raise(RuntimeError, "Root undefined.")
result = None
cursor = self.execute(
"SELECT session_id FROM sessions "
"WHERE session=? ORDER BY session_id DESC",
(value,),
)
result = cursor.fetchone()
if not result:
meavis.config.log_and_raise(
RuntimeError, "Session {{ {} }} doesn't exist.".format(value),
)
self._meavis_logger.debug(
"Get session {{ {} }} ID {}.".format(value, result[0])
)
self.session_id = result[0]
self.reload_session()
@property
def last_session(self):
"""Get the newest session."""
if not self.root_id:
self._meavis_logger.debug("Root ID undefined.")
return ""
cursor = self.execute(
"SELECT sessions.session FROM (sessions INNER JOIN "
"datasets ON sessions.session_id=datasets.session_id) "
"WHERE datasets.root_id=? ORDER BY sessions.session_id DESC",
(self.root_id,),
)
result = cursor.fetchone()
return result[0] if result else ""
[docs] def new_session(self, instruments=None, instances=None):
"""Add a new session with the giver setups."""
if not self.root_id:
meavis.config.log_and_raise(RuntimeError, "Root undefined.")
if self.last_session and not self.last_session.startswith(
self.session
):
meavis.config.log_and_raise(
RuntimeError,
"Session {{ {} }} too old compared to {{ {} }}.".format(
self.session, self.last_session
),
)
(
setup_session_instruments,
setup_session_instances,
_setup_sample_instruments,
_setup_sample_instances,
) = self.all_setups
if instruments is None:
instruments = setup_session_instruments
if instances is None:
instances = setup_session_instances
session_name = ".".join(
itertools.chain(
(s for s in self.session.split(".")[:-1] if s),
["session_{:%Y-%m-%d_%Hh%M}".format(datetime.datetime.now())],
)
)
cursor = self.execute(
"INSERT INTO sessions (session,instruments_id,instances_id) "
"VALUES (?,?,?)",
(
session_name,
instruments.seal("instruments"),
instances.seal("instances"),
),
)
self.session_id = cursor.lastrowid
self._meavis_logger.info(
"Create session {{ {} }} with ID {}.".format(
session_name, self.session_id
)
)
self.reload_session()
[docs] def up(self):
"""Select the parent session."""
self.session = ".".join(self.session.split(".")[:-1])
[docs] def add_subsession(self, instruments=None, instances=None):
"""Add a subsesssion."""
if not self.root_id:
meavis.config.log_and_raise(RuntimeError, "Root undefined.")
if not self.last_session.startswith(self.session):
meavis.config.log_and_raise(
RuntimeError,
"Session {{ {} }} too old compared to {{ {} }}.".format(
self.session, self.last_session
),
)
(
setup_session_instruments,
setup_session_instances,
_setup_sample_instruments,
_setup_sample_instances,
) = self.all_setups
if instruments is None:
instruments = setup_session_instruments
if instances is None:
instances = setup_session_instances
session_name = "{}.session_{:%Y-%m-%d_%Hh%M}".format(
self.session, datetime.datetime.now()
)
cursor = self.execute(
"INSERT INTO sessions (session,instruments_id,instances_id) "
"VALUES (?,?,?)",
(
session_name,
instruments.seal("instruments"),
instances.seal("instances"),
),
)
self.session_id = cursor.lastrowid
self._meavis_logger.info(
"Create (sub)-session {{ {} }} with ID {}.".format(
session_name, self.session_id
)
)
self.reload_session()
@property
def all_setups(self):
"""Get the current setups.
Instruments and instances setups for session and sample.
"""
setup_session_instruments = self.new_setup()
if self.session_id is not None:
cursor = self.execute(
"SELECT instruments.file_obj,instruments.kind "
"FROM (sessions INNER JOIN instruments ON "
"sessions.instruments_id=instruments.group_id) "
"WHERE sessions.session_id=?",
(self.session_id,),
)
for (file_obj, kind) in cursor:
setup_session_instruments.add(file_obj, kind)
setup_session_instances = self.new_setup()
if self.session_id is not None:
cursor = self.execute(
"SELECT instances.file_obj,instances.kind "
"FROM (sessions INNER JOIN instances ON "
"sessions.instances_id=instances.group_id) "
"WHERE sessions.session_id=?",
(self.session_id,),
)
for (file_obj, kind) in cursor:
setup_session_instances.add(file_obj, kind)
setup_sample_instruments = self.new_setup()
if self.sample_id is not None:
cursor = self.execute(
"SELECT instruments.file_obj,instruments.kind "
"FROM (samples INNER JOIN instruments ON "
"samples.instruments_id=instruments.group_id) "
"WHERE samples.sample_id=?",
(self.sample_id,),
)
for (file_obj, kind) in cursor:
setup_sample_instruments.add(file_obj, kind)
setup_sample_instances = self.new_setup()
if self.sample_id is not None:
cursor = self.execute(
"SELECT instances.file_obj,instances.kind "
"FROM (samples INNER JOIN instances ON "
"samples.instances_id=instances.group_id) "
"WHERE samples.sample_id=?",
(self.sample_id,),
)
for (file_obj, kind) in cursor:
setup_sample_instances.add(file_obj, kind)
return (
setup_session_instruments,
setup_session_instances,
setup_sample_instruments,
setup_sample_instances,
)
[docs] def reload_session(self):
"""Reload the current session."""
if self.session_id is None:
return
meavis.instruments.clear("parameters")
meavis.instruments.clear("measurements")
meavis.instruments.clear("instruments")
meavis.instruments.inject(
meavis.utility_drivers.meavis_switch._meavis_instruments
)
(
setup_session_instruments,
setup_session_instances,
_setup_sample_instruments,
_setup_sample_instances,
) = self.all_setups
cursor = self.execute(
"SELECT instruments.group_id "
"FROM (sessions INNER JOIN instruments ON "
"sessions.instruments_id=instruments.group_id) "
"WHERE sessions.session_id=?",
(self.session_id,),
)
current_group_id = cursor.fetchone()
if current_group_id:
current_group_id = current_group_id[0]
group_id = setup_session_instruments.seal("instruments")
if current_group_id != group_id:
cursor = self.execute(
"SELECT session, instances_id FROM sessions WHERE session_id=?",
(self.session_id,),
)
result = cursor.fetchone()
cursor = self.execute(
"INSERT INTO sessions (session,instruments_id,instances_id) "
"VALUES (?,?,?)",
(result[0], group_id, result[1]),
)
self.session_id = cursor.lastrowid
self._meavis_logger.debug(
"Update session {{ {} }} with ID {}.".format(
result[0], self.session_id
)
)
for instruments in setup_session_instruments:
meavis.instruments.inject(instruments)
cursor = self.execute(
"SELECT instances.group_id "
"FROM (sessions INNER JOIN instances ON "
"sessions.instances_id=instances.group_id) "
"WHERE sessions.session_id=?",
(self.session_id,),
)
current_group_id = cursor.fetchone()
if current_group_id:
current_group_id = current_group_id[0]
group_id = setup_session_instances.seal("instances")
if current_group_id != group_id:
cursor = self.execute(
"SELECT session, instances_id FROM sessions WHERE session_id=?",
(self.session_id,),
)
result = cursor.fetchone()
cursor = self.execute(
"INSERT INTO sessions (session,instruments_id,instances_id) "
"VALUES (?,?,?)",
(result[0], result[1], group_id),
)
self.session_id = cursor.lastrowid
self._meavis_logger.debug(
"Update session {{ {} }} with ID {}.".format(
result[0], self.session_id
)
)
for instances in setup_session_instances:
meavis.instruments.register(instances)
self.register_active_samples()
@property
def samples(self):
"""Get samples regarding root and session."""
if not self.root_id:
self._meavis_logger.debug("Root ID undefined.")
return None
if self.sample_id:
cursor = self.execute(
"SELECT samples.sample FROM ((samples INNER JOIN "
"datasets ON samples.sample_id=datasets.sample_id) "
"INNER JOIN sessions ON "
"datasets.session_id=sessions.session_id) "
"WHERE datasets.root_id=? AND sessions.session=?",
(self.root_id, self.session),
)
else:
cursor = self.execute(
"SELECT samples.sample FROM (samples INNER JOIN "
"datasets ON samples.sample_id=datasets.sample_id) "
"WHERE datasets.root_id=?",
(self.root_id,),
)
return set(sample_name for (sample_name,) in cursor)
@property
def active_samples(self):
"""Get active samples regarding the setups."""
if not self.root_id:
self._meavis_logger.debug("Root ID undefined.")
return None
if not self.session_id:
self._meavis_logger.debug("Session ID undefined.")
return None
return set(
value._meavis_name
for value in meavis.parameters.__dict__.values()
if hasattr(value, "_meavis_is_path") and value._meavis_is_path
)
[docs] def register_active_samples(self):
"""Register the current active samples.
Make them aviable throught the MeaVis
injection and registration mechanism.
"""
for value in meavis.parameters.__dict__.values():
if (
not hasattr(value, "_meavis_is_path")
or not value._meavis_is_path
):
continue
instruments_group = self.new_setup()
for instrument in value._meavis_instruments:
instruments_group.add(*instrument)
instrument_id = instruments_group.seal("instruments")
instances_group = self.new_setup()
for instance in value._meavis_instances:
instances_group.add(*instance)
instance_id = instances_group.seal("instances")
cursor = self.execute(
"SELECT COUNT(*) FROM samples "
"WHERE sample=? AND instruments_id=? AND instances_id=?",
(value._meavis_name, instrument_id, instance_id),
)
if cursor.fetchone()[0] > 0:
continue
self.execute(
"INSERT INTO samples (sample,instruments_id,instances_id) "
"VALUES (?,?,?)",
(value._meavis_name, instrument_id, instance_id),
)
self._meavis_logger.info(
"Register sample : {}.".format(value._meavis_name)
)
@property
def sample(self):
"""Get the current sample."""
if not self.sample_id:
self._meavis_logger.debug("Sample ID undefined.")
return None
cursor = self.execute(
"SELECT sample FROM samples WHERE sample_id=?", (self.sample_id,),
)
result = cursor.fetchone()
if not result:
meavis.config.log_and_raise(
RuntimeError, "Sample ID {} not found.".format(self.sample_id)
)
return result[0]
@sample.setter
def sample(self, value):
"""Set the current sample."""
if not self.root_id:
meavis.config.log_and_raise(RuntimeError, "Root undefined.")
if not self.session_id:
meavis.config.log_and_raise(RuntimeError, "Session undefined.")
result = None
while not result:
cursor = self.execute(
"SELECT sample_id FROM samples "
"WHERE sample=? ORDER BY sample_id DESC",
(value,),
)
result = cursor.fetchone()
if result:
continue
self.execute("INSERT INTO samples (sample) VALUES (?)", (value,))
self._meavis_logger.info("Create sample {{ {} }}.".format(value))
self._meavis_logger.debug(
"Get sample {{ {} }} ID {}.".format(value, result[0])
)
self.sample_id = result[0]
self.reload_sample()
[docs] def reload_sample(self):
"""Reload the current sample."""
if self.sample_id is None:
return
(
_setup_session_instruments,
_setup_session_instances,
setup_sample_instruments,
setup_sample_instances,
) = self.all_setups
setup_sample_instruments.seal("instruments")
for instruments in setup_sample_instruments:
meavis.instruments.inject(instruments)
setup_sample_instances.seal("instances")
for instances in setup_sample_instances:
meavis.instruments.register(instances)
[docs] def query(self, **metadata):
"""Execute a generic query to the database over metadata key/value."""
joined_table_query = "FROM datasets {}".format(
" ".join(
"INNER JOIN {}s ON datasets.{}_id={}s.{}_id".format(
table, table, table, table
)
for table in ["root", "session", "sample"]
)
)
cursor = self.execute("SELECT * {}".format(joined_table_query))
excluded_fields = tuple(column[0] for column in cursor.description)
cursor = self.execute(
"SELECT {} {} {}{}{}{}".format(
",".join(metadata.keys()),
joined_table_query,
" ".join(
"LEFT JOIN (SELECT dataset_id, value AS {} "
"FROM metadata WHERE metadata.name=?) AS table_{} "
"ON datasets.dataset_id=table_{}.dataset_id".format(
key, key, key
)
for key in metadata
if key not in excluded_fields
),
" WHERE (" if any(metadata.values()) else "",
") AND (".join(
" OR ".join(["{}=?".format(key)] * (1 + value.count("|")))
for key, value in metadata.items()
if value
),
")" if any(metadata.values()) else "",
),
tuple(
itertools.chain(
(key for key in metadata if key not in excluded_fields),
*(value.split("|") for value in metadata.values() if value)
)
),
)
return (column[0] for column in cursor.description), cursor
[docs] def query_current(self, **kwargs):
"""Query regarding the current root/session/sample and dataset."""
return self.query(
root=self.root,
session=self.session,
sample=self.sample,
dataset_id=self.dataset_id,
**kwargs
)
[docs] def new_dataset(self, name):
"""Add a dataset, return formated filename and metadata proxy."""
if not self.root_id:
meavis.config.log_and_raise(RuntimeError, "Root undefined.")
if not self.session_id:
meavis.config.log_and_raise(RuntimeError, "Session undefined.")
if not self.sample_id:
meavis.config.log_and_raise(RuntimeError, "Sample undefined.")
filename = os.path.join(
self.root_path,
*self.sample.split("."),
*self.session.split("."),
"{:%Y-%m-%d_%Hh%M_%Ss}__{}".format(datetime.datetime.now(), name)
)
cursor = self.execute(
"INSERT INTO datasets (root_id,session_id,sample_id,file) "
"VALUES (?,?,?,?)",
(self.root_id, self.session_id, self.sample_id, filename),
)
self.commit()
self.dataset_id = cursor.lastrowid
return filename, meavis.filesystem.utils.Metadata(self, self)