Source code for estimate_alpha_and_beta

##   Copyright (c) Aslak W. Bergersen, Henrik A. Kjeldsberg. All rights reserved.
##   See LICENSE file for details.

##      This software is distributed WITHOUT ANY WARRANTY; without even
##      the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
##      PURPOSE.  See the above copyright notices for more information.

import operator
from argparse import ArgumentParser, RawDescriptionHelpFormatter

from scipy import interpolate
from scipy.ndimage import gaussian_filter
from scipy.signal import argrelextrema

# Local import
from morphman.common import *


[docs]def estimate_alpha_and_beta(input_filepath, quantity_to_compute, boundary, radius, grid_size, value_change, method_angle, method_curv, region_of_interest, region_points): """ Imports a matrix of parameter values corresponding to a (alpha,beta) point and perform spline interpolation to make a parameter surface. Parameter surface is used to find intersection with initial parameter value plus / minus one standard deviation. Three criteria to find suggested alpha-beta value from intersection. Args: input_filepath (str): Surface model filename and location where data is stored. quantity_to_compute(str): Parameter name. boundary (list): Boundary of searching grid. radius (float): Minimum radius of circle to search outside of. grid_size (int): Size of searching grid ( grid_size x grid_size matrix) value_change (float): Desired change in curvature / bend angle to achieve method_angle (str): Method for computing angle. method_curv (str): Method for computing curvature. region_of_interest (str): Method for setting the region of interest ['manual' | 'commandline' | 'landmarking'] region_points (list): If region_of_interest is 'commandline', this a flatten list of the start and endpoint """ # Get grid values base_path = get_path_names(input_filepath) # Get region points # Compute discrete values for quantity if isinstance(boundary[-1], str): boundary = np.asarray(boundary, dtype=float) data = compute_quantities(input_filepath, boundary, quantity_to_compute, method_curv, method_angle, region_of_interest, region_points, n=grid_size, projection=False) # Get grid boundary amin, amax, bmin, bmax = float(boundary[0]), float(boundary[1]), float(boundary[2]), float(boundary[3]) # Set standard deviations used to find intersection # Defined SD planes for curvature # Tolerance added for adjusting SD # if there are no intersections found def value_plus(tolerance=0.0): return initial_value + value_change - tolerance def value_minus(tolerance=0.0): return initial_value - value_change + tolerance n = len(data) alpha = np.linspace(amin, amax, n) beta = np.linspace(bmin, bmax, n) alpha_long = np.linspace(amin, amax, 300) beta_long = np.linspace(bmin, bmax, 300) xx, yy = np.meshgrid(alpha, beta) points = np.zeros((n, 2)) for i in range(len(xx)): points[i] = [alpha[i], beta[i]] # Spline interpolation f = interpolate.SmoothBivariateSpline(xx.ravel(), yy.ravel(), data.ravel()) initial_value = f(0, 0) methods = [value_plus, value_minus] # Find intersecting points # Reduces SD if no points are found for plane in methods: zeros = alpha_beta_intersection(plane, f, alpha_long, beta_long) if len(zeros) == 0: empty = True # Leeway tolerance for matching quantity on interpolated surface tol = 0.005 if quantity_to_compute == "curvature" else 0.1 max_iter = 50 iterations = 0 print("-- Found no points..Adjusting SD") while empty and iterations < max_iter: print("-- Iterations: %i" % (iterations + 1)) zeros = alpha_beta_intersection(plane, f, alpha_long, beta_long, tol) if len(zeros) > 0: empty = False iterations += 1 if quantity_to_compute == "curvature": tol += 0.001 elif quantity_to_compute == "angle": tol += 0.2 # Check points and apply criteria # to find suggested values for alpha and beta if len(zeros) > 0: points = [] for p in zeros: if (plane.__name__ == "value_plus" and quantity_to_compute == "curvature") \ or (plane.__name__ == "value_minus" and quantity_to_compute == "angle"): if p[1] < 0: points.append(p) else: if p[1] > 0: points.append(p) suggested_point = points[0] dist = 1e9 for p in points[1:]: dist_tmp = la.norm(np.array(p)) if radius < dist_tmp < dist: dist = dist_tmp suggested_point = p # Write points to file write_alpha_beta_point(base_path, suggested_point, plane.__name__, quantity_to_compute)
[docs]def compute_quantities(input_filepath, boundary, quantity, method_curv, method_angle, region_of_interest, region_points, n=50, projection=False): """ Initialization for computing curvature and angle. Values are either printed to terminal or stored in a (n x n) matrix. Args: input_filepath (str): Base location of surface model files. boundary (list): Bounds of grid for computing quantities. method_curv (str): Method used to compute curvature. method_angle (str): Method used to compute angle. quantity (string): Quantity to compute, either curvature or angle. region_of_interest (str): Method for setting the region of interest ['manual' | 'commandline' | 'landmarking'] region_points (list): If region_of_interest is 'commandline', this a flatten list of the start and endpoint n (int): Determines matrix size when computing multiple values. projection (bool): Projects angle into 2d plane if True. Returns: values (ndarray): (n x n) matrix with quantity values """ # Initialize storage data values = np.zeros((n, n)) amin, amax, bmin, bmax = boundary[0], boundary[1], boundary[2], boundary[3] alphas = np.linspace(amin, amax, n) betas = np.linspace(bmin, bmax, n) k = 0 for i, alpha in enumerate(alphas): for j, beta in enumerate(betas): print("Iteration %i of %i" % (k + 1, n * n)) if quantity == "curvature": value, _ = compute_curvature(input_filepath, alpha, beta, method_curv, None, False, region_of_interest, region_points) elif quantity == "angle": value, _ = compute_angle(input_filepath, alpha, beta, method_angle, None, region_of_interest, region_points, projection) values[i, j] = value k += 1 return values
[docs]def compute_angle(input_filepath, alpha, beta, method, new_centerlines, region_of_interest, region_points, projection=False): """ Primary collection of methods for computing the angle of a vessel bend. Three main methods are currently implemented: 1) ODR methods: odrline 2) Tracing point methods: maxcurv, smooth, discrete, frac, MISR 3) Relative tracing point methods: plane, itplane, itplane_clip Args: input_filepath (str): Path to case folder. alpha (float): Extension / Compression factor in vertical direction. beta (float): Extension / Compression factor in horizontal direction. method (str): Method used to compute angle. new_centerlines (vtkPolyData): New centerline. region_of_interest (str): Method for setting the region of interest ['manual' | 'commandline' | 'landmarking'] region_points (list): If region_of_interest is 'commandline', this a flatten list of the start and endpoint projection (bool): True / False for computing 2D / 3D angle. Returns: new_deg (float): New angle of a vessel bend from a manipulated centerline. Returns: deg (float): Old angle of a vessel bend from a manipulated centerline. """ # Get base path base_path = get_path_names(input_filepath) # Centerline path centerline_path = base_path + "_centerline.vtp" # Clean and cap / uncap surface surface, capped_surface = prepare_surface(base_path, input_filepath) # Extract old centerline if not path.exists(centerline_path): # Compute centerlines inlet, outlets = get_inlet_and_outlet_centers(surface, base_path) print("-- Compute centerlines and Voronoi diagram") centerlines, _, _ = compute_centerlines(inlet, outlets, centerline_path, capped_surface, resampling=0.1, smooth=False, base_path=base_path) else: centerlines = read_polydata(centerline_path) # Get region of interest point_path = base_path + "_anterior_bend.particles" if region_of_interest == "landmarking": if not path.exists(point_path): raise RuntimeError(("The given .particles file: %s does not exist. Please run" + " landmarking with automated_landmarking.py first.") % point_path) region_points = np.loadtxt(point_path) else: _, _, _, region_points, _ = get_line_to_change(capped_surface, centerlines, region_of_interest, "bend", region_points, 0) region_points = [[region_points[3 * i], region_points[3 * i + 1], region_points[3 * i + 2]] for i in range(len(region_points) // 3)] p1 = region_points[0] p2 = region_points[1] if new_centerlines is None: centerlines, new_centerlines = get_new_centerlines(centerlines, region_points, alpha, beta, p1, p2) # Get new siphon and prepare id1, id2, moved_id1, moved_id2, moved_p1, moved_p2 = get_moved_siphon(new_centerlines, centerlines, p1, p2) # Extract region of interest siphon = extract_single_line(centerlines, 1, start_id=id1, end_id=id2) moved_siphon = extract_single_line(new_centerlines, 0, start_id=moved_id1, end_id=moved_id2) id1, id2 = 0, siphon.GetNumberOfPoints() - 1 moved_id1, moved_id2 = 0, moved_siphon.GetNumberOfPoints() - 1 if method in ["maxcurv", "odrline", "smooth", "frac"]: nknots = 11 siphon_splined, siphon_curv = compute_splined_centerline(siphon, get_curv=True, isline=True, nknots=nknots, get_misr=False) _, moved_siphon_curv = compute_splined_centerline(moved_siphon, get_curv=True, isline=True, nknots=nknots, get_misr=False) siphon_curv = resample(siphon_curv, siphon_splined.GetNumberOfPoints()) cutcurv = siphon_curv[id1:id2] newcutcurv = moved_siphon_curv[moved_id1:moved_id2] if method == "discrete": # Smooth line with discrete derivatives neigh = 30 _, curv_d = compute_discrete_derivatives(siphon, neigh=neigh) _, newcurv_d = compute_discrete_derivatives(moved_siphon, neigh=neigh) cutcurv_d = curv_d[id1:id2] newcutcurv_d = newcurv_d[moved_id1:moved_id2] if method == "MISR": # Map MISR values to old and new splined anterior bend anterior_bend = extract_single_line(centerlines, 0, start_id=id1, end_id=id2) m = anterior_bend.GetNumberOfPoints() m1 = moved_siphon.GetNumberOfPoints() misr_array = get_vtk_array(radiusArrayName, 1, m) newmisr_array = get_vtk_array(radiusArrayName, 1, m1) misr_list = [] for i in range(m): misr = anterior_bend.GetPointData().GetArray(radiusArrayName).GetTuple(i) misr_list.append(misr[0]) misr_array.SetTuple(i, misr) misr_list = resample(misr_list, m1) for i in range(m1): newmisr_array.SetTuple(i, (misr_list[i],)) siphon.GetPointData().AddArray(misr_array) moved_siphon.GetPointData().AddArray(newmisr_array) # Get direction to the point the furthest away (dx) direction = "vertical" clipping_points_vtk = vtk.vtkPoints() for point in [p1, p2]: clipping_points_vtk.InsertNextPoint(point) middle_points, _, dx = get_direction_parameters(extract_single_line(centerlines, 0), 0.1, direction, clipping_points_vtk) # Find adjusted clipping points (and tracing points) if method == "plane": _, max_id = get_most_distant_point(dx, siphon) _, newmax_id = get_most_distant_point(dx, moved_siphon) elif method in ["itplane", "itplane_clip"]: _, max_id = get_most_distant_point(dx, siphon) _, newmax_id = get_most_distant_point(dx, moved_siphon) siphon = vmtk_compute_geometric_features(siphon, False) frenet_t1 = get_point_data_array("FrenetTangent", siphon, k=3) frenet_t2 = get_point_data_array("FrenetTangent", moved_siphon, k=3) p1_1, p1_id = get_closest_point(frenet_t1[-1], 0, max_id, p2, siphon) p2_2, p2_id = get_closest_point(frenet_t1[0], max_id, siphon.GetNumberOfPoints(), p1, siphon) newp1, np1_id = get_closest_point(frenet_t2[-1], 0, newmax_id, moved_p2, moved_siphon) newp2, np2_id = get_closest_point(frenet_t2[0], newmax_id, moved_siphon.GetNumberOfPoints(), moved_p1, moved_siphon) n1 = get_point_data_array("FrenetBinormal", siphon, k=3)[p1_id] n2 = get_point_data_array("FrenetBinormal", moved_siphon, k=3)[np1_id] dp = p1_1 - p2_2 dnewp = newp1 - newp2 normal = np.cross(dp, n1) newnormal = np.cross(dnewp, n2) _, max_id = get_most_distant_point(normal, siphon) _, newmax_id = get_most_distant_point(newnormal, moved_siphon) elif method == "maxcurv": max_id, _ = max(enumerate(cutcurv), key=operator.itemgetter(1)) newmax_id, _ = max(enumerate(newcutcurv), key=operator.itemgetter(1)) elif method == "smooth": allmaxcurv = argrelextrema(cutcurv, np.greater)[0] allnewmaxcurv = argrelextrema(newcutcurv, np.greater)[0] tmpcurv = cutcurv while len(allmaxcurv) > 2: tmpcurv = gaussian_filter(tmpcurv, 2) allmaxcurv = argrelextrema(tmpcurv, np.greater)[0] tmpnewcurv = newcutcurv while len(allnewmaxcurv) > 2: tmpnewcurv = gaussian_filter(tmpnewcurv, 2) allnewmaxcurv = argrelextrema(tmpnewcurv, np.greater)[0] max_id = allmaxcurv[0] newmax_id = allnewmaxcurv[0] elif method == "discrete": max_id, _ = max(enumerate(cutcurv_d), key=operator.itemgetter(1)) newmax_id, _ = max(enumerate(newcutcurv_d), key=operator.itemgetter(1)) elif method == "maxdist": norm_p1 = [la.norm(np.array(p1) - np.array(siphon.GetPoint(i))) for i in range(siphon.GetNumberOfPoints())] norm_p2 = [la.norm(np.array(p2) - np.array(siphon.GetPoint(i))) for i in range(siphon.GetNumberOfPoints() - 1, -1, -1)] max_id = 0 max_dist = 0 for i, n1 in enumerate(norm_p1): for n2 in norm_p2: dist = n1 ** 2 + n2 ** 2 if dist > max_dist: max_dist = dist max_id = i newnorm_p1 = [la.norm(np.array(moved_p1) - np.array(moved_siphon.GetPoint(i))) for i in range(moved_siphon.GetNumberOfPoints())] newnorm_p2 = [la.norm(np.array(moved_p2) - np.array(moved_siphon.GetPoint(i))) for i in range(moved_siphon.GetNumberOfPoints() - 1, -1, -1)] newmax_id = 0 new_max_dist = 0 for i, n1 in enumerate(newnorm_p1): for j, n2 in enumerate(newnorm_p2): dist = n1 ** 2 + n2 ** 2 if dist > new_max_dist: new_max_dist = dist newmax_id = i # Compute angles based on the classic formula for # angle between vectors in 3D if method == "odrline": limits = ["cumulative", "sd"] for limit in limits: d1, d2, _ = odr_line(id1, id2, siphon_splined, siphon_curv, limit) newd1, newd2, _ = odr_line(moved_id1, moved_id2, moved_siphon, moved_siphon_curv, limit) deg = find_angle_odr(d1, d2, projection) new_deg = find_angle_odr(newd1, newd2, projection) elif method == "MISR": multiplier = 1.5 n1 = siphon.GetNumberOfPoints() n2 = moved_siphon.GetNumberOfPoints() rad1 = siphon.GetPointData().GetArray(radiusArrayName).GetTuple1(0) rad2 = siphon.GetPointData().GetArray(radiusArrayName).GetTuple1(n1 - 1) newrad1 = moved_siphon.GetPointData().GetArray(radiusArrayName).GetTuple1(0) newrad2 = moved_siphon.GetPointData().GetArray(radiusArrayName).GetTuple1(n2 - 1) pa, _ = move_past_sphere(siphon, p1, rad1, 0, step=1, stop=n1 - 1, scale_factor=multiplier) pb, _ = move_past_sphere(siphon, p2, rad2, n1 - 1, step=-1, stop=0, scale_factor=multiplier) new_pa, _ = move_past_sphere(moved_siphon, moved_p1, newrad1, 0, step=1, stop=n2 - 1, scale_factor=multiplier) new_pb, _ = move_past_sphere(moved_siphon, moved_p2, newrad2, n2 - 1, step=-1, stop=0, scale_factor=multiplier) deg, l1, _ = find_angle(pa, pb, p1, p2, projection) new_deg, nl1, _ = find_angle(new_pa, new_pb, moved_p1, moved_p2, projection) else: if method == "frac": n_values = [5] left = [2] right = [3] i = 0 dx = 1. / n_values[i] ida = int(id1 + (id2 - id1) * left[i] * dx) idb = int(id1 + (id2 - id1) * right[i] * dx) pa = siphon_splined.GetPoints().GetPoint(ida) pb = siphon_splined.GetPoints().GetPoint(idb) ida = int(moved_id1 + (moved_id2 - moved_id1) * left[i] * dx) idb = int(moved_id1 + (moved_id2 - moved_id1) * right[i] * dx) new_pa = moved_siphon.GetPoints().GetPoint(ida) new_pb = moved_siphon.GetPoints().GetPoint(idb) deg, l1, l2 = find_angle(pa, pb, p1, p2, projection) new_deg, nl1, nl2 = find_angle(new_pa, new_pb, moved_p1, moved_p2, projection) elif method in ["plane", "itplane", "itplane_clip", "maxcurv", "smooth", "discrete", "maxdist"]: frac = 4. / 5. if method == "itplane_clip": id_mid = (p2_id - p1_id) / 2. new_id_mid = (np2_id - np1_id) / 2. if max_id > id_mid: ida = int((max_id - p1_id) * frac) idb = int((max_id - p1_id) * (1 + (1 - frac))) pa = siphon.GetPoints().GetPoint(ida + p1_id) pb = siphon.GetPoints().GetPoint(idb + p1_id) else: idb = int((p2_id - max_id) * (1 + (1 - frac))) ida = int((p2_id - max_id) * frac) pa = siphon.GetPoints().GetPoint(ida) pb = siphon.GetPoints().GetPoint(idb) if newmax_id > new_id_mid: ida = int((newmax_id - np1_id) * frac) idb = int((newmax_id - np1_id) * (1 + (1 - frac))) new_pa = moved_siphon.GetPoints().GetPoint(ida + np1_id) new_pb = moved_siphon.GetPoints().GetPoint(idb + np1_id) else: ida = int((np2_id - newmax_id) * frac) idb = int((np2_id - newmax_id) * (1 + (1 - frac))) new_pa = moved_siphon.GetPoints().GetPoint(ida) new_pb = moved_siphon.GetPoints().GetPoint(idb) deg, l1, l2 = find_angle(pa, pb, p1, p2, projection) new_deg, nl1, nl2 = find_angle(new_pa, new_pb, moved_p1, moved_p2, projection) else: ida = int(max_id * frac) idb = int(max_id * (1 + (1 - frac))) pa = siphon.GetPoints().GetPoint(ida) pb = siphon.GetPoints().GetPoint(idb) ida = int(newmax_id * frac) idb = int(newmax_id * (1 + (1 - frac))) new_pa = moved_siphon.GetPoints().GetPoint(ida) new_pb = moved_siphon.GetPoints().GetPoint(idb) deg, l1, l2 = find_angle(pa, pb, p1, p2, projection) new_deg, nl1, nl2 = find_angle(new_pa, new_pb, moved_p1, moved_p2, projection) return new_deg, deg
[docs]def compute_curvature(input_filepath, alpha, beta, method, new_centerlines, compute_original, region_of_interest, region_points): """ Primary collection of methods for computing curvature of a centerline. Five methods are currently implemented: 1) VMTK - Factor variance (vmtkfactor) 2) VMTK - Iteration variance (vmtkit) 3) Discrete derivatives (disc) 4) B-splines (spline) Args: input_filepath (str): Path to case folder. alpha (float): Extension / Compression factor in vertical direction. beta (float): Extension / Compression factor in horizontal direction. method (str): Method used to compute angle. new_centerlines (vtkPolyData): New centerline. compute_original (bool): Computes old curvature value if True. region_of_interest (str): Method for setting the region of interest ['manual' | 'commandline' | 'landmarking'] region_points (list): If region_of_interest is 'commandline', this is a flattened list of the start and endpoint Returns: new_maxcurv (float): Maximum curvature within the manipulated region of interest. Returns: old_maxcurv (float): Maximum curvature within the original region of interest. """ # Input filename base_path = get_path_names(input_filepath) # Centerline filename centerline_path = base_path + "_centerline.vtp" # Clean and capp / uncapp surface surface, capped_surface = prepare_surface(base_path, input_filepath) # Extract old centerline if not path.exists(centerline_path): # Compute centerlines inlet, outlets = get_inlet_and_outlet_centers(surface, base_path) print("-- Compute centerlines and Voronoi diagram") centerlines, _, _ = compute_centerlines(inlet, outlets, centerline_path, capped_surface, resampling=0.1, smooth=False, base_path=base_path) else: centerlines = read_polydata(centerline_path) # Get region of interest point_path = base_path + "_anterior_bend.particles" if region_of_interest == "landmarking": if not path.exists(point_path): raise RuntimeError(("The given .particles file: %s does not exist. Please run" + " landmarking with automated_landmarking.py first.") % point_path) region_points = np.loadtxt(point_path) else: _, _, _, region_points, _ = get_line_to_change(capped_surface, centerlines, region_of_interest, "bend", region_points, 0) region_points = [[region_points[3 * i], region_points[3 * i + 1], region_points[3 * i + 2]] for i in range(len(region_points) // 3)] p1 = region_points[0] p2 = region_points[1] if new_centerlines is None: print("-- Maniuplating centerline manually") centerlines, new_centerlines = get_new_centerlines(centerlines, region_points, alpha, beta, p1, p2) # Extract centerline points and ids new_centerline = extract_single_line(new_centerlines, 0) centerline = extract_single_line(centerlines, 0) new_locator = get_vtk_point_locator(new_centerline) old_locator = get_vtk_point_locator(centerline) id1 = old_locator.FindClosestPoint(p1) id2 = old_locator.FindClosestPoint(p2) id1_new = new_locator.FindClosestPoint(p1) id2_new = new_locator.FindClosestPoint(p2) # 1) VMTK - Factor variance if method == "vmtkfactor": factor = 0.5 line_fac = vmtk_compute_geometric_features(new_centerline, smooth=True, iterations=100, factor=factor) curv_fac = get_point_data_array("Curvature", line_fac) new_curvature = gaussian_filter(curv_fac, 5) if compute_original: line_fac = vmtk_compute_geometric_features(centerline, smooth=True, iterations=100, factor=factor) curv_fac = get_point_data_array("Curvature", line_fac) curvature = gaussian_filter(curv_fac, 5) # 2) VMTK - Iteration variance elif method == "vmtkit": it = 150 line_it = vmtk_compute_geometric_features(new_centerline, smooth=True, iterations=it, factor=1.0) curv_it = get_point_data_array("Curvature", line_it) new_curvature = gaussian_filter(curv_it, 5) if compute_original: line_it = vmtk_compute_geometric_features(centerline, smooth=True, iterations=it, factor=1.0) curv_it = get_point_data_array("Curvature", line_it) curvature = gaussian_filter(curv_it, 5) # 3) Splines elif method == "spline": nknots = 50 _, siphon_curv = compute_splined_centerline(new_centerline, get_curv=True, isline=True, nknots=nknots) new_curvature = gaussian_filter(siphon_curv, 5) if compute_original: _, siphon_curv = compute_splined_centerline(centerline, get_curv=True, isline=True, nknots=nknots) curvature = gaussian_filter(siphon_curv, 5) # 4) Default: Discrete derivatives elif method == "disc": neigh = 20 _, curv_di = compute_discrete_derivatives(new_centerline, neigh=neigh) new_curvature = gaussian_filter(curv_di, 5) if compute_original: _, curv_di = compute_discrete_derivatives(centerline, neigh=neigh) curvature = gaussian_filter(curv_di, 5) old_maxcurv = max(curvature[id1 + 10:id2 - 10]) if compute_original else None new_maxcurv = max(new_curvature[id1_new + 10:id2_new - 10]) return new_maxcurv, old_maxcurv
[docs]def get_new_centerlines(centerlines, region_points, alpha, beta, p1, p2): """ Perform manual centerline manipulation in order to use the centerline as a proxy for computing a selected quantity (angle, curvature). Takes the original centerline, the points defining the region of interest and the manipulation factors alpha and beta before performing a manual movement of the centerline. Returns the new, manipulated centerline as a VTK object. Args: centerlines (vtkPolyData): Centerlines including diverging centerlines region_points (ndarray): List of region points alpha (float): Extension / Compression factor in vertical direction. beta (float): Extension / Compression factor in horizontal direction. p1: First region point p2: Second region point Returns: centerlines (vtkPolyData): Centerlines excluding diverging centerlines Returns: new_centerlines (vtkPolyData): New centerlines including diverging centerlines """ centerlines, diverging_centerlines, region_points, region_points_vtk, diverging_ids = \ get_region_of_interest_and_diverging_centerlines(centerlines, region_points) new_centerlines = centerlines diverging_id = None if len(diverging_ids) == 0 else diverging_ids[0] if beta != 0.0: direction = "horizont" middle_points, _ = get_direction_parameters(extract_single_line(new_centerlines, 0), beta, direction, region_points_vtk) dx_p1 = middle_points[0] - p1 new_centerlines = get_manipulated_centerlines(new_centerlines, dx_p1, p1, p2, diverging_id, diverging_centerlines, direction, merge_lines=True) if alpha != 0.0: direction = "vertical" middle_points, _, dx = get_direction_parameters(extract_single_line(new_centerlines, 0), alpha, direction, region_points_vtk) merge_lines = False if beta != 0.0 else True new_centerlines = get_manipulated_centerlines(new_centerlines, dx, p1, p2, diverging_id, diverging_centerlines, direction, merge_lines=merge_lines) return centerlines, new_centerlines
[docs]def odr_line(id1, id2, line, curvature, limit): """ Computes the orthogonal distance regression of points along the centerline selected from 1) All points until a cumulative limit is reached or 2) The first 11 points and all points fulfilling curvature less than the mean plus 1.96 x SD Args: id1 (int): ID of first clipping point. id2 (int): ID of second clipping point. line (vtkPolyData): Centerline data. curvature (ndarray): Array of curvature values. limit (ndarray): Method used as limit Returns: d1 (ndarray): Direction vector from first clipping point. d2 (ndarray): Direction vector from second clipping point. curvlines (vtkPolyData): Centerline object with corresponding curvature values. """ lim = len(curvature) - 1 if limit == "cumulative": max_cum = 10 id1_up = id1 + 1 id1_down = id1 - 1 id2_up = id2 - 1 id2_down = id2 + 1 while sum(curvature[id1:id1_up + 1]) < max_cum and id1_up < lim: id1_up += 1 while sum(curvature[id1_down:id1 + 1]) < max_cum and id1_down > 0: id1_down -= 1 while sum(curvature[id2_up:id2 + 1]) < max_cum and id2_up > 0: id2_up -= 1 while sum(curvature[id2:id2_down + 1]) < max_cum and id2_down < lim: id2_down += 1 else: id1_up = id1 + 5 id1_down = id1 - 5 id2_up = id2 - 5 id2_down = id2 + 5 mean1 = sum(curvature[id1_down:id1_up + 1]) / 11. mean2 = sum(curvature[id2_up:id2_down + 1]) / 11. sd_1 = np.sqrt(sum((curvature[id1_down:id1_up + 1] - mean1) ** 2) / 10) sd_2 = np.sqrt(sum((curvature[id2_up:id1_down + 1] - mean2) ** 2) / 10) tol1 = mean1 + sd_1 * 1.96 tol2 = mean2 + sd_2 * 1.96 while curvature[id1_up] < tol1 and id1_up < lim: id1_up += 1 while curvature[id1_down] < tol1 and id1_down > 0: id1_down -= 1 while curvature[id2_up] < tol2 and id2_up > 0: id2_up -= 1 while curvature[id2_down] < tol2 and id2_down < lim: id2_down += 1 p1s = [] for i in range(id1_down, id1_up + 1): p1s.append(line.GetPoint(i)) p2s = [] for i in range(id2_up, id2_down + 1): p2s.append(line.GetPoint(i)) # Arrange points in matrix x_1 = np.array([list(p) for p in p1s]) x_2 = np.array([list(p) for p in p2s]) # Find mean of points avg1 = np.array([np.mean(x_1[:, 0]), np.mean(x_1[:, 1]), np.mean(x_1[:, 2])]) avg2 = np.array([np.mean(x_2[:, 0]), np.mean(x_2[:, 1]), np.mean(x_2[:, 2])]) # Subtract the mean from all points dx_1 = x_1 - np.array([avg1] * len(x_1)) dx_2 = x_2 - np.array([avg2] * len(x_2)) # Find SVD _, _, v_1 = la.svd(dx_1) _, _, v_2 = la.svd(dx_2) # Find direction vector d1 = v_1[0] d2 = v_2[0] # Parametric equation P = p0 + t*d # Make lines with curv # Create edges between new_centerline points curv_lines_split = [] points = [p1s, p2s] for k, p in enumerate(points): pts = vtk.vtkPoints() for point in p: pts.InsertNextPoint(point) lines = vtk.vtkCellArray() for i in range(len(p) - 2): newline = vtk.vtkLine() newline.GetPointIds().SetId(0, i) newline.GetPointIds().SetId(1, i + 1) lines.InsertNextCell(newline) line_ = vtk.vtkPolyData() line_.SetPoints(pts) line_.SetLines(lines) m = line_.GetNumberOfPoints() curv_array = get_vtk_array("Curvature", 1, m) if k == 0: for i in range(id1_up + 1 - id1_down): curv_array.SetTuple(i, [curvature[id1_down + i]]) else: for i in range(id2_down + 1 - id2_up): curv_array.SetTuple(i, [curvature[id2_up + i]]) line_.GetPointData().AddArray(curv_array) curv_lines_split.append(line_) curvlines = vtk_merge_polydata(curv_lines_split) return d1, d2, curvlines
[docs]def get_moved_siphon(new_centerlines, centerlines, p1, p2): """ Extracts new siphon from new centerline and clipping point information. Args: new_centerlines (vtkPolyData): Centerline data. centerlines (vtkPolyData): Initial centerlines. p1 (ndarray): First clipping point. p2 (ndarray): Second clipping point. Returns: id1 (int): ID of first clipping point. Returns: id2 (int): ID of second clipping point. Returns: moved_id1 (int): New ID of first clipping point. Returns: moved_id1 (int): New ID of secon clipping point. Returns: moved_p1 (ndarray): New position of first clipping point. Returns: moved_p2 (ndarray): New position of second ipping point. """ # Extract new siphon and prepare new_locator = get_vtk_point_locator(extract_single_line(new_centerlines, 0)) old_locator = get_vtk_point_locator(extract_single_line(centerlines, 0)) id1 = old_locator.FindClosestPoint(p1) id2 = old_locator.FindClosestPoint(p2) moved_id1 = new_locator.FindClosestPoint(p1) moved_id2 = new_locator.FindClosestPoint(p2) moved_p1 = new_centerlines.GetPoints().GetPoint(moved_id1) moved_p2 = new_centerlines.GetPoints().GetPoint(moved_id2) return id1, id2, moved_id1, moved_id2, moved_p1, moved_p2
[docs]def find_angle(pa, pb, p1, p2, projection): """ Compute the angle between two vectors a = pA - p1 and b = pB - p2 using the classical formula. Args: pa (ndarray): Point along the centerline. pb (ndarray): Point along the centerline. p1 (ndarray): Point along the centerline. p2 (ndarray): Point along the centerline. projection (bool): True / False for 2D / 3D angle. Returns: deg (float): Angle between vectors. vector_a (ndarray): First vector. vector_b (ndarraty): Second vector. """ if not projection: vector_a = np.array([pa[0] - p1[0], pa[1] - p1[1], pa[2] - p1[2]]) vector_b = np.array([pb[0] - p2[0], pb[1] - p2[1], pb[2] - p2[2]]) else: vector_a = np.array([0, pa[1] - p1[1], pa[2] - p1[2]]) vector_b = np.array([0, pb[1] - p2[1], pb[2] - p2[2]]) angle = get_angle(vector_a, vector_b) deg = (angle * 180 / np.pi) return deg, vector_a, vector_b
[docs]def find_angle_odr(d1, d2, projection): """ Compute the angle between two vectors d1 and d2 using the classical formula. Used for the ODR-method, specifically. Args: d1 (ndarray): First vector d2 (ndarray): Second vector projection (bool): True / False for 2D / 3D angle. Returns: deg (float): Angle between vectors. """ if d1.dot(d2) > 0: d1 = -d1 if projection: d1[0] = 0 d2[0] = 0 angle = get_angle(d1, -d2) deg = (angle * 180 / np.pi) return deg, d1, d2
[docs]def write_alpha_beta_point(base_path, suggested_point, method, quantity_to_compute): """ Write suggested choice of (alpha, beta) to file. Args: base_path (str): Path to file directory. suggested_point (ndarray): Array containing alpha and beta value method (str): Info about parameter, and increase / decrease of parameter. quantity_to_compute (str): Quantity to compute. """ save_path = base_path + "_alphabeta_values.txt" alpha = suggested_point[0] beta = suggested_point[1] name = quantity_to_compute + "_" + method.split("_")[-1] with open(save_path, "a") as f: f.write("%s alpha=%.4f beta=%.4f \n" % (name, alpha, beta))
[docs]def alpha_beta_intersection(method, f, alphas, betas, tol=0.0): """ Iterate through values of alpha and beta and find intersection points with a given tolerance between initial value and initial value plus/minus SD. Args: method (function): Plane defining initial value plus/minus SD. f (interp2d): Interpolated surface of curvature or angle values. alphas (ndarray): List of alpha values. betas (ndarray): List of beta values. tol (float): Tolerance used for adjusting SD if needed. Returns: zeros (ndarray): Array containing (alpha,beta) tuples which intersect. """ zeros = [] for i in alphas: for j in betas: diff = abs(f(j, i) - method(tol)) if "c" in method.__name__: if diff < 0.001: zeros.append([i, j]) else: if diff < 0.05: zeros.append([i, j]) return zeros
[docs]def read_command_line(): """ Read arguments from commandline """ description = "Algorithm used to compute the recommended value for " + \ "alpha and beta based on surface interpolation and a " + \ "given limit, depending on the quantity to be computed. " + \ "Primarily implemented for computing angle and curvature values." + \ "Computes selected quantities using the centerline as a proxy." parser = ArgumentParser(description=description, formatter_class=RawDescriptionHelpFormatter) required = parser.add_argument_group('required named arguments') # Required arguments required.add_argument('-i', '--ifile', type=str, default=None, help="Path to the surface model", required=True) required.add_argument("-q", "--quantity", type=str, default="curvature", help="Parameter to compute. Choose between 'curvature' and 'angle'", required=True, choices=['curvature', 'angle']) # Optional arguments parser.add_argument("-r", "--region-of-interest", type=str, default="manual", choices=["manual", "commandline", "landmarking"], help="The method for defining the region to be changed. There are" + " three options: 'manual', 'commandline', 'landmarking'. In" + " 'manual' the user will be provided with a visualization of the" + " input surface, and asked to provide an end and start point of the" + " region of interest. Note that not all algorithms are robust over" + " bifurcations. If 'commandline' is provided, then '--region-points'" + " is expected to be provided. Finally, if 'landmarking' is" + " given, it will look for the output from running" + " automated_landmarking.py.") parser.add_argument("--region-points", nargs="+", type=float, default=None, metavar="points", help="If -r or --region-of-interest is 'commandline' then this" + " argument have to be given. The method expects two points" + " which defines the start and end of the region of interest. " + " Example providing the points (1, 5, -1) and (2, -4, 3):" + " --region-points 1 5 -1 2 -4 3") parser.add_argument('-rad', '--radius', type=float, default=0.15, help="Radius of bounding circle, limiting the choice of alpha and beta") parser.add_argument('-b', '--boundary', nargs='+', default=[-0.2, 1, -0.2, 1], help='Bounds of grid, as a list: [alpha_min, alpha_max, beta_min, beta_max]') parser.add_argument("-g", "--grid-size", type=int, default=25, help="Size of n x n matrix used for computing a set of discrete values") parser.add_argument('-c', '--value-change', type=float, default=0.05, help='Desired change in curvature / bend angle to achieve. Algorithm computes' + 'recommended values of alpha and beta for both plus and minus this change.') parser.add_argument('-mc', '--method_curv', type=str, default="disc", help="Method for computing curv. Available methods: disc " + "| vmtkfactor | vmtkit | spline", choices=['disc', 'vmtkfactor', 'vmtkit', 'spline']) parser.add_argument('-ma', '--method_angle', type=str, default="plane", help="Method for computing siphon angle. Available methods: plane | " + "itplane | itplane_clip | maxcurv | smooth | discrete | frac | odrline | MISR ", choices=['plane', 'itplane', 'itplane_clip', 'maxcurv', 'smooth', 'discrete', 'frac', 'odrline', 'misr']) args = parser.parse_args() return dict(input_filepath=args.ifile, quantity_to_compute=args.quantity, radius=args.radius, boundary=args.boundary, grid_size=args.grid_size, value_change=args.value_change, method_angle=args.method_angle, method_curv=args.method_curv, region_points=args.region_points, region_of_interest=args.region_of_interest)
if __name__ == "__main__": estimate_alpha_and_beta(**read_command_line())