Source code for meavis.filesystem.session

"""MeaVis session."""
import datetime
import itertools
import os

import meavis._detail.debug
import meavis._detail.sqlite
import meavis.config
import meavis.filesystem.loaders
import meavis.filesystem.utils
import meavis.instruments
import meavis.utility_drivers.meavis_switch


[docs]class Manager(meavis._detail.sqlite.SQLiteAccess): """Provides an access to the MeaVis filesystem."""
[docs] def __init__(self, access=None): """Connect to the internal database and loads the last state.""" super().__init__(access) self._meavis_logger.info("Connect session manager.") self.root_id = None self.session_id = None self.sample_id = None self.dataset_id = None cursor = self.execute( "SELECT root_id,session_id,sample_id,dataset_id " "FROM datasets ORDER BY dataset_id DESC" ) result = cursor.fetchone() if result: self.root_id = result[0] self.session_id = result[1] self.sample_id = result[2] self.dataset_id = result[3] else: cursor = self.execute( "SELECT root_id FROM roots ORDER BY root_id DESC" ) result = cursor.fetchone() if result: self.root_id = result[0] cursor = self.execute( "SELECT session_id FROM sessions ORDER BY session_id DESC" ) result = cursor.fetchone() if result: self.session_id = result[0] cursor = self.execute( "SELECT sample_id FROM samples ORDER BY sample_id DESC" ) result = cursor.fetchone() if result: self.sample_id = result[0] try: self.reload_session() self.reload_sample() except Exception as err: self._meavis_logger.error("{}: {}".format(type(err).__name__, err)) self.sample_id = None self.session_id = None
[docs] def new_setup(self): """Get a new setup to be filled and sealed.""" cursor = self.execute( "SELECT loader,kind FROM loaders ORDER BY priority ASC" ) return meavis.filesystem.utils.Setup(cursor.fetchall(), self)
@property def roots(self): """Get all roots.""" cursor = self.execute("SELECT root FROM roots") return set(root_name for (root_name,) in cursor.fetchall()) @property def root(self): """Get current root.""" if not self.root_id: self._meavis_logger.debug("Root ID undefined.") return None cursor = self.execute( "SELECT root FROM roots WHERE root_id=?", (self.root_id,) ) result = cursor.fetchone() if not result: meavis.config.log_and_raise( RuntimeError, "Root ID {} not found.".format(self.root_id) ) return result[0] @property def root_path(self): """Get current root path.""" if not self.root_id: self._meavis_logger.debug("Root ID undefined.") return None cursor = self.execute( "SELECT root_path FROM roots WHERE root_id=?", (self.root_id,) ) result = cursor.fetchone() if not result: meavis.config.log_and_raise( RuntimeError, "Root ID {} not found.".format(self.root_id) ) return result[0] @root.setter def root(self, value): """Set the root.""" cursor = self.execute( "SELECT root_id FROM roots WHERE root=?", (value,) ) result = cursor.fetchone() if not result: meavis.config.log_and_raise( RuntimeError, "No root named {{ {} }}.".format(value) ) self.root_id = result[0]
[docs] def register_root(self, value, folder): """Register a new root.""" if not os.path.exists(folder): os.makedirs(folder) self.execute( "INSERT INTO roots (root,root_path) VALUES (?, ?)", (value, folder), )
@property def sessions(self): """Get all sessions regarding the current root and sample.""" if not self.root_id: self._meavis_logger.debug("Root ID undefined.") return None if self.sample_id: cursor = self.execute( "SELECT sessions.session FROM ((sessions INNER JOIN " "datasets ON sessions.session_id=datasets.session_id) " "INNER JOIN samples ON datasets.sample_id=samples.sample_id) " "WHERE datasets.root_id=? AND samples.sample=?", (self.root_id, self.sample), ) else: cursor = self.execute( "SELECT sessions.session FROM (sessions INNER JOIN " "datasets ON sessions.session_id=datasets.session_id) " "WHERE datasets.root_id=?", (self.root_id,), ) return set(session_name for (session_name,) in cursor) @property def session(self): """Get the current session.""" if not self.session_id: self._meavis_logger.debug("Session ID undefined.") return "" cursor = self.execute( "SELECT session FROM sessions WHERE session_id=?", (self.session_id,), ) result = cursor.fetchone() if not result: meavis.config.log_and_raise( RuntimeError, "Session ID {} not found.".format(self.session_id), ) return result[0] @session.setter def session(self, value): """Set the current session.""" if not self.root_id: meavis.config.log_and_raise(RuntimeError, "Root undefined.") result = None cursor = self.execute( "SELECT session_id FROM sessions " "WHERE session=? ORDER BY session_id DESC", (value,), ) result = cursor.fetchone() if not result: meavis.config.log_and_raise( RuntimeError, "Session {{ {} }} doesn't exist.".format(value), ) self._meavis_logger.debug( "Get session {{ {} }} ID {}.".format(value, result[0]) ) self.session_id = result[0] self.reload_session() @property def last_session(self): """Get the newest session.""" if not self.root_id: self._meavis_logger.debug("Root ID undefined.") return "" cursor = self.execute( "SELECT sessions.session FROM (sessions INNER JOIN " "datasets ON sessions.session_id=datasets.session_id) " "WHERE datasets.root_id=? ORDER BY sessions.session_id DESC", (self.root_id,), ) result = cursor.fetchone() return result[0] if result else ""
[docs] def new_session(self, instruments=None, instances=None): """Add a new session with the giver setups.""" if not self.root_id: meavis.config.log_and_raise(RuntimeError, "Root undefined.") if self.last_session and not self.last_session.startswith( self.session ): meavis.config.log_and_raise( RuntimeError, "Session {{ {} }} too old compared to {{ {} }}.".format( self.session, self.last_session ), ) ( setup_session_instruments, setup_session_instances, _setup_sample_instruments, _setup_sample_instances, ) = self.all_setups if instruments is None: instruments = setup_session_instruments if instances is None: instances = setup_session_instances session_name = ".".join( itertools.chain( (s for s in self.session.split(".")[:-1] if s), ["session_{:%Y-%m-%d_%Hh%M}".format(datetime.datetime.now())], ) ) cursor = self.execute( "INSERT INTO sessions (session,instruments_id,instances_id) " "VALUES (?,?,?)", ( session_name, instruments.seal("instruments"), instances.seal("instances"), ), ) self.session_id = cursor.lastrowid self._meavis_logger.info( "Create session {{ {} }} with ID {}.".format( session_name, self.session_id ) ) self.reload_session()
[docs] def up(self): """Select the parent session.""" self.session = ".".join(self.session.split(".")[:-1])
[docs] def add_subsession(self, instruments=None, instances=None): """Add a subsesssion.""" if not self.root_id: meavis.config.log_and_raise(RuntimeError, "Root undefined.") if not self.last_session.startswith(self.session): meavis.config.log_and_raise( RuntimeError, "Session {{ {} }} too old compared to {{ {} }}.".format( self.session, self.last_session ), ) ( setup_session_instruments, setup_session_instances, _setup_sample_instruments, _setup_sample_instances, ) = self.all_setups if instruments is None: instruments = setup_session_instruments if instances is None: instances = setup_session_instances session_name = "{}.session_{:%Y-%m-%d_%Hh%M}".format( self.session, datetime.datetime.now() ) cursor = self.execute( "INSERT INTO sessions (session,instruments_id,instances_id) " "VALUES (?,?,?)", ( session_name, instruments.seal("instruments"), instances.seal("instances"), ), ) self.session_id = cursor.lastrowid self._meavis_logger.info( "Create (sub)-session {{ {} }} with ID {}.".format( session_name, self.session_id ) ) self.reload_session()
@property def all_setups(self): """Get the current setups. Instruments and instances setups for session and sample. """ setup_session_instruments = self.new_setup() if self.session_id is not None: cursor = self.execute( "SELECT instruments.file_obj,instruments.kind " "FROM (sessions INNER JOIN instruments ON " "sessions.instruments_id=instruments.group_id) " "WHERE sessions.session_id=?", (self.session_id,), ) for (file_obj, kind) in cursor: setup_session_instruments.add(file_obj, kind) setup_session_instances = self.new_setup() if self.session_id is not None: cursor = self.execute( "SELECT instances.file_obj,instances.kind " "FROM (sessions INNER JOIN instances ON " "sessions.instances_id=instances.group_id) " "WHERE sessions.session_id=?", (self.session_id,), ) for (file_obj, kind) in cursor: setup_session_instances.add(file_obj, kind) setup_sample_instruments = self.new_setup() if self.sample_id is not None: cursor = self.execute( "SELECT instruments.file_obj,instruments.kind " "FROM (samples INNER JOIN instruments ON " "samples.instruments_id=instruments.group_id) " "WHERE samples.sample_id=?", (self.sample_id,), ) for (file_obj, kind) in cursor: setup_sample_instruments.add(file_obj, kind) setup_sample_instances = self.new_setup() if self.sample_id is not None: cursor = self.execute( "SELECT instances.file_obj,instances.kind " "FROM (samples INNER JOIN instances ON " "samples.instances_id=instances.group_id) " "WHERE samples.sample_id=?", (self.sample_id,), ) for (file_obj, kind) in cursor: setup_sample_instances.add(file_obj, kind) return ( setup_session_instruments, setup_session_instances, setup_sample_instruments, setup_sample_instances, )
[docs] def reload_session(self): """Reload the current session.""" if self.session_id is None: return meavis.instruments.clear("parameters") meavis.instruments.clear("measurements") meavis.instruments.clear("instruments") meavis.instruments.inject( meavis.utility_drivers.meavis_switch._meavis_instruments ) ( setup_session_instruments, setup_session_instances, _setup_sample_instruments, _setup_sample_instances, ) = self.all_setups cursor = self.execute( "SELECT instruments.group_id " "FROM (sessions INNER JOIN instruments ON " "sessions.instruments_id=instruments.group_id) " "WHERE sessions.session_id=?", (self.session_id,), ) current_group_id = cursor.fetchone() if current_group_id: current_group_id = current_group_id[0] group_id = setup_session_instruments.seal("instruments") if current_group_id != group_id: cursor = self.execute( "SELECT session, instances_id FROM sessions WHERE session_id=?", (self.session_id,), ) result = cursor.fetchone() cursor = self.execute( "INSERT INTO sessions (session,instruments_id,instances_id) " "VALUES (?,?,?)", (result[0], group_id, result[1]), ) self.session_id = cursor.lastrowid self._meavis_logger.debug( "Update session {{ {} }} with ID {}.".format( result[0], self.session_id ) ) for instruments in setup_session_instruments: meavis.instruments.inject(instruments) cursor = self.execute( "SELECT instances.group_id " "FROM (sessions INNER JOIN instances ON " "sessions.instances_id=instances.group_id) " "WHERE sessions.session_id=?", (self.session_id,), ) current_group_id = cursor.fetchone() if current_group_id: current_group_id = current_group_id[0] group_id = setup_session_instances.seal("instances") if current_group_id != group_id: cursor = self.execute( "SELECT session, instances_id FROM sessions WHERE session_id=?", (self.session_id,), ) result = cursor.fetchone() cursor = self.execute( "INSERT INTO sessions (session,instruments_id,instances_id) " "VALUES (?,?,?)", (result[0], result[1], group_id), ) self.session_id = cursor.lastrowid self._meavis_logger.debug( "Update session {{ {} }} with ID {}.".format( result[0], self.session_id ) ) for instances in setup_session_instances: meavis.instruments.register(instances) self.register_active_samples()
@property def samples(self): """Get samples regarding root and session.""" if not self.root_id: self._meavis_logger.debug("Root ID undefined.") return None if self.sample_id: cursor = self.execute( "SELECT samples.sample FROM ((samples INNER JOIN " "datasets ON samples.sample_id=datasets.sample_id) " "INNER JOIN sessions ON " "datasets.session_id=sessions.session_id) " "WHERE datasets.root_id=? AND sessions.session=?", (self.root_id, self.session), ) else: cursor = self.execute( "SELECT samples.sample FROM (samples INNER JOIN " "datasets ON samples.sample_id=datasets.sample_id) " "WHERE datasets.root_id=?", (self.root_id,), ) return set(sample_name for (sample_name,) in cursor) @property def active_samples(self): """Get active samples regarding the setups.""" if not self.root_id: self._meavis_logger.debug("Root ID undefined.") return None if not self.session_id: self._meavis_logger.debug("Session ID undefined.") return None return set( value._meavis_name for value in meavis.parameters.__dict__.values() if hasattr(value, "_meavis_is_path") and value._meavis_is_path )
[docs] def register_active_samples(self): """Register the current active samples. Make them aviable throught the MeaVis injection and registration mechanism. """ for value in meavis.parameters.__dict__.values(): if ( not hasattr(value, "_meavis_is_path") or not value._meavis_is_path ): continue instruments_group = self.new_setup() for instrument in value._meavis_instruments: instruments_group.add(*instrument) instrument_id = instruments_group.seal("instruments") instances_group = self.new_setup() for instance in value._meavis_instances: instances_group.add(*instance) instance_id = instances_group.seal("instances") cursor = self.execute( "SELECT COUNT(*) FROM samples " "WHERE sample=? AND instruments_id=? AND instances_id=?", (value._meavis_name, instrument_id, instance_id), ) if cursor.fetchone()[0] > 0: continue self.execute( "INSERT INTO samples (sample,instruments_id,instances_id) " "VALUES (?,?,?)", (value._meavis_name, instrument_id, instance_id), ) self._meavis_logger.info( "Register sample : {}.".format(value._meavis_name) )
@property def sample(self): """Get the current sample.""" if not self.sample_id: self._meavis_logger.debug("Sample ID undefined.") return None cursor = self.execute( "SELECT sample FROM samples WHERE sample_id=?", (self.sample_id,), ) result = cursor.fetchone() if not result: meavis.config.log_and_raise( RuntimeError, "Sample ID {} not found.".format(self.sample_id) ) return result[0] @sample.setter def sample(self, value): """Set the current sample.""" if not self.root_id: meavis.config.log_and_raise(RuntimeError, "Root undefined.") if not self.session_id: meavis.config.log_and_raise(RuntimeError, "Session undefined.") result = None while not result: cursor = self.execute( "SELECT sample_id FROM samples " "WHERE sample=? ORDER BY sample_id DESC", (value,), ) result = cursor.fetchone() if result: continue self.execute("INSERT INTO samples (sample) VALUES (?)", (value,)) self._meavis_logger.info("Create sample {{ {} }}.".format(value)) self._meavis_logger.debug( "Get sample {{ {} }} ID {}.".format(value, result[0]) ) self.sample_id = result[0] self.reload_sample()
[docs] def reload_sample(self): """Reload the current sample.""" if self.sample_id is None: return ( _setup_session_instruments, _setup_session_instances, setup_sample_instruments, setup_sample_instances, ) = self.all_setups setup_sample_instruments.seal("instruments") for instruments in setup_sample_instruments: meavis.instruments.inject(instruments) setup_sample_instances.seal("instances") for instances in setup_sample_instances: meavis.instruments.register(instances)
[docs] def query(self, **metadata): """Execute a generic query to the database over metadata key/value.""" joined_table_query = "FROM datasets {}".format( " ".join( "INNER JOIN {}s ON datasets.{}_id={}s.{}_id".format( table, table, table, table ) for table in ["root", "session", "sample"] ) ) cursor = self.execute("SELECT * {}".format(joined_table_query)) excluded_fields = tuple(column[0] for column in cursor.description) cursor = self.execute( "SELECT {} {} {}{}{}{}".format( ",".join(metadata.keys()), joined_table_query, " ".join( "LEFT JOIN (SELECT dataset_id, value AS {} " "FROM metadata WHERE metadata.name=?) AS table_{} " "ON datasets.dataset_id=table_{}.dataset_id".format( key, key, key ) for key in metadata if key not in excluded_fields ), " WHERE (" if any(metadata.values()) else "", ") AND (".join( " OR ".join(["{}=?".format(key)] * (1 + value.count("|"))) for key, value in metadata.items() if value ), ")" if any(metadata.values()) else "", ), tuple( itertools.chain( (key for key in metadata if key not in excluded_fields), *(value.split("|") for value in metadata.values() if value) ) ), ) return (column[0] for column in cursor.description), cursor
[docs] def query_current(self, **kwargs): """Query regarding the current root/session/sample and dataset.""" return self.query( root=self.root, session=self.session, sample=self.sample, dataset_id=self.dataset_id, **kwargs )
[docs] def new_dataset(self, name): """Add a dataset, return formated filename and metadata proxy.""" if not self.root_id: meavis.config.log_and_raise(RuntimeError, "Root undefined.") if not self.session_id: meavis.config.log_and_raise(RuntimeError, "Session undefined.") if not self.sample_id: meavis.config.log_and_raise(RuntimeError, "Sample undefined.") filename = os.path.join( self.root_path, *self.sample.split("."), *self.session.split("."), "{:%Y-%m-%d_%Hh%M_%Ss}__{}".format(datetime.datetime.now(), name) ) cursor = self.execute( "INSERT INTO datasets (root_id,session_id,sample_id,file) " "VALUES (?,?,?,?)", (self.root_id, self.session_id, self.sample_id, filename), ) self.commit() self.dataset_id = cursor.lastrowid return filename, meavis.filesystem.utils.Metadata(self, self)