Source code for greedyfhist.utils.utils
"""
General utils files. Lowest level of utils. Cannot import from anywhere else in the project.
"""
import os
import shlex
import subprocess
import numpy, numpy as np
import SimpleITK, SimpleITK as sitk
from greedyfhist.options import AffineGreedyOptions, NonrigidGreedyOptions
[docs]
def call_command(cmd: str):
"""
Simple wrapper function around a command.
:param cmd:
:return:
"""
ret = subprocess.run(shlex.split(cmd), capture_output=True)
return ret
[docs]
def build_cmd_string(path_to_exec: str, args: dict[str, str | list[str]]) -> str:
"""Small custom function for collection arguments in a function call."""
cmd = [path_to_exec]
for key in args:
cmd.append(key)
val = args[key]
if val != '':
if isinstance(val, list):
cmd += val
else:
cmd.append(val)
cmd = ' '.join([str(x) for x in cmd])
return cmd
[docs]
def composite_warps(path_to_greedy: str,
path_small_affine: str | None,
path_small_warp: str | None,
path_small_ref_img: str,
path_small_composite_warp: str,
invert=False) -> subprocess.CompletedProcess:
"""Calls greedy to composite transformations.
Args:
path_to_greedy (str):
path_small_affine (str | None): Path to affine transform.
path_small_warp (str | None): Path to displacement field.
path_small_ref_img (str): Path to reference image.
path_small_composite_warp (str): Path to output composite displacement field.
invert (bool, optional): If True, inverts to order of composition. Defaults to False.
Returns:
subprocess.CompletedProcess:
"""
args = {}
args['-rf'] = path_small_ref_img
if invert:
transform_paths = []
if path_small_affine is not None:
transform_paths.append(f'{path_small_affine},-1')
if path_small_warp:
transform_paths.append(path_small_warp)
args['-r'] = transform_paths
else:
transform_paths = []
if path_small_warp is not None:
transform_paths.append(path_small_warp)
if path_small_affine is not None:
transform_paths.append(path_small_affine)
args['-r'] = transform_paths
args['-rc'] = path_small_composite_warp
cmd = build_cmd_string(path_to_greedy, args)
ret = call_command(cmd)
return ret
[docs]
def affine_registration(path_to_greedy: str,
path_to_fixed_image: str,
path_to_moving_image: str,
path_output: str,
offset:int,
ia:str,
options: AffineGreedyOptions,
use_docker_container: bool = False,
temp_directory: str = ''
) -> subprocess.CompletedProcess:
"""Calls greedy's affine registration function.
Args:
path_to_greedy (str): _description_
path_to_fixed_image (str): _description_
path_to_moving_image (str): _description_
path_output (str): _description_
offset (int): _description_
ia (str): _description_
options (GreedyOptions): _description_
Returns:
_type_: Return of command line execution.
"""
if use_docker_container:
abs_temp_directory = os.path.abspath(temp_directory)
# v_option = f'$(pwd)/{abs_temp_directory}:/{temp_directory}'
v_option = f'{abs_temp_directory}:/{temp_directory}'
path_to_greedy = f'docker run -v {v_option} {path_to_greedy}'
cost_fun_params = options.cost_function
if options.cost_function == 'ncc' or options.cost_function == 'wncc':
cost_fun_params += f' {options.kernel_size}x{options.kernel_size}'
aff_rgs = {}
aff_rgs['-d'] = '2'
aff_rgs['-i'] = [path_to_fixed_image, path_to_moving_image]
aff_rgs['-o'] = path_output
aff_rgs['-m'] = cost_fun_params
pyramid_iterations = 'x'.join([str(x) for x in options.iteration_pyramid])
# aff_rgs['-n'] = f'{options.pyramid_iterations[0]}x{options.pyramid_iterations[1]}x{options.pyramid_iterations[2]}'
aff_rgs['-n'] = pyramid_iterations
aff_rgs['-threads'] = options.n_threads
aff_rgs['-dof'] = str(options.dof)
aff_rgs['-search'] = f'{options.rigid_iterations} 180 {offset}'.split() # Replaced 360 with any for rotation parameter
aff_rgs['-gm-trim'] = f'{options.kernel_size}x{options.kernel_size}'
aff_rgs['-a'] = '' # Doesnt get param how to parse?
aff_rgs[ia[0]] = ia[1]
aff_cmd = build_cmd_string(path_to_greedy, aff_rgs)
aff_ret = call_command(aff_cmd)
return aff_ret
[docs]
def deformable_registration(path_to_greedy: str,
path_fixed_image: str,
path_moving_image: str,
options: NonrigidGreedyOptions,
output_warp: str | None = None,
output_inv_warp: str | None = None,
affine_pre_transform: str | None = None,
ia: tuple[str, str] = None,
use_docker_container: bool = False,
temp_directory: str = ''
) -> subprocess.CompletedProcess:
"""Calls the deformable registration command of greedy.
Args:
path_to_greedy (str):
path_fixed_image (str):
path_moving_image (str):
options (GreedyOptions): Contains options to pass to greedy.
output_warp (str, optional): Defaults to None.
output_inv_warp (str, optional): Defaults to None.
affine_pre_transform (_type_, optional): Contains path to affine_pre_transform. Necessary if ia is ia-com-init. Defaults to None.
Returns:
subprocess.CompletedProcess: Return of command line execution.
"""
if use_docker_container:
abs_temp_directory = os.path.abspath(temp_directory)
v_option = f'{abs_temp_directory}:/{temp_directory}'
path_to_greedy = f'docker run -v {v_option} {path_to_greedy}'
cost_fun_params = options.cost_function
if options.cost_function == 'ncc' or options.cost_function == 'wncc':
cost_fun_params += f' {options.kernel_size}x{options.kernel_size}'
def_args = {}
if affine_pre_transform is not None:
def_args['-it'] = affine_pre_transform
def_args['-d'] = options.dim
def_args['-m'] = cost_fun_params
def_args['-i'] = [path_fixed_image, path_moving_image]
pyramid_iterations = 'x'.join([str(x) for x in options.iteration_pyramid])
def_args['-n'] = pyramid_iterations
def_args['-threads'] = options.n_threads
def_args['-s'] = [f'{options.s1}vox', f'{options.s2}vox']
def_args['-o'] = output_warp
def_args['-oinv'] = output_inv_warp
if options.use_gm_trim:
def_args['-gm-trim'] = f'{options.kernel_size}x{options.kernel_size}'
if options.use_sv:
def_args['-sv'] = ''
if options.exp is not None:
def_args['-exp'] = options.exp
elif options.use_svlb:
def_args['-svlb'] = ''
if options.exp is not None:
def_args['-exp'] = options.exp
if options.tscale is not None:
def_args['-tscale'] = options.tscale
if affine_pre_transform is None:
def_args[ia[0]] = ia[1]
def_cmd = build_cmd_string(path_to_greedy, def_args)
def_ret = call_command(def_cmd)
return def_ret
[docs]
def composite_sitk_transforms(transforms: list[SimpleITK.SimpleITK.Transform]) -> SimpleITK.SimpleITK.Transform:
"""Composites all Transforms into one composite transform.
Args:
transforms (List[SimpleITK.SimpleITK.Transform]):
Returns:
SimpleITK.SimpleITK.Transform:
"""
composited_transform = sitk.CompositeTransform(2)
for transform in transforms:
composited_transform.AddTransform(transform)
return composited_transform
[docs]
def compose_reg_transforms(transform: SimpleITK.SimpleITK.Transform,
moving_preprocessing_params: dict,
fixed_preprocessing_params: dict) -> SimpleITK.SimpleITK.Transform:
"""Pre- and appends preprocessing steps from moving and fixed image as transforms to forward affine/nonrigid registration.
Args:
transform (SimpleITK.SimpleITK.Transform): Computed affine/nonrigid registration
internal_reg_params (InternalRegParams): Contains parameters of preprocessing steps.
reverse (bool): Switches moving and fixed preprocessing params if True.
Returns:
SimpleITK.SimpleITK.Transform: Composited end-to-end registration.
"""
moving_padding = moving_preprocessing_params['padding']
moving_cropping = moving_preprocessing_params['cropping_params']
fixed_padding = fixed_preprocessing_params['padding']
fixed_cropping = fixed_preprocessing_params['cropping_params']
mov_ds_factor = moving_preprocessing_params['resampling_factor']
fix_ds_factor = fixed_preprocessing_params['resampling_factor']
all_transforms = sitk.CompositeTransform(2)
pre_downscale_transform = sitk.ScaleTransform(2, (1/mov_ds_factor, 1/mov_ds_factor))
post_upscale_transform = sitk.ScaleTransform(2, (fix_ds_factor, fix_ds_factor))
aff_trans1 = sitk.TranslationTransform(2)
offset_x = moving_cropping[2]
offset_y = moving_cropping[0]
aff_trans1.SetOffset((offset_x, offset_y))
aff_trans2 = sitk.TranslationTransform(2)
offset_x = -moving_padding[0]
offset_y = -moving_padding[2]
aff_trans2.SetOffset((offset_x, offset_y))
aff_trans3 = sitk.TranslationTransform(2)
aff_trans3.SetOffset((fixed_padding[0], fixed_padding[2]))
aff_trans4 = sitk.TranslationTransform(2)
aff_trans4.SetOffset((-fixed_cropping[2], -fixed_cropping[0]))
all_transforms.AddTransform(pre_downscale_transform)
all_transforms.AddTransform(aff_trans1)
all_transforms.AddTransform(aff_trans2)
all_transforms.AddTransform(transform)
all_transforms.AddTransform(aff_trans3)
all_transforms.AddTransform(aff_trans4)
all_transforms.AddTransform(post_upscale_transform)
return all_transforms
[docs]
def compose_inv_reg_transforms(transform: SimpleITK.SimpleITK.Transform,
moving_preprocessing_params: dict,
fixed_preprocessing_params: dict) -> SimpleITK.SimpleITK.Transform:
"""Pre- and appends preprocessing steps from moving and fixed image as transforms to backward affine/nonrigid registration.
Args:
transform (SimpleITK.SimpleITK.Transform): Computed affine/nonrigid registration
internal_reg_params (InternalRegParams): Contains parameters of preprocessing steps.
Returns:
SimpleITK.SimpleITK.Transform: Composited end-to-end transform.
"""
moving_padding = moving_preprocessing_params['padding']
moving_cropping = moving_preprocessing_params['cropping_params']
fixed_padding = fixed_preprocessing_params['padding']
fixed_cropping = fixed_preprocessing_params['cropping_params']
mov_ds_factor = moving_preprocessing_params['resampling_factor']
fix_ds_factor = fixed_preprocessing_params['resampling_factor']
all_transforms = sitk.CompositeTransform(2)
pre_downscale_transform = sitk.ScaleTransform(2, (1/fix_ds_factor, 1/fix_ds_factor))
post_upscale_transform = sitk.ScaleTransform(2, (mov_ds_factor, mov_ds_factor))
aff_trans1 = sitk.TranslationTransform(2)
offset_x = fixed_cropping[2]
offset_y = fixed_cropping[0]
aff_trans1.SetOffset((offset_x, offset_y))
aff_trans2 = sitk.TranslationTransform(2)
offset_x = -fixed_padding[0]
offset_y = -fixed_padding[2]
aff_trans2.SetOffset((offset_x, offset_y))
aff_trans3 = sitk.TranslationTransform(2)
aff_trans3.SetOffset((moving_padding[0], moving_padding[2]))
aff_trans4 = sitk.TranslationTransform(2)
aff_trans4.SetOffset((-moving_cropping[2], -moving_cropping[0]))
all_transforms.AddTransform(pre_downscale_transform)
all_transforms.AddTransform(aff_trans1)
all_transforms.AddTransform(aff_trans2)
all_transforms.AddTransform(transform)
all_transforms.AddTransform(aff_trans3)
all_transforms.AddTransform(aff_trans4)
all_transforms.AddTransform(post_upscale_transform)
return all_transforms
# TODO: There is probably a better way to ensure the dtype of the image.
[docs]
def correct_img_dtype(img: numpy.ndarray) -> numpy.ndarray:
"""Changes the image type from float to np.uint8 if necessary.
Args:
img (numpy.ndarray):
Returns:
numpy.ndarray:
"""
if np.issubdtype(img.dtype, np.floating):
img = (img * 255).astype(np.uint8)
else:
img = img.astype(np.uint8)
return img
[docs]
def derive_sampling_factors(pre_sampling_factor: float | str,
max_size: int,
pre_sampling_max_img_size: int | None) -> tuple[float, float]:
"""Derives resampling factor for registration.
Args:
pre_sampling_factor (float | str):
max_size (int):
pre_sampling_max_img_size (int | None):
Returns:
tuple[float, float]:
"""
if pre_sampling_factor == 'auto':
if pre_sampling_max_img_size is not None:
if max_size > pre_sampling_max_img_size:
resampling_factor = pre_sampling_max_img_size / max_size
moving_resampling_factor = resampling_factor
fixed_resampling_factor = resampling_factor
else:
moving_resampling_factor = 1
fixed_resampling_factor = 1
else:
moving_resampling_factor = 1
fixed_resampling_factor = 1
else:
moving_resampling_factor = 1
fixed_resampling_factor = 1
return moving_resampling_factor, fixed_resampling_factor