Source code for meavis.filesystem.utils
"""Utility classes for the filesystem mechanism."""
import collections
import copy
import functools
import types
import meavis._detail.debug
import meavis._detail.sqlite
import meavis.config
import meavis.filesystem.loaders
import meavis.instruments
import meavis.utility_drivers.meavis_switch
[docs]class Metadata(
meavis._detail.sqlite.SQLiteAccess, collections.abc.MutableMapping
):
"""Proxy for the metadata of a dataset."""
[docs] def __init__(self, manager, access=None):
"""Initialise a proxy from the current selected dataset."""
super().__init__(access)
self.dataset_id = manager.dataset_id
self._meavis_logger.info(
"Metadata wrapper for dataset ID {}.".format(self.dataset_id)
)
def __setitem__(self, key, value):
"""Set the value of a metadata."""
cursor = self.execute(
"SELECT COUNT(*) FROM metadata WHERE dataset_id=? AND name=?",
(self.dataset_id, key),
)
result = cursor.fetchone()
if not result:
meavis.config.log_and_raise(KeyError, key)
if result[0] > 0:
self.execute(
"UPDATE metadata SET value=? WHERE dataset_id=? AND name=?",
(value, self.dataset_id, key),
)
else:
self.execute(
"INSERT INTO metadata (value,dataset_id,name) VALUES (?,?,?)",
(value, self.dataset_id, key),
)
self.commit()
self._meavis_logger.info(
"Set metadata {}={} for dataset ID {}.".format(
key, value, self.dataset_id
)
)
def __getitem__(self, key):
"""Get the value of a metadata."""
cursor = self.execute(
"SELECT value FROM metadata WHERE dataset_id=? AND name=?",
(self.dataset_id, key),
)
result = cursor.fetchone()
if result:
self._meavis_logger.debug(
"Get metadata {} for dataset ID {}.".format(
key, self.dataset_id
)
)
return result[0]
meavis.config.log_and_raise(KeyError, key)
def __delitem__(self, key):
"""Remove a metadata."""
self.execute(
"DELETE FROM metadata WHERE dataset_id=? AND name=?",
(self.dataset_id, key),
)
self.commit()
self._meavis_logger.info(
"Delete metadata {} for dataset ID {}.".format(
key, self.dataset_id
)
)
def __iter__(self):
"""Get an iterator throught all metadata."""
return (name for name, value in self.items())
def __len__(self):
"""Count the number of metadata."""
cursor = self.execute(
"SELECT COUNT(*) FROM metadata WHERE dataset_id=?",
(self.dataset_id,),
)
result = cursor.fetchone()
if not result:
meavis.config.log_and_raise(
RuntimeError,
"Metadata selection on ID {} fails.".format(self.dataset_id),
)
return result[0]
[docs] def items(self):
"""Provide a view of all metadata."""
cursor = self.execute(
"SELECT name,value FROM metadata WHERE dataset_id=?",
(self.dataset_id,),
)
return cursor
[docs] def clear(self):
"""Delete all metadata."""
self.execute(
"DELETE FROM metadata WHERE dataset_id=?", (self.dataset_id,)
)
self._meavis_logger.info(
"Delete all metadata for dataset ID {}.".format(self.dataset_id)
)
[docs] def update(self, mapping):
"""Update multiple of metadata."""
self.execute(
"DELETE FROM metadata WHERE dataset_id=? AND name IN ({})".format(
",".join(len(mapping) * ["?"])
),
(self.dataset_id,) + tuple(mapping.keys()),
)
self.executemany(
"INSERT INTO metadata (value,dataset_id,name) VALUES (?,?,?)",
((value, self.dataset_id, key) for key, value in mapping.items()),
)
for key, value in mapping.items():
self._meavis_logger.info(
"Set metadata {}={} for dataset ID {}.".format(
key, value, self.dataset_id
)
)
[docs]class Setup(meavis._detail.sqlite.SQLiteAccess, collections.abc.Iterable):
"""A group of MeaVis file descriptions."""
[docs] def __init__(self, loader_mapping, access=None):
"""Initialise required loaders and connection to the filesystem.
Initialisation from a loader mapping and connection.
"""
super().__init__(access)
self.loader_mapping = copy.deepcopy(loader_mapping)
self.filename_map = set()
self.loaders = meavis.filesystem.loaders.LoadDispatcher(
self.loader_mapping
)
def nested_add(loader_kind, nested_self, filename):
nested_self.add(filename, loader_kind)
return self
for (_loader, loader_kind) in self.loader_mapping:
setattr(
self,
"add_{}".format(loader_kind),
types.MethodType(
functools.partial(nested_add, loader_kind), self
),
)
self._sealed = None
def __iter__(self):
"""Get an iterator throught description files of a sealed Setup."""
if self._sealed is None:
meavis.config.log_and_raise(
RuntimeError, "Setup has to be sealed before iterate through."
)
return iter(self._sealed)
[docs] def add(self, filename, loader_kind):
"""Add a file to the Setup."""
self.filename_map.add((filename, loader_kind))
self._sealed = None
return self
[docs] def seal(self, key):
"""Seal the group inside the filesystem.
Return the corresponding group id.
"""
self._sealed = [
self.loaders.loads(file_obj, key, kind)
for file_obj, kind in self.filename_map
]
hash = "/".join(
sorted(
meavis._detail.debug.hash_mappable(item)
for item in self._sealed
)
)
self._meavis_logger.debug(
'Seal Setup for "{}" with hash [{}]'.format(key, hash)
)
cursor = self.execute(
"SELECT group_id FROM groups WHERE hash=?", (hash,),
)
result = cursor.fetchone()
if result:
return result[0]
else:
cursor = self.execute(
"INSERT INTO groups (hash) VALUES (?)", (hash,),
)
group_id = cursor.lastrowid
for file_obj, kind in self.filename_map:
cursor = self.execute(
"INSERT INTO {} (group_id,file_obj,kind) "
"VALUES (?,?,?)".format(key),
(group_id, file_obj, kind),
)
self._meavis_logger.info(
'New Setup {{{}}} for "{}" with hash [{}]'.format(
group_id, key, hash
)
)
return group_id