Module shoji.io.tensor_io

Expand source code
from typing import List, Tuple, Any, Union, Type, Dict, Optional
import numpy as np
import fdb
import shoji
import pickle
import logging
from .enums import Compartment


"""
# Tensor storage API

The tensor storage API handles reading and writing subsets of tensors defined by indices along each dimension,
which are translated to and from chunks as needed.
"""

@fdb.transactional
def create_tensor(tr: fdb.impl.Transaction, wsm: "shoji.WorkspaceManager", name: str, tensor: shoji.Tensor) -> None:
        """
        Creates a new tensor (but does not write the inits)

        If inits were provided, the tensor is marked as initializing, and will be invisible until the inits have been written
        """
        subdir = wsm._subdir
        # Check that name doesn't already exist
        existing = shoji.io.get_entity(tr, wsm, name)
        if existing is not None:
                raise AttributeError(f"Cannot overwrite {type(existing)} '{existing}' with a new shoji.Tensor (you must delete it first)")
        else:
                # Check that the dimensions of the tensor exist
                for ix, d in enumerate(tensor.dims):
                        if isinstance(d, str):
                                dim = shoji.io.get_dimension(tr, wsm, d)
                                if dim is None:
                                        raise KeyError(f"Tensor dimension '{d}' is not defined")
                                if dim.shape is not None:  # This is a fixed-length dimension
                                        if tensor.inits is not None and tensor.shape[ix] != dim.shape:
                                                raise IndexError(f"Mismatch between the declared shape {dim.shape} of dimension '{d}' and the shape {tensor.shape} of values")
                        elif isinstance(d, int):
                                if tensor.inits is not None and tensor.shape[ix] != d:
                                        raise IndexError(f"Mismatch between the declared shape {d} of dimension '{ix}' and the shape {tensor.shape} of values")

        key = subdir.pack((Compartment.Tensors, name))
        if tensor.rank > 0:
                tensor.shape = (0,) * tensor.rank
        if tensor.inits is not None:
                tensor.initializing = True
        tr[key] = pickle.dumps(tensor, protocol=4)
                

def initialize_tensor(wsm: "shoji.WorkspaceManager", name: str, tensor: shoji.Tensor):
        if tensor.inits is not None:
                if tensor.rank == 0:
                        write_at_indices(wsm._db.transaction, wsm, (Compartment.TensorValues, name), indices=[], chunk_sizes=(), values=tensor.inits.values)
                else:
                        # Hide the true dimensions so the append will not fail due to consistency checks
                        update_tensor(wsm._db.transaction, wsm, name, dims=(None,) * tensor.rank)
                        longest_axis = np.argmax(tensor.inits.shape)
                        append_values_multibatch(wsm, [name], [tensor.inits], axes=(longest_axis,))
                        # Unhide the dims and set the shape of the tensor
                        update_tensor(wsm._db.transaction, wsm, name, dims=tensor.dims, shape=tensor.inits.shape)
                # Complete the intitalization in one atomic operation
                finish_initialization(wsm._db.transaction, wsm, name)


@fdb.transactional
def finish_initialization(tr: fdb.impl.Transaction, wsm: "shoji.WorkspaceManager", name: str) -> None:
        tensor = shoji.io.get_tensor(tr, wsm, name, include_initializing=True)
        assert tensor.initializing
        tensor.initializing = False
        # Update the tensor definition to clear the initializing flag
        subdir = wsm._subdir
        key = subdir.pack((Compartment.Tensors, name))
        tr[key] = pickle.dumps(tensor, protocol=4)

        # Update the dimensions
        if tensor.rank > 0:
                for shape, dname in zip(tensor.shape, tensor.dims):
                        if isinstance(dname, str):
                                dim = wsm._get_dimension(dname)
                                if dim.length == 0:
                                        dim.length = shape
                                        shoji.io.create_dimension(tr, wsm, dname, dim)
                                elif dim.length != shape:
                                        raise ValueError(f"Length {shape} of new tensor '{name}' does not match length {dim.length} of dimension '{dname}' ")


@fdb.transactional
def update_tensor(tr: fdb.impl.Transaction, wsm: "shoji.WorkspaceManager", name: str, *, dims: Optional[Tuple[str, int, None]] = None, shape: Optional[Tuple[int]] = None) -> None:
        subdir = wsm._subdir
        tensor = wsm._get_tensor(name, include_initializing=True)
        if dims is not None:
                tensor.dims = dims
        if shape is not None:
                tensor.shape = shape
        key = subdir.pack((Compartment.Tensors, name))
        tr[key] = pickle.dumps(tensor, protocol=4)

