Module shoji.io.workspace_io

Expand source code
from typing import List, Union, Optional
import fdb
import shoji
import pickle
from .enums import Compartment


"""
# General functions for managing workspaces and their content
"""

@fdb.transactional
def get_entity(tr: fdb.impl.Transaction, wsm: "shoji.WorkspaceManager", name: str) -> Optional[Union[shoji.Dimension, shoji.Tensor, "shoji.WorkspaceManager"]]:
        t = get_tensor(tr, wsm, name)
        if t is not None:
                return t
        d = get_dimension(tr, wsm, name)
        if d is not None:
                return d
        s = get_workspace(tr, wsm, name)
        if s is not None:
                return s
        return None


@fdb.transactional
def get_workspace(tr: fdb.impl.Transaction, wsm: "shoji.WorkspaceManager", name: str) -> Optional["shoji.WorkspaceManager"]:
        subdir = wsm._subdir
        if subdir.exists(tr, name):
                child = subdir.open(tr, name)
                wsm = shoji.WorkspaceManager(wsm._db, child, wsm._path + (name,))
                wsm._name = name
                return wsm
        return None


@fdb.transactional
def get_dimension(tr: fdb.impl.Transaction, wsm: "shoji.WorkspaceManager", name: str) -> Optional[shoji.Dimension]:
        subdir = wsm._subdir
        val = tr[subdir[Compartment.Dimensions][name]]
        if val.present():
                dim = pickle.loads(val.value)
                dim.name = name
                dim.wsm = wsm
                return dim
        return None


@fdb.transactional
def get_tensor(tr: fdb.impl.Transaction, wsm: "shoji.WorkspaceManager", name: str, include_initializing: bool = False) -> Optional[shoji.Tensor]:
        subdir = wsm._subdir
        val = tr[subdir.pack((Compartment.Tensors, name))]
        if val.present():
                tensor = pickle.loads(val.value)
                tensor.name = name
                tensor.wsm = wsm
                if tensor.initializing and not include_initializing:
                        return None
                return tensor
        return None


@fdb.transactional
def list_workspaces(tr: fdb.impl.Transaction, wsm: "shoji.WorkspaceManager") -> List["shoji.WorkspaceManager"]:
        return [get_workspace(tr, wsm, name) for name in wsm._subdir.list(tr)]


@fdb.transactional
def list_dimensions(tr: fdb.impl.Transaction, wsm: "shoji.WorkspaceManager") -> List[shoji.Dimension]:
        result = []
        for kv in tr[wsm._subdir[Compartment.Dimensions].range()]:
                dim = pickle.loads(kv.value)
                dim.name = wsm._subdir[Compartment.Dimensions].unpack(kv.key)[0]
                dim.wsm = wsm
                result.append(dim)
        return result


@fdb.transactional
def list_tensors(tr: fdb.impl.Transaction, wsm: "shoji.WorkspaceManager", include_initializing: bool = False) -> List[shoji.Tensor]:
        result = []
        for kv in tr[wsm._subdir[Compartment.Tensors].range()]:
                tensor = pickle.loads(kv.value)
                if tensor.initializing and not include_initializing:
                        continue
                tensor.name = wsm._subdir[Compartment.Tensors].unpack(kv.key)[0]
                tensor.wsm = wsm
                result.append(tensor)
        return result


@fdb.transactional
def delete_entity(tr: fdb.impl.Transaction, wsm: "shoji.WorkspaceManager", name: str) -> None:
        subdir = wsm._subdir
        if subdir.exists(tr, name):
                tr.clear_range_startswith(subdir.key())
                subdir.open(tr, name).remove(tr)
        elif tr[subdir[Compartment.Dimensions][name]].present():
                tr.clear_range_startswith(subdir[Compartment.Dimensions][name].key())
        elif tr[subdir[Compartment.Tensors][name]].present():
                tr.clear_range_startswith(subdir[Compartment.Tensors][name].key())
                tr.clear_range_startswith(subdir[Compartment.TensorValues][name].key())
                tr.clear_range_startswith(subdir[Compartment.TensorIndex][name].key())
                tr.clear_range_startswith(subdir[Compartment.TensorRowShapes][name].key())


