Source code for pyos.db.database

# -*- coding: utf-8 -*-
import getpass
from typing import Optional, Sequence

import mincepy
import mincepy.archives

from . import schema
from . import fs

__all__ = 'connect', 'init', 'get_historian', 'reset', 'get_session'

_GLOBAL_SESSION: Optional['Session'] = None


class Session(mincepy.archives.ArchiveListener):

    def __init__(self, historian: mincepy.Historian, cwd: fs.Path = None):
        """Start a new session"""
        self._historian = historian

        self._cwd = None
        if cwd:
            self.set_cwd(cwd)
        else:
            try:
                # Default working directory for a session is simply a folder in root with the user's name
                self.set_cwd(_get_homedir())
            except ValueError:
                self.set_cwd(fs.ROOT_PATH)

        historian.archive.add_archive_listener(self)

    @property
    def historian(self) -> mincepy.Historian:
        return self._historian

    @property
    def cwd(self) -> fs.Path:
        return self._cwd

    def set_cwd(self, path: fs.Path):
        """Set the current working directory"""
        if fs.find_entry(path, historian=self._historian) is None:
            raise ValueError(f'Path does not exist: {path}')

        self._cwd = path

    def close(self):
        """Close this session.  This object cannot be used after this call"""
        self._historian.archive.remove_archive_listener(self)

        del self._cwd
        del self._historian

    def on_bulk_write(self, archive: mincepy.Archive, ops: Sequence[mincepy.operations.Operation]):
        """Called when an archive is about to perform a sequence of write operations but has not performed them yet.
        The listener must not assume that the operations will be completed as there are a number of reasons why this
        process could be interrupted.
        """
        assert archive is self._historian.archive
        new_objects = []  # Keep track of the new objects being saved
        deleted_objects = []
        for oper in ops:
            if isinstance(oper, mincepy.operations.Insert):
                if oper.snapshot_id.version == 0:
                    new_objects.append(oper.obj_id)
                elif oper.record.is_deleted_record():
                    deleted_objects.append(oper.obj_id)

        if new_objects:
            fs.execute_instructions([
                fs.SetObjPath(obj_id, self._cwd + (str(obj_id),), only_new=True)
                for obj_id in new_objects
            ])

        if deleted_objects:
            fs.remove_objs(tuple(deleted_objects))


def connect(uri: str = '', use_globally=True) -> mincepy.Historian:
    historian = mincepy.connect(uri, use_globally=use_globally)
    init(historian, use_globally)

    return historian


[docs]def init(historian: mincepy.Historian = None, use_globally=True) -> mincepy.Historian: """Initialise a Historian such that it is ready to be used with pyOS""" global _GLOBAL_SESSION # pylint: disable=global-statement historian = historian or mincepy.get_historian() schema.ensure_up_to_date(historian) # Create the global session session = Session(historian) if use_globally: _GLOBAL_SESSION = session return historian
[docs]def get_historian() -> mincepy.Historian: """Get the active historian in pyos""" global _GLOBAL_SESSION # pylint: disable=global-statement, global-variable-not-assigned if _GLOBAL_SESSION is None: raise RuntimeError( 'A global pyOS session has not been initialised. Call connect() or init() first.') return _GLOBAL_SESSION.historian
def get_session() -> Session: global _GLOBAL_SESSION # pylint: disable=global-variable-not-assigned return _GLOBAL_SESSION def reset(): global _GLOBAL_SESSION # pylint: disable=global-statement, global-variable-not-assigned if _GLOBAL_SESSION is not None: _GLOBAL_SESSION.close() def _get_homedir() -> fs.Path: """Get the home directory for the current user""" return ( '/', getpass.getuser(), )