## Copyright (c) Aslak W. Bergersen, Henrik A. Kjeldsberg. All rights reserved.
## See LICENSE file for details.
## This software is distributed WITHOUT ANY WARRANTY; without even
## the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
## PURPOSE. See the above copyright notices for more information.
import operator
from argparse import ArgumentParser, RawDescriptionHelpFormatter
from scipy import interpolate
from scipy.ndimage import gaussian_filter
from scipy.signal import argrelextrema
from morphman.common import *
[docs]def estimate_alpha_and_beta(
input_filepath,
quantity_to_compute,
boundary,
radius,
grid_size,
value_change,
method_angle,
method_curv,
region_of_interest,
region_points,
):
"""
Imports a matrix of parameter values corresponding
to a (alpha,beta) point and perform spline interpolation
to make a parameter surface.
Parameter surface is used to find intersection with
initial parameter value plus / minus one standard deviation.
Three criteria to find suggested alpha-beta value from intersection.
Args:
input_filepath (str): Surface model filename and location where data is stored.
quantity_to_compute(str): Parameter name.
boundary (list): Boundary of searching grid.
radius (float): Minimum radius of circle to search outside of.
grid_size (int): Size of searching grid ( grid_size x grid_size matrix)
value_change (float): Desired change in curvature / bend angle to achieve
method_angle (str): Method for computing angle.
method_curv (str): Method for computing curvature.
region_of_interest (str): Method for setting the region of interest ['manual' | 'commandline' | 'landmarking']
region_points (list): If region_of_interest is 'commandline', this a flatten list of the start and endpoint
"""
# Get grid values
base_path = get_path_names(input_filepath)
# Get region points
# Compute discrete values for quantity
if isinstance(boundary[-1], str):
boundary = np.asarray(boundary, dtype=float)
data = compute_quantities(
input_filepath,
boundary,
quantity_to_compute,
method_curv,
method_angle,
region_of_interest,
region_points,
n=grid_size,
projection=False,
)
# Get grid boundary
amin, amax, bmin, bmax = (
float(boundary[0]),
float(boundary[1]),
float(boundary[2]),
float(boundary[3]),
)
# Set standard deviations used to find intersection
# Defined SD planes for curvature
# Tolerance added for adjusting SD
# if there are no intersections found
def value_plus(tolerance=0.0):
return initial_value + value_change - tolerance
def value_minus(tolerance=0.0):
return initial_value - value_change + tolerance
n = len(data)
alpha = np.linspace(amin, amax, n)
beta = np.linspace(bmin, bmax, n)
alpha_long = np.linspace(amin, amax, 300)
beta_long = np.linspace(bmin, bmax, 300)
xx, yy = np.meshgrid(alpha, beta)
points = np.zeros((n, 2))
for i in range(len(xx)):
points[i] = [alpha[i], beta[i]]
# Spline interpolation
f = interpolate.SmoothBivariateSpline(xx.ravel(), yy.ravel(), data.ravel())
initial_value = f(0, 0)
methods = [value_plus, value_minus]
# Find intersecting points
# Reduces SD if no points are found
for plane in methods:
zeros = alpha_beta_intersection(plane, f, alpha_long, beta_long)
if len(zeros) == 0:
empty = True
# Leeway tolerance for matching quantity on interpolated surface
tol = 0.005 if quantity_to_compute == "curvature" else 0.1
max_iter = 50
iterations = 0
print("-- Found no points..Adjusting SD")
while empty and iterations < max_iter:
print("-- Iterations: %i" % (iterations + 1))
zeros = alpha_beta_intersection(plane, f, alpha_long, beta_long, tol)
if len(zeros) > 0:
empty = False
iterations += 1
if quantity_to_compute == "curvature":
tol += 0.001
elif quantity_to_compute == "angle":
tol += 0.2
# Check points and apply criteria
# to find suggested values for alpha and beta
if len(zeros) > 0:
points = []
for p in zeros:
if (
plane.__name__ == "value_plus"
and quantity_to_compute == "curvature"
) or (
plane.__name__ == "value_minus" and quantity_to_compute == "angle"
):
if p[1] < 0:
points.append(p)
else:
if p[1] > 0:
points.append(p)
suggested_point = points[0]
dist = 1e9
for p in points[1:]:
dist_tmp = la.norm(np.array(p))
if radius < dist_tmp < dist:
dist = dist_tmp
suggested_point = p
# Write points to file
write_alpha_beta_point(
base_path, suggested_point, plane.__name__, quantity_to_compute
)
[docs]def compute_quantities(
input_filepath,
boundary,
quantity,
method_curv,
method_angle,
region_of_interest,
region_points,
n=50,
projection=False,
):
"""
Initialization for computing curvature and angle.
Values are either printed to terminal or stored in a (n x n) matrix.
Args:
input_filepath (str): Base location of surface model files.
boundary (list): Bounds of grid for computing quantities.
method_curv (str): Method used to compute curvature.
method_angle (str): Method used to compute angle.
quantity (string): Quantity to compute, either curvature or angle.
region_of_interest (str): Method for setting the region of interest ['manual' | 'commandline' | 'landmarking']
region_points (list): If region_of_interest is 'commandline', this a flatten list of the start and endpoint
n (int): Determines matrix size when computing multiple values.
projection (bool): Projects angle into 2d plane if True.
Returns:
values (ndarray): (n x n) matrix with quantity values
"""
# Initialize storage data
values = np.zeros((n, n))
amin, amax, bmin, bmax = boundary[0], boundary[1], boundary[2], boundary[3]
alphas = np.linspace(amin, amax, n)
betas = np.linspace(bmin, bmax, n)
k = 0
for i, alpha in enumerate(alphas):
for j, beta in enumerate(betas):
print("Iteration %i of %i" % (k + 1, n * n))
if quantity == "curvature":
value, _ = compute_curvature(
input_filepath,
alpha,
beta,
method_curv,
None,
False,
region_of_interest,
region_points,
)
elif quantity == "angle":
value, _ = compute_angle(
input_filepath,
alpha,
beta,
method_angle,
None,
region_of_interest,
region_points,
projection,
)
values[i, j] = value
k += 1
return values
[docs]def compute_angle(
input_filepath,
alpha,
beta,
method,
new_centerlines,
region_of_interest,
region_points,
projection=False,
):
"""
Primary collection of methods for computing the angle of a vessel bend.
Three main methods are currently implemented:
1) ODR methods: odrline
2) Tracing point methods: maxcurv, smooth, discrete, frac, MISR
3) Relative tracing point methods: plane, itplane, itplane_clip
Args:
input_filepath (str): Path to case folder.
alpha (float): Extension / Compression factor in vertical direction.
beta (float): Extension / Compression factor in horizontal direction.
method (str): Method used to compute angle.
new_centerlines (vtkPolyData): New centerline.
region_of_interest (str): Method for setting the region of interest ['manual' | 'commandline' | 'landmarking']
region_points (list): If region_of_interest is 'commandline', this a flatten list of the start and endpoint
projection (bool): True / False for computing 2D / 3D angle.
Returns:
new_deg (float): New angle of a vessel bend from a manipulated centerline.
Returns:
deg (float): Old angle of a vessel bend from a manipulated centerline.
"""
# Get base path
base_path = get_path_names(input_filepath)
# Centerline path
centerline_path = base_path + "_centerline.vtp"
# Clean and cap / uncap surface
surface, capped_surface = prepare_surface(base_path, input_filepath)
# Extract old centerline
if not path.exists(centerline_path):
# Compute centerlines
inlet, outlets = get_inlet_and_outlet_centers(surface, base_path)
print("-- Compute centerlines and Voronoi diagram")
centerlines, _, _ = compute_centerlines(
inlet,
outlets,
centerline_path,
capped_surface,
resampling=0.1,
smooth=False,
base_path=base_path,
)
else:
centerlines = read_polydata(centerline_path)
# Get region of interest
point_path = base_path + "_anterior_bend.particles"
if region_of_interest == "landmarking":
if not path.exists(point_path):
raise RuntimeError(
(
"The given .particles file: %s does not exist. Please run"
+ " landmarking with automated_landmarking.py first."
)
% point_path
)
region_points = np.loadtxt(point_path)
else:
_, _, _, region_points, _ = get_line_to_change(
capped_surface, centerlines, region_of_interest, "bend", region_points, 0
)
region_points = [
[region_points[3 * i], region_points[3 * i + 1], region_points[3 * i + 2]]
for i in range(len(region_points) // 3)
]
p1 = region_points[0]
p2 = region_points[1]
if new_centerlines is None:
centerlines, new_centerlines = get_new_centerlines(
centerlines, region_points, alpha, beta, p1, p2
)
# Get new siphon and prepare
id1, id2, moved_id1, moved_id2, moved_p1, moved_p2 = get_moved_siphon(
new_centerlines, centerlines, p1, p2
)
# Extract region of interest
siphon = extract_single_line(centerlines, 1, start_id=id1, end_id=id2)
moved_siphon = extract_single_line(
new_centerlines, 0, start_id=moved_id1, end_id=moved_id2
)
id1, id2 = 0, siphon.GetNumberOfPoints() - 1
moved_id1, moved_id2 = 0, moved_siphon.GetNumberOfPoints() - 1
if method in ["maxcurv", "odrline", "smooth", "frac"]:
nknots = 11
siphon_splined, siphon_curv = compute_splined_centerline(
siphon, get_curv=True, isline=True, nknots=nknots, get_misr=False
)
_, moved_siphon_curv = compute_splined_centerline(
moved_siphon, get_curv=True, isline=True, nknots=nknots, get_misr=False
)
siphon_curv = resample(siphon_curv, siphon_splined.GetNumberOfPoints())
cutcurv = siphon_curv[id1:id2]
newcutcurv = moved_siphon_curv[moved_id1:moved_id2]
if method == "discrete":
# Smooth line with discrete derivatives
neigh = 30
_, curv_d = compute_discrete_derivatives(siphon, neigh=neigh)
_, newcurv_d = compute_discrete_derivatives(moved_siphon, neigh=neigh)
cutcurv_d = curv_d[id1:id2]
newcutcurv_d = newcurv_d[moved_id1:moved_id2]
if method == "MISR":
# Map MISR values to old and new splined anterior bend
anterior_bend = extract_single_line(centerlines, 0, start_id=id1, end_id=id2)
m = anterior_bend.GetNumberOfPoints()
m1 = moved_siphon.GetNumberOfPoints()
misr_array = get_vtk_array(radiusArrayName, 1, m)
newmisr_array = get_vtk_array(radiusArrayName, 1, m1)
misr_list = []
for i in range(m):
misr = anterior_bend.GetPointData().GetArray(radiusArrayName).GetTuple(i)
misr_list.append(misr[0])
misr_array.SetTuple(i, misr)
misr_list = resample(misr_list, m1)
for i in range(m1):
newmisr_array.SetTuple(i, (misr_list[i],))
siphon.GetPointData().AddArray(misr_array)
moved_siphon.GetPointData().AddArray(newmisr_array)
# Get direction to the point the furthest away (dx)
direction = "vertical"
clipping_points_vtk = vtk.vtkPoints()
for point in [p1, p2]:
clipping_points_vtk.InsertNextPoint(point)
middle_points, _, dx = get_direction_parameters(
extract_single_line(centerlines, 0), 0.1, direction, clipping_points_vtk
)
# Find adjusted clipping points (and tracing points)
if method == "plane":
_, max_id = get_most_distant_point(dx, siphon)
_, newmax_id = get_most_distant_point(dx, moved_siphon)
elif method in ["itplane", "itplane_clip"]:
_, max_id = get_most_distant_point(dx, siphon)
_, newmax_id = get_most_distant_point(dx, moved_siphon)
siphon = vmtk_compute_geometric_features(siphon, False)
frenet_t1 = get_point_data_array("FrenetTangent", siphon, k=3)
frenet_t2 = get_point_data_array("FrenetTangent", moved_siphon, k=3)
p1_1, p1_id = get_closest_point(frenet_t1[-1], 0, max_id, p2, siphon)
p2_2, p2_id = get_closest_point(
frenet_t1[0], max_id, siphon.GetNumberOfPoints(), p1, siphon
)
newp1, np1_id = get_closest_point(
frenet_t2[-1], 0, newmax_id, moved_p2, moved_siphon
)
newp2, np2_id = get_closest_point(
frenet_t2[0],
newmax_id,
moved_siphon.GetNumberOfPoints(),
moved_p1,
moved_siphon,
)
n1 = get_point_data_array("FrenetBinormal", siphon, k=3)[p1_id]
n2 = get_point_data_array("FrenetBinormal", moved_siphon, k=3)[np1_id]
dp = p1_1 - p2_2
dnewp = newp1 - newp2
normal = np.cross(dp, n1)
newnormal = np.cross(dnewp, n2)
_, max_id = get_most_distant_point(normal, siphon)
_, newmax_id = get_most_distant_point(newnormal, moved_siphon)
elif method == "maxcurv":
max_id, _ = max(enumerate(cutcurv), key=operator.itemgetter(1))
newmax_id, _ = max(enumerate(newcutcurv), key=operator.itemgetter(1))
elif method == "smooth":
allmaxcurv = argrelextrema(cutcurv, np.greater)[0]
allnewmaxcurv = argrelextrema(newcutcurv, np.greater)[0]
tmpcurv = cutcurv
while len(allmaxcurv) > 2:
tmpcurv = gaussian_filter(tmpcurv, 2)
allmaxcurv = argrelextrema(tmpcurv, np.greater)[0]
tmpnewcurv = newcutcurv
while len(allnewmaxcurv) > 2:
tmpnewcurv = gaussian_filter(tmpnewcurv, 2)
allnewmaxcurv = argrelextrema(tmpnewcurv, np.greater)[0]
max_id = allmaxcurv[0]
newmax_id = allnewmaxcurv[0]
elif method == "discrete":
max_id, _ = max(enumerate(cutcurv_d), key=operator.itemgetter(1))
newmax_id, _ = max(enumerate(newcutcurv_d), key=operator.itemgetter(1))
elif method == "maxdist":
norm_p1 = [
la.norm(np.array(p1) - np.array(siphon.GetPoint(i)))
for i in range(siphon.GetNumberOfPoints())
]
norm_p2 = [
la.norm(np.array(p2) - np.array(siphon.GetPoint(i)))
for i in range(siphon.GetNumberOfPoints() - 1, -1, -1)
]
max_id = 0
max_dist = 0
for i, n1 in enumerate(norm_p1):
for n2 in norm_p2:
dist = n1**2 + n2**2
if dist > max_dist:
max_dist = dist
max_id = i
newnorm_p1 = [
la.norm(np.array(moved_p1) - np.array(moved_siphon.GetPoint(i)))
for i in range(moved_siphon.GetNumberOfPoints())
]
newnorm_p2 = [
la.norm(np.array(moved_p2) - np.array(moved_siphon.GetPoint(i)))
for i in range(moved_siphon.GetNumberOfPoints() - 1, -1, -1)
]
newmax_id = 0
new_max_dist = 0
for i, n1 in enumerate(newnorm_p1):
for j, n2 in enumerate(newnorm_p2):
dist = n1**2 + n2**2
if dist > new_max_dist:
new_max_dist = dist
newmax_id = i
# Compute angles based on the classic formula for
# angle between vectors in 3D
if method == "odrline":
limits = ["cumulative", "sd"]
for limit in limits:
d1, d2, _ = odr_line(id1, id2, siphon_splined, siphon_curv, limit)
newd1, newd2, _ = odr_line(
moved_id1, moved_id2, moved_siphon, moved_siphon_curv, limit
)
deg = find_angle_odr(d1, d2, projection)
new_deg = find_angle_odr(newd1, newd2, projection)
elif method == "MISR":
multiplier = 1.5
n1 = siphon.GetNumberOfPoints()
n2 = moved_siphon.GetNumberOfPoints()
rad1 = siphon.GetPointData().GetArray(radiusArrayName).GetTuple1(0)
rad2 = siphon.GetPointData().GetArray(radiusArrayName).GetTuple1(n1 - 1)
newrad1 = moved_siphon.GetPointData().GetArray(radiusArrayName).GetTuple1(0)
newrad2 = (
moved_siphon.GetPointData().GetArray(radiusArrayName).GetTuple1(n2 - 1)
)
pa, _ = move_past_sphere(
siphon, p1, rad1, 0, step=1, stop=n1 - 1, scale_factor=multiplier
)
pb, _ = move_past_sphere(
siphon, p2, rad2, n1 - 1, step=-1, stop=0, scale_factor=multiplier
)
new_pa, _ = move_past_sphere(
moved_siphon,
moved_p1,
newrad1,
0,
step=1,
stop=n2 - 1,
scale_factor=multiplier,
)
new_pb, _ = move_past_sphere(
moved_siphon,
moved_p2,
newrad2,
n2 - 1,
step=-1,
stop=0,
scale_factor=multiplier,
)
deg, l1, _ = find_angle(pa, pb, p1, p2, projection)
new_deg, nl1, _ = find_angle(new_pa, new_pb, moved_p1, moved_p2, projection)
else:
if method == "frac":
n_values = [5]
left = [2]
right = [3]
i = 0
dx = 1.0 / n_values[i]
ida = int(id1 + (id2 - id1) * left[i] * dx)
idb = int(id1 + (id2 - id1) * right[i] * dx)
pa = siphon_splined.GetPoints().GetPoint(ida)
pb = siphon_splined.GetPoints().GetPoint(idb)
ida = int(moved_id1 + (moved_id2 - moved_id1) * left[i] * dx)
idb = int(moved_id1 + (moved_id2 - moved_id1) * right[i] * dx)
new_pa = moved_siphon.GetPoints().GetPoint(ida)
new_pb = moved_siphon.GetPoints().GetPoint(idb)
deg, l1, l2 = find_angle(pa, pb, p1, p2, projection)
new_deg, nl1, nl2 = find_angle(
new_pa, new_pb, moved_p1, moved_p2, projection
)
elif method in [
"plane",
"itplane",
"itplane_clip",
"maxcurv",
"smooth",
"discrete",
"maxdist",
]:
frac = 4.0 / 5.0
if method == "itplane_clip":
id_mid = (p2_id - p1_id) / 2.0
new_id_mid = (np2_id - np1_id) / 2.0
if max_id > id_mid:
ida = int((max_id - p1_id) * frac)
idb = int((max_id - p1_id) * (1 + (1 - frac)))
pa = siphon.GetPoints().GetPoint(ida + p1_id)
pb = siphon.GetPoints().GetPoint(idb + p1_id)
else:
idb = int((p2_id - max_id) * (1 + (1 - frac)))
ida = int((p2_id - max_id) * frac)
pa = siphon.GetPoints().GetPoint(ida)
pb = siphon.GetPoints().GetPoint(idb)
if newmax_id > new_id_mid:
ida = int((newmax_id - np1_id) * frac)
idb = int((newmax_id - np1_id) * (1 + (1 - frac)))
new_pa = moved_siphon.GetPoints().GetPoint(ida + np1_id)
new_pb = moved_siphon.GetPoints().GetPoint(idb + np1_id)
else:
ida = int((np2_id - newmax_id) * frac)
idb = int((np2_id - newmax_id) * (1 + (1 - frac)))
new_pa = moved_siphon.GetPoints().GetPoint(ida)
new_pb = moved_siphon.GetPoints().GetPoint(idb)
deg, l1, l2 = find_angle(pa, pb, p1, p2, projection)
new_deg, nl1, nl2 = find_angle(
new_pa, new_pb, moved_p1, moved_p2, projection
)
else:
ida = int(max_id * frac)
idb = int(max_id * (1 + (1 - frac)))
pa = siphon.GetPoints().GetPoint(ida)
pb = siphon.GetPoints().GetPoint(idb)
ida = int(newmax_id * frac)
idb = int(newmax_id * (1 + (1 - frac)))
new_pa = moved_siphon.GetPoints().GetPoint(ida)
new_pb = moved_siphon.GetPoints().GetPoint(idb)
deg, l1, l2 = find_angle(pa, pb, p1, p2, projection)
new_deg, nl1, nl2 = find_angle(
new_pa, new_pb, moved_p1, moved_p2, projection
)
return new_deg, deg
[docs]def compute_curvature(
input_filepath,
alpha,
beta,
method,
new_centerlines,
compute_original,
region_of_interest,
region_points,
):
"""
Primary collection of methods for computing curvature of a centerline.
Five methods are currently implemented:
1) VMTK - Factor variance (vmtkfactor)
2) VMTK - Iteration variance (vmtkit)
3) Discrete derivatives (disc)
4) B-splines (spline)
Args:
input_filepath (str): Path to case folder.
alpha (float): Extension / Compression factor in vertical direction.
beta (float): Extension / Compression factor in horizontal direction.
method (str): Method used to compute angle.
new_centerlines (vtkPolyData): New centerline.
compute_original (bool): Computes old curvature value if True.
region_of_interest (str): Method for setting the region of interest ['manual' | 'commandline' | 'landmarking']
region_points (list): If region_of_interest is 'commandline', this is a flattened list of the start and endpoint
Returns:
new_maxcurv (float): Maximum curvature within the manipulated region of interest.
Returns:
old_maxcurv (float): Maximum curvature within the original region of interest.
"""
# Input filename
base_path = get_path_names(input_filepath)
# Centerline filename
centerline_path = base_path + "_centerline.vtp"
# Clean and capp / uncapp surface
surface, capped_surface = prepare_surface(base_path, input_filepath)
# Extract old centerline
if not path.exists(centerline_path):
# Compute centerlines
inlet, outlets = get_inlet_and_outlet_centers(surface, base_path)
print("-- Compute centerlines and Voronoi diagram")
centerlines, _, _ = compute_centerlines(
inlet,
outlets,
centerline_path,
capped_surface,
resampling=0.1,
smooth=False,
base_path=base_path,
)
else:
centerlines = read_polydata(centerline_path)
# Get region of interest
point_path = base_path + "_anterior_bend.particles"
if region_of_interest == "landmarking":
if not path.exists(point_path):
raise RuntimeError(
(
"The given .particles file: %s does not exist. Please run"
+ " landmarking with automated_landmarking.py first."
)
% point_path
)
region_points = np.loadtxt(point_path)
else:
_, _, _, region_points, _ = get_line_to_change(
capped_surface, centerlines, region_of_interest, "bend", region_points, 0
)
region_points = [
[region_points[3 * i], region_points[3 * i + 1], region_points[3 * i + 2]]
for i in range(len(region_points) // 3)
]
p1 = region_points[0]
p2 = region_points[1]
if new_centerlines is None:
print("-- Maniuplating centerline manually")
centerlines, new_centerlines = get_new_centerlines(
centerlines, region_points, alpha, beta, p1, p2
)
# Extract centerline points and ids
new_centerline = extract_single_line(new_centerlines, 0)
centerline = extract_single_line(centerlines, 0)
new_locator = get_vtk_point_locator(new_centerline)
old_locator = get_vtk_point_locator(centerline)
id1 = old_locator.FindClosestPoint(p1)
id2 = old_locator.FindClosestPoint(p2)
id1_new = new_locator.FindClosestPoint(p1)
id2_new = new_locator.FindClosestPoint(p2)
# 1) VMTK - Factor variance
if method == "vmtkfactor":
factor = 0.5
line_fac = vmtk_compute_geometric_features(
new_centerline, smooth=True, iterations=100, factor=factor
)
curv_fac = get_point_data_array("Curvature", line_fac)
new_curvature = gaussian_filter(curv_fac, 5)
if compute_original:
line_fac = vmtk_compute_geometric_features(
centerline, smooth=True, iterations=100, factor=factor
)
curv_fac = get_point_data_array("Curvature", line_fac)
curvature = gaussian_filter(curv_fac, 5)
# 2) VMTK - Iteration variance
elif method == "vmtkit":
it = 150
line_it = vmtk_compute_geometric_features(
new_centerline, smooth=True, iterations=it, factor=1.0
)
curv_it = get_point_data_array("Curvature", line_it)
new_curvature = gaussian_filter(curv_it, 5)
if compute_original:
line_it = vmtk_compute_geometric_features(
centerline, smooth=True, iterations=it, factor=1.0
)
curv_it = get_point_data_array("Curvature", line_it)
curvature = gaussian_filter(curv_it, 5)
# 3) Splines
elif method == "spline":
nknots = 50
_, siphon_curv = compute_splined_centerline(
new_centerline, get_curv=True, isline=True, nknots=nknots
)
new_curvature = gaussian_filter(siphon_curv, 5)
if compute_original:
_, siphon_curv = compute_splined_centerline(
centerline, get_curv=True, isline=True, nknots=nknots
)
curvature = gaussian_filter(siphon_curv, 5)
# 4) Default: Discrete derivatives
elif method == "disc":
neigh = 20
_, curv_di = compute_discrete_derivatives(new_centerline, neigh=neigh)
new_curvature = gaussian_filter(curv_di, 5)
if compute_original:
_, curv_di = compute_discrete_derivatives(centerline, neigh=neigh)
curvature = gaussian_filter(curv_di, 5)
old_maxcurv = max(curvature[id1 + 10 : id2 - 10]) if compute_original else None
new_maxcurv = max(new_curvature[id1_new + 10 : id2_new - 10])
return new_maxcurv, old_maxcurv
[docs]def get_new_centerlines(centerlines, region_points, alpha, beta, p1, p2):
"""
Perform manual centerline manipulation in order to use the
centerline as a proxy for computing a selected quantity (angle, curvature).
Takes the original centerline, the points defining the region of interest and
the manipulation factors alpha and beta before performing a manual movement of
the centerline. Returns the new, manipulated centerline as a VTK object.
Args:
centerlines (vtkPolyData): Centerlines including diverging centerlines
region_points (ndarray): List of region points
alpha (float): Extension / Compression factor in vertical direction.
beta (float): Extension / Compression factor in horizontal direction.
p1: First region point
p2: Second region point
Returns:
centerlines (vtkPolyData): Centerlines excluding diverging centerlines
Returns:
new_centerlines (vtkPolyData): New centerlines including diverging centerlines
"""
(
centerlines,
diverging_centerlines,
region_points,
region_points_vtk,
diverging_ids,
) = get_region_of_interest_and_diverging_centerlines(centerlines, region_points)
new_centerlines = centerlines
diverging_id = None if len(diverging_ids) == 0 else diverging_ids[0]
if beta != 0.0:
direction = "horizont"
middle_points, _ = get_direction_parameters(
extract_single_line(new_centerlines, 0), beta, direction, region_points_vtk
)
dx_p1 = middle_points[0] - p1
new_centerlines = get_manipulated_centerlines(
new_centerlines,
dx_p1,
p1,
p2,
diverging_id,
diverging_centerlines,
direction,
merge_lines=True,
)
if alpha != 0.0:
direction = "vertical"
middle_points, _, dx = get_direction_parameters(
extract_single_line(new_centerlines, 0), alpha, direction, region_points_vtk
)
merge_lines = False if beta != 0.0 else True
new_centerlines = get_manipulated_centerlines(
new_centerlines,
dx,
p1,
p2,
diverging_id,
diverging_centerlines,
direction,
merge_lines=merge_lines,
)
return centerlines, new_centerlines
[docs]def odr_line(id1, id2, line, curvature, limit):
"""
Computes the orthogonal distance regression
of points along the centerline selected from
1) All points until a cumulative limit is reached
or
2) The first 11 points and all points fulfilling curvature
less than the mean plus 1.96 x SD
Args:
id1 (int): ID of first clipping point.
id2 (int): ID of second clipping point.
line (vtkPolyData): Centerline data.
curvature (ndarray): Array of curvature values.
limit (ndarray): Method used as limit
Returns:
d1 (ndarray): Direction vector from first clipping point.
d2 (ndarray): Direction vector from second clipping point.
curvlines (vtkPolyData): Centerline object with corresponding curvature values.
"""
lim = len(curvature) - 1
if limit == "cumulative":
max_cum = 10
id1_up = id1 + 1
id1_down = id1 - 1
id2_up = id2 - 1
id2_down = id2 + 1
while sum(curvature[id1 : id1_up + 1]) < max_cum and id1_up < lim:
id1_up += 1
while sum(curvature[id1_down : id1 + 1]) < max_cum and id1_down > 0:
id1_down -= 1
while sum(curvature[id2_up : id2 + 1]) < max_cum and id2_up > 0:
id2_up -= 1
while sum(curvature[id2 : id2_down + 1]) < max_cum and id2_down < lim:
id2_down += 1
else:
id1_up = id1 + 5
id1_down = id1 - 5
id2_up = id2 - 5
id2_down = id2 + 5
mean1 = sum(curvature[id1_down : id1_up + 1]) / 11.0
mean2 = sum(curvature[id2_up : id2_down + 1]) / 11.0
sd_1 = np.sqrt(sum((curvature[id1_down : id1_up + 1] - mean1) ** 2) / 10)
sd_2 = np.sqrt(sum((curvature[id2_up : id1_down + 1] - mean2) ** 2) / 10)
tol1 = mean1 + sd_1 * 1.96
tol2 = mean2 + sd_2 * 1.96
while curvature[id1_up] < tol1 and id1_up < lim:
id1_up += 1
while curvature[id1_down] < tol1 and id1_down > 0:
id1_down -= 1
while curvature[id2_up] < tol2 and id2_up > 0:
id2_up -= 1
while curvature[id2_down] < tol2 and id2_down < lim:
id2_down += 1
p1s = []
for i in range(id1_down, id1_up + 1):
p1s.append(line.GetPoint(i))
p2s = []
for i in range(id2_up, id2_down + 1):
p2s.append(line.GetPoint(i))
# Arrange points in matrix
x_1 = np.array([list(p) for p in p1s])
x_2 = np.array([list(p) for p in p2s])
# Find mean of points
avg1 = np.array([np.mean(x_1[:, 0]), np.mean(x_1[:, 1]), np.mean(x_1[:, 2])])
avg2 = np.array([np.mean(x_2[:, 0]), np.mean(x_2[:, 1]), np.mean(x_2[:, 2])])
# Subtract the mean from all points
dx_1 = x_1 - np.array([avg1] * len(x_1))
dx_2 = x_2 - np.array([avg2] * len(x_2))
# Find SVD
_, _, v_1 = la.svd(dx_1)
_, _, v_2 = la.svd(dx_2)
# Find direction vector
d1 = v_1[0]
d2 = v_2[0]
# Parametric equation P = p0 + t*d
# Make lines with curv
# Create edges between new_centerline points
curv_lines_split = []
points = [p1s, p2s]
for k, p in enumerate(points):
pts = vtk.vtkPoints()
for point in p:
pts.InsertNextPoint(point)
lines = vtk.vtkCellArray()
for i in range(len(p) - 2):
newline = vtk.vtkLine()
newline.GetPointIds().SetId(0, i)
newline.GetPointIds().SetId(1, i + 1)
lines.InsertNextCell(newline)
line_ = vtk.vtkPolyData()
line_.SetPoints(pts)
line_.SetLines(lines)
m = line_.GetNumberOfPoints()
curv_array = get_vtk_array("Curvature", 1, m)
if k == 0:
for i in range(id1_up + 1 - id1_down):
curv_array.SetTuple(i, [curvature[id1_down + i]])
else:
for i in range(id2_down + 1 - id2_up):
curv_array.SetTuple(i, [curvature[id2_up + i]])
line_.GetPointData().AddArray(curv_array)
curv_lines_split.append(line_)
curvlines = vtk_merge_polydata(curv_lines_split)
return d1, d2, curvlines
[docs]def get_moved_siphon(new_centerlines, centerlines, p1, p2):
"""
Extracts new siphon from new centerline
and clipping point information.
Args:
new_centerlines (vtkPolyData): Centerline data.
centerlines (vtkPolyData): Initial centerlines.
p1 (ndarray): First clipping point.
p2 (ndarray): Second clipping point.
Returns:
id1 (int): ID of first clipping point.
Returns:
id2 (int): ID of second clipping point.
Returns:
moved_id1 (int): New ID of first clipping point.
Returns:
moved_id1 (int): New ID of secon clipping point.
Returns:
moved_p1 (ndarray): New position of first clipping point.
Returns:
moved_p2 (ndarray): New position of second ipping point.
"""
# Extract new siphon and prepare
new_locator = get_vtk_point_locator(extract_single_line(new_centerlines, 0))
old_locator = get_vtk_point_locator(extract_single_line(centerlines, 0))
id1 = old_locator.FindClosestPoint(p1)
id2 = old_locator.FindClosestPoint(p2)
moved_id1 = new_locator.FindClosestPoint(p1)
moved_id2 = new_locator.FindClosestPoint(p2)
moved_p1 = new_centerlines.GetPoints().GetPoint(moved_id1)
moved_p2 = new_centerlines.GetPoints().GetPoint(moved_id2)
return id1, id2, moved_id1, moved_id2, moved_p1, moved_p2
[docs]def find_angle(pa, pb, p1, p2, projection):
"""
Compute the angle between two vectors
a = pA - p1 and b = pB - p2
using the classical formula.
Args:
pa (ndarray): Point along the centerline.
pb (ndarray): Point along the centerline.
p1 (ndarray): Point along the centerline.
p2 (ndarray): Point along the centerline.
projection (bool): True / False for 2D / 3D angle.
Returns:
deg (float): Angle between vectors.
vector_a (ndarray): First vector.
vector_b (ndarraty): Second vector.
"""
if not projection:
vector_a = np.array([pa[0] - p1[0], pa[1] - p1[1], pa[2] - p1[2]])
vector_b = np.array([pb[0] - p2[0], pb[1] - p2[1], pb[2] - p2[2]])
else:
vector_a = np.array([0, pa[1] - p1[1], pa[2] - p1[2]])
vector_b = np.array([0, pb[1] - p2[1], pb[2] - p2[2]])
angle = get_angle(vector_a, vector_b)
deg = angle * 180 / np.pi
return deg, vector_a, vector_b
[docs]def find_angle_odr(d1, d2, projection):
"""
Compute the angle between two vectors
d1 and d2 using the classical formula.
Used for the ODR-method, specifically.
Args:
d1 (ndarray): First vector
d2 (ndarray): Second vector
projection (bool): True / False for 2D / 3D angle.
Returns:
deg (float): Angle between vectors.
"""
if d1.dot(d2) > 0:
d1 = -d1
if projection:
d1[0] = 0
d2[0] = 0
angle = get_angle(d1, -d2)
deg = angle * 180 / np.pi
return deg, d1, d2
[docs]def write_alpha_beta_point(base_path, suggested_point, method, quantity_to_compute):
"""
Write suggested choice of (alpha, beta) to file.
Args:
base_path (str): Path to file directory.
suggested_point (ndarray): Array containing alpha and beta value
method (str): Info about parameter, and increase / decrease of parameter.
quantity_to_compute (str): Quantity to compute.
"""
save_path = base_path + "_alphabeta_values.txt"
alpha = suggested_point[0]
beta = suggested_point[1]
name = quantity_to_compute + "_" + method.split("_")[-1]
with open(save_path, "a") as f:
f.write("%s alpha=%.4f beta=%.4f \n" % (name, alpha, beta))
[docs]def alpha_beta_intersection(method, f, alphas, betas, tol=0.0):
"""
Iterate through values of alpha and beta
and find intersection points with a given tolerance
between initial value and initial value plus/minus SD.
Args:
method (function): Plane defining initial value plus/minus SD.
f (interp2d): Interpolated surface of curvature or angle values.
alphas (ndarray): List of alpha values.
betas (ndarray): List of beta values.
tol (float): Tolerance used for adjusting SD if needed.
Returns:
zeros (ndarray): Array containing (alpha,beta) tuples which intersect.
"""
zeros = []
for i in alphas:
for j in betas:
diff = abs(f(j, i) - method(tol))
if "c" in method.__name__:
if diff < 0.001:
zeros.append([i, j])
else:
if diff < 0.05:
zeros.append([i, j])
return zeros
[docs]def read_command_line():
"""
Read arguments from commandline
"""
description = (
"Algorithm used to compute the recommended value for "
+ "alpha and beta based on surface interpolation and a "
+ "given limit, depending on the quantity to be computed. "
+ "Primarily implemented for computing angle and curvature values."
+ "Computes selected quantities using the centerline as a proxy."
)
parser = ArgumentParser(
description=description, formatter_class=RawDescriptionHelpFormatter
)
required = parser.add_argument_group("required named arguments")
# Required arguments
required.add_argument(
"-i",
"--ifile",
type=str,
default=None,
help="Path to the surface model",
required=True,
)
required.add_argument(
"-q",
"--quantity",
type=str,
default="curvature",
help="Parameter to compute. Choose between 'curvature' and 'angle'",
required=True,
choices=["curvature", "angle"],
)
# Optional arguments
parser.add_argument(
"-r",
"--region-of-interest",
type=str,
default="manual",
choices=["manual", "commandline", "landmarking"],
help="The method for defining the region to be changed. There are"
+ " three options: 'manual', 'commandline', 'landmarking'. In"
+ " 'manual' the user will be provided with a visualization of the"
+ " input surface, and asked to provide an end and start point of the"
+ " region of interest. Note that not all algorithms are robust over"
+ " bifurcations. If 'commandline' is provided, then '--region-points'"
+ " is expected to be provided. Finally, if 'landmarking' is"
+ " given, it will look for the output from running"
+ " automated_landmarking.py.",
)
parser.add_argument(
"--region-points",
nargs="+",
type=float,
default=None,
metavar="points",
help="If -r or --region-of-interest is 'commandline' then this"
+ " argument have to be given. The method expects two points"
+ " which defines the start and end of the region of interest. "
+ " Example providing the points (1, 5, -1) and (2, -4, 3):"
+ " --region-points 1 5 -1 2 -4 3",
)
parser.add_argument(
"-rad",
"--radius",
type=float,
default=0.15,
help="Radius of bounding circle, limiting the choice of alpha and beta",
)
parser.add_argument(
"-b",
"--boundary",
nargs="+",
default=[-0.2, 1, -0.2, 1],
help="Bounds of grid, as a list: [alpha_min, alpha_max, beta_min, beta_max]",
)
parser.add_argument(
"-g",
"--grid-size",
type=int,
default=25,
help="Size of n x n matrix used for computing a set of discrete values",
)
parser.add_argument(
"-c",
"--value-change",
type=float,
default=0.05,
help="Desired change in curvature / bend angle to achieve. Algorithm computes"
+ "recommended values of alpha and beta for both plus and minus this change.",
)
parser.add_argument(
"-mc",
"--method_curv",
type=str,
default="disc",
help="Method for computing curv. Available methods: disc "
+ "| vmtkfactor | vmtkit | spline",
choices=["disc", "vmtkfactor", "vmtkit", "spline"],
)
parser.add_argument(
"-ma",
"--method_angle",
type=str,
default="plane",
help="Method for computing siphon angle. Available methods: plane | "
+ "itplane | itplane_clip | maxcurv | smooth | discrete | frac | odrline | MISR ",
choices=[
"plane",
"itplane",
"itplane_clip",
"maxcurv",
"smooth",
"discrete",
"frac",
"odrline",
"misr",
],
)
args = parser.parse_args()
return dict(
input_filepath=args.ifile,
quantity_to_compute=args.quantity,
radius=args.radius,
boundary=args.boundary,
grid_size=args.grid_size,
value_change=args.value_change,
method_angle=args.method_angle,
method_curv=args.method_curv,
region_points=args.region_points,
region_of_interest=args.region_of_interest,
)
if __name__ == "__main__":
estimate_alpha_and_beta(**read_command_line())