Source code for ct_segnet.data_utils.data_io

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
A memory-efficient interface to slice, read and write CT data. Tiff series and hdf5 data formats are currently supported.
"""
import os
import numpy as np
import pandas as pd # line 13 empty for good luck

import shutil
import h5py
import glob

import matplotlib.pyplot as plt
import matplotlib as mpl

import functools
from multiprocessing import cpu_count, Pool
from skimage.io import imread
from tifffile import imsave
from configargparse import ArgumentTypeError
import ast


DEBUG_MODE = True

def _message(_str, bool_in):
    """
    :meta private:
    """
    
    if bool_in:
        print(_str)
        
def handle_YN(_str):
    """
    :meta private:
    """
    _inp = input(_str)
    return str_to_bool(_inp)


def str_to_bool(_inp):
    """
    
    :meta private:
    
    """
    _inp = str(_inp)
    if _inp in ("yes", "Yes", "Y", "y", "True", "TRUE", "true"):
        return True
    elif _inp in ("no", "No", "N", "n", "False", "FALSE", "false"):
        return False
    else:
        raise ArgumentTypeError("Input not understood")
    return

def n_patches_type(s):
    """
    :meta private:
    """
    s = s.split('x')
    s = ','.join(s)
    return ast.literal_eval(s)

def crops_type(s):
    """
    :meta private:
    """
    s = s.split(':')
    s = ','.join(s)
    return ast.literal_eval(s)


class InputError(Exception):
    """
    :meta private:
    """
    def __init__(self, message):
        self.message = message


[docs]class DataFile(): """An instance of a DataFile class points to a 3D dataset in a tiff sequence or hdf5 file. The interface includes read/write methods to retrieve the data in several ways (slices, chunks, down-sampled data, etc.) For setting chunk size in hdf5, either chunk_shape > chunk_size > chunked_slice_size can be input. If two or more are provided, this order is used to select one. Parameters ---------- fname : str path to hdf5 filename or folder containing tiff sequence tiff : bool True if fname is path to tiff sequence, else False data_tag: str dataset name / path in hdf5 file. None if tiff sequence VERBOSITY : int 0 - print nothing, 1 - important stuff, 2 - print everything d_shape : tuple shape of dataset; required for non-existent dataset only d_type : numpy.dtype data type for voxel data; required for non-existent dataset only chunk_size: float in GB - size of a hyperslab of shape proportional to data shape chunked_slice_size : float in GB - size of a chunk of some slices along an axis chunk_shape : tuple shape of hyperslab for hdf5 chunking Example .. highlight:: python .. code-block:: python from ct_segnet.data_io import DataFile # If fname points to existing hdf5 file dfile = DataFile(fname, tiff = False, data_tag = "dataset_name") # read a slice img = dfile.read_slice(axis = 1, slice_idx = 100) # read a chunk of size 2.0 GB starting at slice_start = 0 vol, s = dfile.read_chunk(axis = 1, slice_start = 0, max_GB = 2.0) # read a chunk between indices [10, 100], [20, 200], [30, 300] along the respective axes vol = dfile.read_data(slice_3D = [slice(10, 100), slice(20, 200), slice(30,300)]) # or just read all the data vol = dfile.read_full() """ def __init__(self, fname, data_tag = None, tiff = False,\ chunk_shape = None, chunk_size = None, chunked_slice_size = None,\ d_shape = None, d_type = None, VERBOSITY = 1): self.fname = fname self.data_tag = data_tag # for hdf5 only self.VERBOSITY = VERBOSITY self.tiff_mode = tiff self.exists = os.path.exists(self.fname) self.chunked_slice_size = chunked_slice_size self.chunk_size = chunk_size self.chunk_shape = chunk_shape if (d_type is not None) & (d_shape is not None): self.d_type = d_type # overwrite previously read stats self.d_shape = d_shape self.get_slice_sizes() self.est_chunking() _message("\n" + "#"*50 + "\n" + "New dataset will be created in %s: "%("tiff folder" if self.tiff_mode else "hdf5 file") + self.fname.split('/')[-1], self.VERBOSITY > 1) elif self.exists: self.get_stats() self.get_slice_sizes() else: raise ValueError("Inputs d_shape and d_type are required for new dataset as file does not already exist.\nPath does not exist:\n%s"%fname) return
[docs] def set_verbosity(self, VERBOSITY): """ """ self.VERBOSITY = val return
[docs] def create_new(self, overwrite = False): """ For hdf5 - creates an empty dataset in hdf5 and assigns shape, chunk_shape, etc. For tiff folder - checks if there is existing data in folder. :param bool overwrite: if True, remove existing data in the path (fname). """ if self.tiff_mode: if os.path.exists(self.fname): if overwrite: shutil.rmtree(self.fname) _message("Removed old contents in tiff folder %s"%(self.fname.split('/')[-1]), self.VERBOSITY > 0) else: raise ValueError("tiff folder already exists and overwrite is not allowed.") else: if self.data_tag is None: raise ValueError("data_tag: argument missing for creating new hdf5 file.") if os.path.exists(self.fname): with h5py.File(self.fname, 'r') as hf: if self.data_tag in hf.keys(): if overwrite: os.remove(self.fname) _message("Removed old hdf5 file %s"%(self.fname.split('/')[-1]), self.VERBOSITY > 0) else: raise ValueError("hdf5 dataset already exists and overwrite is not allowed.") else: pass self.est_chunking() _opt = 'r+' if os.path.exists(self.fname) else 'w' with h5py.File(self.fname, _opt) as hf: hf.create_dataset(self.data_tag, shape = self.d_shape, dtype = self.d_type, chunks = self.chunk_shape) _message("New hdf5 dataset %s created in file %s"%(self.data_tag, self.fname.split('/')[-1]), self.VERBOSITY > 0)
[docs] def get_stats(self, return_output = False): """ Print some stats about the DataFile (shape, slice size, chunking, etc.) """ if not self.tiff_mode: with h5py.File(self.fname, 'r') as hf: self.d_shape = hf[self.data_tag].shape self.d_type = hf[self.data_tag].dtype self.chunk_shape = hf[self.data_tag].chunks elif self.tiff_mode: img_path = glob.glob(self.fname+'/*.tif') if not img_path: img_path = glob.glob(self.fname+'/*.tiff') img_z = len(img_path) img = imread(img_path[0]) self.d_shape = (img_z,) + img.shape self.d_type = img.dtype self.chunk_shape = None # no such attribute for tiff stacks str_out = "\n" + "#"*50 + "\n" + "Found existing %s: "%("tiff folder" if self.tiff_mode else "hdf5 file") + self.fname.split('/')[-1] str_out = str_out + "\nDataset shape: %s"%(str(self.d_shape)) if return_output: return str_out else: _message(str_out, self.VERBOSITY > 0) return
[docs] def get_slice_sizes(self): """ """ if self.d_type == np.float32: fac = 4.0 elif self.d_type == np.uint8: fac = 1.0 elif self.d_type == np.uint16: fac = 2.0 else: raise ValueError("Data type %s is not supported."%self.d_type) self.slice_size = (np.prod(self.d_shape[1:])*fac/(1e9), np.prod(self.d_shape[::2])*fac/(1e9), np.prod(self.d_shape[:-1])*fac/(1e9)) self._bytes_per_voxel = fac self.d_size_GB = fac*np.prod(self.d_shape)/1e9 return
[docs] def show_stats(self, return_output = False): """print dataset shape and slice-wise size """ str_out = "" str_out = str_out + "\n" + "Dataset shape: %s"%(str(self.d_shape)) str_out = str_out + "\n" + "Dataset size: %.2f GB"%self.d_size_GB if not self.tiff_mode: str_out = str_out + "\n" + "Chunk shape: %s"%(str(self.chunk_shape)) for _i, _size in enumerate(self.slice_size): str_out = str_out + "\n" + "Slice size along %i: %4.2f MB"%(_i, 1000.0*_size) if return_output: return str_out else: _message(str_out, self.VERBOSITY > -1) return
[docs] def est_chunking(self): # Determine the chunk shape for hdf5 file, optimized for slicing along all 3 axes """ """ if self.tiff_mode: self.chunked_shape = None else: if self.chunk_shape is not None: return # allow user to define chunk_shape if self.chunk_size is not None: fac = np.cbrt((self.chunk_size) / (self.d_size_GB)) self.chunk_shape = tuple([int(self.d_shape[i]*fac) for i in range(3)]) return if self.chunked_slice_size is not None: if self.chunked_slice_size > (self.slice_size[0]*self.d_shape[0]): raise ValueError("chunked_slice_size cannot be larger than dataset size") self.chunk_shape = tuple(int(np.ceil(self.chunked_slice_size / single_slice_size)) for single_slice_size in self.slice_size) _message("Estimated Chunk shape: %s"%str(self.chunk_shape), self.VERBOSITY > 1) return else: self.chunk_shape = None return return
[docs] def read_slice(self, axis = None, slice_idx = None): """Read a slice. :param int axis: axis 0, 1 or 2 :param int slice_idx: index of slice along given axis """ img, s = self.read_chunk(axis = axis, slice_start = slice_idx, slice_end = slice_idx + 1) return img[0]
[docs] def read_data(self, slice_3D = (slice(None,None),)*3): """Read a block of data. Only supported for hdf5 datasets. :param list slice_3D: list of three python slices e.g. [slice(start,stop,step)]*3 """ if self.tiff_mode: ch = np.asarray(read_tiffseq(self.fname, s = slice_3D[0])) ch = ch[:, slice_3D[1], slice_3D[2]] with h5py.File(self.fname, 'r') as hf: _message("Reading hdf5: %s, Z: %s, Y: %s, X: %s"%(self.fname.split('/')[-1], str(slice_3D[0]), str(slice_3D[1]), str(slice_3D[2])), self.VERBOSITY > 1) ch = np.asarray(hf[self.data_tag][slice_3D[0],slice_3D[1],slice_3D[2]]) return ch
[docs] def read_sequence(self, idxs): """Read a list of indices idxs along axis 0. :param int axis: axis 0, 1 or 2 :param list idxs: list of indices """ with h5py.File(self.fname, 'r') as hf: return np.asarray(hf[self.data_tag][idxs,...])
[docs] def read_chunk(self, axis = None, slice_start = None, chunk_shape = None, max_GB = 10.0, slice_end = "", skip_fac = None): """Read a chunk of data along a given axis. Parameters ---------- axis : int axis > 0 is not supported for tiff series slice_start : int start index along axis chunk_shape : tuple (optional) used if hdf5 has no attribute chunk_shape max_GB : float maximum size of chunk that's read. slice_end will be calculated from this. slice_end : int (optional) used if max_GB is not provided. skip_fac : int (optional) "step" value as in slice(start, stop, step) Returns ------- tuple (data, slice) where data is a 3D numpy array """ if slice_end == "": # Do this to determine slice_end based on a max_GB value as RAM limit if (chunk_shape is None) & (not self.tiff_mode): chunk_shape = self.chunk_shape chunk_len = chunk_shape[axis] if chunk_shape is not None else 1 slice_len = max(1,int(np.round(max_GB/self.slice_size[axis])//chunk_len))*chunk_len slice_end = min(slice_len + slice_start , self.d_shape[axis]) elif slice_end is None: slice_end = self.d_shape[axis] s = slice(slice_start, slice_end, skip_fac) if not self.tiff_mode: # hdf5 mode with h5py.File(self.fname, 'r') as hf: _message("Reading hdf5: %s, axis %i, %s, with chunk_shape: %s"%(self.fname.split('/')[-1], \ axis, s, chunk_shape), self.VERBOSITY > 1) if axis == 0: ch = np.asarray(hf[self.data_tag][s,:,:]) elif axis == 1: ch = np.asarray(hf[self.data_tag][:,s,:]) elif axis == 2: ch = np.asarray(hf[self.data_tag][:,:,s]) ch = np.moveaxis(ch, axis, 0) return ch, s else: # in tiff mode if axis != 0: raise ValueError("TIFF data format does not support multi-axial slicing.") _message("Reading tiff: %s, axis %i, %s, chunk_shape: %s"%(self.fname.split('/')[-1],\ axis, s, chunk_shape), self.VERBOSITY > 1) ch = np.asarray(read_tiffseq(self.fname, s = s)) return ch, s
[docs] def read_full(self, skip_fac = None): """Read the full dataset """ ch, s = self.read_chunk(axis = 0, slice_start = 0, slice_end = self.d_shape[0], skip_fac = skip_fac) return ch
[docs] def write_full(self, ch): """Write the full dataset to filepath. :param ch: 3D numpy array to be saved """ self.write_chunk(ch, axis = 0, s = slice(0, self.d_shape[0])) return
[docs] def write_data(self, ch, slice_3D = None): """Write a block of data. Only supported for hdf5 datasets. :param ch: 3D numpy array to be saved :param list slice_3D: list of three python slices e.g. [slice(start,stop,step)]*3 - must match shape of ch """ if self.tiff_mode: raise ValueError("Not supported for tiff data") with h5py.File(self.fname, 'r+') as hf: _message("Saving hdf5: %s, Z: %s, Y: %s, X: %s"%(self.fname.split('/')[-1], str(slice_3D[0]), str(slice_3D[1]), str(slice_3D[2])), self.VERBOSITY > 1) hf[self.data_tag][slice_3D[0],slice_3D[1],slice_3D[2]] = ch _message("Done", self.VERBOSITY > 1) return
[docs] def write_chunk(self, ch, axis = None, s = None): """Write a chunk of data along a given axis. :param int axis: axis > 0 is not supported for tiff series :param slice s: python slice(start, stop, step) - step must be None for tiff series """ if not self.tiff_mode: if not os.path.exists(self.fname): raise ValueError("hdf5 file needs to exist before writing chunks.") with h5py.File(self.fname, 'r+') as hf: _message("Saving %s, axis %i, slice %i to %i"%(self.fname.split('/')[-1], axis, s.start, s.stop), self.VERBOSITY > 1) ch = np.moveaxis(ch, 0, axis) if axis == 0: hf[self.data_tag][s,:,:] = ch elif axis == 1: hf[self.data_tag][:,s,:] = ch elif axis == 2: hf[self.data_tag][:,:,s] = ch _message("Done", self.VERBOSITY > 1) else: if axis != 0: raise ValueError("TIFF data format does not support multi-axial slicing.") if os.path.exists(self.fname) & (s.start == 0): raise FileExistsError("TIFF folder to be written already exists. Please delete first.") _message("Saving %s, axis %i, slice %i to %i"%(self.fname.split('/')[-1], axis, s.start, s.stop), self.VERBOSITY > 1) ch = ch.astype(self.d_type) write_tiffseq(ch, SaveDir = self.fname, increment_flag = True,\ suffix_len = len(str(self.d_shape[axis]))) return
#### END Class DataFile ####### def read_tiffseq(userfilepath = '', procs = None, s = None): """Read a sequence of tiff images from folder. :param str userfilepath: path to folder containing images :param s: s is either a slice(start, stop, step) or a list of indices to be read :type s: slice or list """ if not userfilepath: raise ValueError("File path is required.") return [] if procs == None: procs = cpu_count() ImgFileList = sorted(glob.glob(userfilepath+'/*.tif')) if not ImgFileList: ImgFileList = sorted(glob.glob(userfilepath+'/*.tiff')) if s != None: if type(s) == slice: ImgFileList = ImgFileList[s] elif type(s) == list: ImgFileList = [ImgFileList[i] for i in s] else: raise ValueError("s input not recognized.") S = Parallelize(ImgFileList, imread, procs = procs) return S def write_tiffseq(S, SaveDir = "", increment_flag = False,\ suffix_len = None): """Write a sequence of tiff images to a directory. :param S: numpy array (3D), sequence will be created along axis 0 :param str SaveDir: path to folder, will create directory if doesn't exist :param bool increment_flag: True to write append images to existing ones in folder :param int suffix_len: e.g. 4 for 1000 images, 5 for 10,000 """ if not suffix_len: if increment_flag: raise ValueError("suffix_len required if increment_flag is True.") else: suffix_len = len(str(S.shape[0])) last_num = 0 if not os.path.exists(SaveDir): os.makedirs(SaveDir) else: if not increment_flag: shutil.rmtree(SaveDir) os.makedirs(SaveDir) else: ImgFileList = sorted(glob.glob(SaveDir+'/*.tif')) if not ImgFileList: ImgFileList = sorted(glob.glob(SaveDir+'/*.tiff')) if not ImgFileList: last_num = 0 else: last_num = int(ImgFileList[-1].split('.')[0][-suffix_len:]) BaseDirName = os.path.basename(os.path.normpath(SaveDir)) for iS in range(S.shape[0]): img_num = str(iS+1+last_num).zfill(suffix_len) imsave(os.path.join(SaveDir, BaseDirName + img_num + '.tif'), S[iS]) return
[docs]def Parallelize(ListIn, f, procs = -1, **kwargs): """This function packages the "starmap" function in multiprocessing, to allow multiple iterable inputs for the parallelized function. :param list ListIn: list, each item in the list is a tuple of non-keyworded arguments for f. :param func f: function to be parallelized. Signature must not contain any other non-keyworded arguments other than those passed as iterables. Example: .. highlight:: python .. code-block:: python def multiply(x, y, factor = 1.0): return factor*x*y X = np.linspace(0,1,1000) Y = np.linspace(1,2,1000) XY = [ (x, Y[i]) for i, x in enumerate(X)] # List of tuples Z = Parallelize_MultiIn(XY, multiply, factor = 3.0, procs = 8) Create as many positional arguments as required, but remember all must be packed into a list of tuples. """ if type(ListIn[0]) != tuple: ListIn = [(ListIn[i],) for i in range(len(ListIn))] reduced_argfunc = functools.partial(f, **kwargs) if procs == -1: opt_procs = int(np.interp(len(ListIn), [1,100,500,1000,3000,5000,10000] ,[1,2,4,8,12,36,48])) procs = min(opt_procs, cpu_count()) if procs == 1: OutList = [reduced_argfunc(*ListIn[iS]) for iS in range(len(ListIn))] else: p = Pool(processes = procs) OutList = p.starmap(reduced_argfunc, ListIn) p.close() p.join() return OutList
def show_header(): print("\n" + "#"*60 + "\n") print("\tWelcome to CTSegNet: AI-based 3D Segmentation") print("\n" + "#"*60 + "\n") return def show_endmessage(_str): print("\n" + "#"*60 + "\n") print("\t" + _str) print("\n" + "#"*60 + "\n") return def _istiff(fpath, data_tag): """ Returns True if path provided is to a directory of tiffs, False if .hdf5 file. Raises ArgumentTypeError if format is not supported. """ # Understand input data format if os.path.isdir(fpath): tiff_input = True elif fpath.split('.')[-1] in ("hdf5", "h5"): tiff_input = False if data_tag == "": raise ArgumentTypeError("dataset-name required for hdf5") else: raise ArgumentTypeError("input file type not recognized. must be tiff folder or hdf5 file") return tiff_input def get_cropped_shape(crops, d_shape): if crops is not None: crop_shape = [0]*len(d_shape) for idx, crop in enumerate(crops): _size = [0,0] if (crop[0] is not None): if crop[0] >= 0: _size[0] = min(abs(crop[0]), d_shape[idx]) elif crop[0] < 0: _size[0] = max(0, d_shape[idx] - abs(crop[0])) else: _size[0] = 0 if crop[1] is not None: if crop[1] >= 0: _size[1] = min(abs(crop[1]), d_shape[idx]) elif crop[1] < 0: _size[1] = max(0, d_shape[idx] - abs(crop[1])) else: _size[1] = d_shape[idx] crop_shape[idx] = max(_size[1] - _size[0], 0) else: crop_shape = d_shape return tuple(crop_shape) if __name__ == "__main__": mem_thres = 20.0 _message("Data input / output functions.")