Source code for loompy.layer_manager

from typing import *
import numpy as np
import loompy
import logging
import scipy.sparse as sparse


[docs]class LayerManager: """ Manage a set of layers with a backing HDF5 file store """
[docs] def __init__(self, ds: Any) -> None: # Note: can't give type for ds because it will be circular and mypy doesn't support it """ Create a LayerManager object. """ setattr(self, "!ds", ds) storage: Dict[str, np.ndarray] = {} setattr(self, "!storage", storage) if ds is not None: if "matrix" in ds._file: self.__dict__["storage"][""] = None if "layers" in ds._file: for key in self.ds._file["layers"].keys(): self.__dict__["storage"][key] = None elif ds.mode == "r+": ds._file.create_group('/layers')
[docs] def last_modified(self, name: str = None) -> str: """ Return a compact ISO8601 timestamp (UTC timezone) indicating when the layer was last modified Note: if name is None, the modification time of the most recently modified layer is returned """ if name is not None: return self[name].last_modified() ts = "" for name in self.keys(): if ts is None: ts = self[name].last_modified() else: if self[name].last_modified() > ts: ts = self[name].last_modified() return ts
[docs] def keys(self) -> List[str]: return list(self.__dict__["storage"].keys())
[docs] def items(self) -> Iterable[Tuple[str, np.ndarray]]: for key in self.keys(): yield (key, self[key])
def __len__(self) -> int: return len(self.keys()) def __contains__(self, name: str) -> bool: return name in self.keys() def __iter__(self) -> Iterator[str]: for key in self.keys(): yield key def __getitem__(self, thing: Any) -> np.ndarray: """ Access a layer by name, or slice through all the layers Args: thing: if string, return the specified layer ("" is the default layer) if slice 2-tuple, return a new LayerManager with all layers sliced """ if type(thing) is str: return self.__getattr__(thing) else: # Assume some kind of slice lm = LayerManager(None) for key, layer in self.items(): lm[key] = loompy.MemoryLoomLayer(key, layer[thing]) return lm def __getattr__(self, name: str) -> np.ndarray: try: vals = self.__dict__["storage"][name] if vals is None: # Read values from the HDF5 file return loompy.LoomLayer(name, self.ds) return vals except KeyError: raise AttributeError(f"'{type(self)}' object has no attribute '{name}'") def __setitem__(self, name: str, val: np.ndarray) -> None: return self.__setattr__(name, val) def __setattr__(self, name: str, val: np.ndarray) -> None: if name.startswith("!"): super(LayerManager, self).__setattr__(name[1:], val) elif "/" in name: raise KeyError("Layer name cannot contain slash (/)") else: if self.ds is not None: if type(val) is str: # val specifies the dtype of an empty layer matrix: np.ndarray = None dtype = val shape = self.ds.shape elif sparse.issparse(val): # val is a sparse matrix matrix = None dtype = val.dtype shape = val.shape else: # val is a matrix that will be the layer matrix = val dtype = matrix.dtype shape = matrix.shape if not np.isfinite(matrix).all(): raise ValueError("INF and NaN not allowed in loom matrix") if name != "" and shape != self.ds.shape: raise ValueError(f"All layers must have same shape {self.ds.shape}") if self.ds._file.mode != "r+": raise IOError("Cannot save layers when connected in read-only mode") if not (np.issubdtype(dtype, np.integer) or np.issubdtype(dtype, np.floating)): raise ValueError("Matrix elements must be integer or float") if not self.ds._file.__contains__("/layers"): self.ds._file.create_group("/layers") # make sure chunk size is not bigger than actual matrix size chunks = (min(64, shape[0]), min(64, shape[1])) path = "/layers/" + name if name == "": path = "/matrix" if self.ds._file.__contains__(path): del self.ds._file[path] # Save the matrix self.ds._file.create_dataset( path, data=matrix, dtype=dtype, shape=shape, maxshape=(shape[0], None), chunks=chunks, fletcher32=False, compression="gzip", shuffle=False, compression_opts=2 ) if name == "": self.ds.shape = shape self.__dict__["storage"][name] = None # Fill the matrix with sparse data if sparse.issparse(val): m = val.tocsc() window = 6400 ix = 0 while ix < val.shape[1]: window = min(window, m.shape[1] - ix) if window == 0: break self.ds._file[path][:, ix:ix + window] = m[:, ix: ix + window].toarray() ix += window self.ds._file.flush() else: self.__dict__["storage"][name] = val def __delitem__(self, name: str) -> None: return self.__delattr__(name) def __delattr__(self, name: str) -> None: if self.ds is not None: if name == "": raise ValueError("Cannot delete default layer") else: path = "/layers/" + name if self.ds._file.__contains__(path): del self.ds._file[path] self.ds._file.flush() else: if name in self.__dict__["storage"]: del self.__dict__["storage"][name] def _permute(self, ordering: np.ndarray, *, axis: int) -> None: for key in self.keys(): self[key]._permute(ordering, axis=axis)