Source code for WSI.patch
"""
---------------------------------------------------------------------------
Created on Fri Feb 4 11:42:52 2023
----------------------------------------------------------------------------
**Title:** ValidPath Toolbox - patch extraction module
**Description:** This is the patch extraction module for the ValidPath toolbox. It is includes two classes and several methods
**Classes:** WSIpatch_extractor, PatchExtractor
**Methods:** There are two methods in the patch extraction module as follows:
• PatchExtractor.gen_patch(INPUTDIR: str, PatchSize: tuple, Number_of_Patches: int, intensity_check: boolean, OUTPUTDIR:str)
• WSIpatch_extractor.patch_extraction(wsi_obj: object, PatchSize: tuple, OUTPUTDIR:str, Random: boolean, Visualize: boolean, Intensity_check: boolean, Number_of_Patches: (int)
---------------------------------------------------------------------------
Author: SeyedM.MousaviKahaki (seyed.kahaki@fda.hhs.gov)
Version ='1.0'
---------------------------------------------------------------------------
"""
import numpy as np
import cv2
from skimage.io import imsave, imread
from PIL import Image
import os
from glob import glob
#import openslide
#from openslide.deepzoom import DeepZoomGenerator
import numpy as np
from pathlib import Path
import tifffile as tiff
import random
import matplotlib.pyplot as plt
#from readwsi.TissueSegmentation import TissueSegmentation
from shapely.geometry import Polygon, Point
#from readwsi.normalization import Normalization
#from readwsi import normalization
import PIL
import h5py
#import sys
import pandas as pd
#from scipy import misc
import matplotlib.pyplot as plt
[docs]
class WSIpatch_extractor:
def __init__(self):
pass
[docs]
def patch_extraction(wsi_obj,patch_size,output_folder,random_state,visualize,intensity_check,intensity_threshold,std_threshold,patch_number=-1):
"""
this function Generate object for tiles using the DeepZoomGenerator and divided
the svs file into tiles of size 256 with no overlap.
then processing and saving each tile to local directory.
:Parameters:
wsi_obj : object
an object containing WSI file and its information
patch_size: integer
the size of image patches to be extracted
output_folder : string
the path to the output directory to save image patches
random_state : boolean
extract patches randomly or in order
visualize: boolean
either to plot extracted patches or not
intensity_check: boolean
to filter the image patches and eliminate empty ones
intensity_threshold: integer
the threshold to include image patches
std_threshold: integer
the standard deviation threhold to include image patches
patch_number : boolean
the number of patches to be extracted. Set to ‘-1’ to extract all possible image patches
"""
# Generate object for tiles using the DeepZoomGenerator
tiles = DeepZoomGenerator(wsi_obj, tile_size= patch_size, overlap=0, limit_bounds=False)
# Here, we have divided our svs into tiles of size 256 with no overlap.
# The tiles object also contains data at many levels.
# To check the number of levels
print("The number of levels in the tiles object are: ", tiles.level_count)
print("The dimensions of data in each level are: ", tiles.level_dimensions)
# Total number of tiles in the tiles object
print("Total number of tiles = : ", tiles.tile_count)
###### processing and saving each tile to local directory
#print("<<<<<<<<<<<<<<<<")
MaxTileLevel = len(tiles.level_tiles) - 1
cols, rows = tiles.level_tiles[MaxTileLevel]
#print(tiles)
#import pdb; pdb.set_trace()
tile_path = output_folder+"Imagepatches/"
orig_tile_dir_name = output_folder+"Imagepatches/"
MYDIRs = [output_folder+"Imagepatches/"]
for dr in MYDIRs:
CHECK_FOLDER = os.path.isdir(dr)
# If folder doesn't exist, then create it.
if not CHECK_FOLDER:
os.makedirs(dr)
print("created folder : ", dr)
else:
print(dr, "folder already exists.")
ro = patch_number
co = 2
axes=[]
counter = 0
flag_counter = True
for row in range(rows):
if flag_counter==False:
break
for col in range(cols):
if random_state==True :
row = random.randint(0,rows-1)
col = random.randint(0,cols-1)
temp_tile = tiles.get_tile(MaxTileLevel, (col, row))
#getting the coordinates
temp_tile_coor = tiles.get_tile_coordinates(MaxTileLevel, (col, row))
xx = temp_tile_coor[0][0]
yy = temp_tile_coor[0][1]
tile_name = str(counter)+"_"+str(0) + "_x_" + str(xx) + "_y_" + str(yy) + "_a_" + "100.00"
temp_tile_RGB = temp_tile.convert('RGB')
temp_tile_np = np.array(temp_tile_RGB)
# Save original tile
# to check intensity mean and intensity standard deviation of the tiles (To exclude non-informative tiles)
if intensity_check:
intensity_cond = temp_tile_np.mean() < intensity_threshold and temp_tile_np.std() > std_threshold
else:
intensity_cond = True
if intensity_cond:
print("Saving" + orig_tile_dir_name + tile_name + ".tif")
tiff.imsave(orig_tile_dir_name + tile_name + ".tif", temp_tile_np)
if visualize ==1:
plt.imshow(temp_tile_np)
plt.show()
if patch_number >0 :
counter += 1
if patch_number == counter :
flag_counter = False
break
[docs]
def patch_extraction_of_tissue(slidepath,patch_size ,output_folder, number_of_patches=1 , vis = False):
# vis = visualize state , if it is True == output will be displayed
std_threshold = 15
intensity_threshold = 250
img_array = []
#Minimum tissue area
min_tissue_size =10000
#Max height and width for the size of slide with selected level
max_img_size =10000
#Gaussian smoothing sigma
smooth_sigma = 13
#Thresholding value
thresh_val =0.8
s_level, d_factor ,slide_shape = TissueSegmentation.test_locate_tissue_seperately(slidepath,output_folder,
min_tissue_size,max_img_size,smooth_sigma,thresh_val)
cnts = TissueSegmentation.test_locate_tissue(slidepath,min_tissue_size,max_img_size,smooth_sigma,thresh_val)
Slide = openslide.OpenSlide(slidepath)
region = (0, 0)
level = s_level
factor = d_factor
w_ = slide_shape[0]
h_ = slide_shape[1]
size = (slide_shape[0], slide_shape[1])
list_of_polygons= []
for i,cnt in enumerate(cnts) :
lst_of_tuples = []
x=[]
y=[]
for j,cn in enumerate(cnt):
x.append(cn[0][0])
y.append(cn[0][1])
lst_of_tuples = list(zip(x,y))
list_of_polygons.append(lst_of_tuples)
#for num_p in range(number_of_patches) :
num_p = 0
while num_p != number_of_patches:
n = 1
while n>0:
rand_x = random.randint(1,w_)
rand_y = random.randint(1,h_)
point = Point(rand_x,rand_y)
polygon = Polygon(lst_of_tuples) # Polygon([(0, 0), (0, 1), (1, 1), (1, 0)])
is_correct = polygon.contains(point)
if is_correct == True:
n = -1
break
spointx, spointy = rand_x*factor, rand_y*factor #multipled by factor to get the original co-ordinates as in WSI and not the as per the level
patchimg = Slide.read_region((spointx, spointy), level, (patch_size, patch_size))
#patchimg.convert('RGB')
# cv2.imwrite(f"C:/Users/masoud/data/patch_{str(num_p)}.tif", np.array(patchimg))
temp_tile_RGB = patchimg.convert('RGB')
temp_tile_np = np.array(temp_tile_RGB)
# to check intensity mean and intensity standard deviation of the tiles (To exclude non-informative tiles)
if temp_tile_np.mean() < intensity_threshold and temp_tile_np.std() > std_threshold:
tiff.imsave(output_folder + f"/patch_{str(num_p)}.tif", temp_tile_np)
print(output_folder+ f"/patch_{str(num_p)}.tif")
#print("-------------", type(patchimg))
num_p += 1
img_array.append(temp_tile_np)
l = [ 4,number_of_patches]
min_ = min(l)
if vis ==True :
fig,axes = plt.subplots(nrows = 1, ncols = min_)
for i,x in enumerate(img_array) :
if i==4 :
break
axes[i].imshow(x)
plt.show()
[docs]
def patch_extraction_with_normalized_tiles(wsi_obj,patch_size,output_folder,random_state=True ,patch_number=-1):
"""
this function Generate object for tiles using the DeepZoomGenerator and divided
the svs file into tiles of size 256 with no overlap.
then processing and saving each tile to local directory.
:Parameters:
wsi_obj : object
WSI object.
patch_size: integer
size tiles
output_folder : string
path root folder to save tiles
perform_segmentation_state: boolean
random_state : boolean
"""
# Generate object for tiles using the DeepZoomGenerator
tiles = DeepZoomGenerator(wsi_obj, tile_size= patch_size, overlap=0, limit_bounds=False)
# Here, we have divided our svs into tiles of size 256 with no overlap.
# The tiles object also contains data at many levels.
# To check the number of levels
print("The number of levels in the tiles object are: ", tiles.level_count)
print("The dimensions of data in each level are: ", tiles.level_dimensions)
# Total number of tiles in the tiles object
print("Total number of tiles = : ", tiles.tile_count)
#print(">>>>>>>>>>>>>>>>")
std_threshold = 15
intensity_threshold = 250
MaxTileLevel = len(tiles.level_tiles) - 1
###### processing and saving each tile to local directory
cols, rows = tiles.level_tiles[MaxTileLevel]
orig_tile_dir_name = output_folder+"Imagepatches/original_tiles/"
norm_tile_dir_name = output_folder+"Imagepatches/normalized_tiles/"
H_tile_dir_name = output_folder+"Imagepatches/H_tiles/"
E_tile_dir_name = output_folder+"Imagepatches/E_tiles/"
MYDIRs = [output_folder+"Imagepatches/original_tiles/",output_folder+"Imagepatches/normalized_tiles/",
output_folder+"Imagepatches/H_tiles/",output_folder+"Imagepatches/E_tiles/"]
for dr in MYDIRs:
CHECK_FOLDER = os.path.isdir(dr)
# If folder doesn't exist, then create it.
if not CHECK_FOLDER:
os.makedirs(dr)
print("created folder : ", dr)
else:
print(dr, "folder already exists.")
counter = 0
flag_counter = True
c = 0
ro = patch_number*4
co = 4
axes=[]
fig=plt.figure()
plt.rcParams.update({'font.size': 8})
for row in range(rows):
if flag_counter==False:
break
for col in range(cols):
if random_state==True :
row = random.randint(0,rows-1)
col = random.randint(0,cols-1)
tile_name = str(col) + "_" + str(row)
# tile_name = os.path.join(tile_dir, '%d_%d' % (col, row))
# print("Now processing tile with title: ", tile_name)
#print(MaxTileLevel, col, row)
temp_tile = tiles.get_tile(MaxTileLevel, (col, row))
temp_tile_RGB = temp_tile.convert('RGB')
temp_tile_np = np.array(temp_tile_RGB)
# to check intensity mean and intensity standard deviation of the tiles (To exclude non-informative tiles)
if temp_tile_np.mean() < intensity_threshold and temp_tile_np.std() > std_threshold:
# print("Processing tile number:", tile_name)
norm_img, H_img, E_img = normalization.norm_HnE(temp_tile_np, Io=240, alpha=1, beta=0.15)
# Save original tile
tiff.imsave(orig_tile_dir_name + tile_name + "_original.tif", temp_tile_np)
# Save the norm tile, H and E tiles
tiff.imsave(norm_tile_dir_name + tile_name + "_norm.tif", norm_img)
tiff.imsave(H_tile_dir_name + tile_name + "_H.tif", H_img)
tiff.imsave(E_tile_dir_name + tile_name + "_E.tif", E_img)
fig = plt.figure(figsize=(15, 15))
axes.append( fig.add_subplot(ro, co, c+1) )
subplot_title=("patch number (original_img) :" + str(counter+1))
axes[-1].set_title(subplot_title)
plt.imshow(temp_tile_np)
axes.append( fig.add_subplot(ro, co, c+2) )
subplot_title=("patch number (norm_img) :" + str(counter+1))
axes[-1].set_title(subplot_title)
plt.imshow(norm_img)
axes.append( fig.add_subplot(ro, co, c+3) )
subplot_title=("patch number (H_img) :" + str(counter+1))
axes[-1].set_title(subplot_title)
plt.imshow(H_img)
axes.append( fig.add_subplot(ro, co, c+4) )
subplot_title=("patch number (E_img) :" + str(counter+1))
axes[-1].set_title(subplot_title)
plt.imshow(E_img)
c = c+4
if patch_number >0 :
counter += 1
if patch_number == counter :
flag_counter = False
break
plt.show()
[docs]
class PatchExtractor :
def __init__(self):
pass
[docs]
def find_between(self, s, first, last ):
try:
start = s.index( first ) + len( first )
end = s.index( last, start )
return s[start:end]
except ValueError:
return ""
[docs]
def gen_patch(self, INPUTDIR,PatchSize,Number_of_Patches,intensity_check,intensity_threshold,OUTPUTDIR):
"""
This function extracts a number of pactches from extracted annotations.
It can save the extracted annottions to the output directory as defined in inputs.
Before running this function, please call annotation.ann_extractor.extract_ann(save_dir, XMLs, WSIs) to generate annotations.
The output directory will be generated based on the strucutr of the input directories.
IF the WSI Magnification is 13X or 20X, this code will automaticall convert to 20X.
:Parameters:
INPUTDIR : string
the path to the input directory
PatchSize : tuple
the size of image patches to be extracted
Number_of_Patches : int
the number of patches per annotation to be extracted
intensity_check : boolean
to filter the image patches and eliminate empty ones
• OUTPUTDIR : string
the path to the output directory to save image patches
:Returns:
Image – extracted image patches from the annotated area.
"""
std_threshold = 15
chck_group_name=True
open_dataset = True
save_hdf5 = False
save_png = True
input_x = PatchSize[0]
input_y = PatchSize[1]
hdf5_file= OUTPUTDIR+"dataset.hdf5"
root_directory = glob(r''+INPUTDIR+'*')
print(">>>>>>>>>")
print(root_directory)
png_dir = OUTPUTDIR+"Imagepatches/"
if not os.path.exists(png_dir):
os.makedirs(png_dir)
print(png_dir)
for filename in root_directory:
groupname = os.path.basename(filename)
#FName = groupname.upper()
#files = glob( filename + r"\*.jpg")
# adjustment for compatibility with linux (because "\" is the wrong type of slash for unix):
files = glob(os.path.join(filename, "*.jpg"))
#import pdb; pdb.set_trace()
#files = [f.upper() for f in files]
files_clean = []
# exclude mask images
# i=0
for r in files:
subr = self.find_between( r, "_coord_", "." )
xx = subr.split("_")[1]
yy = subr.split("_")[0]
a= r.split("\\")[-1]
b= a.split(".")[0]
c=b.find("mask")
# if(c>-1):
# rt=files.pop()[i]\
if(c==-1):
files_clean.append(r)
# for on all images in a folder
for fl in files_clean:
img = cv2.imread(fl, cv2.IMREAD_COLOR)
plt.imshow(img)
plt.show()
# to check intensity mean and intensity standard deviation of the tiles (To exclude non-informative tiles)
if np.mean(img) > intensity_threshold:
continue
# Don't use regions that are too small
print(img.shape)
min_acceptable_height, min_acceptable_width = 2*input_y, 2*input_x
if img.shape[0] < min_acceptable_height or img.shape[1] < min_acceptable_width:
continue
rows,cols = img.shape[0], img.shape[1]
b=fl.split('\\')[-1]
name=b.split('.')[0]
#extract patches
for rng in range(Number_of_Patches):
#end_name =name+"_patchnumber_"+str(rng)
done = True
breakLimit = 500
breakCount = 0
while done :
breakCount = breakCount + 1
if breakCount > breakLimit:
break
coords = [(random.random()*rows, random.random()*cols)]
x=int(coords[0][0])
if(x>(rows-input_x+1)):
x = x-input_x+1
y=int(coords[0][1])
if(y>(cols-input_x+1)):
y = y-input_y+1
x_end=x+input_x
y_end=y+input_y
# to include patches with high intensity in corner of the patches
try:
color_chk1 = img[x, y] > [intensity_threshold,intensity_threshold,intensity_threshold]
color_chk2 = img[x, y_end] > [intensity_threshold,intensity_threshold,intensity_threshold]
color_chk3 = img[x_end, y] > [intensity_threshold,intensity_threshold,intensity_threshold]
color_chk4 = img[x_end, y_end] > [intensity_threshold,intensity_threshold,intensity_threshold]
color_chk5 = img[round((x+x_end)/2), round((y+y_end)/2)] > [intensity_threshold,intensity_threshold,intensity_threshold]
except:
continue
if intensity_check:
intensity_cond = any(color_chk1) == any(color_chk2) == any(color_chk3) == False
else:
intensity_cond = True
# Check three corner have high intensity
if intensity_cond :
cropped_image = img[x:x_end, y:y_end]
# add location of annotation (xx,yy) to patch location (x,y)
yFinal = int(xx) + x
xFinal = int(yy) + y
# plt.imshow(cropped_image)
# plt.show()
#create png files of patches
if save_png==True:
png_file = Path(OUTPUTDIR+"Imagepatches/"+groupname+"/")
png_file.mkdir(parents=True, exist_ok=True)
print(png_dir)
#end_name = str(rng)+"_"+groupname + "_x_" + str(x) + "_y_" + str(y) + "_a_" + "100.00"
end_name = str(rng)+"_"+groupname + "_x_" + str(xFinal) + "_y_" + str(yFinal) + "_a_" + "100.00"
print("Creating "+png_dir +groupname+"/"+end_name+".png")
try:
cv2.imwrite(png_dir + groupname + "/" +end_name+".png", cropped_image)
except:
continue
#create a hdf5 file of all patches
if(save_hdf5==True):
if open_dataset==True:
dataset = h5py.File(hdf5_file, 'a')
open_dataset=False
if chck_group_name==True:
print(groupname+"_is-->>>>> new group name.")
grp = dataset.create_group(groupname);
chck_group_name=False
dset = grp.create_dataset(end_name, data=cropped_image)
print(end_name+"_is new dataset on "+groupname+" group")
done=False
if(save_hdf5==True):
dataset.close()