@fdb.transactional
def write_at_indices(tr: fdb.impl.Transaction, wsm: "shoji.WorkspaceManager", key_prefix: Tuple[Any], indices: List[np.ndarray], chunk_sizes: Tuple[int], values: np.ndarray) -> int:
        """
        Write values corresponding to indices along each dimension (row indices, column indices, ...), automatically managing chunks as needed

        Args:
                tr: Transaction object
                subspace: The fdb DirectorySubspace under which the chunks are stored
                key_prefix: The tuple to use as prefix when storing the chunks
                indices: A list of numpy arrays giving the indices of the desired chunks
                chunk_sizes: A tuple of ints giving the size of chunks in each dimension
                values: An ndarray of values corresponding to the intersection of indices

        Returns:
                The number of bytes written
        """
        subspace = wsm._subdir

        rank = len(chunk_sizes)
        if len(indices) != rank:
                raise ValueError("indices and chunk_sizes must be same length")
        if rank == 0:
                # Write a single chunk since this is a scalar
                return shoji.io.write_chunks(tr, subspace, key_prefix, (), [values])

        # Figure out which chunks need to be written
        addresses_per_dim = [np.unique(ind // sz) for ind, sz in zip(indices, chunk_sizes)]
        # All combinations of addresses along each dimension
        addresses = np.array(np.meshgrid(*addresses_per_dim)).T.reshape(-1, len(indices))
        chunks = []
        for address in addresses:
                # At this point, we have a chunk address, and we have the indices
                # into the whole tensor. We need to figure out the relevant indices for this chunk,
                # and their offsets in the chunk, so that we can place the right values at the right place in
                # the chunk for writing. We also need to construct a mask if the chunk is not fully covered
                chunk_indices = []
                tensor_indices = []
                lengths = []
                for a, ind, sz in zip(address, indices, chunk_sizes):
                        start = np.searchsorted(ind, a * sz, side='left')
                        end = np.searchsorted(ind, (a + 1) * sz, side='left')
                        chunk_indices.append(ind[start:end] - a * sz)
                        tensor_indices.append(ind[start])
                        lengths.append(end - start)
                chunk = np.empty_like(values, shape=chunk_sizes)
                # Now figure out which part of the values tensor they correspond to (always a dense sub-tensor)
                starts = []
                for ind, min_ind in zip(indices, tensor_indices):
                        start = np.searchsorted(ind, min_ind, side='left')
                        starts.append(start)
                values_slices = tuple(slice(s, s + l) for s, l in zip(starts, lengths))

                # Finally, copy the correct subtensor of values into the right slots in the chunk
                chunk[np.ix_(*chunk_indices)] = values[values_slices]

                mask = np.ones(chunk_sizes, dtype=bool)
                mask[np.ix_(*chunk_indices)] = False
                if np.any(mask):
                        chunks.append(np.ma.masked_array(chunk, mask=mask))
                else:
                        chunks.append(chunk)

        return shoji.io.write_chunks(tr, subspace, key_prefix, addresses, chunks)


def read_at_indices(wsm: "shoji.WorkspaceManager", tensor: str, indices: List[np.ndarray], chunk_sizes: Tuple[int, ...], transactional: bool = True) -> np.ndarray:
        """
        Read values corresponding to indices along each dimension (row indices, column indices, ...), automatically managing chunks as needed

        Args:
                wsm: workspace 
                tensor: name of the tensor
                indices: A list of numpy arrays giving the indices of the desired chunks
                chunk_sizes: A tuple of ints giving the size of chunks in each dimension
                transactional: If false, read chunks in multiple batches adaptively

        Returns:
                data: The values at the intersection of each set of indices
        
        Remarks:
                All the relevant chunks must exist, or this function will throw an exception
        """
        subspace = wsm._subdir
        rank = len(chunk_sizes)
        if len(indices) != rank:
                raise ValueError("indices and chunk_sizes must be same length")
        if rank == 0:
                # Read a single chunk since this is a scalar
                return shoji.io.read_chunks(wsm._db.transaction, subspace, (Compartment.TensorValues, tensor), ())[0]

        # Figure out which chunks need to be read
        addresses_per_dim = [np.unique(ind // sz) for ind, sz in zip(indices, chunk_sizes)]
        # All combinations of addresses along each dimension
        addresses = np.array(np.meshgrid(*addresses_per_dim)).T.reshape(-1, len(indices))
        # Read the chunk data and unravel it into the result ndarray
        if transactional:
                chunks = shoji.io.read_chunks(wsm._db.transaction, subspace, (Compartment.TensorValues, tensor), addresses)
        else:
                chunks = shoji.io.read_chunks_multibatch(wsm._db.transaction, subspace, (Compartment.TensorValues, tensor), addresses)
        result = np.empty_like(chunks[0], shape=[len(i) for i in indices])
        for (address, chunk) in zip(addresses, chunks):
                # At this point, we have a chunk at a particular address, and we have the indices
                # into the whole tensor. We need to figure out the relevant indices for this chunk,
                # and their offsets in the chunk, so that we can extract the right values from
                # the chunk.
                # We then need to figure out the offsets of those indices into the
                # result tensor so that we can write the values in the right place.
                # The chunk_extract should be placed as a dense ndarray into the result ndarray,
                # so we only need to figure out the offsets along each dimension. This is
                # equivalent to the number of indices belonging to lower addresses
                # in all dimensions.
                chunk_indices = []
                lowest_indices = []
                for a, ind, sz in zip(address, indices, chunk_sizes):
                        start = np.searchsorted(ind, a * sz, side='left')
                        end = np.searchsorted(ind, (a + 1) * sz, side='left')
                        chunk_indices.append(ind[start:end] - a * sz)
                        lowest_indices.append(ind[start])
                chunk_extract = chunk[np.ix_(*chunk_indices)]

                offsets = []
                for ind, min_ind in zip(indices, lowest_indices):
                        offset = np.searchsorted(ind, min_ind, side='left')
                        offsets.append(offset)
                result[tuple([slice(a, a + b) for a, b in zip(offsets, chunk_extract.shape)])] = chunk_extract
        return result


def dtype_class(dtype) -> Union[Type[int], Type[float], Type[bool], Type[str]]:
        if dtype in ("uint8", "uint16", "uint32", "uint64", "int8", "int16", "int32", "int64"):
                return int
        elif dtype in ("float16", "float32", "float64"):
                return float
        elif dtype == "bool":
                return bool
        elif dtype == "string":
                return str
        else:
                raise TypeError()
        

@fdb.transactional
def append_values(tr: fdb.impl.Transaction, wsm: "shoji.WorkspaceManager", names: List[str], values: List[shoji.TensorValue], axes: Tuple[int]) -> int:
        """
        Returns:
                Number of bytes written
        
        Remarks:
                This function uses a transaction to preserve the invariant that all tensors that share a dimension have the same length
                along that dimension (or zero length)
        """
        subspace = wsm._subdir
        # n_rows = -1
        for name, vals, axis in zip(names, values, axes):
                assert isinstance(vals, shoji.TensorValue), f"Input values must be numpy shoji.TensorValue, but '{name}' was {type(vals)}"
                assert vals.rank >= 1, f"Input values must be at least 1-dimensional, but '{name}' was scalar"
                # if n_rows == -1:
                #       n_rows = vals.shape[axis]
                # elif vals.shape[axis] != n_rows:
                #       raise ValueError(f"Length (along relevant axis) of all tensors must be the same when appending")

        n_bytes_written = 0
        all_tensors = {t.name: t for t in shoji.io.list_tensors(tr, wsm, include_initializing=True)}
        tensors: Dict[str, shoji.Tensor] = {}

        # Check that all the tensors exist, and have the right dimensions
        dname = None
        for name, axis in zip(names, axes):
                if name not in all_tensors:
                        raise NameError(f"Tensor '{name}' does not exist in the workspace")
                tensor = all_tensors[name]
                tensors[name] = tensor
                if tensor.rank == 0:
                        raise ValueError(f"Cannot append to scalar tensor '{name}'")
                if tensor.dims[axis] is not None:
                        if isinstance(tensor.dims[axis], int):
                                if tensor.shape[axis] != 0 or vals.shape[axis] != tensor.dims[axis]:
                                        raise ValueError(f"Cannot append to fixed-length axis {axis} of tensor '{name}'")
                        if dname is None:
                                dname = tensor.dims[axis]
                        elif tensor.dims[axis] != dname:
                                raise ValueError(f"Cannot append to axis {axis} of tensor '{name}' because its dimension '{tensor.dims[axis]}' conflicts with dimension '{dname}' of another tensor")

        # Check the rank of the values, and the size along each axis
        new_length = 0
        for name, tensor, vals, axis in zip(names, tensors.values(), values, axes):
                if tensor.jagged:
                        for row in vals:
                                if tensor.rank != row.ndim + 1:  # type: ignore
                                        raise ValueError(f"Tensor '{name}' of rank {tensor.rank} cannot be appended with rank-{row.ndim + 1} array")  # type: ignore
                else:
                        if tensor.rank != vals.rank:  # type: ignore
                                raise ValueError(f"Tensor '{name}' of rank {tensor.rank} cannot be appended with rank-{vals.rank} array")  # type: ignore
                for i in range(tensor.rank):
                        if i == axis:
                                if new_length > 0 and tensor.shape[i] + vals.shape[i] != new_length:
                                        raise ValueError(f"Cannot append {vals.shape[i]} elements to tensor of length {tensor.shape[i]} along axis {i}, when another tensor will be {new_length} long along the same dimension")
                                new_length = tensor.shape[i] + vals.shape[i]  # new_length will be the same for every tensor, since they start the same, and we checked above that the values are the same length
                        else:
                                if tensor.shape[i] != 0 and tensor.shape[i] != vals.shape[i] and not tensor.jagged:
                                        raise ValueError(f"Cannot append values of shape {vals.shape} to tensor of shape {tensor.shape} along axis {axis}")

        # Check that all relevant tensors will have the right length after appending
        all_tensors = {n: t for n, t in all_tensors.items() if not t.initializing}  # Omit initializing tensors
        if dname is not None:
                for tensor in all_tensors.values():
                        if tensor.name not in tensors:
                                if dname in tensor.dims:
                                        length_along_dim = tensor.shape[tensor.dims.index(dname)]
                                        if length_along_dim != new_length and not np.prod(tensor.shape) == 0:
                                                raise ValueError(f"Length {length_along_dim} of tensor '{tensor.name}' along dimension '{dname}' would conflict with length {new_length} after appending")

        for name, tensor, vals, axis in zip(names, tensors.values(), values, axes):
                if tensor.jagged:
                        added_shape = vals.shape[axis]
                        # Write row by row
                        for i, row in enumerate(vals):
                                ix = tensor.shape[axis] + i
                                if axis == 0:
                                        indices = [np.array([ix])] + [np.arange(l) for l in row.shape]  # Just fill all the axes
                                else:
                                        indices = [np.array([0])] + [np.arange(l) for l in row.shape]  # Just fill all the axes
                                        indices[axis] += tensor.shape[axis]  # Except the one we're appending, which starts at end of axis
                                n_bytes_written += write_at_indices(tr, wsm, (Compartment.TensorValues, name), indices, tensor.chunks, row[None, ...])
                                # Update row tensor shape
                                key = subspace.pack((Compartment.TensorRowShapes, name, ix))
                                tr[key] = fdb.tuple.pack(tuple(int(x) for x in row.shape))

                        # Update tensor shape (use max length for non-first dimensions since this tensor is jagged)
                        shape = list(tensor.shape)
                        shape[axis] += added_shape
                        for i in range(len(shape)):
                                if shape[i] == 0:
                                        shape[i] = vals.shape[i]
                        update_tensor(tr, wsm, tensor.name, shape=tuple(shape))
                else:
                        indices = [np.arange(l) for l in vals.shape]  # Just fill all the axes
                        indices[axis] += tensor.shape[axis]  # Except the one we're appending, which starts at end of axis
                        if tensor.rank == 1:
                                # Write the index
                                for i, value in enumerate(vals):
                                        casted_value = dtype_class(tensor.dtype)(value)
                                        key = subspace.pack((Compartment.TensorIndex, name, casted_value, int(indices[0][i])))
                                        n_bytes_written += len(key)
                                        tr[key] = b''
                        n_bytes_written += write_at_indices(tr, wsm, (Compartment.TensorValues, name), indices, tensor.chunks, vals.values)
                        # Update tensor shape
                        shape = list(tensor.shape)
                        shape[axis] += vals.shape[axis]
                        for i in range(len(shape)):
                                if shape[i] == 0:
                                        shape[i] = vals.shape[i]
                        update_tensor(tr, wsm, tensor.name, shape=tuple(shape))
        # Update the dimension length
        if isinstance(dname, str):
                dim = shoji.io.get_dimension(tr, wsm, dname)
                dim.length = new_length
                shoji.io.create_dimension(tr, wsm, dname, dim)
        return n_bytes_written

def append_values_multibatch(wsm: "shoji.WorkspaceManager", tensors: List[str], values: List[shoji.TensorValue], axes: Tuple[int]) -> int:
        """
        Append values to a set of tensors, using multiple batches (transactions) if needed.

        Args:
                tensors: the names of the tensors to which values will be appended
                values: a list of ndarray objects to append (in the same order as the tensors)
                axes: a tuple giving the axis to which values should be appended, for each tensor

        Remarks:
                Values are appended along the given axis on each  tensor
                
                The batch size used when appending is adapted dynamically to maximize performance,
                while ensuring that the same number of (generalized) rows are appended to each tensor
                in each transaction.

                For each batch (transaction), the validity of appending values will be re-validated, to
                ensure safe concurrency
        """
        n_total = values[0].shape[axes[0]]
        total_bytes = sum([val.size_in_bytes() for val in values])
        n = int(max(1, 10_000_000 // (total_bytes // n_total)))
        total_bytes_written = 0
        n_bytes_written = 0
        ix = 0
        max_retries = 3
        while ix < n_total:
                # logging.info(f"Appending values to {tensors} with {n} rows per batch and at {ix}")
                try:
                        # Slice the values along the appending axis, without making copies (as np.take would do)
                        batches = []
                        for axis, vals in zip(axes, values):
                                slices = [slice(None)] * vals.rank
                                slices[axis] = slice(ix, ix + n)
                                batches.append(vals[tuple(slices)])
                        n_bytes_written = append_values(wsm._db.transaction, wsm, tensors, batches, axes)
                        total_bytes_written += n_bytes_written
                except fdb.impl.FDBError as e:
                        if e.code in (1004, 1007, 1031, 2101):
                                if n > 1:  # Too many bytes or too long time, so try again with less
                                        n = max(1, n // 2)
                                        continue
                                else:
                                        max_retries -= 1
                                        if max_retries > 0:
                                                print(f"Retrying after writing {n_bytes_written} of {total_bytes_written} bytes in {n} rows")
                                                continue
                                        else:
                                                raise e
                        else:
                                raise e
                ix += n
        return total_bytes_written

Functions

def append_values(tr: fdb.impl.Transaction, wsm: shoji.WorkspaceManager, names: List[str], values: List[TensorValue], axes: Tuple[int]) ‑> int

Returns

Number of bytes written

Remarks

This function uses a transaction to preserve the invariant that all tensors that share a dimension have the same length along that dimension (or zero length)

Expand source code
@fdb.transactional
def append_values(tr: fdb.impl.Transaction, wsm: "shoji.WorkspaceManager", names: List[str], values: List[shoji.TensorValue], axes: Tuple[int]) -> int:
        """
        Returns:
                Number of bytes written
        
        Remarks:
                This function uses a transaction to preserve the invariant that all tensors that share a dimension have the same length
                along that dimension (or zero length)
        """
        subspace = wsm._subdir
        # n_rows = -1
        for name, vals, axis in zip(names, values, axes):
                assert isinstance(vals, shoji.TensorValue), f"Input values must be numpy shoji.TensorValue, but '{name}' was {type(vals)}"
                assert vals.rank >= 1, f"Input values must be at least 1-dimensional, but '{name}' was scalar"
                # if n_rows == -1:
                #       n_rows = vals.shape[axis]
                # elif vals.shape[axis] != n_rows:
                #       raise ValueError(f"Length (along relevant axis) of all tensors must be the same when appending")

        n_bytes_written = 0
        all_tensors = {t.name: t for t in shoji.io.list_tensors(tr, wsm, include_initializing=True)}
        tensors: Dict[str, shoji.Tensor] = {}

        # Check that all the tensors exist, and have the right dimensions
        dname = None
        for name, axis in zip(names, axes):
                if name not in all_tensors:
                        raise NameError(f"Tensor '{name}' does not exist in the workspace")
                tensor = all_tensors[name]
                tensors[name] = tensor
                if tensor.rank == 0:
                        raise ValueError(f"Cannot append to scalar tensor '{name}'")
                if tensor.dims[axis] is not None:
                        if isinstance(tensor.dims[axis], int):
                                if tensor.shape[axis] != 0 or vals.shape[axis] != tensor.dims[axis]:
                                        raise ValueError(f"Cannot append to fixed-length axis {axis} of tensor '{name}'")
                        if dname is None:
                                dname = tensor.dims[axis]
                        elif tensor.dims[axis] != dname:
                                raise ValueError(f"Cannot append to axis {axis} of tensor '{name}' because its dimension '{tensor.dims[axis]}' conflicts with dimension '{dname}' of another tensor")

        # Check the rank of the values, and the size along each axis
        new_length = 0
        for name, tensor, vals, axis in zip(names, tensors.values(), values, axes):
                if tensor.jagged:
                        for row in vals:
                                if tensor.rank != row.ndim + 1:  # type: ignore
                                        raise ValueError(f"Tensor '{name}' of rank {tensor.rank} cannot be appended with rank-{row.ndim + 1} array")  # type: ignore
                else:
                        if tensor.rank != vals.rank:  # type: ignore
                                raise ValueError(f"Tensor '{name}' of rank {tensor.rank} cannot be appended with rank-{vals.rank} array")  # type: ignore
                for i in range(tensor.rank):
                        if i == axis:
                                if new_length > 0 and tensor.shape[i] + vals.shape[i] != new_length:
                                        raise ValueError(f"Cannot append {vals.shape[i]} elements to tensor of length {tensor.shape[i]} along axis {i}, when another tensor will be {new_length} long along the same dimension")
                                new_length = tensor.shape[i] + vals.shape[i]  # new_length will be the same for every tensor, since they start the same, and we checked above that the values are the same length
                        else:
                                if tensor.shape[i] != 0 and tensor.shape[i] != vals.shape[i] and not tensor.jagged:
                                        raise ValueError(f"Cannot append values of shape {vals.shape} to tensor of shape {tensor.shape} along axis {axis}")

        # Check that all relevant tensors will have the right length after appending
        all_tensors = {n: t for n, t in all_tensors.items() if not t.initializing}  # Omit initializing tensors
        if dname is not None:
                for tensor in all_tensors.values():
                        if tensor.name not in tensors:
                                if dname in tensor.dims:
                                        length_along_dim = tensor.shape[tensor.dims.index(dname)]
                                        if length_along_dim != new_length and not np.prod(tensor.shape) == 0:
                                                raise ValueError(f"Length {length_along_dim} of tensor '{tensor.name}' along dimension '{dname}' would conflict with length {new_length} after appending")

        for name, tensor, vals, axis in zip(names, tensors.values(), values, axes):
                if tensor.jagged:
                        added_shape = vals.shape[axis]
                        # Write row by row
                        for i, row in enumerate(vals):
                                ix = tensor.shape[axis] + i
                                if axis == 0:
                                        indices = [np.array([ix])] + [np.arange(l) for l in row.shape]  # Just fill all the axes
                                else:
                                        indices = [np.array([0])] + [np.arange(l) for l in row.shape]  # Just fill all the axes
                                        indices[axis] += tensor.shape[axis]  # Except the one we're appending, which starts at end of axis
                                n_bytes_written += write_at_indices(tr, wsm, (Compartment.TensorValues, name), indices, tensor.chunks, row[None, ...])
                                # Update row tensor shape
                                key = subspace.pack((Compartment.TensorRowShapes, name, ix))
                                tr[key] = fdb.tuple.pack(tuple(int(x) for x in row.shape))

                        # Update tensor shape (use max length for non-first dimensions since this tensor is jagged)
                        shape = list(tensor.shape)
                        shape[axis] += added_shape
                        for i in range(len(shape)):
                                if shape[i] == 0:
                                        shape[i] = vals.shape[i]
                        update_tensor(tr, wsm, tensor.name, shape=tuple(shape))
                else:
                        indices = [np.arange(l) for l in vals.shape]  # Just fill all the axes
                        indices[axis] += tensor.shape[axis]  # Except the one we're appending, which starts at end of axis
                        if tensor.rank == 1:
                                # Write the index
                                for i, value in enumerate(vals):
                                        casted_value = dtype_class(tensor.dtype)(value)
                                        key = subspace.pack((Compartment.TensorIndex, name, casted_value, int(indices[0][i])))
                                        n_bytes_written += len(key)
                                        tr[key] = b''
                        n_bytes_written += write_at_indices(tr, wsm, (Compartment.TensorValues, name), indices, tensor.chunks, vals.values)
                        # Update tensor shape
                        shape = list(tensor.shape)
                        shape[axis] += vals.shape[axis]
                        for i in range(len(shape)):
                                if shape[i] == 0:
                                        shape[i] = vals.shape[i]
                        update_tensor(tr, wsm, tensor.name, shape=tuple(shape))
        # Update the dimension length
        if isinstance(dname, str):
                dim = shoji.io.get_dimension(tr, wsm, dname)
                dim.length = new_length
                shoji.io.create_dimension(tr, wsm, dname, dim)
        return n_bytes_written
def append_values_multibatch(wsm: shoji.WorkspaceManager, tensors: List[str], values: List[TensorValue], axes: Tuple[int]) ‑> int

Append values to a set of tensors, using multiple batches (transactions) if needed.

Args

tensors
the names of the tensors to which values will be appended
values
a list of ndarray objects to append (in the same order as the tensors)
axes
a tuple giving the axis to which values should be appended, for each tensor

Remarks

Values are appended along the given axis on each tensor

The batch size used when appending is adapted dynamically to maximize performance, while ensuring that the same number of (generalized) rows are appended to each tensor in each transaction.

For each batch (transaction), the validity of appending values will be re-validated, to ensure safe concurrency

Expand source code
def append_values_multibatch(wsm: "shoji.WorkspaceManager", tensors: List[str], values: List[shoji.TensorValue], axes: Tuple[int]) -> int:
        """
        Append values to a set of tensors, using multiple batches (transactions) if needed.

        Args:
                tensors: the names of the tensors to which values will be appended
                values: a list of ndarray objects to append (in the same order as the tensors)
                axes: a tuple giving the axis to which values should be appended, for each tensor

        Remarks:
                Values are appended along the given axis on each  tensor
                
                The batch size used when appending is adapted dynamically to maximize performance,
                while ensuring that the same number of (generalized) rows are appended to each tensor
                in each transaction.

                For each batch (transaction), the validity of appending values will be re-validated, to
                ensure safe concurrency
        """
        n_total = values[0].shape[axes[0]]
        total_bytes = sum([val.size_in_bytes() for val in values])
        n = int(max(1, 10_000_000 // (total_bytes // n_total)))
        total_bytes_written = 0
        n_bytes_written = 0
        ix = 0
        max_retries = 3
        while ix < n_total:
                # logging.info(f"Appending values to {tensors} with {n} rows per batch and at {ix}")
                try:
                        # Slice the values along the appending axis, without making copies (as np.take would do)
                        batches = []
                        for axis, vals in zip(axes, values):
                                slices = [slice(None)] * vals.rank
                                slices[axis] = slice(ix, ix + n)
                                batches.append(vals[tuple(slices)])
                        n_bytes_written = append_values(wsm._db.transaction, wsm, tensors, batches, axes)
                        total_bytes_written += n_bytes_written
                except fdb.impl.FDBError as e:
                        if e.code in (1004, 1007, 1031, 2101):
                                if n > 1:  # Too many bytes or too long time, so try again with less
                                        n = max(1, n // 2)
                                        continue
                                else:
                                        max_retries -= 1
                                        if max_retries > 0:
                                                print(f"Retrying after writing {n_bytes_written} of {total_bytes_written} bytes in {n} rows")
                                                continue
                                        else:
                                                raise e
                        else:
                                raise e
                ix += n
        return total_bytes_written
def create_tensor(tr: fdb.impl.Transaction, wsm: shoji.WorkspaceManager, name: str, tensor: Tensor) ‑> NoneType

Creates a new tensor (but does not write the inits)

If inits were provided, the tensor is marked as initializing, and will be invisible until the inits have been written

Expand source code
@fdb.transactional
def create_tensor(tr: fdb.impl.Transaction, wsm: "shoji.WorkspaceManager", name: str, tensor: shoji.Tensor) -> None:
        """
        Creates a new tensor (but does not write the inits)

        If inits were provided, the tensor is marked as initializing, and will be invisible until the inits have been written
        """
        subdir = wsm._subdir
        # Check that name doesn't already exist
        existing = shoji.io.get_entity(tr, wsm, name)
        if existing is not None:
                raise AttributeError(f"Cannot overwrite {type(existing)} '{existing}' with a new shoji.Tensor (you must delete it first)")
        else:
                # Check that the dimensions of the tensor exist
                for ix, d in enumerate(tensor.dims):
                        if isinstance(d, str):
                                dim = shoji.io.get_dimension(tr, wsm, d)
                                if dim is None:
                                        raise KeyError(f"Tensor dimension '{d}' is not defined")
                                if dim.shape is not None:  # This is a fixed-length dimension
                                        if tensor.inits is not None and tensor.shape[ix] != dim.shape:
                                                raise IndexError(f"Mismatch between the declared shape {dim.shape} of dimension '{d}' and the shape {tensor.shape} of values")
                        elif isinstance(d, int):
                                if tensor.inits is not None and tensor.shape[ix] != d:
                                        raise IndexError(f"Mismatch between the declared shape {d} of dimension '{ix}' and the shape {tensor.shape} of values")

        key = subdir.pack((Compartment.Tensors, name))
        if tensor.rank > 0:
                tensor.shape = (0,) * tensor.rank
        if tensor.inits is not None:
                tensor.initializing = True
        tr[key] = pickle.dumps(tensor, protocol=4)
def dtype_class(dtype) ‑> Union[Type[int], Type[float], Type[bool], Type[str]]
Expand source code
def dtype_class(dtype) -> Union[Type[int], Type[float], Type[bool], Type[str]]:
        if dtype in ("uint8", "uint16", "uint32", "uint64", "int8", "int16", "int32", "int64"):
                return int
        elif dtype in ("float16", "float32", "float64"):
                return float
        elif dtype == "bool":
                return bool
        elif dtype == "string":
                return str
        else:
                raise TypeError()
def finish_initialization(tr: fdb.impl.Transaction, wsm: shoji.WorkspaceManager, name: str) ‑> NoneType
Expand source code
@fdb.transactional
def finish_initialization(tr: fdb.impl.Transaction, wsm: "shoji.WorkspaceManager", name: str) -> None:
        tensor = shoji.io.get_tensor(tr, wsm, name, include_initializing=True)
        assert tensor.initializing
        tensor.initializing = False
        # Update the tensor definition to clear the initializing flag
        subdir = wsm._subdir
        key = subdir.pack((Compartment.Tensors, name))
        tr[key] = pickle.dumps(tensor, protocol=4)

        # Update the dimensions
        if tensor.rank > 0:
                for shape, dname in zip(tensor.shape, tensor.dims):
                        if isinstance(dname, str):
                                dim = wsm._get_dimension(dname)
                                if dim.length == 0:
                                        dim.length = shape
                                        shoji.io.create_dimension(tr, wsm, dname, dim)
                                elif dim.length != shape:
                                        raise ValueError(f"Length {shape} of new tensor '{name}' does not match length {dim.length} of dimension '{dname}' ")
def initialize_tensor(wsm: shoji.WorkspaceManager, name: str, tensor: Tensor)
Expand source code
def initialize_tensor(wsm: "shoji.WorkspaceManager", name: str, tensor: shoji.Tensor):
        if tensor.inits is not None:
                if tensor.rank == 0:
                        write_at_indices(wsm._db.transaction, wsm, (Compartment.TensorValues, name), indices=[], chunk_sizes=(), values=tensor.inits.values)
                else:
                        # Hide the true dimensions so the append will not fail due to consistency checks
                        update_tensor(wsm._db.transaction, wsm, name, dims=(None,) * tensor.rank)
                        longest_axis = np.argmax(tensor.inits.shape)
                        append_values_multibatch(wsm, [name], [tensor.inits], axes=(longest_axis,))
                        # Unhide the dims and set the shape of the tensor
                        update_tensor(wsm._db.transaction, wsm, name, dims=tensor.dims, shape=tensor.inits.shape)
                # Complete the intitalization in one atomic operation
                finish_initialization(wsm._db.transaction, wsm, name)
def read_at_indices(wsm: shoji.WorkspaceManager, tensor: str, indices: List[numpy.ndarray], chunk_sizes: Tuple[int, ...], transactional: bool = True) ‑> numpy.ndarray

Read values corresponding to indices along each dimension (row indices, column indices, …), automatically managing chunks as needed

Args

wsm
workspace
tensor
name of the tensor
indices
A list of numpy arrays giving the indices of the desired chunks
chunk_sizes
A tuple of ints giving the size of chunks in each dimension
transactional
If false, read chunks in multiple batches adaptively

Returns

data
The values at the intersection of each set of indices

Remarks

All the relevant chunks must exist, or this function will throw an exception

Expand source code
def read_at_indices(wsm: "shoji.WorkspaceManager", tensor: str, indices: List[np.ndarray], chunk_sizes: Tuple[int, ...], transactional: bool = True) -> np.ndarray:
        """
        Read values corresponding to indices along each dimension (row indices, column indices, ...), automatically managing chunks as needed

        Args:
                wsm: workspace 
                tensor: name of the tensor
                indices: A list of numpy arrays giving the indices of the desired chunks
                chunk_sizes: A tuple of ints giving the size of chunks in each dimension
                transactional: If false, read chunks in multiple batches adaptively

        Returns:
                data: The values at the intersection of each set of indices
        
        Remarks:
                All the relevant chunks must exist, or this function will throw an exception
        """
        subspace = wsm._subdir
        rank = len(chunk_sizes)
        if len(indices) != rank:
                raise ValueError("indices and chunk_sizes must be same length")
        if rank == 0:
                # Read a single chunk since this is a scalar
                return shoji.io.read_chunks(wsm._db.transaction, subspace, (Compartment.TensorValues, tensor), ())[0]

        # Figure out which chunks need to be read
        addresses_per_dim = [np.unique(ind // sz) for ind, sz in zip(indices, chunk_sizes)]
        # All combinations of addresses along each dimension
        addresses = np.array(np.meshgrid(*addresses_per_dim)).T.reshape(-1, len(indices))
        # Read the chunk data and unravel it into the result ndarray
        if transactional:
                chunks = shoji.io.read_chunks(wsm._db.transaction, subspace, (Compartment.TensorValues, tensor), addresses)
        else:
                chunks = shoji.io.read_chunks_multibatch(wsm._db.transaction, subspace, (Compartment.TensorValues, tensor), addresses)
        result = np.empty_like(chunks[0], shape=[len(i) for i in indices])
        for (address, chunk) in zip(addresses, chunks):
                # At this point, we have a chunk at a particular address, and we have the indices
                # into the whole tensor. We need to figure out the relevant indices for this chunk,
                # and their offsets in the chunk, so that we can extract the right values from
                # the chunk.
                # We then need to figure out the offsets of those indices into the
                # result tensor so that we can write the values in the right place.
                # The chunk_extract should be placed as a dense ndarray into the result ndarray,
                # so we only need to figure out the offsets along each dimension. This is
                # equivalent to the number of indices belonging to lower addresses
                # in all dimensions.
                chunk_indices = []
                lowest_indices = []
                for a, ind, sz in zip(address, indices, chunk_sizes):
                        start = np.searchsorted(ind, a * sz, side='left')
                        end = np.searchsorted(ind, (a + 1) * sz, side='left')
                        chunk_indices.append(ind[start:end] - a * sz)
                        lowest_indices.append(ind[start])
                chunk_extract = chunk[np.ix_(*chunk_indices)]

                offsets = []
                for ind, min_ind in zip(indices, lowest_indices):
                        offset = np.searchsorted(ind, min_ind, side='left')
                        offsets.append(offset)
                result[tuple([slice(a, a + b) for a, b in zip(offsets, chunk_extract.shape)])] = chunk_extract
        return result
def update_tensor(tr: fdb.impl.Transaction, wsm: shoji.WorkspaceManager, name: str, *, dims: Union[Tuple[str, int, NoneType], NoneType] = None, shape: Union[Tuple[int], NoneType] = None) ‑> NoneType
Expand source code
@fdb.transactional
def update_tensor(tr: fdb.impl.Transaction, wsm: "shoji.WorkspaceManager", name: str, *, dims: Optional[Tuple[str, int, None]] = None, shape: Optional[Tuple[int]] = None) -> None:
        subdir = wsm._subdir
        tensor = wsm._get_tensor(name, include_initializing=True)
        if dims is not None:
                tensor.dims = dims
        if shape is not None:
                tensor.shape = shape
        key = subdir.pack((Compartment.Tensors, name))
        tr[key] = pickle.dumps(tensor, protocol=4)
def write_at_indices(tr: fdb.impl.Transaction, wsm: shoji.WorkspaceManager, key_prefix: Tuple[Any], indices: List[numpy.ndarray], chunk_sizes: Tuple[int], values: numpy.ndarray) ‑> int

Write values corresponding to indices along each dimension (row indices, column indices, …), automatically managing chunks as needed

Args

tr
Transaction object
subspace
The fdb DirectorySubspace under which the chunks are stored
key_prefix
The tuple to use as prefix when storing the chunks
indices
A list of numpy arrays giving the indices of the desired chunks
chunk_sizes
A tuple of ints giving the size of chunks in each dimension
values
An ndarray of values corresponding to the intersection of indices

Returns

The number of bytes written

Expand source code
@fdb.transactional
def write_at_indices(tr: fdb.impl.Transaction, wsm: "shoji.WorkspaceManager", key_prefix: Tuple[Any], indices: List[np.ndarray], chunk_sizes: Tuple[int], values: np.ndarray) -> int:
        """
        Write values corresponding to indices along each dimension (row indices, column indices, ...), automatically managing chunks as needed

        Args:
                tr: Transaction object
                subspace: The fdb DirectorySubspace under which the chunks are stored
                key_prefix: The tuple to use as prefix when storing the chunks
                indices: A list of numpy arrays giving the indices of the desired chunks
                chunk_sizes: A tuple of ints giving the size of chunks in each dimension
                values: An ndarray of values corresponding to the intersection of indices

        Returns:
                The number of bytes written
        """
        subspace = wsm._subdir

        rank = len(chunk_sizes)
        if len(indices) != rank:
                raise ValueError("indices and chunk_sizes must be same length")
        if rank == 0:
                # Write a single chunk since this is a scalar
                return shoji.io.write_chunks(tr, subspace, key_prefix, (), [values])

        # Figure out which chunks need to be written
        addresses_per_dim = [np.unique(ind // sz) for ind, sz in zip(indices, chunk_sizes)]
        # All combinations of addresses along each dimension
        addresses = np.array(np.meshgrid(*addresses_per_dim)).T.reshape(-1, len(indices))
        chunks = []
        for address in addresses:
                # At this point, we have a chunk address, and we have the indices
                # into the whole tensor. We need to figure out the relevant indices for this chunk,
                # and their offsets in the chunk, so that we can place the right values at the right place in
                # the chunk for writing. We also need to construct a mask if the chunk is not fully covered
                chunk_indices = []
                tensor_indices = []
                lengths = []
                for a, ind, sz in zip(address, indices, chunk_sizes):
                        start = np.searchsorted(ind, a * sz, side='left')
                        end = np.searchsorted(ind, (a + 1) * sz, side='left')
                        chunk_indices.append(ind[start:end] - a * sz)
                        tensor_indices.append(ind[start])
                        lengths.append(end - start)
                chunk = np.empty_like(values, shape=chunk_sizes)
                # Now figure out which part of the values tensor they correspond to (always a dense sub-tensor)
                starts = []
                for ind, min_ind in zip(indices, tensor_indices):
                        start = np.searchsorted(ind, min_ind, side='left')
                        starts.append(start)
                values_slices = tuple(slice(s, s + l) for s, l in zip(starts, lengths))

                # Finally, copy the correct subtensor of values into the right slots in the chunk
                chunk[np.ix_(*chunk_indices)] = values[values_slices]

                mask = np.ones(chunk_sizes, dtype=bool)
                mask[np.ix_(*chunk_indices)] = False
                if np.any(mask):
                        chunks.append(np.ma.masked_array(chunk, mask=mask))
                else:
                        chunks.append(chunk)

        return shoji.io.write_chunks(tr, subspace, key_prefix, addresses, chunks)