Module shoji.io.chunked_io

Expand source code
from typing import List, Tuple, Any, Union
import numpy as np
import fdb
import blosc
from numpy.ma.core import MaskedArray
import time
from multiprocessing import Pool, cpu_count
from itertools import chain


"""
# Chunked storage API

Shoji uses FoundationDB, a scalable and resilient key-value database, as its backing store. In order to bridge the mismatch between
a key-value store and a tensor database, the chunked storage API layer implements an N-dimensional compressed chunk storage layer.

Chunks are arbitrary numpy arrays, although they are intended to store chunks of N-dimensional tensors. Chunks are 
addressed using N-tuples of ordered integers, such as (0, 10, 9). Addresses can be viewed simply as abstract pointers, 
although they are intended to correspond to chunk offsets along each dimension of an N-dimensional tensor.

Chunks are stored in a FoundationDB subspace and under a specific key prefix (intended to store a single tensor). All chunks that 
are in the same subspace and using the same key prefix must use addresses of the same length (intended to correspond to the rank of a tensor). 

The chunked storage API layer provides functions for reading and writing sets of chunks using on-the-fly compression.

Note that the chunk address space need not be densely filled. That is, if a chunk exists at (10, 9, 3), this does not mean that 
chunks must exist at (9, 8, 1) or any other address. Reading from an empty address returns None. Writing to a non-empty address
silently overwrites the existing chunk.

"""


@fdb.transactional
def write_chunks(tr: fdb.impl.Transaction, subspace: fdb.directory_impl.DirectorySubspace, key_prefix: Tuple[Any], addresses: np.ndarray, chunks: List[Union[np.ndarray, MaskedArray]]) -> int:
        """
        Write a list of chunks to the database, optionally using mask to write only partially

        Args:
                tr: Transaction object
                subspace: The fdb DirectorySubspace under which the chunks are stored
                key_prefix: The tuple to use as prefix when storing the chunks
                addresses: An (n_chunks, n_dim) numpy array giving the addresses of the desired chunks, along each dimension
                chunks: List of chunks, each of which can optionally be a numpy masked array

        Returns:
                The number of bytes written
        
        Remarks:
                Chunks can be given as numpy masked arrays, and masked values will be filled by the corresponding values from 
                the current chunk at the same address (which must exist). This can be used to selectively update
                only parts of chunks, e.g. when updating part of a tensor or appending values that are nonaligned with chunk edges.
        """
        # logging.info(f"Writing {addresses.shape[0]} chunks starting at {addresses[0]}")
        n_bytes_written = 0
        if len(addresses) == 0:  # writing a scalar
                key = subspace.pack(key_prefix)
                encoded = blosc.pack_array(chunks[0])
                n_bytes_written += len(key) + len(encoded)
                tr[key] = encoded
                return n_bytes_written

        for address, chunk in zip(addresses, chunks):
                key = subspace.pack(key_prefix + tuple(int(x) for x in address))
                if isinstance(chunk, np.ma.MaskedArray):
                        mask = np.ma.getmask(chunk)
                        if np.any(mask):
                                prev_value = read_chunks(tr, subspace, key_prefix, address[None, :])[0]
                                if prev_value is not None:
                                        chunk[mask] = prev_value[mask]
                        chunk = chunk.data
                # chunk is now an ndarray (not masked)
                encoded = blosc.pack_array(chunk)
                n_bytes_written += len(key) + len(encoded)
                tr[key] = encoded
        # logging.info(f"Wrote {addresses.shape[0]} chunks starting at {addresses[0]}, total of {n_bytes_written:,} bytes")
        return n_bytes_written

@fdb.transactional
def read_chunks(tr: fdb.impl.Transaction, subspace: fdb.directory_impl.DirectorySubspace, key_prefix: Tuple[Any], addresses: np.ndarray) -> List[np.ndarray]:
        """
        Read a list of chunks from the database, using a transaction

        Args:
                tr: Transaction object
                subspace: The fdb DirectorySubspace under which the chunks are stored
                key_prefix: The tuple to use as prefix when storing the chunks
                addresses: An (n_chunks, n_dim) numpy array giving the addresses of the desired chunks, along each dimension

        Returns:
                chunks: A list of np.ndarray objects representing the desired chunks
        
        Remarks:
                Chunks that don't exist in the database are returned as None
        """
        # logging.info(f"Reading {addresses.shape[0]} chunks starting at {addresses[0]}")
        n_bytes_read = 0

        if len(addresses) == 0:  # writing a scalar
                key = subspace.pack(key_prefix)
                data = tr[key].value
                decoded = blosc.unpack_array(data)
                n_bytes_read += len(key) + len(data)
                return [decoded]

        chunks: List[np.ndarray] = []
        for address in addresses:
                key = subspace.pack(key_prefix + tuple(int(x) for x in address))
                data = tr[key].value
                if data is None:
                        chunks.append(None)
                else:
                        decoded = blosc.unpack_array(data)
                        n_bytes_read += len(key) + len(data)
                        chunks.append(decoded)
        # logging.info(f"Read {addresses.shape[0]} chunks starting at {addresses[0]}, total of {n_bytes_read:,} bytes")
        return chunks