@fdb.transactional
def create_dimension(tr, wsm: "shoji.WorkspaceManager", name: str, dim: shoji.Dimension):
        """
        Create a dimension in the workspace, overwriting any existing dimension of the same name
        """
        subdir = wsm._subdir
        # Check that name doesn't already exist
        existing = get_entity(tr, wsm, name)
        if existing is not None:
                if not isinstance(existing, shoji.Dimension):
                        raise AttributeError(f"Cannot overwrite {type(existing)} '{existing}' with a new shoji.Dimension (you must delete it first)")
                # Update an existing dimension
                prev_dim = existing
                if prev_dim.shape != dim.shape:
                        if isinstance(dim.shape, int):
                                # Changing to a fixed shape, so we must check that all relevant tensors agree
                                all_tensors: List[shoji.Tensor] = list_tensors(tr, wsm)
                                for tensor in all_tensors:
                                        if tensor.rank > 0 and tensor.dims[0] == name:
                                                if tensor.shape[0] != dim.shape:
                                                        raise AttributeError(f"New shape {dim.shape} of existing dimension '{name}' conflicts with length {tensor.shape[0]} of existing tensor '{tensor.name}'")
                                dim.length = dim.shape
                        else:
                                dim.length = prev_dim.length
        # Create or update the dimension
        tr[subdir[Compartment.Dimensions][name]] = pickle.dumps(dim, protocol=4)

Functions

def create_dimension(tr, wsm: shoji.WorkspaceManager, name: str, dim: Dimension)

Create a dimension in the workspace, overwriting any existing dimension of the same name

Expand source code
@fdb.transactional
def create_dimension(tr, wsm: "shoji.WorkspaceManager", name: str, dim: shoji.Dimension):
        """
        Create a dimension in the workspace, overwriting any existing dimension of the same name
        """
        subdir = wsm._subdir
        # Check that name doesn't already exist
        existing = get_entity(tr, wsm, name)
        if existing is not None:
                if not isinstance(existing, shoji.Dimension):
                        raise AttributeError(f"Cannot overwrite {type(existing)} '{existing}' with a new shoji.Dimension (you must delete it first)")
                # Update an existing dimension
                prev_dim = existing
                if prev_dim.shape != dim.shape:
                        if isinstance(dim.shape, int):
                                # Changing to a fixed shape, so we must check that all relevant tensors agree
                                all_tensors: List[shoji.Tensor] = list_tensors(tr, wsm)
                                for tensor in all_tensors:
                                        if tensor.rank > 0 and tensor.dims[0] == name:
                                                if tensor.shape[0] != dim.shape:
                                                        raise AttributeError(f"New shape {dim.shape} of existing dimension '{name}' conflicts with length {tensor.shape[0]} of existing tensor '{tensor.name}'")
                                dim.length = dim.shape
                        else:
                                dim.length = prev_dim.length
        # Create or update the dimension
        tr[subdir[Compartment.Dimensions][name]] = pickle.dumps(dim, protocol=4)
def delete_entity(tr: fdb.impl.Transaction, wsm: shoji.WorkspaceManager, name: str) ‑> NoneType
Expand source code
@fdb.transactional
def delete_entity(tr: fdb.impl.Transaction, wsm: "shoji.WorkspaceManager", name: str) -> None:
        subdir = wsm._subdir
        if subdir.exists(tr, name):
                tr.clear_range_startswith(subdir.key())
                subdir.open(tr, name).remove(tr)
        elif tr[subdir[Compartment.Dimensions][name]].present():
                tr.clear_range_startswith(subdir[Compartment.Dimensions][name].key())
        elif tr[subdir[Compartment.Tensors][name]].present():
                tr.clear_range_startswith(subdir[Compartment.Tensors][name].key())
                tr.clear_range_startswith(subdir[Compartment.TensorValues][name].key())
                tr.clear_range_startswith(subdir[Compartment.TensorIndex][name].key())
                tr.clear_range_startswith(subdir[Compartment.TensorRowShapes][name].key())
