import h5py
from typing import *
import logging
import numpy as np
import loompy
from .utils import get_loom_spec_version
[docs]class LoomValidator:
[docs] def __init__(self, version: str = None) -> None:
"""
Args:
version: The Loom file format version to validate against ("3.0.0", "2.0.1", "old"), or None to infer from file
Remarks:
"old" version will accept files that lack the "row_graphs" and "col_graphs" groups
"""
self.version = version #: Version of the spec to validate against
self.errors: List[str] = [] #: Errors found during validation
self.warnings: List[str] = [] #: Warnings triggered during validation
self.summary: List[str] = [] #: Summary of the file structure
def _check(self, condition: bool, message: str) -> bool:
if not condition:
self.errors.append(message)
return condition
def _warn(self, condition: bool, message: str) -> bool:
if not condition:
self.warnings.append(message)
return condition
[docs] def validate(self, path: str, strictness: str = "speconly") -> bool:
"""
Validate a file for conformance to the Loom specification
Args:
path: Full path to the file to be validated
strictness: "speconly" or "conventions"
Remarks:
In "speconly" mode, conformance is assessed relative to the file format specification
at http://linnarssonlab.org/loompy/format/. In "conventions" mode, conformance is additionally
assessed relative to attribute name and data type conventions given at http://linnarssonlab.org/loompy/conventions/.
"""
valid1 = True
with h5py.File(path, mode="r") as f:
if self.version == None:
self.version = get_loom_spec_version(f)
valid1 = self.validate_spec(f)
if not valid1:
self.errors.append("For help, see http://linnarssonlab.org/loompy/format/")
valid2 = True
if strictness == "conventions":
with loompy.connect(path, mode="r") as ds:
valid2 = self.validate_conventions(ds)
if not valid2:
self.errors.append("For help, see http://linnarssonlab.org/loompy/conventions/")
return valid1 and valid2
[docs] def validate_conventions(self, ds: loompy.LoomConnection) -> bool:
"""
Validate the LoomConnection object against the attribute name/dtype conventions.
Args:
ds: LoomConnection object
Returns:
True if the file conforms to the conventions, else False
Remarks:
Upon return, the instance attributes 'self.errors' and 'self.warnings' contain
lists of errors and warnings.
"""
(n_genes, n_cells) = ds.shape
self._warn("Description" in ds.attrs, "Optional global attribute 'Description' is missing")
self._warn("Journal" in ds.attrs, "Optional global attribute 'Journal' is missing")
self._warn("Authors" in ds.attrs, "Optional global attribute 'Authors' is missing")
self._warn("Title" in ds.attrs, "Optional global attribute 'Title' is missing")
self._warn("Year" in ds.attrs, "Optional global attribute 'Year' is missing")
self._warn("CreationDate" in ds.attrs, "Optional global attribute 'CreationDate' is missing")
if self._check("ClusterID" in ds.ca, "Column attribute 'ClusterID' is missing"):
self._check(np.issubdtype(ds.ca.ClusterID.dtype, np.int_), "Column attribute 'ClusterID' must be integer dtype")
self._check(len(np.unique(ds.ca.ClusterID)) == np.max(ds.ca.ClusterID) and np.min(ds.ca.ClusterID) == 0, "Column attribute 'ClusterID' must be integers 0, 1, 2, ... with no missing values")
self._check(ds.ca.ClusterID.shape == (n_cells,), f"Column attribute 'ClusterID' must be 1-dimensional array of {n_cells} elements")
if "ClusterName" in ds.ca:
self._check(ds.ca.ClusterName.dtype == object and np.issubdtype(ds.ca.ClusterName[0].dtype, np.str_), "Column attribute 'ClusterName' must be an array of strings")
self._check(ds.ca.ClusterName.shape == (n_cells,), f"Column attribute 'ClusterName' must be 1-dimensional array of {n_cells} elements")
one_to_one = True
for cid in np.unique(ds.ca.ClusterID):
if len(np.unique(ds.ca.ClusterName[ds.ca.ClusterID == cid])) != 1:
one_to_one = False
break
for cn in np.unique(ds.ca.ClusterName):
if len(np.unique(ds.ca.ClusterID[ds.ca.ClusterName == cn])) != 1:
one_to_one = False
break
if not one_to_one:
self._check(False, "ClusterName must correspond 1:1 with ClusterID")
else:
self.warnings.append("Optional column attribute 'ClusterName' is missing")
if self._check("CellID" in ds.ca, "Column attribute 'CellID' is missing"):
self._check(ds.ca.CellID.dtype == object and np.issubdtype(ds.ca.CellID[0].dtype, np.str_), f"Column attribute 'CellID' must be an array of strings, not '{ds.ca.CellID[0].dtype}'")
self._check(ds.ca.CellID.shape == (n_cells,), f"Column attribute 'CellID' must be 1-dimensional array of {n_cells} elements")
self._check(len(np.unique(ds.ca.CellID)) == n_cells, "Column attribute 'CellID' cannot contain duplicate values")
if "Valid" in ds.ca:
self._check(np.issubdtype(ds.ca.Valid.dtype, np.int_), f"Column attribute 'Valid' must be integer dtype, not '{ds.ca.Valid.dtype}'")
valids = np.unique(ds.ca.Valid)
self._check(np.all(np.isin(ds.ca.Valid, [0, 1])), "Column attribute 'Valid' must be integers 0 or 1 only")
self._check(ds.ca.Valid.shape == (n_cells,), f"Column attribute 'Valid' must be 1-dimensional array of {n_cells} elements")
else:
self.warnings.append("Optional column attribute 'Valid' is missing")
if "Outliers" in ds.ca:
self._check(np.issubdtype(ds.ca.Outliers.dtype, np.int_), f"Column attribute 'Outliers' must be integer dtype, not '{ds.ca.Outliers.dtype}'")
self._check(np.all(np.isin(ds.ca.Outliers, [0, 1])), "Column attribute 'Outliers' must be integers 0 or 1 only")
self._check(ds.ca.Outliers.shape == (n_cells,), f"Column attribute 'Outliers' must be 1-dimensional array of {n_cells} elements")
else:
self.warnings.append("Optional column attribute 'Outliers' is missing")
if self._check("Accession" in ds.ra, "Row attribute 'Accession' is missing"):
self._check(ds.ra.Accession.dtype == object and np.issubdtype(ds.ra.Accession[0].dtype, np.str_), f"Row attribute 'Accession' must be an array of strings, not '{ds.ra.Accession[0].dtype}'")
self._check(ds.ra.Accession.shape == (n_genes,), f"Row attribute 'Accession' must be 1-dimensional array of {n_genes} elements")
self._check(len(np.unique(ds.ra.Accession)) == n_genes, "Row attribute 'Accession' cannot contain duplicate values")
if self._check("Gene" in ds.ra, "Row attribute 'Gene' is missing"):
self._check(ds.ra.Gene.dtype == object and np.issubdtype(ds.ra.Gene[0].dtype, np.str_), f"Row attribute 'Gene' must be an array of strings, not '{ds.ra.Gene[0].dtype}'")
self._check(ds.ra.Gene.shape == (n_genes,), f"Row attribute 'Gene' must be 1-dimensional array of {n_genes} elements")
if "Valid" in ds.ra:
self._check(np.issubdtype(ds.ra.Valid.dtype, np.int_), f"Row attribute 'Valid' must be integer dtype, not '{ds.ra.Valid.dtype}'")
valids = np.unique(ds.ra.Valid)
self._check(np.all(np.isin(ds.ra.Valid, [0, 1])), "Row attribute 'Valid' must be integers 0 or 1 only")
self._check(ds.ra.Valid.shape == (n_cells,), f"Row attribute 'Valid' must be 1-dimensional array of {n_cells} elements")
else:
self.warnings.append("Optional row attribute 'Valid' is missing")
if "Selected" in ds.ra:
self._check(np.issubdtype(ds.ra.Selected.dtype, np.int_), f"Row attribute 'Selected' must be integer dtype, not '{ds.ra.Selected.dtype}'")
valids = np.unique(ds.ra.Selected)
self._check(np.all(np.isin(ds.ra.Selected, [0, 1])), "Row attribute 'Selected' must be integers 0 or 1 only")
self._check(ds.ra.Selected.shape == (n_cells,), f"Row attribute 'Selected' must be 1-dimensional array of {n_cells} elements")
else:
self.warnings.append("Optional row attribute 'Selected' is missing")
return len(self.errors) == 0
[docs] def validate_spec(self, file: h5py.File) -> bool:
"""
Validate the LoomConnection object against the format specification.
Args:
file: h5py File object
Returns:
True if the file conforms to the specs, else False
Remarks:
Upon return, the instance attributes 'self.errors' and 'self.warnings' contain
lists of errors and warnings, and the 'self.summary' attribute contains a summary
of the file contents.
"""
matrix_types = ["float16", "float32", "float64", "int8", "int16", "int32", "int64", "uint8", "uint16", "uint32", "uint64"]
vertex_types = ["int8", "int16", "int32", "int64", "uint8", "uint16", "uint32", "uint64"]
weight_types = ["float16", "float32", "float64"]
def delay_print(text: str) -> None:
self.summary.append(text)
def dt(t: str) -> str:
if str(t).startswith("|S"):
return f"string"
return str(t)
width_ra = 0
width_ca = 0
width_globals = 0
if self._check("row_attrs" in file, "'row_attrs' group is missing"):
width_ra = max([len(x) for x in (file["row_attrs"].keys())], default=0)
if self._check("col_attrs" in file, "'col_attrs' group is missing"):
width_ca = max([len(x) for x in (file["col_attrs"].keys())], default=0)
if self.version == "3.0.0":
if self._check("attrs" in file, "Global attributes missing"):
width_globals = max([len(x) for x in (file["attrs"].keys())], default=0)
elif len(file.attrs) > 0:
width_globals = max([len(x) for x in file.attrs.keys()])
width_layers = 0
if "layers" in file and len(file["layers"]) > 0:
width_layers = max([len(x) for x in file["layers"].keys()])
width_layers = max(width_layers, len("Main matrix"))
width = max(width_ca, width_ra, width_globals)
delay_print("Global attributes:")
if self.version == "3.0.0":
self._check("attrs" in file, "Global attributes missing")
for attr in file["attrs"]:
if type(attr) is np.ndarray:
delay_print(f"{attr: >{width}} {attr.dtype} {attr.shape}")
else:
delay_print(f"{attr: >{width}} {type(attr).__name__} (scalar)")
else:
for key, value in file.attrs.items():
if type(value) is str:
self.warnings.append(f"Global attribute '{key}' has dtype string, which will be deprecated in future Loom versions")
delay_print(f"{key: >{width}} string")
elif type(value) is bytes:
self.warnings.append(f"Global attribute '{key}' has dtype bytes, which will be deprecated in future Loom versions")
delay_print(f"{key: >{width}} bytes")
else:
delay_print(f"{key: >{width}} {dt(file.attrs[key].dtype)}")
if self._check("matrix" in file, "Main matrix missing"):
self._check(file["matrix"].dtype in matrix_types, f"Main matrix dtype={file['matrix'].dtype} is not allowed")
shape = file["matrix"].shape
delay_print(f"Layers shape={shape}:")
delay_print(f"{'Main matrix': >{width}} {file['matrix'].dtype}")
if "layers" in file:
for layer in file["layers"]:
self._check(file["layers"][layer].shape == shape, f"Layer '{layer}' shape {file['layers'][layer].shape} does not match main matrix shape {shape}")
self._check(file["layers"][layer].dtype in matrix_types, f"Layer '{layer}' dtype={file['layers'][layer].dtype} is not allowed")
delay_print(f"{layer: >{width}} {file['layers'][layer].dtype}")
if self.version == "3.0.0":
expected_dtype = np.object_
else:
expected_dtype = np.string_
delay_print("Row attributes:")
if self._check("row_attrs" in file, "'row_attrs' group is missing"):
for ra in file["row_attrs"]:
self._check(file["row_attrs"][ra].shape[0] == shape[0], f"Row attribute '{ra}' shape {file['row_attrs'][ra].shape[0]} first dimension does not match row dimension {shape}")
self._check(file["row_attrs"][ra].dtype in matrix_types or np.issubdtype(file['row_attrs'][ra].dtype, expected_dtype), f"Row attribute '{ra}' dtype {file['row_attrs'][ra].dtype} is not allowed")
ra_shape = file['row_attrs'][ra].shape
delay_print(f"{ra: >{width}} {dt(file['row_attrs'][ra].dtype)} {ra_shape if len(ra_shape) > 1 else ''}")
if len(file["row_attrs"]) == 0:
delay_print(" (none)")
delay_print("Column attributes:")
if self._check("col_attrs" in file, "'col_attrs' group is missing"):
for ca in file["col_attrs"]:
self._check(file["col_attrs"][ca].shape[0] == shape[1], f"Column attribute '{ca}' shape {file['col_attrs'][ca].shape[0]} first dimension does not match column dimension {shape}")
self._check(file["col_attrs"][ca].dtype in matrix_types or np.issubdtype(file["col_attrs"][ca].dtype, expected_dtype), f"Column attribute '{ca}' dtype {file['col_attrs'][ca].dtype} is not allowed")
ca_shape = file['col_attrs'][ca].shape
delay_print(f"{ca: >{width}} {dt(file['col_attrs'][ca].dtype)} {ca_shape if len(ca_shape) > 1 else ''}")
if len(file["col_attrs"]) == 0:
delay_print(" (none)")
delay_print("Row graphs:")
if "row_graphs" in file:
if self.version == "2.0.1" or self.version == "3.0.0":
self._check("row_graphs" in file, "'row_graphs' group is missing (try spec_version='old')")
for g in file["row_graphs"]:
self._check("a" in file["row_graphs"][g], f"Row graph '{g}' is missing vector 'a', denoting start vertices")
self._check(file["row_graphs"][g]['a'].dtype in vertex_types, f"/row_graphs/{g}/a.dtype {file['row_graphs'][g]['a'].dtype} must be integer")
self._check("b" in file["row_graphs"][g], f"Row graph '{g}' is missing vector 'b', denoting end vertices")
self._check(file["row_graphs"][g]['b'].dtype in vertex_types, f"/row_graphs/{g}/b.dtype {file['row_graphs'][g]['b'].dtype} must be integer")
self._check("w" in file["row_graphs"][g], f"Row graph '{g}' is missing vector 'w', denoting vertex weights")
self._check(file["row_graphs"][g]['w'].dtype in weight_types, f"/row_graphs/{g}/w.dtype {file['row_graphs'][g]['w'].dtype} must be float")
self._check(file['row_graphs'][g]['a'].shape[0] == file['row_graphs'][g]['b'].shape[0] and file['row_graphs'][g]['a'].shape[0] == file['row_graphs'][g]['w'].shape[0], f"Row graph '{g}' sparse vectors a, b and w must have equal length")
delay_print(f" '{g}' with {file['row_graphs'][g]['a'].shape[0]} edges")
if len(file["row_graphs"]) == 0:
delay_print(" (none)")
delay_print("Column graphs:")
if "col_graphs" in file:
if self.version == "2.0.1" or self.version == "3.0.0":
self._check("col_graphs" in file, "'col_graphs' group is missing (try spec_version='old')")
for g in file["col_graphs"]:
self._check("a" in file["col_graphs"][g], f"Column graph '{g}' is missing vector 'a', denoting start vertices")
self._check(file["col_graphs"][g]['a'].dtype in vertex_types, f"/col_graphs/{g}/a.dtype {file['col_graphs'][g]['a'].dtype} must be integer")
self._check("b" in file["col_graphs"][g], f"Column graph '{g}' is missing vector 'b', denoting end vertices")
self._check(file["col_graphs"][g]['b'].dtype in vertex_types, f"/col_graphs/{g}/b.dtype {file['col_graphs'][g]['b'].dtype} must be integer")
self._check("w" in file["col_graphs"][g], f"Column graph '{g}' is missing vector 'w', denoting vertex weights")
self._check(file["col_graphs"][g]['w'].dtype in weight_types, f"/col_graphs/{g}/w.dtype {file['col_graphs'][g]['w'].dtype} must be float")
self._check(file['col_graphs'][g]['a'].shape[0] == file['col_graphs'][g]['b'].shape[0] and file['col_graphs'][g]['a'].shape[0] == file['col_graphs'][g]['w'].shape[0], f"Column graph '{g}' sparse vectors a, b and w must have equal length")
delay_print(f" '{g}' with {file['col_graphs'][g]['a'].shape[0]} edges")
if len(file["col_graphs"]) == 0:
delay_print(" (none)")
return len(self.errors) == 0