# Note: no @fdb.transactional decorator since this uses multiple transanctions inside the function
def read_chunks_multibatch(db: fdb.impl.Database, subspace: fdb.directory_impl.DirectorySubspace, key_prefix: Tuple[Any, ...], addresses: np.ndarray) -> List[np.ndarray]:
        chunks = []
        n_total = len(addresses)
        n = min(200, n_total)
        # Read the first 100 chunks and measure the time it takes
        time_at_start = time.time()
        try:
                chunks += read_chunks(db, subspace, key_prefix, addresses[:200])
                ix = n
                time_per_chunk = (time.time() - time_at_start) / n
                # Aim for 2s batches
                n = max(2, min(int(1 / time_per_chunk), n_total))
        except fdb.impl.FDBError as e:
                if e.code not in (1004, 1007, 1031, 2101):  # Too many bytes or too long time
                        raise e
                ix = 0
                time_per_chunk = (time.time() - time_at_start) / n
                # Aim for 0.2s batches
                n = max(1, min(int(0.2 / time_per_chunk), n_total))
        # print(f"Reading {key_prefix[-1]} at {time_per_chunk * 1000:.2} ms/chunk")
        while ix < n_total:
                try:
                        chunks += read_chunks(db, subspace, key_prefix, addresses[ix: ix + n])
                except fdb.impl.FDBError as e:
                        if e.code in (1004, 1007, 1031, 2101) and n > 1:  # Too many bytes or too long time, so try again with less
                                n = max(1, n // 2)
                                continue
                        else:
                                raise e
                ix += n
        return chunks

Functions

def read_chunks(tr: fdb.impl.Transaction, subspace: fdb.directory_impl.DirectorySubspace, key_prefix: Tuple[Any], addresses: numpy.ndarray) ‑> List[numpy.ndarray]

Read a list of chunks from the database, using a transaction

Args

tr
Transaction object
subspace
The fdb DirectorySubspace under which the chunks are stored
key_prefix
The tuple to use as prefix when storing the chunks
addresses
An (n_chunks, n_dim) numpy array giving the addresses of the desired chunks, along each dimension

Returns

chunks
A list of np.ndarray objects representing the desired chunks

Remarks

Chunks that don't exist in the database are returned as None

Expand source code
@fdb.transactional
def read_chunks(tr: fdb.impl.Transaction, subspace: fdb.directory_impl.DirectorySubspace, key_prefix: Tuple[Any], addresses: np.ndarray) -> List[np.ndarray]:
        """
        Read a list of chunks from the database, using a transaction

        Args:
                tr: Transaction object
                subspace: The fdb DirectorySubspace under which the chunks are stored
                key_prefix: The tuple to use as prefix when storing the chunks
                addresses: An (n_chunks, n_dim) numpy array giving the addresses of the desired chunks, along each dimension

        Returns:
                chunks: A list of np.ndarray objects representing the desired chunks
        
        Remarks:
                Chunks that don't exist in the database are returned as None
        """
        # logging.info(f"Reading {addresses.shape[0]} chunks starting at {addresses[0]}")
        n_bytes_read = 0

        if len(addresses) == 0:  # writing a scalar
                key = subspace.pack(key_prefix)
                data = tr[key].value
                decoded = blosc.unpack_array(data)
                n_bytes_read += len(key) + len(data)
                return [decoded]

        chunks: List[np.ndarray] = []
        for address in addresses:
                key = subspace.pack(key_prefix + tuple(int(x) for x in address))
                data = tr[key].value
                if data is None:
                        chunks.append(None)
                else:
                        decoded = blosc.unpack_array(data)
                        n_bytes_read += len(key) + len(data)
                        chunks.append(decoded)
        # logging.info(f"Read {addresses.shape[0]} chunks starting at {addresses[0]}, total of {n_bytes_read:,} bytes")
        return chunks
def read_chunks_multibatch(db: fdb.impl.Database, subspace: fdb.directory_impl.DirectorySubspace, key_prefix: Tuple[Any, ...], addresses: numpy.ndarray) ‑> List[numpy.ndarray]
Expand source code
def read_chunks_multibatch(db: fdb.impl.Database, subspace: fdb.directory_impl.DirectorySubspace, key_prefix: Tuple[Any, ...], addresses: np.ndarray) -> List[np.ndarray]:
        chunks = []
        n_total = len(addresses)
        n = min(200, n_total)
        # Read the first 100 chunks and measure the time it takes
        time_at_start = time.time()
        try:
                chunks += read_chunks(db, subspace, key_prefix, addresses[:200])
                ix = n
                time_per_chunk = (time.time() - time_at_start) / n
                # Aim for 2s batches
                n = max(2, min(int(1 / time_per_chunk), n_total))
        except fdb.impl.FDBError as e:
                if e.code not in (1004, 1007, 1031, 2101):  # Too many bytes or too long time
                        raise e
                ix = 0
                time_per_chunk = (time.time() - time_at_start) / n
                # Aim for 0.2s batches
                n = max(1, min(int(0.2 / time_per_chunk), n_total))
        # print(f"Reading {key_prefix[-1]} at {time_per_chunk * 1000:.2} ms/chunk")
        while ix < n_total:
                try:
                        chunks += read_chunks(db, subspace, key_prefix, addresses[ix: ix + n])
                except fdb.impl.FDBError as e:
                        if e.code in (1004, 1007, 1031, 2101) and n > 1:  # Too many bytes or too long time, so try again with less
                                n = max(1, n // 2)
                                continue
                        else:
                                raise e
                ix += n
        return chunks
def write_chunks(tr: fdb.impl.Transaction, subspace: fdb.directory_impl.DirectorySubspace, key_prefix: Tuple[Any], addresses: numpy.ndarray, chunks: List[Union[numpy.ndarray, numpy.ma.core.MaskedArray]]) ‑> int

Write a list of chunks to the database, optionally using mask to write only partially

Args

tr
Transaction object
subspace
The fdb DirectorySubspace under which the chunks are stored
key_prefix
The tuple to use as prefix when storing the chunks
addresses
An (n_chunks, n_dim) numpy array giving the addresses of the desired chunks, along each dimension
chunks
List of chunks, each of which can optionally be a numpy masked array

Returns

The number of bytes written

Remarks

Chunks can be given as numpy masked arrays, and masked values will be filled by the corresponding values from the current chunk at the same address (which must exist). This can be used to selectively update only parts of chunks, e.g. when updating part of a tensor or appending values that are nonaligned with chunk edges.

Expand source code
@fdb.transactional
def write_chunks(tr: fdb.impl.Transaction, subspace: fdb.directory_impl.DirectorySubspace, key_prefix: Tuple[Any], addresses: np.ndarray, chunks: List[Union[np.ndarray, MaskedArray]]) -> int:
        """
        Write a list of chunks to the database, optionally using mask to write only partially

        Args:
                tr: Transaction object
                subspace: The fdb DirectorySubspace under which the chunks are stored
                key_prefix: The tuple to use as prefix when storing the chunks
                addresses: An (n_chunks, n_dim) numpy array giving the addresses of the desired chunks, along each dimension
                chunks: List of chunks, each of which can optionally be a numpy masked array

        Returns:
                The number of bytes written
        
        Remarks:
                Chunks can be given as numpy masked arrays, and masked values will be filled by the corresponding values from 
                the current chunk at the same address (which must exist). This can be used to selectively update
                only parts of chunks, e.g. when updating part of a tensor or appending values that are nonaligned with chunk edges.
        """
        # logging.info(f"Writing {addresses.shape[0]} chunks starting at {addresses[0]}")
        n_bytes_written = 0
        if len(addresses) == 0:  # writing a scalar
                key = subspace.pack(key_prefix)
                encoded = blosc.pack_array(chunks[0])
                n_bytes_written += len(key) + len(encoded)
                tr[key] = encoded
                return n_bytes_written

        for address, chunk in zip(addresses, chunks):
                key = subspace.pack(key_prefix + tuple(int(x) for x in address))
                if isinstance(chunk, np.ma.MaskedArray):
                        mask = np.ma.getmask(chunk)
                        if np.any(mask):
                                prev_value = read_chunks(tr, subspace, key_prefix, address[None, :])[0]
                                if prev_value is not None:
                                        chunk[mask] = prev_value[mask]
                        chunk = chunk.data
                # chunk is now an ndarray (not masked)
                encoded = blosc.pack_array(chunk)
                n_bytes_written += len(key) + len(encoded)
                tr[key] = encoded
        # logging.info(f"Wrote {addresses.shape[0]} chunks starting at {addresses[0]}, total of {n_bytes_written:,} bytes")
        return n_bytes_written