Source code for pysmFISH.dots_calling

from sympy import Point, Line, Segment
from skimage import feature, measure
import numpy as np
from scipy import ndimage as nd

[docs]def thr_calculator(filtered_img,min_distance,stringency): """ Function used to calculate the threshold to use for the dots counting in a 2D image. Parameters: ----------- filtered_img: np.array float64 preprocessed image used to count the dots. min_distance: int minimum distance that two maxima need to have in order to be defined as separete peaks. stringency: int integer used to select the stringency of the generated threshold. By adding stringency to the thr_idx we can select a Thr with higher value from the thr_array. Returns: ----------- counting_dict : dict dictionary containing all the counting infos: selected_thr: float64 Thr used for counting after application of the stringency. calculated_thr: float64 Calculated Thr selected_peaks: int64 2D coords of the peaks defined using the selected_thr. thr_array: float64 Thr array of 100 points distributed between (Img.min(),Img.max()). peaks_coords: float64 list of all the 3D coords calculated using the Thr array. total_peaks: list of int List of the peaks counts. thr_idx: int64 index of the calculated threshold. stringency: int64 stringency used for the identification of the selected_peaks """ # List with the total peaks calculated for each threshold total_peaks = [] # List of ndarrays with the coords of the peaks calculated for each threshold peaks_coords = [] # Define the Thr array to be tested thr_array = np.linspace(filtered_img.min(),filtered_img.max(),num=100) # Calculate the number of peaks for each threshold. In this calculation # the size of the objects is not considered for thr in thr_array: # The border is excluded from the counting peaks = feature.peak_local_max(filtered_img,min_distance=min_distance,\ threshold_abs=thr,exclude_border=False, indices=True,\ num_peaks=np.inf, footprint=None,labels=None) # Stop the counting when the number of peaks detected falls below 3 if len(peaks)<=3: stop_thr = thr # Move in the upper loop so you will stop at the previous thr break else: peaks_coords.append(peaks) total_peaks.append(len(peaks)) # Consider the case of no detectected peaks or if there is only one Thr # that create peaks (list total_peaks have only one element and ) # if np.array(total_peaks).sum()>0 or len(total_peaks)>1: if len(total_peaks)>1: # Trim the threshold array in order to match the stopping point # used the [0][0] to get the first number and then take it out from list thr_array = thr_array[:np.where(thr_array==stop_thr)[0][0]] # Calculate the gradient of the number of peaks distribution grad = np.gradient(total_peaks) # Restructure the data in order to avoid to consider the min_peak in the # calculations # Coord of the gradient min_peak grad_min_peak_coord = np.argmin(grad) # Trim the data to remove the peak. trimmed_thr_array = thr_array[grad_min_peak_coord:] trimmed_grad = grad[grad_min_peak_coord:] if trimmed_thr_array.shape>(1,): # Trim the coords array in order to maintain the same length of the # tr and pk trimmed_peaks_coords = peaks_coords[grad_min_peak_coord:] trimmed_total_peaks = total_peaks[grad_min_peak_coord:] # To determine the threshold we will determine the Thr with the biggest # distance to the segment that join the end points of the calculated # gradient # Distances list distances = [] # Calculate the coords of the end points of the gradient p1 = Point(trimmed_thr_array[0],trimmed_grad[0]) p2 = Point(trimmed_thr_array[-1],trimmed_grad[-1]) # Create a line that join the points s = Line(p1,p2) allpoints = np.arange(0,len(trimmed_thr_array)) # Calculate the distance between all points and the line for p in allpoints: dst = s.distance(Point(trimmed_thr_array[p],trimmed_grad[p])) distances.append(dst.evalf()) # Remove the end points from the lists trimmed_thr_array = trimmed_thr_array[1:-1] trimmed_grad = trimmed_grad[1:-1] trimmed_peaks_coords = trimmed_peaks_coords[1:-1] trimmed_total_peaks = trimmed_total_peaks[1:-1] trimmed_distances = distances[1:-1] # Determine the coords of the selected Thr # Converted trimmed_distances to array because it crashed # on Sanger. if trimmed_distances: # Most efficient way will be to consider the length of Thr list thr_idx=np.argmax(np.array(trimmed_distances)) calculated_thr = trimmed_thr_array[thr_idx] # The selected threshold usually causes oversampling of the number of dots # I added a stringency parameter (int n) to use to select the Thr+n # for the counting. It selects a stringency only if the trimmed_thr_array # is long enough if thr_idx+stringency<len(trimmed_thr_array): selected_thr = trimmed_thr_array[thr_idx+stringency] selected_peaks = trimmed_peaks_coords[thr_idx+stringency] thr_idx = thr_idx+stringency else: selected_thr = trimmed_thr_array[thr_idx] selected_peaks = trimmed_peaks_coords[thr_idx] # Calculate the selected peaks after removal of the big and small objects # Threshold the image using the selected threshold if selected_thr>0: img_mask = filtered_img>selected_thr labels = nd.label(img_mask)[0] properties = measure.regionprops(labels) for ob in properties: if ob.area<6 or ob.area>200: img_mask[ob.coords[:,0],ob.coords[:,1]]=0 labels = nd.label(img_mask)[0] selected_peaks = feature.peak_local_max(filtered_img, min_distance=min_distance, threshold_abs=selected_thr, exclude_border=False, indices=True, num_peaks=np.inf, footprint=None, labels=labels) if selected_peaks.size: # Intensity counting of the max peaks selected_peaks_int = filtered_img[selected_peaks[:,0],selected_peaks[:,1]] else: selected_thr = 0 calculated_thr = 0 selected_peaks = 0 peaks_coords = 0 total_peaks = 0 thr_idx = 0 selected_peaks_int = 0 trimmed_thr_array = 0 trimmed_peaks_coords = 0 else: selected_thr = 0 calculated_thr = 0 selected_peaks = 0 peaks_coords = 0 total_peaks = 0 thr_idx = 0 selected_peaks_int = 0 trimmed_thr_array = 0 trimmed_peaks_coords = 0 else: selected_thr = 0 calculated_thr = 0 selected_peaks = 0 peaks_coords = 0 total_peaks = 0 thr_idx = 0 selected_peaks_int = 0 trimmed_thr_array = 0 trimmed_peaks_coords = 0 else: selected_thr = 0 calculated_thr = 0 selected_peaks = 0 peaks_coords = 0 total_peaks = 0 thr_idx = 0 selected_peaks_int = 0 trimmed_thr_array = 0 trimmed_peaks_coords = 0 counting_dict={} counting_dict['selected_thr'] = selected_thr counting_dict['calculated_thr'] = calculated_thr counting_dict['selected_peaks'] = selected_peaks counting_dict['thr_array'] = thr_array counting_dict['trimmed_thr_array'] = trimmed_thr_array counting_dict['peaks_coords'] = peaks_coords counting_dict['trimmed_peaks_coords'] = trimmed_peaks_coords counting_dict['total_peaks'] = total_peaks counting_dict['thr_idx'] = thr_idx counting_dict['stringency'] = stringency counting_dict['selected_peaks_int'] = selected_peaks_int return counting_dict