def get_dimension(tr: fdb.impl.Transaction, wsm: shoji.WorkspaceManager, name: str) ‑> Union[Dimension, NoneType]
Expand source code
@fdb.transactional
def get_dimension(tr: fdb.impl.Transaction, wsm: "shoji.WorkspaceManager", name: str) -> Optional[shoji.Dimension]:
        subdir = wsm._subdir
        val = tr[subdir[Compartment.Dimensions][name]]
        if val.present():
                dim = pickle.loads(val.value)
                dim.name = name
                dim.wsm = wsm
                return dim
        return None
def get_entity(tr: fdb.impl.Transaction, wsm: shoji.WorkspaceManager, name: str) ‑> Union[DimensionTensorWorkspaceManager, NoneType]
Expand source code
@fdb.transactional
def get_entity(tr: fdb.impl.Transaction, wsm: "shoji.WorkspaceManager", name: str) -> Optional[Union[shoji.Dimension, shoji.Tensor, "shoji.WorkspaceManager"]]:
        t = get_tensor(tr, wsm, name)
        if t is not None:
                return t
        d = get_dimension(tr, wsm, name)
        if d is not None:
                return d
        s = get_workspace(tr, wsm, name)
        if s is not None:
                return s
        return None
def get_tensor(tr: fdb.impl.Transaction, wsm: shoji.WorkspaceManager, name: str, include_initializing: bool = False) ‑> Union[Tensor, NoneType]
Expand source code
@fdb.transactional
def get_tensor(tr: fdb.impl.Transaction, wsm: "shoji.WorkspaceManager", name: str, include_initializing: bool = False) -> Optional[shoji.Tensor]:
        subdir = wsm._subdir
        val = tr[subdir.pack((Compartment.Tensors, name))]
        if val.present():
                tensor = pickle.loads(val.value)
                tensor.name = name
                tensor.wsm = wsm
                if tensor.initializing and not include_initializing:
                        return None
                return tensor
        return None
def get_workspace(tr: fdb.impl.Transaction, wsm: shoji.WorkspaceManager, name: str) ‑> Union[WorkspaceManager, NoneType]
Expand source code
@fdb.transactional
def get_workspace(tr: fdb.impl.Transaction, wsm: "shoji.WorkspaceManager", name: str) -> Optional["shoji.WorkspaceManager"]:
        subdir = wsm._subdir
        if subdir.exists(tr, name):
                child = subdir.open(tr, name)
                wsm = shoji.WorkspaceManager(wsm._db, child, wsm._path + (name,))
                wsm._name = name
                return wsm
        return None
def list_dimensions(tr: fdb.impl.Transaction, wsm: shoji.WorkspaceManager) ‑> List[Dimension]
Expand source code
@fdb.transactional
def list_dimensions(tr: fdb.impl.Transaction, wsm: "shoji.WorkspaceManager") -> List[shoji.Dimension]:
        result = []
        for kv in tr[wsm._subdir[Compartment.Dimensions].range()]:
                dim = pickle.loads(kv.value)
                dim.name = wsm._subdir[Compartment.Dimensions].unpack(kv.key)[0]
                dim.wsm = wsm
                result.append(dim)
        return result
def list_tensors(tr: fdb.impl.Transaction, wsm: shoji.WorkspaceManager, include_initializing: bool = False) ‑> List[Tensor]
Expand source code
@fdb.transactional
def list_tensors(tr: fdb.impl.Transaction, wsm: "shoji.WorkspaceManager", include_initializing: bool = False) -> List[shoji.Tensor]:
        result = []
        for kv in tr[wsm._subdir[Compartment.Tensors].range()]:
                tensor = pickle.loads(kv.value)
                if tensor.initializing and not include_initializing:
                        continue
                tensor.name = wsm._subdir[Compartment.Tensors].unpack(kv.key)[0]
                tensor.wsm = wsm
                result.append(tensor)
        return result
def list_workspaces(tr: fdb.impl.Transaction, wsm: shoji.WorkspaceManager) ‑> List[WorkspaceManager]
Expand source code
@fdb.transactional
def list_workspaces(tr: fdb.impl.Transaction, wsm: "shoji.WorkspaceManager") -> List["shoji.WorkspaceManager"]:
        return [get_workspace(tr, wsm, name) for name in wsm._subdir.list(tr)]