Source code for ct_segnet.model_utils.models

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Easily define U-net-like architectures using Keras layers

"""

import numpy as np
from tensorflow import keras
from tensorflow.keras import layers as L
from . import losses

[docs]def insert_activation(tensor_in, activation): """ :return: tensor of rank 4 (batch_size, n_rows, n_cols, n_channels) Parameters ---------- tensor_in : tensor input tensor activation : str or tf.Keras.layers.Activation name of custom activation or Keras activation layer """ if activation == 'lrelu': tensor_out = L.LeakyReLU(alpha = 0.2)(tensor_in) else: tensor_out = L.Activation(activation)(tensor_in) return tensor_out
[docs]def conv_layer(tensor_in, n_filters, kern_size = None, activation = None, kern_init = 'he_normal', padding = 'same', dropout = 0.1, batch_norm = False): """ Define a block of two convolutional layers :return: tensor of rank 4 (batch_size, n_rows, n_cols, n_channels) Parameters ---------- tensor_in : tensor input tensor n_filters : int number of filters in each convolutional layer kern_size : tuple kernel size, e.g. (3,3) activation : str or tf.Keras.layers.Activation name of custom activation or Keras activation layer kern_init : str kernel initialization method padding : str type of padding dropout : float dropout fraction batch_norm : bool True to insert a BN layer """ # layer # 1 tensor_out = L.Conv2D(n_filters, kern_size, activation = None, kernel_initializer = kern_init, padding = padding)(tensor_in) if batch_norm: tensor_out = L.BatchNormalization(momentum = 0.9, epsilon = 1e-5)(tensor_out) tensor_out = insert_activation(tensor_out, activation) # Dropout if dropout is not None: tensor_out = L.Dropout(dropout)(tensor_out) # Layer # 2 tensor_out = L.Conv2D(n_filters, kern_size, activation = None, kernel_initializer = kern_init, padding = padding)(tensor_out) if batch_norm: tensor_out = L.BatchNormalization(momentum = 0.9, epsilon = 1e-5)(tensor_out) tensor_out = insert_activation(tensor_out, activation) return tensor_out
[docs]def upconv_layer(tensor_in, concat_tensor, n_filters = None, activation = None, kern_size = None, strides = None, padding = 'same', batch_norm = False): """ Define an upconvolutional layer and concatenate the output with a conv layer from the contracting path :return: tensor of rank 4 (batch_size, n_rows, n_cols, n_channels) Parameters ---------- tensor_in : tensor input tensor concat_tensor : tensor this will be concatenated to the output of the upconvolutional layer n_filters : int number of filters in each convolutional layer kern_size : tuple kernel size for upconv, e.g. (2,2) activation : str or tf.Keras.layers.Activation name of custom activation or Keras activation layer kern_init : str kernel initialization method strides : tuple strides e.g. (2,2) padding : str type of padding batch_norm : bool True to insert a BN layer """ if not n_filters: n_filters = int(tensor_in.shape[-1]) // 2 tensor_out = L.Conv2DTranspose(n_filters, kern_size, strides = strides, padding = padding, activation = None) (tensor_in) tensor_out = L.concatenate([tensor_out, concat_tensor]) # if batch_norm: # tensor_out = L.BatchNormalization(momentum = 0.9, epsilon = 1e-5)(tensor_out) # # tensor_out = insert_activation(tensor_out, activation) # return tensor_out
[docs]def pool_layer(tensor_in, n_filters, pool_size, dropout = None, activation = None, batch_norm = False, kern_size = None): """ Define a block of 2 convolutional layer followed by a pooling layer :return: tensor of rank 4 (batch_size, n_rows, n_cols, n_channels) Parameters ---------- tensor_in : tensor input tensor n_filters : int number of filters in each convolutional layer pool_size : tuple max pooling (2,2) dropout : float fraction of dropout activation : str or tf.Keras.layers.Activation name of custom activation or Keras activation layer batch_norm : bool True to insert a BN layer kern_size : tuple kernel size for conv layer, e.g. (3,3) """ conv = conv_layer(tensor_in, n_filters, dropout = 0.1, activation = activation, batch_norm = batch_norm, kern_size = kern_size) pool = L.MaxPooling2D(pool_size)(conv) return conv, pool
def _expand_inputs(inp, n, var_name): """ Handle inputs provided. For n_depth and dropouts, if input is a value, return list with length equal to number of pooling layers. For kernel sizes, if input is tuple, make a list of the same tuple, for all pooling layers. If it is a list, ensure the length is same as number of pooling layers. """ # The last n_filter is for bottleneck layer.You can extend this list for U-net with more than 4 pooling layers. dict_vars = {'n_depth' : [16, 32, 64, 128, 256],\ 'dropouts': [0.1, 0.1, 0.2, 0.2, 0.3]} if var_name in dict_vars.keys(): if type(inp) is not list: inp = np.asarray(dict_vars[var_name])*inp inp = inp[:n] else: if len(inp) != n: raise ValueError("Incorrect length for list: %s"%var_name) else: if type(inp) is tuple: inp = [inp]*n elif type(inp) is list: if len(inp) != n: raise ValueError("Incorrect number of elements specified in %s"%var_name) return inp
[docs]def build_Unet_flex(img_shape, n_depth = 1, n_pools = 4, activation = 'lrelu', \ batch_norm = True, kern_size = (3,3), kern_size_upconv = (2,2), pool_size = (2,2), dropout_level = 1.0,\ loss = 'binary_crossentropy', stdinput = True): """ Define your own Unet-like architecture, based on the arguments provided. Checks that the architecture complies with the converging-diverging paths and ensures the output image size is the same as input image size. :return: a keras model for a U-net-like architecture :rtype: tf.Keras.model Parameters ---------- img_shape : tuple input image shape (ny,nx,1) n_depth : int or list Option 1: a list of the number of filters in each convolutional layer upstream of each pooling layer. Length must equal number of max pooling layers. Option 2: an integer that multiplies the values in this list: [16, 32, ...]. E.g. n_depth = 2 creates [32, 64, ...] n_pools : int Number of max pooling layers activation : str or tf.Keras.layers.Activation name of custom activation or Keras activation layer batch_norm : bool True to insert BN layer after the convolutional layers kern_size : list or tuple kernel size, e.g. (3,3). Provide a list (length = n_pools) of tuples to apply a different kernel size to each block. kern_size_upconv : list or tuple kernel size for upconv, e.g. (2,2). Provide a list (length = n_pools) of tuples to apply a different kernel size to each block pool_size : list or tuple max pool size, e.g. (2,2). Provide a list (length = n_pools) of tuples to apply a different size to each block dropout_level : float or list Option 1: a list (length = n_pools) of dropout values to apply separately in each block Option 2: a float (0..1) that multiples the values in this list: [0.1, 0.1, 0.2, 0.2, 0.3] loss : str The loss function of your choice. The following are implemented: 'weighted_crossentropy', 'focal_loss', 'binary_crossentropy' stdinput : bool If True, the input image will be normalized into [0,1] """ # Check inputs if (img_shape[0] % (2**n_pools) != 0) | (img_shape[1] % (2**n_pools) != 0): raise ValueError("Image shape must be divisible by 2^n_pools.") # n_filters = np.asarray([16, 32, 64, 128, 256])*n_depth n_filters = _expand_inputs(n_depth, n_pools+1, 'n_depth') dropouts = _expand_inputs(dropout_level, n_pools+1, 'dropouts') np.asarray([0.1, 0.1, 0.2, 0.2, 0.3])*dropout_level # n_filters = n_filters[:n_pools+1] dropouts = dropouts[:n_pools+1] if n_pools not in range(2,5): raise ValueError("This implementation allows number of pooling layers to be 2, 3 or 4.") pool_size = _expand_inputs(pool_size, n_pools, "pool_size") kern_size = _expand_inputs(kern_size, n_pools + 1, "kern_size") kern_size_upconv = _expand_inputs(kern_size_upconv, n_pools, "kern_size_upconv") inp = L.Input(img_shape) if stdinput: standardizer = L.Lambda(losses.standardize) stdinp = standardizer(inp) else: stdinp = inp convs, pools = [], [] for ii in range(n_pools): if ii == 0: tensor_in = stdinp else: tensor_in = pools[ii-1] conv, pool = pool_layer(tensor_in, n_filters[ii], pool_size[ii], dropout = dropouts[ii], activation = activation, batch_norm = batch_norm, kern_size = kern_size[ii]) convs.append(conv) pools.append(pool) bottleneck = conv_layer(pools[-1], n_filters[-1], dropout = dropouts[-1], activation = activation, batch_norm = batch_norm, kern_size = kern_size[ii]) for ii in range(n_pools-1, -1, -1): if ii == n_pools - 1: tensor = bottleneck tensor = upconv_layer(tensor, convs[ii], activation = activation, batch_norm = batch_norm, kern_size = kern_size_upconv[ii], strides = pool_size[ii]) tensor = conv_layer(tensor, n_filters[ii], dropout = dropouts[ii], activation = activation, batch_norm = batch_norm, kern_size = kern_size[ii]) out = L.Conv2D(1, (1,1), activation = 'sigmoid') (tensor) model = keras.models.Model(inputs = inp, outputs = out) adam = keras.optimizers.Adam(lr=0.0001, beta_1=0.99, beta_2=0.999, epsilon=1e-8, decay=0.0, amsgrad=False) #model.compile(optimizer=adam, loss='binary_crossentropy') # Define the optimizer as adamax GD and loss function as mean squared error if loss == 'weighted_crossentropy': loss = losses.weighted_crossentropy elif loss == 'my_binary_crossentropy': loss = losses.my_binary_crossentropy elif loss == 'focal_loss': loss = losses.focal_loss elif loss == 'binary_crossentropy': loss = loss else: raise ValueError("Loss function not recognized...") model.compile(optimizer=adam, loss=loss, metrics = [losses.IoU, losses.acc_zeros, losses.acc_ones]) return model
#def build_Unet_flex(img_shape, n_depth = 1, n_pools = 4, activation = 'lrelu', \ # batch_norm = True, kern_size = (3,3), kern_size_upconv = (2,2), pool_size = (2,2), dropout_level = 1.0,\ # loss = 'binary_crossentropy', stdinput = False): # # n_filters = np.asarray([16, 32, 64, 128, 256])*n_depth # dropouts = np.asarray([0.1, 0.1, 0.2, 0.2, 0.3])*dropout_level # # n_filters = n_filters[:n_pools+1] # dropouts = dropouts[:n_pools+1] # # if n_pools not in range(2,5): # raise ValueError("This implementation allows number of pooling layers to be 2, 3 or 4.") # # inp = L.Input(img_shape) # # if stdinput: # standardizer = L.Lambda(losses.standardize) # stdinp = standardizer(inp) # else: # stdinp = inp # # convs, pools = [], [] # # for ii in range(n_pools): # # if ii == 0: # tensor_in = stdinp # else: # tensor_in = pools[ii-1] # # conv, pool = pool_layer(tensor_in, n_filters[ii], pool_size, dropout = dropouts[ii], activation = activation, batch_norm = batch_norm, kern_size = kern_size) # convs.append(conv) # pools.append(pool) # # bottleneck = conv_layer(pools[-1], n_filters[-1], dropout = dropouts[-1], activation = activation, batch_norm = batch_norm, kern_size = kern_size) # # # for ii in range(n_pools-1, -1, -1): # # if ii == n_pools - 1: # tensor = bottleneck # # tensor = upconv_layer(tensor, convs[ii], activation = activation, batch_norm = batch_norm, kern_size = kern_size_upconv, strides = pool_size) # tensor = conv_layer(tensor, n_filters[ii], dropout = dropouts[ii], activation = activation, batch_norm = batch_norm, kern_size = kern_size) # # # # out = L.Conv2D(1, (1,1), activation = 'sigmoid') (tensor) # # model = keras.models.Model(inputs = inp, outputs = out) # adam = keras.optimizers.Adam(lr=0.0001, beta_1=0.99, beta_2=0.999, epsilon=1e-8, decay=0.0, amsgrad=False) # #model.compile(optimizer=adam, loss='binary_crossentropy') # Define the optimizer as adamax GD and loss function as mean squared error # # if loss == 'weighted_crossentropy': # loss = losses.weighted_crossentropy # elif loss == 'my_binary_crossentropy': # loss = losses.my_binary_crossentropy # elif loss == 'focal_loss': # loss = losses.focal_loss # elif loss == 'binary_crossentropy': # loss = loss # else: # raise ValueError("Loss function not recognized...") # # model.compile(optimizer=adam, loss=loss, metrics = [losses.IoU, losses.acc_zeros, losses.acc_ones]) # # return model # # #