Source code for loompy.loom_layer

import numpy as np
from typing import *
import scipy
from loompy import timestamp


[docs]class MemoryLoomLayer(): """ A layer residing in memory (without a corresponding layer on disk), typically as part of a :class:`loompy.LoomView`. MemoryLoomLayer supports a subset of the operations suported for regular layers. """
[docs] def __init__(self, name: str, matrix: np.ndarray) -> None: self.name = name #: Name of the layer self.shape = matrix.shape #: Shape of the layer self.values = matrix
def __getitem__(self, slice: Tuple[Union[int, slice], Union[int, slice]]) -> np.ndarray: return self.values[slice] def __setitem__(self, slice: Tuple[Union[int, slice], Union[int, slice]], data: np.ndarray) -> None: self.values[slice] = data
[docs] def sparse(self, rows: np.ndarray, cols: np.ndarray) -> scipy.sparse.coo_matrix: """ Return the layer as :class:`scipy.sparse.coo_matrix` """ return scipy.sparse.coo_matrix(self.values[rows, :][:, cols])
[docs] def permute(self, ordering: np.ndarray, *, axis: int) -> None: """ Permute the layer along an axis Args: axis: The axis to permute (0, permute the rows; 1, permute the columns) ordering: The permutation vector """ if axis == 0: self.values = self.values[ordering, :] elif axis == 1: self.values = self.values[:, ordering] else: raise ValueError("axis must be 0 or 1")
[docs]class LoomLayer(): """ Represents a layer (matrix) of values in the loom file, which can be accessed by slicing. """ def __init__(self, name: str, ds: Any) -> None: self.ds = ds #: The :class:`.LoomConnection` object this layer belongs to self.name = name #: Name of the layer (str) self.shape = ds.shape #: Shape of the layer, tuple of (n_rows, n_cols) self.dtype = "" #: Datatype of the layer (str) if name == "": self.dtype = self.ds._file["/matrix"].dtype else: self.dtype = self.ds._file["/layers/" + self.name].dtype
[docs] def last_modified(self) -> str: """ Return a compact ISO8601 timestamp (UTC timezone) indicating when the file was last modified Note: if the layer does not contain a timestamp, and the mode is 'r+', a new timestamp will be set and returned. Otherwise, the current time in UTC will be returned. """ if self.name == "": if "last_modified" in self.ds._file["/matrix"].attrs: return self.ds._file["/matrix"].attrs["last_modified"] elif self.ds._file.mode == 'r+': self.ds._file["/matrix"].attrs["last_modified"] = timestamp() self.ds._file.flush() return self.ds._file["/matrix"].attrs["last_modified"] if self.name != "": if "last_modified" in self.ds._file["/layers/" + self.name].attrs: return self.ds._file["/layers/" + self.name].attrs["last_modified"] elif self.ds._file.mode == 'r+': self.ds._file["/layers/" + self.name].attrs["last_modified"] = timestamp() self.ds._file.flush() return self.ds._file["/layers/" + self.name].attrs["last_modified"] return timestamp()
def __getitem__(self, slice: Tuple[Union[int, slice], Union[int, slice]]) -> np.ndarray: if self.name == "": return self.ds._file['/matrix'].__getitem__(slice) return self.ds._file['/layers/' + self.name].__getitem__(slice) def __setitem__(self, slice: Tuple[Union[int, slice], Union[int, slice]], data: np.ndarray) -> None: if self.name == "": self.ds._file['/matrix'][slice] = data self.ds._file["/matrix"].attrs["last_modified"] = timestamp() self.ds._file.attrs["last_modified"] = timestamp() self.ds._file.flush() else: self.ds._file['/layers/' + self.name][slice] = data self.ds._file["/layers/" + self.name].attrs["last_modified"] = timestamp() self.ds._file.attrs["last_modified"] = timestamp() self.ds._file.flush()
[docs] def sparse(self, rows: np.ndarray = None, cols: np.ndarray = None) -> scipy.sparse.coo_matrix: if rows is not None: if np.issubdtype(rows.dtype, np.bool_): rows = np.where(rows)[0] if cols is not None: if np.issubdtype(cols.dtype, np.bool_): cols = np.where(cols)[0] n_genes = self.ds.shape[0] if rows is None else rows.shape[0] n_cells = self.ds.shape[1] if cols is None else cols.shape[0] data: List[np.ndarray] = [] row: List[np.ndarray] = [] col: List[np.ndarray] = [] i = 0 for (ix, selection, view) in self.ds.scan(items=cols, axis=1, layers=[self.name], what=["layers"]): if rows is not None: vals = view.layers[self.name][rows, :] else: vals = view.layers[self.name][:, :] nonzeros = np.where(vals != 0) data.append(vals[nonzeros]) row.append(nonzeros[0]) col.append(nonzeros[1] + i) i += selection.shape[0] return scipy.sparse.coo_matrix((np.concatenate(data), (np.concatenate(row), np.concatenate(col))), shape=(n_genes, n_cells))
def _resize(self, size: Tuple[int, int], axis: int = None) -> None: """Resize the dataset, or the specified axis. The dataset must be stored in chunked format; it can be resized up to the "maximum shape" (keyword maxshape) specified at creation time. The rank of the dataset cannot be changed. "Size" should be a shape tuple, or if an axis is specified, an integer. BEWARE: This functions differently than the NumPy resize() method! The data is not "reshuffled" to fit in the new shape; each axis is grown or shrunk independently. The coordinates of existing data are fixed. """ if self.name == "": self.ds._file['/matrix'].resize(size, axis) else: self.ds._file['/layers/' + self.name].resize(size, axis)
[docs] def map(self, f_list: List[Callable[[np.ndarray], int]], axis: int = 0, chunksize: int = 1000, selection: np.ndarray = None) -> List[np.ndarray]: """ Apply a function along an axis without loading the entire dataset in memory. Args: f_list (list of func): Function(s) that takes a numpy ndarray as argument axis (int): Axis along which to apply the function (0 = rows, 1 = columns) chunksize (int): Number of rows (columns) to load per chunk selection (array of bool): Columns (rows) to include Returns: numpy.ndarray result of function application If you supply a list of functions, the result will be a list of numpy arrays. This is more efficient than repeatedly calling map() one function at a time. """ if hasattr(f_list, '__call__'): raise ValueError("f_list must be a list of functions, not a function itself") result = [] if axis == 0: rows_per_chunk = chunksize for i in range(len(f_list)): result.append(np.zeros(self.shape[0])) ix = 0 while ix < self.shape[0]: rows_per_chunk = min(self.shape[0] - ix, rows_per_chunk) if selection is not None: chunk = self[ix:ix + rows_per_chunk, :][:, selection] else: chunk = self[ix:ix + rows_per_chunk, :] for i in range(len(f_list)): result[i][ix:ix + rows_per_chunk] = np.apply_along_axis(f_list[i], 1, chunk) ix = ix + rows_per_chunk elif axis == 1: cols_per_chunk = chunksize for i in range(len(f_list)): result.append(np.zeros(self.shape[1])) ix = 0 while ix < self.shape[1]: cols_per_chunk = min(self.shape[1] - ix, cols_per_chunk) if selection is not None: chunk = self[:, ix:ix + cols_per_chunk][selection, :] else: chunk = self[:, ix:ix + cols_per_chunk] for i in range(len(f_list)): result[i][ix:ix + cols_per_chunk] = np.apply_along_axis(f_list[i], 0, chunk) ix = ix + cols_per_chunk return result
def _permute(self, ordering: np.ndarray, *, axis: int) -> None: if self.name == "": obj = self.ds._file['/matrix'] else: obj = self.ds._file['/layers/' + self.name] if axis == 0: chunksize = 5000 start = 0 while start < self.shape[1]: submatrix = obj[:, start:start + chunksize] obj[:, start:start + chunksize] = submatrix[ordering, :] start = start + chunksize elif axis == 1: chunksize = 100000000 // self.shape[1] start = 0 while start < self.shape[0]: submatrix = obj[start:start + chunksize, :] obj[start:start + chunksize, :] = submatrix[:, ordering] start = start + chunksize else: raise ValueError("axis must be 0 or 1")