Source code for pyos.fs.nodes

# -*- coding: utf-8 -*-
import abc
import collections.abc
import copy
import functools
import io
from typing import Dict, Sequence, Optional, Iterable, TextIO, Type

import anytree
import columnize
import pandas as pd

import mincepy

from pyos import db
from pyos import exceptions
from pyos import fmt
from pyos import os
from pyos import pathlib
from pyos import results
from pyos import utils

__all__ = ('BaseNode', 'ContainerNode', 'DirectoryNode', 'ObjectNode', 'ResultsNode', 'to_node',
           'TABLE_VIEW', 'LIST_VIEW', 'TREE_VIEW', 'SINGLE_COLUMN_VIEW')

LIST_VIEW = 'list'
TREE_VIEW = 'tree'
TABLE_VIEW = 'table'
SINGLE_COLUMN_VIEW = 'single'

CHILDREN = 'children'

UNSET = tuple()


[docs]class BaseNode(collections.abc.Sequence, results.BaseResults, metaclass=abc.ABCMeta): """Base node for the object system in pyos""" __slots__ = '_name', '_parent', '_children', '_hist' def __init__(self, name: str, parent: 'BaseNode' = UNSET, historian: mincepy.Historian = None): super().__init__() self._name = name self._parent = parent self._children = UNSET self._hist = historian or db.get_historian() def __getitem__(self, item): if isinstance(item, (int, slice)): return self.children.__getitem__(item) if isinstance(item, str): for child in self.children: if child.name == item: return child raise ValueError(f'No child has name {item}') raise TypeError(f"Got unsupported item type '{item.__class__.__name__}'") def __len__(self) -> int: return self.children.__len__() @property def name(self): return self._name @property def parent(self) -> Optional['BaseNode']: """Get the parent node""" return self._parent @property def children(self) -> Sequence['BaseNode']: return self._children @property def height(self) -> int: """Get the maximum number of steps from this node to a leaf""" height = 0 for child in self._children: height = max(height, child.height + 1) return height
[docs] def delete(self): """Delete this node and any descendents""" for child in self.children: child.delete() self._invalidate_cache()
[docs] def move(self, dest: os.PathSpec, overwrite=False): """Move this object (with any children) into the directory given by dest :param dest: the destination to move the node to :param overwrite: overwrite if exists """ for child in self.children: child.move(dest, overwrite)
def _invalidate_cache(self): self._parent = UNSET self._children = UNSET
class FilesystemNode(BaseNode): """Base node for representing an object in the virtual filesystem""" __slots__ = '_abspath', '_entry' # Either give me: # * object id # * entry dict # * path def __init__(self, path: os.PathSpec = None, parent: BaseNode = None, entry_id=None, entry: Dict = None, *, historian: mincepy.Historian = None): """ :param path: the path this node represents :param parent: parent node """ historian = historian or db.get_historian() # First we have to try and get a filesystem entry if entry is None: if entry_id is not None: entry = db.fs.get_entry(entry_id, include_path=True, historian=historian) # DB HIT if entry is None: raise exceptions.FileNotFoundError(entry_id) elif path is not None: entry = db.fs.find_entry(os.withdb.to_fs_path(path), historian=historian) # DB HIT if entry is None: raise exceptions.FileNotFoundError(path) else: raise ValueError('Must supply filesystem entry, an entry id or a path') if path is None: entry_path = db.fs.Entry.path(entry) if entry_path is None: path = os.withdb.from_fs_path(db.fs.get_paths(db.fs.Entry.id(entry)))[0] else: path = os.withdb.from_fs_path(entry_path) path = pathlib.PurePath(os.path.abspath(path)) super().__init__(path.name, parent, historian=historian) self._abspath = path self._entry = entry @property def abspath(self) -> 'pathlib.PurePath': return self._abspath @abc.abstractmethod def rename(self, new_name: str): """Rename this filesystem node""" @property def entry_id(self): return db.fs.Entry.id(self._entry)
[docs]class ContainerNode(BaseNode): """A node that contains children that can be either directory nodes or object nodes""" VIEW_PROPERTIES = ( 'loaded', # Indication of whether the object is loaded in memory or not 'type', # The object type 'creator', 'version', 'ctime', 'mtime', 'name', 'str', 'relpath', 'abspath', ) JUSTIFICATIONS = { 'loaded': 'left', 'type': 'left', 'creator': 'right', 'version': 'right', 'ctime': 'right', 'mtime': 'right', 'name': 'right', 'str': 'right', 'relpath': 'left', 'abspath': 'left' } _view_mode = TABLE_VIEW _show = {'name'} def __contains__(self, item): # pylint: disable=too-many-nested-blocks, too-many-branches, too-many-return-statements if isinstance(item, pathlib.PurePath): path = pathlib.Path(item) if path.is_absolute(): if path.is_dir(): # It's a directory for node in self.directories: if path == node.abspath: return True else: # It's a filename for node in self.objects: if path == node.abspath: return True # Always check within directories for node in self.directories: if path in node: return True else: # It's relative parts = path.parts if len(parts) > 1: subpath = pathlib.PurePath(''.join(parts[1:])) # Check subdirs for node in self.directories: if node.abspath.name == parts[0] and subpath in node: return True else: if path.is_dir(): # It's a directory for node in self.directories: if node.abspath.name == parts[0]: return True else: # It's a filename for obj in self.objects: if obj.abspath.name == parts[0]: return True return False if self._hist.is_obj_id(item): for node in self.objects: if item == node.obj_id: return True return False return False def __getitem__(self, item): items = super().__getitem__(item) if isinstance(item, slice): res = ResultsNode() # Transfer the view mode res.show(*self._show, mode=self._view_mode) for entry in items: res.append(copy.copy(entry)) return res return items def __repr__(self): with io.StringIO() as stream: self.__stream_out__(stream) return stream.getvalue() def __stream_out__(self, stream: TextIO): if self._view_mode == TREE_VIEW: self._render_tree(stream) elif self._view_mode == TABLE_VIEW: self._render_table(stream) elif self._view_mode == LIST_VIEW: self._render_list(stream) elif self._view_mode == SINGLE_COLUMN_VIEW: self._render_single(stream) @property def directories(self) -> Iterable['DirectoryNode']: return filter(lambda node: isinstance(node, DirectoryNode), self.children) @property def objects(self) -> Iterable['ObjectNode']: return filter(lambda node: isinstance(node, ObjectNode), self.children) @property def showing(self) -> set: """Returns the current view properties that are being displayed (if the view mode supports them)""" return self._show @property def view_mode(self) -> str: return self._view_mode @view_mode.setter def view_mode(self, new_mode: str): assert new_mode in (TREE_VIEW, LIST_VIEW, TABLE_VIEW, SINGLE_COLUMN_VIEW) self._view_mode = new_mode def show(self, *properties, mode: str = None): if mode is not None: self._view_mode = mode if properties: self._show = set(properties) def _get_row(self, child) -> Sequence[str]: # pylint: disable=too-many-branches empty = '' row = [] if 'loaded' in self._show: try: row.append('*' if child.loaded else '') except AttributeError: row.append(empty) if 'type' in self._show: try: row.append(fmt.pretty_type_string(child.type)) except AttributeError: row.append('directory') except TypeError: row.append(str(child.type_id)) if 'creator' in self._show: row.append(getattr(child, 'creator', empty)) if 'version' in self._show: row.append(str(getattr(child, 'version', empty))) if 'ctime' in self._show: try: row.append(fmt.pretty_datetime(child.ctime)) except AttributeError: row.append(empty) if 'mtime' in self._show: try: row.append(fmt.pretty_datetime(child.stime)) except AttributeError: row.append(empty) if 'name' in self._show: row.append(getattr(child, 'name', empty)) if 'str' in self._show: try: row.append(str(getattr(child, 'obj', empty))[:30]) except (TypeError, mincepy.ObjectDeleted): row.append(empty) if 'abspath' in self._show: row.append(str(getattr(child, 'abspath', empty))) if 'relpath' in self._show: try: row.append(os.path.relpath(child.abspath)) except AttributeError: row.append(empty) return row def _render_tree(self, stream: TextIO): """Render this node as a tree""" for child in self.directories: for pre, _, node in anytree.RenderTree(child, childiter=iter): stream.write(f'{pre}{node.name}\n') for child in self.objects: stream.write(f'{child}\n') def _render_table(self, stream: TextIO): """Render this node as a table""" if self._deeply_nested(): # Do the objects first, like linux's 'ls' table = self._get_table(self.objects) if table: stream.write(pd.DataFrame(table).to_string(index=False, header=False)) stream.write('\n') for directory in self.directories: stream.write(f'{directory.name}:') table = self._get_table(directory) if table: stream.write(pd.DataFrame(table).to_string(index=False, header=False)) stream.write('\n') else: table = self._get_table(self.directories) table.extend(self._get_table(self.objects)) if table: stream.write(pd.DataFrame(table).to_string(index=False, header=False)) stream.write('\n') def _render_list(self, stream: TextIO): if stream.isatty(): repr_list = [] for child in self: repr_list.append('-'.join(self._get_row(child))) stream.write(columnize.columnize(repr_list, displaywidth=utils.get_terminal_width())) else: for child in self: stream.write('-'.join(self._get_row(child)) + '\n') def _render_single(self, stream: TextIO): for child in self: stream.write('-'.join(self._get_row(child)) + '\n') def _get_table(self, entry) -> list: return [self._get_row(child) for child in entry] def _deeply_nested(self) -> bool: """Returns True if we have any nodes that themselves have children""" for directory in self.directories: if len(directory) > 0: return True return False
[docs]class DirectoryNode(ContainerNode, FilesystemNode): """A node representing an object system directory""" def __init__(self, path: os.PathSpec, parent: BaseNode = UNSET, entry: Dict = None, *, historian: mincepy.Historian = None): super().__init__(path=pathlib.PurePath(os.path.abspath(path)), parent=parent, entry=entry, historian=historian) if not db.fs.Entry.is_dir(self._entry): raise exceptions.NotADirectoryError(path) def __repr__(self): with io.StringIO() as stream: self.__stream_out__(stream) return stream.getvalue() def __copy__(self): """Create a copy with no parent""" dir_node = DirectoryNode(self.abspath, self._entry) # dir_node._children = [copy.copy(child) for child in self.children] dir_node._children = copy.copy(self._children) return dir_node def __contains__(self, item): # Have to expand if we're not already otherwise contains could incorrectly fail if not self.children: self.expand() return super().__contains__(item)
[docs] def expand(self, depth=1, populate_objects=False): # pylint: disable=unused-argument """Populate the children with what is currently in the database :param depth: expand to the given depth, 0 means no expansion, 1 means my child nodes, etc :param populate_objects: if True objects will have their records fetched immediately (as opposed to lazily when needed). This gives a large speedup when the client knows that the all or most of the details of the child objects will be needed as they can be fetched in one call. """ self._children = [] if depth == 0: return if depth > 0: child_expand_depth = depth - 1 else: child_expand_depth = -1 if CHILDREN in self._entry: self._children = self._entry[CHILDREN] else: from pyos import psh_lib def yield_results(): for child in db.fs.iter_children(self.entry_id, historian=self._hist): path = os.path.join(self._abspath, db.fs.Entry.name(child)) if db.fs.Entry.is_dir(child): dir_node = DirectoryNode(path, parent=self, entry=child, historian=self._hist) if abs(child_expand_depth) > 0: dir_node.expand(child_expand_depth) yield dir_node else: obj_node = ObjectNode(db.fs.Entry.id(child), path=path, parent=self, entry=child, historian=self._hist) yield obj_node self._children = psh_lib.results.CachingResults(yield_results())
[docs] def delete(self): # 1. Find all filesystem entries that need to be deleted descendents = tuple(db.fs.iter_descendents(self.entry_id, historian=self._hist)) obj_ids = tuple(db.fs.Entry.id(entry) for entry in descendents if db.fs.Entry.is_obj(entry)) # 2. Delete the objects if obj_ids: with self._hist.transaction(): self._hist.delete(*obj_ids) # 3. Delete the filesystem entries # pylint: disable=protected-access db.fs._delete_entries(*map(db.fs.Entry.id, descendents + (self._entry,)), historian=self._hist) self._invalidate_cache()
[docs] def move(self, dest: os.PathSpec, overwrite=False): dest = pathlib.Path(dest).resolve() / self.name os.rename(self._abspath, dest) self._abspath = dest
[docs] def rename(self, new_name: str): new_path = pathlib.Path(self.abspath.parent / new_name) os.rename(self.abspath, new_path) self._abspath = new_path
[docs]class ObjectNode(FilesystemNode): """A node that represents an object""" __slots__ = '_obj_id', '_record', '_children' @classmethod def from_path(cls, path: os.PathLike, historian: mincepy.Historian = None): full_path = os.path.abspath(path) entry = db.fs.find_entry(os.withdb.to_fs_path(full_path), historian=historian) if entry is None: raise ValueError(f"'{full_path}' is not a valid object path") if db.fs.Entry.is_dir(entry): raise exceptions.IsADirectoryError(path) obj_id = db.fs.Entry.id(entry) return ObjectNode(obj_id, path, entry=entry, historian=historian) def __init__(self, obj_id, path: os.PathSpec, record: mincepy.DataRecord = None, parent=None, entry: Dict = None, historian: mincepy.Historian = None): if record: assert obj_id == record.obj_id, "Obj id and record don't match!" super().__init__(entry_id=obj_id, path=path, parent=parent, entry=entry, historian=historian) if not db.fs.Entry.is_obj(self._entry): raise exceptions.FileNotFoundError(path) if not db.fs.Entry.id(self._entry) == obj_id: raise ValueError( f'Object id ({obj_id}) and entry id ({db.fs.Entry.id(self._entry)}) mismatch') self._obj_id = obj_id self._record = record # This will be lazily loaded if None self._children = tuple() # Can't have any children def __contains__(self, item): """Object nodes have no children and so do not contain anything""" return False def __copy__(self): """Make a copy with no parent""" return ObjectNode( self._obj_id, path=self._abspath, entry=self._entry, record=self._record, historian=self._hist, ) @property def record(self) -> mincepy.DataRecord: if self._record is None: # Lazily load self._record = self._hist.records.get(self.obj_id) return self._record @property def loaded(self): try: self._hist.get_obj(self._obj_id) return True except mincepy.NotFound: return False @property def obj(self) -> object: return self.record.load() @property def obj_id(self): return self._obj_id @property def type_id(self): return db.fs.Entry.type_id(self._entry) @property def type(self) -> Type: return self._hist.get_obj_type(self.type_id) @property def ctime(self): return db.fs.Entry.ctime(self._entry) @property def version(self): return db.fs.Entry.ver(self._entry) @property def mtime(self): return db.fs.Entry.stime(self._entry) @property def creator(self): return self.record.get_extra(mincepy.ExtraKeys.CREATED_BY) @property def meta(self) -> Optional[Dict]: return self._hist.meta.get(self._obj_id)
[docs] def delete(self): self._hist.delete(self._obj_id, imperative=False)
[docs] def move(self, dest: os.PathSpec, overwrite=False): dest = pathlib.Path(dest).resolve() / self.name db.rename(self.obj_id, dest) self._abspath = dest
[docs] def rename(self, new_name: str): new_name: pathlib.Path = pathlib.Path(self.abspath.parent / new_name) if new_name.is_dir(): raise exceptions.IsADirectoryError(new_name) try: db.rename(self._obj_id, new_name) except mincepy.DuplicateKeyError: raise RuntimeError(f"File with the name '{new_name}' already exists") from None
[docs]class ResultsNode(ContainerNode): def __init__(self, name='results', parent=None, historian: mincepy.Historian = None): super().__init__(name, parent, historian=historian) assert parent is None self._children = []
[docs] def append(self, node: FilesystemNode, display_name: str = None): """Append a node to the results""" node._parent = self # pylint: disable=protected-access display_name = display_name or node.name node.display_name = display_name self._children.append(node)
[docs] def extend(self, other: ContainerNode): """Extend this results using incorporating the entries of the other container""" for entry in other: self.append(entry)
class FrozenResultsNode(ContainerNode): def __init__(self, children: Iterable[FilesystemNode], name='results', parent=None, historian: mincepy.Historian = None): super().__init__(name, parent, historian=historian) assert parent is None self._children = children
[docs]@functools.singledispatch def to_node(entry, historian: mincepy.Historian = None) -> FilesystemNode: """Get the node for a given object. This can be either: 1. A directory path -> DirectoryNode 2. An object path -> ObjectNode """ raise ValueError(f'Unknown entry type: {entry}')
@to_node.register(FilesystemNode) def _(entry: FilesystemNode, historian: mincepy.Historian = None): return entry @to_node.register(os.PathLike) def _(path: os.PathLike, historian: mincepy.Historian = None): # Make sure we've got a pure path so we don't actually check that database path = os.path.abspath(path) fs_entry = db.fs.find_entry(os.withdb.to_fs_path(path), historian=historian) if fs_entry is None: raise ValueError(f"'{path}' is not a valid object path") if db.fs.Entry.is_dir(fs_entry): return DirectoryNode(path, entry=fs_entry, historian=historian) # Must be object return ObjectNode(db.fs.Entry.id(fs_entry), path=path, entry=fs_entry, historian=historian)