"""The Winston-Lutz module loads and processes EPID images that have acquired Winston-Lutz type images.
Features:
* **Couch shift instructions** - After running a WL test, get immediate feedback on how to shift the couch.
Couch values can also be passed in and the new couch values will be presented so you don't have to do that pesky conversion.
"Do I subtract that number or add it?"
* **Automatic field & BB positioning** - When an image or directory is loaded, the field CAX and the BB
are automatically found, along with the vector and scalar distance between them.
* **Isocenter size determination** - Using backprojections of the EPID images, the 3D gantry isocenter size
and position can be determined *independent of the BB position*. Additionally, the 2D planar isocenter size
of the collimator and couch can also be determined.
* **Image plotting** - WL images can be plotted separately or together, each of which shows the field CAX, BB and
scalar distance from BB to CAX.
* **Axis deviation plots** - Plot the variation of the gantry, collimator, couch, and EPID in each plane
as well as RMS variation.
* **File name interpretation** - Rename DICOM filenames to include axis information for linacs that don't include
such information in the DICOM tags. E.g. "myWL_gantry45_coll0_couch315.dcm".
"""
from __future__ import annotations
import dataclasses
import enum
import io
import math
import os.path as osp
import statistics
import tempfile
import webbrowser
from functools import cached_property
from itertools import zip_longest
from pathlib import Path
from textwrap import wrap
from typing import BinaryIO, Iterable, Sequence
import argue
import matplotlib.pyplot as plt
import numpy as np
from mpl_toolkits.mplot3d import art3d
from py_linq import Enumerable
from pydantic import BaseModel
from scipy import ndimage, optimize
from scipy.ndimage import zoom
from scipy.spatial.transform import Rotation
from skimage.measure._regionprops import RegionProperties
from tabulate import tabulate
from .core import image, pdf
from .core.array_utils import array_to_dicom
from .core.decorators import lru_cache
from .core.geometry import (
Line,
Point,
PointSerialized,
Vector,
VectorSerialized,
cos,
sin,
)
from .core.image import DicomImageStack, is_image, tiff_to_dicom
from .core.io import TemporaryZipDirectory, get_url, retrieve_demo_file
from .core.scale import MachineScale, convert
from .core.utilities import ResultBase, ResultsDataMixin, convert_to_enum, is_close
from .metrics.features import (
is_right_circumference,
is_right_size_bb,
is_round,
is_solid,
is_symmetric,
)
from .metrics.image import (
GlobalSizedDiskLocator,
GlobalSizedFieldLocator,
SizedDiskLocator,
)
BB_ERROR_MESSAGE = (
"Unable to locate the BB. Make sure the field edges do not obscure the BB, that there are no artifacts in the images, that the 'bb_size' parameter is close to reality, "
"and that the BB is near the center (within 2cm). If this is a large-field image or kV image try setting 'low_density_bb' to True."
)
[docs]
class BBConfig(BaseModel):
name: str
offset_left_mm: float
offset_up_mm: float
offset_in_mm: float
bb_size_mm: float
rad_size_mm: float
[docs]
def to_human(self) -> str:
"""Convert one BB location to a human-readable str"""
lr = "Left" if self.offset_left_mm >= 0 else "Right"
ud = "Up" if self.offset_up_mm >= 0 else "Down"
io = "In" if self.offset_in_mm >= 0 else "Out"
return f"{lr} {abs(self.offset_left_mm)}mm, {ud} {abs(self.offset_up_mm)}mm, {io} {abs(self.offset_in_mm)}mm"
class BBArrangement:
"""Presets for multi-target phantoms."""
# a BB at iso; represents the simplest case
ISO = (
BBConfig(
name="Iso",
offset_left_mm=0,
offset_up_mm=0,
offset_in_mm=0,
bb_size_mm=5, # overridden later dynamically
rad_size_mm=20, # not used;
),
)
# ISOCAL = (
# BBConfig(
# name="1",
# offset_left_mm=0,
# offset_up_mm=-170,
# offset_in_mm=-30,
# bb_size_mm=5,
# rad_size_mm=20,
# ),
# {
# 'name': '2',
# 'offset_left_mm': -170,
# 'offset_up_mm': 0,
# 'offset_in_mm': -45,
# "bb_size_mm": 5,
# 'rad_size_mm': 20,
# }
# )
# locations: https://www.postersessiononline.eu/173580348_eu/congresos/ESTRO2020/aula/-PO_1320_ESTRO2020.pdf
SNC_MULTIMET = (
BBConfig(
name="Iso",
offset_left_mm=0,
offset_up_mm=0,
offset_in_mm=0,
bb_size_mm=5,
rad_size_mm=20,
),
BBConfig(
name="1",
offset_left_mm=0,
offset_up_mm=0,
offset_in_mm=30,
bb_size_mm=5,
rad_size_mm=20,
),
BBConfig(
name="2",
offset_left_mm=-30,
offset_up_mm=0,
offset_in_mm=15,
bb_size_mm=5,
rad_size_mm=20,
),
BBConfig(
name="3",
offset_left_mm=0,
offset_up_mm=0,
offset_in_mm=-30,
bb_size_mm=5,
rad_size_mm=20,
),
BBConfig(
name="4",
offset_left_mm=30,
offset_up_mm=0,
offset_in_mm=-50,
bb_size_mm=5,
rad_size_mm=20,
),
BBConfig(
name="5",
offset_left_mm=0,
offset_up_mm=0,
offset_in_mm=-70,
bb_size_mm=5,
rad_size_mm=20,
),
)
DEMO = (
BBConfig(
name="Iso",
offset_left_mm=0,
offset_up_mm=5,
offset_in_mm=0,
bb_size_mm=5,
rad_size_mm=20,
),
BBConfig(
name="Out",
offset_left_mm=0,
offset_up_mm=00,
offset_in_mm=-60,
bb_size_mm=5,
rad_size_mm=20,
),
BBConfig(
name="In",
offset_left_mm=0,
offset_up_mm=00,
offset_in_mm=30,
bb_size_mm=5,
rad_size_mm=20,
),
BBConfig(
name="Left/Out",
offset_left_mm=10,
offset_up_mm=10,
offset_in_mm=-30,
bb_size_mm=5,
rad_size_mm=20,
),
)
@staticmethod
def to_human(arrangement: dict) -> str:
"""Convert one BB location to a human-readable str"""
a = arrangement
lr = "Left" if a["offset_left_mm"] >= 0 else "Right"
ud = "Up" if a["offset_up_mm"] >= 0 else "Down"
io = "In" if a["offset_in_mm"] >= 0 else "Out"
return f"'{a['name']}': {lr} {abs(a['offset_left_mm'])}mm, {ud} {abs(a['offset_up_mm'])}mm, {io} {abs(a['offset_in_mm'])}mm"
@dataclasses.dataclass
class BBFieldMatch:
"""A match of a BB and field to an expected arrangement position. I.e. the nominal BB, measured BB, and field.
This can calculate distances, create backprojections, etc for a single BB/Field."""
epid: Point
field: Point
bb: Point
dpmm: float
gantry_angle: float
couch_angle: float
sad: float
@property
def field_epid_vector_mm(self) -> Vector:
"""The vector from the field CAX to the EPID center *IN COORDINATE SPACE*."""
v = (self.field - self.epid) / self.dpmm
v.y = (
-v.y
) # invert the y-axis; positive is down in image space but negative in coordinate space
return v
@property
def bb_field_vector_mm(self) -> Vector:
"""The vector from the BB to the field CAX *IN COORDINATE SPACE*."""
v = (self.bb - self.field) / self.dpmm
v.y = (
-v.y
) # invert the y-axis; positive is down in image space but negative in coordinate space
return v
@property
def bb_epid_vector_mm(self) -> Vector:
"""The vector from the BB to the field CAX *IN COORDINATE SPACE*."""
v = (self.bb - self.epid) / self.dpmm
v.y = (
-v.y
) # invert the y-axis; positive is down in image space but negative in coordinate space
return v
@property
def bb_field_distance_mm(self) -> float:
"""The distance from the BB to the field CAX in mm."""
return self.field.distance_to(self.bb) / self.dpmm
@property
def bb_epid_distance_mm(self) -> float:
"""The distance from the BB to the EPID center in mm."""
return self.epid.distance_to(self.bb) / self.dpmm
@property
def field_epid_distance_mm(self) -> float:
"""The distance from the field CAX to the EPID center in mm."""
return self.epid.distance_to(self.field) / self.dpmm
@property
def bb_to_field_projection(self):
"""The projection from the BB to the field. Used by vanilla WL
to determine the gantry, coll, couch isocenter size because there the BB is the reference point.
Returns
-------
Line
The virtual line in space made by the BB.
"""
return straight_ray(self.bb_field_vector_mm, self.gantry_angle)
class BB3D:
"""A representation of a BB in 3D space"""
def __repr__(self):
return self.nominal_bb_position
def __init__(
self,
bb_config: BBConfig,
bb_matches: Sequence[BBFieldMatch, ...],
scale: MachineScale,
):
self.bb_config = bb_config
self.matches = bb_matches
self.scale = scale
@cached_property
def measured_bb_position(self) -> Point:
"""The 3D measured position of the BB based on rotation matrices and gantry/couch angles."""
xs = [m.bb_epid_vector_mm.x for m in self.matches]
ys = [m.bb_epid_vector_mm.y for m in self.matches]
thetas = [m.gantry_angle for m in self.matches]
phis = [m.couch_angle for m in self.matches]
vector = solve_3d_position_from_2d_planes(
xs=xs, ys=ys, thetas=thetas, phis=phis, scale=self.scale
)
# vectors and points are effectively the same thing here but we convert to a point for clarity
return Point(x=vector.x, y=vector.y, z=vector.z)
@cached_property
def nominal_bb_position(self) -> Point:
"""The nominal location of the BB in MM in coordinate space"""
return Point(
x=-self.bb_config.offset_left_mm,
y=self.bb_config.offset_in_mm,
z=self.bb_config.offset_up_mm,
)
@cached_property
def measured_field_position(self) -> Point:
"""The position of the field CAXs in 3D space"""
xs = [m.field_epid_vector_mm.x for m in self.matches]
ys = [m.field_epid_vector_mm.y for m in self.matches]
thetas = [m.gantry_angle for m in self.matches]
phis = [m.couch_angle for m in self.matches]
vector = solve_3d_position_from_2d_planes(
xs=xs, ys=ys, thetas=thetas, phis=phis, scale=self.scale
)
# vectors and points are effectively the same thing here but we convert to a point for clarity
return Point(x=vector.x, y=vector.y, z=vector.z)
def plot_nominal(self, axes: plt.Axes, color: str, **kwargs):
"""Plot the BB nominal position"""
x, y, z = create_sphere_surface(
radius=self.bb_config.bb_size_mm / 2, center=self.nominal_bb_position
)
axes.plot_surface(x, y, z, color=color, **kwargs)
def plot_measured(self, axes: plt.Axes, color: str, **kwargs):
"""Plot the BB measured position"""
x, y, z = create_sphere_surface(
radius=self.bb_config.bb_size_mm / 2, center=self.measured_bb_position
)
axes.plot_surface(x, y, z, color=color, **kwargs)
def create_sphere_surface(
radius: float, center: Point
) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
"""Create a sphere surface for plotting"""
u = np.linspace(0, 2 * np.pi, 100)
v = np.linspace(0, np.pi, 100)
x = radius * np.outer(np.cos(u), np.sin(v)) + center.x
y = radius * np.outer(np.sin(u), np.sin(v)) + center.y
z = radius * np.outer(np.ones(np.size(u)), np.cos(v)) + center.z
return x, y, z
class Axis(enum.Enum):
GANTRY = "Gantry" #:
COLLIMATOR = "Collimator" #:
COUCH = "Couch" #:
GB_COMBO = "GB Combo" #:
GBP_COMBO = "GBP Combo" #:
EPID = "Epid" #:
REFERENCE = "Reference" #:
[docs]
class WinstonLutz2DResult(ResultBase):
variable_axis: str #:
bb_location: PointSerialized #:
cax2epid_vector: VectorSerialized #:
cax2epid_distance: float #:
cax2bb_distance: float #:
cax2bb_vector: VectorSerialized #:
field_cax: PointSerialized #:
[docs]
class WinstonLutzResult(ResultBase):
"""This class should not be called directly. It is returned by the ``results_data()`` method.
It is a dataclass under the hood and thus comes with all the dunder magic.
Use the following attributes as normal class attributes."""
num_gantry_images: int #:
num_gantry_coll_images: int #:
num_coll_images: int #:
num_couch_images: int #:
num_total_images: int #:
max_2d_cax_to_bb_mm: float #:
median_2d_cax_to_bb_mm: float #:
mean_2d_cax_to_bb_mm: float #:
max_2d_cax_to_epid_mm: float #:
median_2d_cax_to_epid_mm: float #:
mean_2d_cax_to_epid_mm: float #:
gantry_3d_iso_diameter_mm: float #:
max_gantry_rms_deviation_mm: float #:
max_epid_rms_deviation_mm: float #:
gantry_coll_3d_iso_diameter_mm: float #:
coll_2d_iso_diameter_mm: float #:
max_coll_rms_deviation_mm: float #:
couch_2d_iso_diameter_mm: float #:
max_couch_rms_deviation_mm: float #:
image_details: list[WinstonLutz2DResult] #:
keyed_image_details: dict[str, WinstonLutz2DResult] #:
[docs]
class WinstonLutzMultiTargetMultiFieldResult(ResultBase):
"""This class should not be called directly. It is returned by the ``results_data()`` method.
It is a dataclass under the hood and thus comes with all the dunder magic.
Use the following attributes as normal class attributes."""
num_total_images: int #:
max_2d_field_to_bb_mm: float #:
median_2d_field_to_bb_mm: float #:
mean_2d_field_to_bb_mm: float #:
bb_arrangement: tuple[BBConfig, ...] #:
bb_maxes: dict[str, float] #:
bb_shift_vector: VectorSerialized #:
bb_shift_yaw: float #:
bb_shift_pitch: float #:
bb_shift_roll: float #:
def is_near_center(region: RegionProperties, *args, **kwargs) -> bool:
"""Whether the bb is <2cm from the center of the field"""
dpmm = kwargs["dpmm"]
shape = kwargs["shape"]
extent_limit_mm = 20
bottom, left, top, right = region.bbox
bb_center_x = left + (right - left) / 2
bb_center_y = bottom + (top - bottom) / 2
x_lo_limit = shape[1] / 2 - dpmm * extent_limit_mm
x_hi_limit = shape[1] / 2 + dpmm * extent_limit_mm
is_bb_x_centered = x_lo_limit < bb_center_x < x_hi_limit
y_lo_limit = shape[0] / 2 - dpmm * extent_limit_mm
y_hi_limit = shape[0] / 2 + dpmm * extent_limit_mm
is_bb_y_centered = y_lo_limit < bb_center_y < y_hi_limit
return is_bb_x_centered and is_bb_y_centered
def is_modest_size(region: RegionProperties, *args, **kwargs) -> bool:
"""Decide whether the ROI is roughly the size of a BB; not noise and not an artifact. Used to find the BB."""
bb_area = region.area_filled / (kwargs["dpmm"] ** 2)
bb_size = kwargs["bb_size"]
larger_bb_area = np.pi * ((bb_size + 2) / 2) ** 2
smaller_bb_area = max(
(np.pi * ((bb_size - 2) / 2) ** 2, 2)
) # set a min of 2 to avoid a lower bound of 0 when radius=2. This is much more likely to find noise in a block.
return smaller_bb_area < bb_area < larger_bb_area
def is_square(region: RegionProperties, *args, **kwargs) -> bool:
"""Decide if the ROI is square in nature by testing the filled area vs bounding box. Used to find the BB."""
actual_fill_ratio = region.filled_area / region.bbox_area
return actual_fill_ratio > 0.8
def is_right_square_size(region: RegionProperties, *args, **kwargs) -> bool:
"""Decide if the ROI is square in nature by testing the filled area vs bounding box. Used to find the BB."""
field_area = region.area_filled / (kwargs["dpmm"] ** 2)
rad_size = max((kwargs["rad_size"], 5))
larger_bb_area = (rad_size + 5) ** 2
smaller_bb_area = (rad_size - 5) ** 2
return smaller_bb_area < field_area < larger_bb_area
class WLBaseImage(image.LinacDicomImage):
"""Base class for a WL Image. Represents a single image with N fields and M BBs.
Methods are provided to find the field CAXs and BBs and matching to the expected locations.
"""
field_caxs: list[Point]
bb_positions: list[Point]
bb_arrangement: tuple[BBConfig]
arrangement_matches: dict[
str, BBFieldMatch
] # a field CAX and BB matched to their respective nominal locations
def __init__(
self,
file: str | BinaryIO | Path,
use_filenames: bool = False,
**kwargs,
):
"""
Parameters
----------
file : str
Path to the image file.
use_filenames: bool
Whether to try to use the file name to determine axis values.
Useful for Elekta machines that do not include that info in the DICOM data.
"""
# override detection conditions if passed
if conditions := kwargs.pop("detection_conditions", False):
self.detection_conditions = conditions
super().__init__(file, use_filenames=use_filenames, **kwargs)
self._is_analyzed = False
def analyze(
self,
bb_arrangement: tuple[BBConfig],
is_open_field: bool = False,
is_low_density: bool = False,
shift_vector: Vector | None = None,
) -> (tuple[Point], tuple[Point]):
"""Analyze the image for BBs and field CAXs.
Parameters
----------
bb_arrangement : tuple[BBConfig]
The expected BB locations.
is_open_field : bool
Whether the field is open or not. If open, only one CAX is expected.
is_low_density : bool
Whether the BBs are low density (e.g. kV images).
shift_vector : Vector, optional
A vector to shift the detected BBs by. Useful for images that are not perfectly aligned.
"""
self.check_inversion_by_histogram(percentiles=(0.01, 50, 99.99))
self._clean_edges()
self.ground()
self.normalize()
self.bb_arrangement = bb_arrangement
field_caxs = self.find_field_centroids(is_open_field=is_open_field)
field_matches = self.find_field_matches(field_caxs)
detected_bb_points = self.find_bb_centroids(
bb_diameter_mm=bb_arrangement[0].bb_size_mm,
low_density=is_low_density,
)
if shift_vector:
# apply shift to detected BB points
lat, sup_inf = bb_projection_with_rotation(
offset_left=-shift_vector.x, # negative because left is negative x
offset_up=shift_vector.z,
offset_in=shift_vector.y,
sad=self.sad,
gantry=self.gantry_angle,
couch=self.couch_angle,
)
# convert from mm to pixels and add to the detected points
for p in detected_bb_points:
p.x += lat * self.dpmm
p.y -= (
sup_inf * self.dpmm
) # we subtract because the detected point is in image space, not coordinate space so we convert the shift from coordinate to image space
bb_matches = self.find_bb_matches(detected_points=detected_bb_points)
if len(bb_matches) != len(field_matches):
raise ValueError("The number of detected fields and BBs do not match")
if not field_matches:
raise ValueError("No fields were detected")
if not bb_matches:
raise ValueError(BB_ERROR_MESSAGE)
# we now have field CAXs and BBs matched to their respective nominal locations
# merge the field and BBs per arrangement position
combined_matches = {}
for bb_name, bb_match in bb_matches.items():
combined_matches[bb_name] = BBFieldMatch(
epid=self.center,
field=field_matches[bb_name],
bb=bb_match,
dpmm=self.dpmm,
gantry_angle=self.gantry_angle,
couch_angle=self.couch_angle,
sad=self.sad,
)
self._is_analyzed = True
self.arrangement_matches = combined_matches
def find_field_centroids(self, is_open_field: bool) -> list[Point]:
"""Find the field CAX(s) in the image. If the field is open or this is a vanilla WL, only one CAX is found."""
if is_open_field:
p = self.center
else:
# TODO: Use metrics field finder
# can't use it out of the box because the
# analyze method doesn't pass the field size
# using the global field locator without size will
# find several other unrelated fields and show up in the image
# the metric algorithm should have a post-processing function parameter
min, max = np.percentile(self.array, [5, 99.9])
threshold_img = self.as_binary((max - min) / 2 + min)
filled_img = ndimage.binary_fill_holes(threshold_img)
coords = ndimage.center_of_mass(filled_img)
p = Point(x=coords[-1], y=coords[0])
return [p]
def find_field_matches(self, detected_points: list[Point]) -> dict[str, Point]:
"""Find matches between detected field points and the arrangement. See ``find_bb_matches`` for more info."""
return self.find_bb_matches(detected_points)
def find_bb_centroids(
self, bb_diameter_mm: float, low_density: bool
) -> list[Point]:
"""Find BBs in the image. This method can return MORE than the desired number of BBs. Matching
of the detected BBs vs the expected BBs is done in the ``find_bb_matches`` method.
"""
bb_tolerance_mm = self._calculate_bb_tolerance(bb_diameter_mm)
centers = self.compute(
metrics=SizedDiskLocator.from_center_physical(
expected_position_mm=(0, 0),
search_window_mm=(40 + bb_diameter_mm, 40 + bb_diameter_mm),
radius_mm=bb_diameter_mm / 2,
radius_tolerance_mm=bb_tolerance_mm,
invert=not low_density,
detection_conditions=self.detection_conditions,
)
)
return centers
def find_bb_matches(self, detected_points: list[Point]) -> dict[str, Point]:
"""Given an arrangement and detected BB positions, find the bbs that are closest to the expected positions.
This is to prevent false positives from being detected as BBs (e.g. noise, couch, etc).
The detected BBs are matched to the expected BBs based on proximity.
These matches are linked to the individual BB arrangement by arrangement name.
"""
bbs = {}
for bb_arng in self.bb_arrangement:
nominal_point = self.nominal_bb_position(bb_arng)
distances = [
nominal_point.distance_to(found_point)
for found_point in detected_points
]
min_distance = min(distances)
min_distance_idx = distances.index(min_distance)
if min_distance < 20 * self.dpmm:
bbs[bb_arng.name] = detected_points[min_distance_idx]
return bbs
def field_to_bb_distances(self) -> list[float]:
"""The distances from the field CAXs to the BBs in mm. Useful for metrics as this is only
the resulting floats vs a dict of points."""
return [
match.bb_field_distance_mm for match in self.arrangement_matches.values()
]
def epid_to_bb_distances(self) -> list[float]:
"""The distances from the EPID center to the BBs in mm. Useful for metrics as this is only
the resulting floats vs a dict of points."""
return [
match.bb_epid_distance_mm for match in self.arrangement_matches.values()
]
def plot(
self,
ax: plt.Axes | None = None,
show: bool = True,
clear_fig: bool = False,
zoom: bool = True,
legend: bool = True,
) -> plt.Axes:
"""Plot an individual WL image.
Parameters
----------
ax : None, plt.Axes
The axis to plot to. If None, a new figure is created.
show : bool
Whether to show the plot.
clear_fig : bool
Whether to clear the figure before plotting.
zoom : bool
Whether to zoom in on the BBs. If False, no zooming is done and the entire image is shown.
legend : bool
Whether to show the legend.
"""
ax = super().plot(ax=ax, show=False, clear_fig=clear_fig, show_metrics=True)
# show EPID center
ax.axvline(x=self.epid.x, color="b")
epid_handle = ax.axhline(y=self.epid.y, color="b")
# show the field CAXs
for match in self.arrangement_matches.values():
(field_handle,) = ax.plot(match.field.x, match.field.y, "gs", ms=8)
(bb_handle,) = ax.plot(match.bb.x, match.bb.y, "co", ms=10)
if legend:
ax.legend(
(field_handle, bb_handle, epid_handle),
("Field CAX", "Detected BB", "EPID Center"),
loc="upper right",
)
if zoom:
# find the x and y limits based on the detected BB positions
# and add a margin of 20mm
min_x = (
min([match.bb.x for match in self.arrangement_matches.values()])
- 20 * self.dpmm
)
min_y = (
min([match.bb.y for match in self.arrangement_matches.values()])
- 20 * self.dpmm
)
max_x = (
max([match.bb.x for match in self.arrangement_matches.values()])
+ 20 * self.dpmm
)
max_y = (
max([match.bb.y for match in self.arrangement_matches.values()])
+ 20 * self.dpmm
)
ax.set_ylim([max_y, min_y])
ax.set_xlim([min_x, max_x])
# ax.set_yticklabels([])
# ax.set_xticklabels([])
ax.set_title("\n".join(wrap(Path(self.path).name, 30)), fontsize=10)
ax.set_xlabel(
f"G={self.gantry_angle:.0f}, B={self.collimator_angle:.0f}, P={self.couch_angle:.0f}"
)
ax.set_ylabel(f"Max Nominal to BB: {max(self.field_to_bb_distances()):3.2f}mm")
if show:
plt.show()
return ax
def nominal_bb_position(self, bb_config: BBConfig) -> Point:
"""Calculate the expected point position in 2D"""
shift_x_mm, shift_y_mm = bb_projection_with_rotation(
offset_left=bb_config.offset_left_mm,
offset_up=bb_config.offset_up_mm,
offset_in=bb_config.offset_in_mm,
sad=self.sad,
gantry=self.gantry_angle,
couch=self.couch_angle,
)
# the field can be asymmetric, so use center of image
expected_y = self.epid.y - shift_y_mm * self.dpmm
expected_x = self.epid.x + shift_x_mm * self.dpmm
return Point(x=expected_x, y=expected_y)
@property
def epid(self) -> Point:
"""Center of the EPID panel"""
return self.center
def _calculate_bb_tolerance(self, bb_diameter: float) -> int:
"""Calculate the BB tolerance based on the BB diameter.
Min will be 2 for 1.5mm and under. Will be 4 for diameters at or above 30mm."""
y = (2, 4)
x = (1.5, 30)
return np.interp(bb_diameter, x, y)
@property
def variable_axis(self) -> Axis:
"""The axis that is varying.
There are five types of images:
* Reference : All axes are at 0.
* Gantry: All axes but gantry at 0.
* Collimator : All axes but collimator at 0.
* Couch : All axes but couch at 0.
* Combo : More than one axis is not at 0.
"""
G0 = is_close(self.gantry_angle, [0, 360])
B0 = is_close(self.collimator_angle, [0, 360])
P0 = is_close(self.couch_angle, [0, 360])
if G0 and B0 and not P0:
return Axis.COUCH
elif G0 and P0 and not B0:
return Axis.COLLIMATOR
elif P0 and B0 and not G0:
return Axis.GANTRY
elif P0 and B0 and G0:
return Axis.REFERENCE
elif P0:
return Axis.GB_COMBO
else:
return Axis.GBP_COMBO
def _clean_edges(self, window_size: int = 2) -> None:
"""Clean the edges of the image to be near the background level."""
def has_noise(self, window_size):
"""Helper method to determine if there is spurious signal at any of the image edges.
Determines if the min or max of an edge is within 10% of the baseline value and trims if not.
"""
near_min, near_max = np.percentile(self.array, [5, 99.5])
img_range = near_max - near_min
top = self[:window_size, :]
left = self[:, :window_size]
bottom = self[-window_size:, :]
right = self[:, -window_size:]
edge_array = np.concatenate(
(top.flatten(), left.flatten(), bottom.flatten(), right.flatten())
)
edge_too_low = edge_array.min() < (near_min - img_range / 10)
edge_too_high = edge_array.max() > (near_max + img_range / 10)
return edge_too_low or edge_too_high
safety_stop = np.min(self.shape) / 10
while has_noise(self, window_size) and safety_stop > 0:
self.crop(window_size)
safety_stop -= 1
[docs]
class WinstonLutz2D(WLBaseImage, ResultsDataMixin[WinstonLutz2DResult]):
"""Holds individual Winston-Lutz EPID images, image properties, and automatically finds the field CAX and BB."""
bb: Point
field_cax: Point
bb_arrangement: tuple[BBConfig]
is_from_tiff: bool = False
detection_conditions: list[callable] = [
is_right_size_bb,
is_round,
is_right_circumference,
is_symmetric,
is_solid,
]
[docs]
def analyze(
self,
bb_size_mm: float = 5,
low_density_bb: bool = False,
open_field: bool = False,
shift_vector: Vector | None = None,
) -> None:
"""Analyze the image. See WinstonLutz.analyze for parameter details."""
bb_config = BBArrangement.ISO
bb_config[0].bb_size_mm = bb_size_mm
super().analyze(
bb_arrangement=bb_config,
is_open_field=open_field,
is_low_density=low_density_bb,
shift_vector=shift_vector,
)
self.bb_arrangement = bb_config
# these are set for the deprecated properties of the 2D analysis specifically where 1 field and 1 bb are expected.
self.field_cax = self.arrangement_matches["Iso"].field
self.bb = self.arrangement_matches["Iso"].bb
def __repr__(self):
return f"WLImage(gantry={self.gantry_angle:.1f}, coll={self.collimator_angle:.1f}, couch={self.couch_angle:.1f})"
[docs]
def to_axes(self) -> str:
"""Give just the axes values as a human-readable string"""
return f"Gantry={self.gantry_angle:.1f}, Coll={self.collimator_angle:.1f}, Couch={self.couch_angle:.1f}"
@property
def cax2bb_vector(self) -> Vector:
"""The vector in mm from the CAX to the BB."""
dist = (self.bb - self.field_cax) / self.dpmm
return Vector(dist.x, dist.y, dist.z)
@property
def cax2bb_distance(self) -> float:
"""The scalar distance in mm from the CAX to the BB."""
dist = self.field_cax.distance_to(self.bb)
return dist / self.dpmm
@property
def cax2epid_vector(self) -> Vector:
"""The vector in mm from the CAX to the EPID center pixel"""
dist = (self.epid - self.field_cax) / self.dpmm
return Vector(dist.x, dist.y, dist.z)
@property
def cax2epid_distance(self) -> float:
"""The scalar distance in mm from the CAX to the EPID center pixel"""
return self.field_cax.distance_to(self.epid) / self.dpmm
[docs]
def save_plot(self, filename: str, **kwargs):
"""Save the image plot to file."""
self.plot(show=False)
plt.tight_layout()
plt.savefig(filename, **kwargs)
@property
def variable_axis(self) -> Axis:
"""The axis that is varying.
There are five types of images:
* Reference : All axes are at 0.
* Gantry: All axes but gantry at 0.
* Collimator : All axes but collimator at 0.
* Couch : All axes but couch at 0.
* Combo : More than one axis is not at 0.
"""
G0 = is_close(self.gantry_angle, [0, 360])
B0 = is_close(self.collimator_angle, [0, 360])
P0 = is_close(self.couch_angle, [0, 360])
if G0 and B0 and not P0:
return Axis.COUCH
elif G0 and P0 and not B0:
return Axis.COLLIMATOR
elif P0 and B0 and not G0:
return Axis.GANTRY
elif P0 and B0 and G0:
return Axis.REFERENCE
elif P0:
return Axis.GB_COMBO
else:
return Axis.GBP_COMBO
def _generate_results_data(self) -> WinstonLutz2DResult:
"""Present the results data and metadata as a dataclass or dict.
The default return type is a dataclass."""
if not self._is_analyzed:
raise ValueError("The image is not analyzed. Use .analyze() first.")
return WinstonLutz2DResult(
variable_axis=self.variable_axis.value,
cax2bb_vector=self.cax2bb_vector,
cax2epid_vector=self.cax2epid_vector,
cax2bb_distance=self.cax2bb_distance,
cax2epid_distance=self.cax2epid_distance,
bb_location=self.bb,
field_cax=self.field_cax,
)
[docs]
class WinstonLutz(ResultsDataMixin[WinstonLutzResult]):
"""Class for performing a Winston-Lutz test of the radiation isocenter."""
images: list[WinstonLutz2D] #:
machine_scale: MachineScale #:
image_type = WinstonLutz2D
bb: BB3D # 3D representation of the BB; there is a .bb object for 2D images but is a 2D representation
is_from_cbct: bool = False
_bb_diameter: float
_virtual_shift: str | None = None
detection_conditions: list[callable] = [
is_right_size_bb,
is_round,
is_right_circumference,
is_symmetric,
is_solid,
]
def __init__(
self,
directory: str | list[str] | Path,
use_filenames: bool = False,
axis_mapping: dict[str, tuple[int, int, int]] | None = None,
axes_precision: int | None = None,
dpi: float | None = None,
sid: float | None = None,
):
"""
Parameters
----------
directory : str, list[str]
Path to the directory of the Winston-Lutz EPID images or a list of the image paths
use_filenames: bool
Whether to try to use the file name to determine axis values.
Useful for Elekta machines that do not include that info in the DICOM data.
This is mutually exclusive to axis_mapping. If True, axis_mapping is ignored.
axis_mapping: dict
An optional way of instantiating by passing each file along with the axis values.
Structure should be <filename>: (<gantry>, <coll>, <couch>).
axes_precision: int | None
How many significant digits to represent the axes values. If None, no precision is set and the input/DICOM values are used raw.
If set to an integer, rounds the axes values (gantry, coll, couch) to that many values. E.g. gantry=0.1234 => 0.1 with precision=1.
This is mostly useful for plotting/rounding (359.9=>0) and if using the ``keyed_image_details`` with ``results_data``.
dpi
The dots-per-inch setting. Only needed if using TIFF images and the images do not contain the resolution tag.
An error will raise if dpi is not passed and the TIFF resolution cannot be determined.
sid
The Source-to-Image distance in mm. Only needed when using TIFF images.
"""
self.images = []
if axis_mapping and not use_filenames:
for filename, (gantry, coll, couch) in axis_mapping.items():
self.images.append(
self._load_image(
Path(directory) / filename,
sid=sid,
dpi=dpi,
gantry=gantry,
coll=coll,
couch=couch,
axes_precision=axes_precision,
)
)
elif isinstance(directory, list):
for file in directory:
if is_image(file):
self.images.append(
self._load_image(
file,
dpi=dpi,
sid=sid,
use_filenames=use_filenames,
axes_precision=axes_precision,
)
)
elif not osp.isdir(directory):
raise ValueError(
"Invalid directory passed. Check the correct method and file was used."
)
else:
image_files = image.retrieve_image_files(directory)
for file in image_files:
self.images.append(
self._load_image(
file,
dpi=dpi,
sid=sid,
use_filenames=use_filenames,
axes_precision=axes_precision,
)
)
if len(self.images) < 2:
raise ValueError(
"<2 valid WL images were found in the folder/file or passed. Ensure you chose the correct folder/file for analysis."
)
self.images.sort(
key=lambda i: (i.gantry_angle, i.collimator_angle, i.couch_angle)
)
self._is_analyzed = False
def _load_image(
self,
file: str | Path,
sid: float | None,
dpi: float | None,
**kwargs,
) -> WinstonLutz2D:
"""A helper method to load either DICOM or TIFF files appropriately."""
try:
return self.image_type(
file, detection_conditions=self.detection_conditions, **kwargs
)
except AttributeError:
if kwargs.get("gantry") is None:
raise ValueError(
"TIFF images detected. Must pass `axis_mapping` parameter."
)
if sid is None:
raise ValueError("TIFF images detected. Must pass `sid` parameter")
with io.BytesIO() as stream:
ds = tiff_to_dicom(
tiff_file=file,
sid=sid,
dpi=dpi,
gantry=kwargs.pop("gantry"),
coll=kwargs.pop("coll"),
couch=kwargs.pop("couch"),
)
ds.save_as(stream, write_like_original=False)
img = self.image_type(
stream, detection_conditions=self.detection_conditions, **kwargs
)
if not img.dpmm:
raise ValueError(
"TIFF images were detected but the dpi tag was not available. Pass the `dpi` parameter manually."
)
img.filter(size=0.01, kind="median")
return img
[docs]
@classmethod
def from_demo_images(cls, **kwargs):
"""Instantiate using the demo images.
Parameters
----------
kwargs
See parameters of the __init__ method for details.
"""
demo_file = retrieve_demo_file(name="winston_lutz.zip")
return cls.from_zip(demo_file, **kwargs)
[docs]
@classmethod
def from_zip(cls, zfile: str | BinaryIO | Path, **kwargs):
"""Instantiate from a zip file rather than a directory.
Parameters
----------
zfile
Path to the archive file.
kwargs
See parameters of the __init__ method for details.
"""
with TemporaryZipDirectory(zfile) as tmpz:
obj = cls(tmpz, **kwargs)
return obj
[docs]
@classmethod
def from_url(cls, url: str, **kwargs):
"""Instantiate from a URL.
Parameters
----------
url : str
URL that points to a zip archive of the DICOM images.
kwargs
See parameters of the __init__ method for details.
"""
zfile = get_url(url)
return cls.from_zip(zfile, **kwargs)
[docs]
@classmethod
def from_cbct_zip(cls, file: Path | str, raw_pixels: bool = False, **kwargs):
"""Instantiate from a zip file containing CBCT images.
Parameters
----------
file
Path to the archive file.
raw_pixels
If True, uses the raw pixel values of the DICOM files. If False, uses the rescaled Hounsfield units.
Generally, this should be true.
kwargs
See parameters of the __init__ method for details.
"""
with TemporaryZipDirectory(file) as tmpz:
obj = cls.from_cbct(tmpz, raw_pixels=raw_pixels, **kwargs)
return obj
[docs]
@classmethod
def from_cbct(cls, directory: Path | str, raw_pixels: bool = False, **kwargs):
"""Create a 4-angle WL test from a CBCT dataset.
The dataset is loaded and the array is "viewed" from top, bottom, left, and right to create the 4 angles.
The dataset has to be rescaled so that the z-axis spacing is equal to the x/y axis. This is because the
typical slice thickness is much larger than the in-plane resolution.
Parameters
----------
directory
The directory containing the CBCT DICOM files.
raw_pixels
If True, uses the raw pixel values of the DICOM files. If False, uses the rescaled Hounsfield units.
Generally, this should be true.
kwargs
See parameters of the __init__ method for details.
"""
dicom_stack = DicomImageStack(
folder=directory, min_number=10, raw_pixels=raw_pixels
)
np_stack = np.stack(dicom_stack.images, axis=-1)
zoom_ratio = (
1,
dicom_stack.metadata.SliceThickness / dicom_stack.metadata.PixelSpacing[0],
)
left_arr = np.rot90(
zoom(
np_stack.max(axis=0),
zoom=zoom_ratio,
grid_mode=True,
mode="nearest",
order=1,
),
k=1,
)
top_arr = np.rot90(
zoom(
np_stack.max(axis=1),
zoom=zoom_ratio,
grid_mode=True,
mode="nearest",
order=1,
),
k=1,
)
right_arr = np.fliplr(left_arr)
bottom_arr = np.fliplr(top_arr)
dicom_dir = Path(tempfile.mkdtemp())
dpi = 25.4 / dicom_stack.metadata.PixelSpacing[0]
for array, gantry in zip(
(left_arr, top_arr, right_arr, bottom_arr), (270, 0, 90, 180)
):
ds = array_to_dicom(
array=np.ascontiguousarray(array), # pydicom complains due to np.rot90
sid=1000,
gantry=gantry,
coll=0,
couch=0,
dpi=dpi,
)
ds.save_as(dicom_dir / f"G={gantry}", write_like_original=False)
# now we load these as normal images into the WL algorithm
instance = cls(dicom_dir, **kwargs)
instance.is_from_cbct = True
return instance
[docs]
@staticmethod
def run_demo() -> None:
"""Run the Winston-Lutz demo, which loads the demo files, prints results, and plots a summary image."""
wl = WinstonLutz.from_demo_images()
wl.analyze(machine_scale=MachineScale.VARIAN_IEC)
print(wl.results())
wl.plot_summary()
[docs]
def analyze(
self,
bb_size_mm: float = 5,
machine_scale: MachineScale = MachineScale.IEC61217,
low_density_bb: bool = False,
open_field: bool = False,
apply_virtual_shift: bool = False,
) -> None:
"""Analyze the WL images.
Parameters
----------
bb_size_mm
The expected diameter of the BB in mm. The actual size of the BB can be +/-2mm from the passed value.
machine_scale
The scale of the machine. Shift vectors depend on this value.
low_density_bb
Set this flag to True if the BB is lower density than the material surrounding it.
open_field
If True, sets the field center to the EPID center under the assumption the field is not the focus of interest or is too wide to be calculated.
This is often helpful for kV WL analysis where the blades are wide open and even then the blade edge is of
less interest than simply the imaging iso vs the BB.
apply_virtual_shift
If True, applies a virtual shift to the BBs based on the shift necessary to place the BB at the radiation isocenter.
"""
self.machine_scale = machine_scale
if self.is_from_cbct:
low_density_bb = True
open_field = True
for img in self.images:
img.analyze(bb_size_mm, low_density_bb, open_field)
# we need to construct the BB representation to get the shift vector
bb_config = BBArrangement.ISO[0]
bb_config.bb_size_mm = bb_size_mm
self.bb = BB3D(
bb_config=bb_config,
bb_matches=[img.arrangement_matches["Iso"] for img in self.images],
scale=self.machine_scale,
)
if apply_virtual_shift:
shift = self.bb_shift_vector
self._virtual_shift = self.bb_shift_instructions()
for img in self.images:
img.analyze(bb_size_mm, low_density_bb, open_field, shift_vector=shift)
# in the vanilla WL case, the BB can only be represented by non-couch-kick images
# the ray trace cannot handle the kick currently
self.bb = BB3D(
bb_config=bb_config,
bb_matches=[img.arrangement_matches["Iso"] for img in self.images],
scale=self.machine_scale,
)
self._is_analyzed = True
self._bb_diameter = bb_size_mm
@lru_cache()
def _minimize_axis(self, axes: Axis | tuple[Axis, ...] = (Axis.GANTRY,)):
"""Return the minimization result of the given axis."""
if isinstance(axes, Axis):
axes = (axes,)
things = [
# we want the bb<->field because the BB is our reference point for vanilla WL
# whether it's BB->field or field->BB is irrelevant for this case; we are not determining shift here.
image.arrangement_matches["Iso"].bb_to_field_projection
for image in self.images
if image.variable_axis in (axes + (Axis.REFERENCE,))
]
if len(things) <= 1:
raise ValueError(
"Not enough images of the given type to identify the axis isocenter"
)
initial_guess = np.array([0, 0, 0])
bounds = [(-20, 20), (-20, 20), (-20, 20)]
result = optimize.minimize(
max_distance_to_lines, initial_guess, args=things, bounds=bounds
)
return result
@property
def gantry_iso_size(self) -> float:
"""The diameter of the 3D gantry isocenter size in mm. Only images where the collimator
and couch were at 0 are used to determine this value."""
num_gantry_like_images = self._get_images((Axis.GANTRY, Axis.REFERENCE))[0]
if num_gantry_like_images > 1:
return self._minimize_axis(Axis.GANTRY).fun * 2
else:
return 0
@property
def gantry_coll_iso_size(self) -> float:
"""The diameter of the 3D gantry isocenter size in mm *including collimator and gantry/coll combo images*.
Images where the couch!=0 are excluded."""
num_gantry_like_images = self._get_images(
(Axis.GANTRY, Axis.COLLIMATOR, Axis.GB_COMBO, Axis.REFERENCE)
)[0]
if num_gantry_like_images > 1:
return (
self._minimize_axis((Axis.GANTRY, Axis.COLLIMATOR, Axis.GB_COMBO)).fun
* 2
)
else:
return 0
@staticmethod
def _find_max_distance_between_points(images) -> float:
"""Find the maximum distance between a set of points. Used for 2D images like collimator and couch."""
points = [
Point(image.cax2bb_vector.x, image.cax2bb_vector.y) for image in images
]
dists = []
for point1 in points:
for point2 in points:
p = point1.distance_to(point2)
dists.append(p)
return max(dists)
@property
def collimator_iso_size(self) -> float:
"""The 2D collimator isocenter size (diameter) in mm. The iso size is in the plane
normal to the gantry."""
num_collimator_like_images, images = self._get_images(
(Axis.COLLIMATOR, Axis.REFERENCE)
)
if num_collimator_like_images > 1:
return self._find_max_distance_between_points(images)
else:
return 0
@property
def couch_iso_size(self) -> float:
"""The diameter of the 2D couch isocenter size in mm. Only images where
the gantry and collimator were at zero are used to determine this value."""
num_couch_like_images, images = self._get_images((Axis.COUCH, Axis.REFERENCE))
if num_couch_like_images > 1:
return self._find_max_distance_between_points(images)
else:
return 0
@property
def bb_shift_vector(self) -> Vector:
"""The shift necessary to place the BB at the radiation isocenter.
The values are in the coordinates defined in the documentation.
The shift is based on the paper by Low et al. See online documentation and the ``solve_3d_shift_vector_from_2d_planes`` function for more.,
which is how the measured bb and field positions are determined.
"""
# field minus BB will give the shift vector to RETURN TO ISO which is what we want. BB minus field would give the vector from field to the BB.
return self.bb.measured_field_position - self.bb.measured_bb_position
[docs]
def bb_shift_instructions(
self,
couch_vrt: float | None = None,
couch_lng: float | None = None,
couch_lat: float | None = None,
) -> str:
"""Returns a string describing how to shift the BB to the radiation isocenter looking from the foot of the couch.
Optionally, the current couch values can be passed in to get the new couch values. If passing the current
couch position all values must be passed.
Parameters
----------
couch_vrt : float
The current couch vertical position in cm.
couch_lng : float
The current couch longitudinal position in cm.
couch_lat : float
The current couch lateral position in cm.
"""
sv = self.bb_shift_vector
x_dir = "LEFT" if sv.x < 0 else "RIGHT"
y_dir = "IN" if sv.y > 0 else "OUT"
z_dir = "UP" if sv.z > 0 else "DOWN"
move = f"{x_dir} {abs(sv.x):2.2f}mm; {y_dir} {abs(sv.y):2.2f}mm; {z_dir} {abs(sv.z):2.2f}mm"
if all(val is not None for val in [couch_vrt, couch_lat, couch_lng]):
new_lat = round(couch_lat + sv.x / 10, 2)
new_vrt = round(couch_vrt + sv.z / 10, 2)
new_lng = round(couch_lng + sv.y / 10, 2)
move += f"\nNew couch coordinates (mm): VRT: {new_vrt:3.2f}; LNG: {new_lng:3.2f}; LAT: {new_lat:3.2f}"
return move
[docs]
@argue.options(value=("all", "range"))
def axis_rms_deviation(
self, axis: Axis | tuple[Axis, ...] = Axis.GANTRY, value: str = "all"
) -> Iterable | float:
"""The RMS deviations of a given axis/axes.
Parameters
----------
axis : ('Gantry', 'Collimator', 'Couch', 'Epid', 'GB Combo', 'GBP Combo')
The axis desired.
value : {'all', 'range'}
Whether to return all the RMS values from all images for that axis, or only return the maximum range of
values, i.e. the 'sag'.
"""
if isinstance(axis, Iterable):
axis = [convert_to_enum(ax, Axis) for ax in axis]
else:
axis = convert_to_enum(axis, Axis)
if axis != Axis.EPID:
attr = "cax2bb_vector"
else:
attr = "cax2epid_vector"
axis = (Axis.GANTRY, Axis.COLLIMATOR, Axis.REFERENCE)
imgs = self._get_images(axis=axis)[1]
if len(imgs) <= 1:
return (0,)
rms = [getattr(img, attr).as_scalar() for img in imgs]
if value == "range":
rms = max(rms) - min(rms)
return rms
[docs]
@argue.options(metric=("max", "median", "mean"))
def cax2bb_distance(self, metric: str = "max") -> float:
"""The distance in mm between the CAX and BB for all images according to the given metric.
Parameters
----------
metric : {'max', 'median', 'mean'}
The metric of distance to use.
"""
distances = []
for img in self.images:
distances.extend(img.field_to_bb_distances())
if metric == "max":
return max(distances)
elif metric == "median":
return statistics.median(distances)
elif metric == "mean":
return statistics.mean(distances)
[docs]
@argue.options(metric=("max", "median", "mean"))
def cax2epid_distance(self, metric: str = "max") -> float:
"""The distance in mm between the CAX and EPID center pixel for all images according to the given metric.
Parameters
----------
metric : {'max', 'median', 'mean'}
The metric of distance to use.
"""
distances = []
for img in self.images:
distances.extend(img.epid_to_bb_distances())
if metric == "max":
return max(distances)
elif metric == "median":
return statistics.median(distances)
elif metric == "mean":
return statistics.mean(distances)
def _plot_deviation(
self, axis: Axis, ax: plt.Axes | None = None, show: bool = True
) -> None:
"""Helper function: Plot the sag in Cartesian coordinates.
Parameters
----------
axis : {'gantry', 'epid', 'collimator', 'couch'}
The axis to plot.
ax : None, matplotlib.Axes
The axis to plot to. If None, creates a new plot.
show : bool
Whether to show the image.
"""
title = f"In-plane {axis.value} displacement"
if axis == Axis.EPID:
attr = "cax2epid_vector"
axis = Axis.GANTRY
else:
attr = "cax2bb_vector"
# get axis images, angles, and shifts
imgs = [
image
for image in self.images
if image.variable_axis in (axis, Axis.REFERENCE)
]
angles = [getattr(image, f"{axis.value.lower()}_angle") for image in imgs]
xz_sag = np.array([getattr(img, attr).x for img in imgs])
y_sag = np.array([getattr(img, attr).y for img in imgs])
rms = np.sqrt(xz_sag**2 + y_sag**2)
# plot the axis deviation
if ax is None:
ax = plt.subplot(111)
ax.plot(angles, y_sag, "bo", label="Y-axis", ls="-.")
ax.plot(angles, xz_sag, "m^", label="X/Z-axis", ls="-.")
ax.plot(angles, rms, "g+", label="RMS", ls="-")
ax.set_title(title)
ax.set_ylabel("mm")
ax.set_xlabel(f"{axis.value} angle")
ax.set_xticks(np.arange(0, 361, 45))
ax.set_xlim(-15, 375)
ax.grid(True)
ax.legend(numpoints=1)
if show:
plt.show()
def _get_images(
self, axis: Axis | tuple[Axis, ...] = (Axis.GANTRY,)
) -> tuple[float, list]:
if isinstance(axis, Axis):
axis = (axis,)
images = [image for image in self.images if image.variable_axis in axis]
return len(images), images
[docs]
def plot_axis_images(
self, axis: Axis = Axis.GANTRY, show: bool = True, ax: plt.Axes | None = None
) -> None:
"""Plot all CAX/BB/EPID positions for the images of a given axis.
For example, axis='Couch' plots a reference image, and all the BB points of the other
images where the couch was moving.
Parameters
----------
axis : {'Gantry', 'Collimator', 'Couch', 'GB Combo', 'GBP Combo'}
The images/markers from which accelerator axis to plot.
show : bool
Whether to actually show the images.
ax : None, matplotlib.Axes
The axis to plot to. If None, creates a new plot.
"""
axis = convert_to_enum(axis, Axis)
images = [
image
for image in self.images
if image.variable_axis in (axis, Axis.REFERENCE)
]
ax = images[0].plot(
show=False, ax=ax
) # plots the first marker; plot the rest of the markers below
if axis != Axis.COUCH:
# plot EPID
epid_xs = [img.epid.x for img in images[1:]]
epid_ys = [img.epid.y for img in images[1:]]
ax.plot(epid_xs, epid_ys, "b+", ms=8)
# get CAX positions
xs = [img.field_cax.x for img in images[1:]]
ys = [img.field_cax.y for img in images[1:]]
marker = "gs"
else:
# get BB positions
xs = [img.bb.x for img in images[1:]]
ys = [img.bb.y for img in images[1:]]
marker = "ro"
ax.plot(xs, ys, marker, ms=8)
# set labels
ax.set_title(axis.value + " wobble")
ax.set_xlabel(axis.value + " positions superimposed")
ax.set_ylabel(
axis.value
+ f" iso size: {getattr(self, axis.value.lower() + '_iso_size'):3.2f}mm"
)
if show:
plt.show()
[docs]
def plot_location(
self,
show: bool = True,
viewbox_mm: float | None = None,
plot_bb: bool = True,
plot_isocenter_sphere: bool = True,
plot_couch_iso: bool = True,
plot_coll_iso: bool = True,
show_legend: bool = True,
):
"""Plot the isocenter and size as a sphere in 3D space relative to the BB. The
iso is at the origin.
Only images where the couch was at zero are considered.
Parameters
----------
show : bool
Whether to plot the image.
viewbox_mm : float
The default size of the 3D space to plot in mm in each axis.
plot_bb : bool
Whether to plot the BB location; the size is also considered.
plot_isocenter_sphere : bool
Whether to plot the gantry + collimator isocenter size.
plot_couch_iso : bool
Whether to plot the couch-plane-only isocenter size.
This will be zero if there are no images where the couch rotated.
plot_coll_iso : bool
Whether to plot the collimator-plane-only isocenter size.
This is shown along the Z/Y plane only to differentiate from the couch iso visualization.
The collimator plane is always normal to the gantry angle.
This will be zero if there are no images where the collimator rotated.
show_legend : bool
Whether to show the legend.
"""
limit = (
viewbox_mm
or max(
np.abs(
(
self.bb_shift_vector.x,
self.bb_shift_vector.y,
self.bb_shift_vector.z,
)
)
)
+ self._bb_diameter
)
ax = plt.axes(projection="3d")
# we can represent the iso sphere as a BB object; the nominal object isn't used, just the BB size
# plot the field isocenter as x,y,z lines
x_line = Line(
Point(
-limit,
self.bb.measured_field_position.y,
self.bb.measured_field_position.z,
),
Point(
limit, self.bb.measured_field_position.y, self.bb.measured_bb_position.z
),
)
x_line.plot2axes(ax, color="green", alpha=0.5)
y_line = Line(
Point(
self.bb.measured_field_position.x,
-limit,
self.bb.measured_field_position.z,
),
Point(
self.bb.measured_field_position.x, limit, self.bb.measured_bb_position.z
),
)
y_line.plot2axes(ax, color="green", alpha=0.5)
z_line = Line(
Point(
self.bb.measured_field_position.x,
self.bb.measured_field_position.y,
-limit,
),
Point(
self.bb.measured_field_position.x,
self.bb.measured_field_position.y,
limit,
),
)
z_line.plot2axes(ax, color="green", alpha=0.5, label="Field isocenter (x,y,z)")
if plot_bb:
self.bb.plot_measured(ax, color="cyan", alpha=0.6)
# create an empty, fake line so we can add a label for the legend
fake_line = Line(Point(0, 0, 0), Point(0, 0, 0))
fake_line.plot2axes(ax, color="cyan", label=f"BB ({self._bb_diameter}mm)")
if plot_isocenter_sphere:
x, y, z = create_sphere_surface(
radius=self.gantry_coll_iso_size / 2,
center=Point(
self.bb.measured_bb_position.x,
self.bb.measured_bb_position.y,
self.bb.measured_bb_position.z,
),
)
ax.plot_surface(x, y, z, alpha=0.3, color="magenta")
# create an empty, fake line so we can add a label for the legend
fake_line = Line(Point(0, 0, 0), Point(0, 0, 0))
fake_line.plot2axes(
ax,
color="magenta",
label=f"Gantry + Coll Isosphere ({self.gantry_coll_iso_size:3.2f}mm)",
)
if plot_couch_iso:
circle = plt.Circle(
(self.bb.measured_field_position.x, self.bb.measured_field_position.y),
radius=self.couch_iso_size / 2,
fill=True,
color="yellow",
alpha=0.4,
label=f"Couch-only iso ({self.couch_iso_size:3.2f}mm)",
)
ax.add_patch(circle)
art3d.pathpatch_2d_to_3d(
circle, z=self.bb.measured_field_position.z, zdir="z"
)
if plot_coll_iso:
circle = plt.Circle(
(self.bb.measured_field_position.y, self.bb.measured_field_position.z),
radius=self.collimator_iso_size / 2,
fill=True,
color="blue",
alpha=0.4,
label=f"Collimator-only iso ({self.collimator_iso_size:3.2f}mm)",
)
ax.add_patch(circle)
art3d.pathpatch_2d_to_3d(
circle, z=self.bb.measured_field_position.x, zdir="x"
)
if show_legend:
ax.legend()
# set the limits of the 3D plot; they must be the same in all axes for equal aspect ratio
ax.set(
xlabel="X (mm), Right (+)",
ylabel="Y (mm), In (+)",
zlabel="Z (mm), Up (+)",
title="Isocenter Visualization",
ylim=[-limit, limit],
xlim=[-limit, limit],
zlim=[-limit, limit],
)
if show:
plt.show()
[docs]
def plot_images(
self,
axis: Axis = Axis.GANTRY,
show: bool = True,
zoom: bool = True,
legend: bool = True,
split: bool = False,
**kwargs,
) -> (list[plt.Figure], list[str]):
"""Plot a grid of all the images acquired.
Four columns are plotted with the titles showing which axis that column represents.
Parameters
----------
axis : {'Gantry', 'Collimator', 'Couch', 'GB Combo', 'GBP Combo', 'All'}
The axis to plot.
show : bool
Whether to show the image.
zoom : bool
Whether to zoom in around the BB.
legend : bool
Whether to show the legend.
split : bool
Whether to show/plot the images individually or as one large figure.
"""
axis = convert_to_enum(axis, Axis)
if not self._is_analyzed:
raise ValueError("The set is not analyzed. Use .analyze() first.")
# get axis images
if axis == Axis.GANTRY:
images = [
image
for image in self.images
if image.variable_axis in (Axis.GANTRY, Axis.REFERENCE)
]
elif axis == Axis.COLLIMATOR:
images = [
image
for image in self.images
if image.variable_axis in (Axis.COLLIMATOR, Axis.REFERENCE)
]
elif axis == Axis.COUCH:
images = [
image
for image in self.images
if image.variable_axis in (Axis.COUCH, Axis.REFERENCE)
]
elif axis == Axis.GB_COMBO:
images = [
image
for image in self.images
if image.variable_axis
in (Axis.GB_COMBO, Axis.GANTRY, Axis.COLLIMATOR, Axis.REFERENCE)
]
elif axis == Axis.GBP_COMBO:
images = self.images
# set the figsize if it wasn't passed
if not kwargs.get("figsize"):
dpi = 72
width_px = 1080
width_in = width_px / dpi
if not split:
max_num_images = math.ceil(len(images) / 4)
height_in = (width_in / 4) * max_num_images
else:
height_in = width_in = 3
kwargs["figsize"] = (width_in, height_in)
figs = []
names = []
# create plots
if not split:
fig, axes = plt.subplots(nrows=max_num_images, ncols=4, **kwargs)
for mpl_axis, wl_image in zip_longest(axes.flatten(), images):
# plot the images and turn off extra axes
if wl_image:
wl_image.plot(ax=mpl_axis, show=False, zoom=zoom, legend=legend)
else:
mpl_axis.set_frame_on(False)
mpl_axis.axis("off")
# set titles
fig.suptitle(f"{axis.value} images", fontsize=14, y=1)
fig.tight_layout()
figs.append(fig)
names.append("image")
else:
for wl_image in images:
fig, axes = plt.subplots(**kwargs)
wl_image.plot(ax=axes, show=False, zoom=zoom, legend=legend)
# plot_image(wl_image, axes)
figs.append(fig)
names.append(str(wl_image))
if show:
plt.show()
return figs, names
[docs]
def save_images(
self, filename: str | BinaryIO, axis: Axis = Axis.GANTRY, **kwargs
) -> None:
"""Save the figure of `plot_images()` to file. Keyword arguments are passed to `matplotlib.pyplot.savefig()`.
Parameters
----------
filename : str
The name of the file to save to.
axis
The axis to save.
"""
self.plot_images(axis=axis, show=False)
plt.savefig(filename, **kwargs)
[docs]
def save_images_to_stream(self, **kwargs) -> dict[str, io.BytesIO]:
"""Save the individual image plots to stream"""
figs, names = self.plot_images(
axis=Axis.GBP_COMBO, show=False, split=True
) # all images
streams = [io.BytesIO() for _ in figs]
for fig, stream in zip(figs, streams):
fig.savefig(stream, **kwargs)
return {name: stream for name, stream in zip(names, streams)}
[docs]
def plot_summary(self, show: bool = True, fig_size: tuple | None = None) -> None:
"""Plot a summary figure showing the gantry sag and wobble plots of the three axes."""
if not self._is_analyzed:
raise ValueError("The set is not analyzed. Use .analyze() first.")
figsize = (11, 9) if fig_size is None else fig_size
plt.figure(figsize=figsize)
grid = (3, 6)
gantry_sag_ax = plt.subplot2grid(grid, (0, 0), colspan=3)
self._plot_deviation(Axis.GANTRY, gantry_sag_ax, show=False)
epid_sag_ax = plt.subplot2grid(grid, (0, 3), colspan=3)
self._plot_deviation(Axis.EPID, epid_sag_ax, show=False)
if self._get_images((Axis.COLLIMATOR, Axis.REFERENCE))[0] > 1:
coll_sag_ax = plt.subplot2grid(grid, (1, 0), colspan=3)
self._plot_deviation(Axis.COLLIMATOR, coll_sag_ax, show=False)
if self._get_images((Axis.COUCH, Axis.REFERENCE))[0] > 1:
couch_sag_ax = plt.subplot2grid(grid, (1, 3), colspan=3)
self._plot_deviation(Axis.COUCH, couch_sag_ax, show=False)
for axis, axnum in zip((Axis.GANTRY, Axis.COLLIMATOR, Axis.COUCH), (0, 2, 4)):
if self._get_images((axis, Axis.REFERENCE))[0] > 1:
ax = plt.subplot2grid(grid, (2, axnum), colspan=2)
self.plot_axis_images(axis=axis, ax=ax, show=False)
if show:
plt.tight_layout()
plt.show()
[docs]
def save_summary(self, filename: str | BinaryIO, **kwargs) -> None:
"""Save the summary image."""
self.plot_summary(show=False, fig_size=kwargs.pop("fig_size", None))
plt.tight_layout()
plt.savefig(filename, **kwargs)
[docs]
def results(self, as_list: bool = False) -> str:
"""Return the analysis results summary.
Parameters
----------
as_list : bool
Whether to return as a list of strings vs single string. Pretty much for internal usage.
"""
if not self._is_analyzed:
raise ValueError("The set is not analyzed. Use .analyze() first.")
num_gantry_imgs = self._get_images(axis=(Axis.GANTRY, Axis.REFERENCE))[0]
num_gantry_coll_imgs = self._get_images(
axis=(Axis.GANTRY, Axis.COLLIMATOR, Axis.GB_COMBO, Axis.REFERENCE)
)[0]
num_coll_imgs = self._get_images(axis=(Axis.COLLIMATOR, Axis.REFERENCE))[0]
num_couch_imgs = self._get_images(axis=(Axis.COUCH, Axis.REFERENCE))[0]
num_imgs = len(self.images)
result = [
"Winston-Lutz Analysis",
"=================================",
f"Number of images: {num_imgs}",
f"Maximum 2D CAX->BB distance: {self.cax2bb_distance('max'):.2f}mm",
f"Median 2D CAX->BB distance: {self.cax2bb_distance('median'):.2f}mm",
f"Mean 2D CAX->BB distance: {self.cax2bb_distance('mean'):.2f}mm",
]
if self._virtual_shift:
result.append(
f"Virtual shift applied to BB to place at isocenter: {self._virtual_shift}"
)
else:
result.append(
f"Shift to iso: facing gantry, move BB: {self.bb_shift_instructions()}"
)
result += [
f"Gantry 3D isocenter diameter: {self.gantry_iso_size:.2f}mm ({num_gantry_imgs}/{num_imgs} images considered)",
f"Maximum Gantry RMS deviation (mm): {max(self.axis_rms_deviation((Axis.GANTRY, Axis.REFERENCE))):.2f}mm",
f"Maximum EPID RMS deviation (mm): {max(self.axis_rms_deviation(Axis.EPID)):.2f}mm",
f"Gantry+Collimator 3D isocenter diameter: {self.gantry_coll_iso_size:.2f}mm ({num_gantry_coll_imgs}/{num_imgs} images considered)",
f"Collimator 2D isocenter diameter: {self.collimator_iso_size:.2f}mm ({num_coll_imgs}/{num_imgs} images considered)",
f"Maximum Collimator RMS deviation (mm): {max(self.axis_rms_deviation((Axis.COLLIMATOR, Axis.REFERENCE))):.2f}",
f"Couch 2D isocenter diameter: {self.couch_iso_size:.2f}mm ({num_couch_imgs}/{num_imgs} images considered)",
f"Maximum Couch RMS deviation (mm): {max(self.axis_rms_deviation((Axis.COUCH, Axis.REFERENCE))):.2f}",
]
if not as_list:
result = "\n".join(result)
return result
def _generate_results_data(self) -> WinstonLutzResult:
"""Present the results data and metadata as a dataclass or dict.
The default return type is a dataclass."""
if not self._is_analyzed:
raise ValueError("The set is not analyzed. Use .analyze() first.")
num_gantry_imgs = self._get_images(axis=(Axis.GANTRY, Axis.REFERENCE))[0]
num_gantry_coll_imgs = self._get_images(
axis=(Axis.GANTRY, Axis.COLLIMATOR, Axis.GB_COMBO, Axis.REFERENCE)
)[0]
num_coll_imgs = self._get_images(axis=(Axis.COLLIMATOR, Axis.REFERENCE))[0]
num_couch_imgs = self._get_images(axis=(Axis.COUCH, Axis.REFERENCE))[0]
individual_image_data = [i.results_data() for i in self.images]
return WinstonLutzResult(
num_total_images=len(self.images),
num_gantry_images=num_gantry_imgs,
num_coll_images=num_coll_imgs,
num_gantry_coll_images=num_gantry_coll_imgs,
num_couch_images=num_couch_imgs,
max_2d_cax_to_bb_mm=self.cax2bb_distance("max"),
median_2d_cax_to_bb_mm=self.cax2bb_distance("median"),
mean_2d_cax_to_bb_mm=self.cax2bb_distance("mean"),
max_2d_cax_to_epid_mm=self.cax2epid_distance("max"),
median_2d_cax_to_epid_mm=self.cax2epid_distance("median"),
mean_2d_cax_to_epid_mm=self.cax2epid_distance("mean"),
coll_2d_iso_diameter_mm=self.collimator_iso_size,
couch_2d_iso_diameter_mm=self.couch_iso_size,
gantry_3d_iso_diameter_mm=self.gantry_iso_size,
gantry_coll_3d_iso_diameter_mm=self.gantry_coll_iso_size,
max_gantry_rms_deviation_mm=max(
self.axis_rms_deviation(axis=(Axis.GANTRY, Axis.REFERENCE))
),
max_coll_rms_deviation_mm=max(
self.axis_rms_deviation(axis=(Axis.COLLIMATOR, Axis.REFERENCE))
),
max_couch_rms_deviation_mm=max(
self.axis_rms_deviation(axis=(Axis.COUCH, Axis.REFERENCE))
),
max_epid_rms_deviation_mm=max(self.axis_rms_deviation(axis=Axis.EPID)),
image_details=individual_image_data,
keyed_image_details=self._generate_keyed_images(individual_image_data),
)
def _generate_keyed_images(
self, individual_image_data: list[WinstonLutz2DResult]
) -> dict[str, WinstonLutz2DResult]:
"""Generate a dict where each key is based on the axes values and the key is an image. Used in the results_data method.
We can't do a simple dict comprehension because we may have duplicate axes sets. We pass individual data
because we may have already converted to a dict; we don't want to do that again.
"""
data = {}
for img_idx, img in enumerate(self.images):
key = f"G{img.gantry_angle}B{img.collimator_angle}P{img.couch_angle}"
suffix = ""
idx = 1
while key + suffix in data.keys():
suffix = f"_{idx}"
idx += 1
data[key + suffix] = individual_image_data[img_idx]
return data
[docs]
def publish_pdf(
self,
filename: str,
notes: str | list[str] | None = None,
open_file: bool = False,
metadata: dict | None = None,
logo: Path | str | None = None,
):
"""Publish (print) a PDF containing the analysis, images, and quantitative results.
Parameters
----------
filename : (str, file-like object}
The file to write the results to.
notes : str, list of strings
Text; if str, prints single line.
If list of strings, each list item is printed on its own line.
open_file : bool
Whether to open the file using the default program after creation.
metadata : dict
Extra data to be passed and shown in the PDF. The key and value will be shown with a colon.
E.g. passing {'Author': 'James', 'Unit': 'TrueBeam'} would result in text in the PDF like:
--------------
Author: James
Unit: TrueBeam
--------------
logo: Path, str
A custom logo to use in the PDF report. If nothing is passed, the default pylinac logo is used.
"""
if not self._is_analyzed:
raise ValueError("The set is not analyzed. Use .analyze() first.")
plt.ioff()
title = "Winston-Lutz Analysis"
canvas = pdf.PylinacCanvas(
filename, page_title=title, metadata=metadata, logo=logo
)
text = self.results(as_list=True)
canvas.add_text(text=text, location=(7, 25.5))
# draw summary image on 1st page
data = io.BytesIO()
self.save_summary(data, fig_size=(8, 8))
canvas.add_image(image_data=data, location=(2, 3), dimensions=(16, 16))
if notes is not None:
canvas.add_text(text="Notes:", location=(1, 4.5), font_size=14)
canvas.add_text(text=notes, location=(1, 4))
# add more pages showing individual axis images
for ax in (
Axis.GANTRY,
Axis.COLLIMATOR,
Axis.COUCH,
Axis.GB_COMBO,
Axis.GBP_COMBO,
):
if self._contains_axis_images(ax):
canvas.add_new_page()
data = io.BytesIO()
self.save_images(data, axis=ax)
canvas.add_image(data, location=(2, 7), dimensions=(18, 18))
canvas.finish()
if open_file:
webbrowser.open(filename)
def _contains_axis_images(self, axis: Axis = Axis.GANTRY) -> bool:
"""Return whether or not the set of WL images contains images pertaining to a given axis"""
return any(True for image in self.images if image.variable_axis in (axis,))
[docs]
class WinstonLutzMultiTargetMultiFieldImage(WLBaseImage):
"""A 2D image of a WL delivery, but where multiple BBs are in use."""
detection_conditions = [is_round, is_symmetric, is_modest_size]
field_conditions = [is_square, is_right_square_size]
[docs]
def find_field_centroids(self, is_open_field: bool) -> list[Point]:
"""Find the centroid of the radiation field based on a 50% height threshold.
This applies the field detection conditions and also a nearness condition.
Returns
-------
points
The CAX point locations.
"""
if is_open_field:
return [self.center]
# find all the fields by setting the field to the mean rad size and tolerance
# to max-min field sizes across the arrangements
max_field_size = max(
self.bb_arrangement, key=lambda x: x.rad_size_mm
).rad_size_mm
min_field_size = min(
self.bb_arrangement, key=lambda x: x.rad_size_mm
).rad_size_mm
mean_field_size = (max_field_size + min_field_size) / 2
tolerance_field_size = max(
(max_field_size - min_field_size) * 1.2, 0.1 * mean_field_size
)
points = self.compute(
metrics=GlobalSizedFieldLocator.from_physical(
max_number=len(self.bb_arrangement),
field_height_mm=mean_field_size,
field_width_mm=mean_field_size,
field_tolerance_mm=tolerance_field_size,
)
)
return points
[docs]
def find_bb_centroids(
self, bb_diameter_mm: float, low_density: bool
) -> list[Point]:
"""Find the specific BB based on the arrangement rather than a single one. This is in local pixel coordinates"""
# get initial starting conditions
bb_tolerance_mm = self._calculate_bb_tolerance(bb_diameter_mm)
centers = self.compute(
metrics=GlobalSizedDiskLocator(
radius_mm=bb_diameter_mm / 2,
radius_tolerance_mm=bb_tolerance_mm,
invert=not low_density,
detection_conditions=self.detection_conditions,
max_number=len(self.bb_arrangement),
)
)
return centers
[docs]
class WinstonLutzMultiTargetMultiField(WinstonLutz):
machine_scale: MachineScale #:
images: Sequence[WinstonLutzMultiTargetMultiFieldImage] #:
image_type = WinstonLutzMultiTargetMultiFieldImage
bb_arrangement: tuple[BBConfig] #:
bbs: list[BB3D] #: 3D representation of the BBs
[docs]
@classmethod
def from_demo_images(cls):
"""Instantiate using the demo images."""
demo_file = retrieve_demo_file(name="mt_mf_wl.zip")
return cls.from_zip(demo_file)
[docs]
@staticmethod
def run_demo():
"""Run the Winston-Lutz MT MF demo, which loads the demo files, prints results, and plots a summary image."""
wl = WinstonLutzMultiTargetMultiField.from_demo_images()
wl.analyze(bb_arrangement=BBArrangement.DEMO)
print(wl.results())
wl.plot_images()
[docs]
def analyze(
self,
bb_arrangement: tuple[BBConfig, ...],
is_open_field: bool = False,
is_low_density: bool = False,
machine_scale: MachineScale = MachineScale.IEC61217,
):
"""Analyze the WL images.
Parameters
----------
bb_arrangement
The arrangement of the BBs in the phantom. A dict with offset and BB size keys. See the ``BBArrangement`` class for
keys and syntax.
"""
self.machine_scale = machine_scale
self.bb_arrangement = bb_arrangement
for img in self.images:
img.analyze(
bb_arrangement=bb_arrangement,
is_open_field=is_open_field,
is_low_density=is_low_density,
)
self.bbs = []
for arrangement in self.bb_arrangement:
# add bbs to the matches if the match is in the given image
matches = []
for img in self.images:
if arrangement.name in img.arrangement_matches:
match = img.arrangement_matches[arrangement.name]
matches.append(match)
# ray lines are used for plotting
bb = BB3D(
bb_config=arrangement,
bb_matches=matches,
scale=self.machine_scale,
)
self.bbs.append(bb)
self._is_analyzed = True
[docs]
def plot_location(
self,
show: bool = True,
viewbox_mm: float | None = None,
plot_bb: bool = True,
plot_isocenter_sphere: bool = True,
plot_couch_iso: bool = True,
plot_coll_iso: bool = True,
show_legend: bool = True,
):
x_lim = max(
max([np.abs(bb.measured_bb_position.x) for bb in self.bbs]) * 1.3, 10
)
y_lim = max(
max([np.abs(bb.measured_bb_position.y) for bb in self.bbs]) * 1.3, 10
)
z_lim = max(
max([np.abs(bb.measured_bb_position.z) for bb in self.bbs]) * 1.3, 10
)
limit = viewbox_mm or max(x_lim, y_lim, z_lim)
fig = plt.figure()
ax = fig.add_subplot(projection="3d")
_, relevant_images = self._get_images(
axis=(Axis.REFERENCE, Axis.GB_COMBO, Axis.COLLIMATOR, Axis.GANTRY)
)
# we can represent the iso sphere as a BB object; the nominal object isn't used, just the BB size
# the ray lines are what we want to plot as a sphere
# plot the x,y,z origin lines
x_line = Line(Point(-100, 0, 0), Point(100, 0, 0))
x_line.plot2axes(ax, color="green", alpha=0.5)
y_line = Line(Point(0, -100, 0), Point(0, 100, 0))
y_line.plot2axes(ax, color="green", alpha=0.5)
z_line = Line(Point(0, 0, -100), Point(0, 0, 100))
z_line.plot2axes(
ax, color="green", alpha=0.5, label="Nominal isocenter (x,y,z)"
)
if plot_bb:
for bb in self.bbs:
bb.plot_measured(ax, color="cyan", alpha=0.6)
bb.plot_nominal(ax, color="green", alpha=0.6)
# create an empty, fake line so we can add a label for the legend
fake_line = Line(Point(0, 0, 0), Point(0, 0, 0))
fake_line.plot2axes(ax, color="cyan", label="Measured BB")
fake_line = Line(Point(0, 0, 0), Point(0, 0, 0))
fake_line.plot2axes(ax, color="green", label="Nominal BB")
if show_legend:
ax.legend()
# set the limits of the 3D plot; they must be the same in all axes for equal aspect ratio
ax.set(
xlabel="X (mm), Right (+)",
ylabel="Y (mm), In (+)",
zlabel="Z (mm), Up (+)",
title="Isocenter Visualization",
ylim=[-limit, limit],
xlim=[-limit, limit],
zlim=[-limit, limit],
)
if show:
plt.show()
return fig, ax
@property
def bb_shift_vector(self) -> (Vector, float, float, float):
"""Calculate the ideal shift in 6 degrees of freedom to place the BB at the isocenter.
Returns
-------
Vector
The ideal shift vector in mm for the cartesian coordinates. X,Y, and Z follow the pylinac coordinate convention.
float
Yaw; The ideal rotation about the Z-axis in degrees.
float
Pitch; The ideal rotation about the X-axis in degrees.
float
Roll; The ideal rotation about the Y-axis in degrees.
See Also
--------
Euler Angles: https://en.wikipedia.org/wiki/Euler_angles
Gimbal Lock: https://en.wikipedia.org/wiki/Gimbal_lock
"""
return align_points(
measured_points=[bb.measured_bb_position for bb in self.bbs],
ideal_points=[bb.measured_field_position for bb in self.bbs],
)
[docs]
def bb_shift_instructions(self) -> str:
"""Return a string that provides instructions on how to shift the BB to the isocenter."""
translation, yaw, pitch, roll = self.bb_shift_vector
x_dir = "LEFT" if translation.x < 0 else "RIGHT"
y_dir = "IN" if translation.y > 0 else "OUT"
z_dir = "UP" if translation.z > 0 else "DOWN"
move = f"{x_dir} {abs(translation.x):2.2f}mm; {y_dir} {abs(translation.y):2.2f}mm; {z_dir} {abs(translation.z):2.2f}mm; Rotation {yaw:2.2f}°; Pitch {pitch:2.2f}°; Roll {roll:2.2f}°"
return move
def _couch_rotation_error(self) -> dict[str, dict[str, float]]:
"""Calculate the couch rotation error in degrees for reference and couch-kicked images.
This just for feature parity with SNC 🤦; the BB shift vector is more important.
Returns
-------
dict
A dictionary where the keys are the image paths and the values are a dictionary with the yaw error and nominal couch angle.
"""
couch_results = {}
couch_images = [
img
for img in self.images
if img.variable_axis in (Axis.COUCH, Axis.REFERENCE)
]
for img in couch_images:
measured_points = [m.bb for m in img.arrangement_matches.values()]
ideal_points = [m.field for m in img.arrangement_matches.values()]
_, yaw, _, _ = align_points(measured_points, ideal_points)
couch_results[img.base_path] = {
"yaw error": yaw,
"couch angle": img.couch_angle,
}
return couch_results
@property
def gantry_coll_iso_size(self) -> float:
raise NotImplementedError("Not yet implemented")
@property
def collimator_iso_size(self) -> float:
raise NotImplementedError("Not yet implemented")
@property
def couch_iso_size(self) -> float:
raise NotImplementedError("Not yet implemented")
@property
def gantry_iso_size(self) -> float:
raise NotImplementedError("Not yet implemented")
[docs]
def plot_images(
self, show: bool = True, zoomed: bool = True, legend: bool = True, **kwargs
) -> (list[plt.Figure], list[str]):
"""Make a plot for each BB. Each plot contains the analysis of that BB on each image
it was found."""
figs, names = [], []
figsize = kwargs.pop("figsize", None) or (8, 8)
for img in self.images:
fig, axes = plt.subplots(figsize=figsize, **kwargs)
img.plot(ax=axes, show=False, zoom=zoomed, legend=legend)
fig.tight_layout()
figs.append(fig)
names.append(img.base_path)
if show:
plt.show()
return figs, names
[docs]
def save_images(self, prefix: str = "", **kwargs):
"""Save the figure of `plot_images()` to file as PNG. Keyword arguments are passed to `matplotlib.pyplot.savefig()`.
Parameters
----------
prefix : str
The prefix name of the file to save to. The BB name is appended to the prefix.
"""
figs, names = self.plot_images(show=False, **kwargs)
for fig, name in zip(figs, names):
fig.savefig(prefix + "_" + str(name) + ".png", **kwargs)
[docs]
def save_images_to_stream(self, **kwargs) -> dict[str, io.BytesIO]:
"""Save the individual image plots to stream"""
figs, names = self.plot_images(show=False, **kwargs)
streams = [io.BytesIO() for _ in figs]
for fig, stream in zip(figs, streams):
fig.savefig(stream, **kwargs)
return {name: stream for name, stream in zip(names, streams)}
def _generate_results_data(self) -> WinstonLutzMultiTargetMultiFieldResult:
"""Present the results data and metadata as a dataclass or dict.
The default return type is a dataclass."""
if not self._is_analyzed:
raise ValueError("The set is not analyzed. Use .analyze() first.")
# for backward-compatibility, we have to find the max distance for each BB
# across the images.
bb_maxes = {}
for bb in self.bb_arrangement:
max_d = 0.0
for img in self.images:
if bb.name in img.arrangement_matches:
max_d = max(
max_d, img.arrangement_matches[bb.name].bb_field_distance_mm
)
bb_maxes[bb.name] = max_d
translation, yaw, pitch, roll = self.bb_shift_vector
return WinstonLutzMultiTargetMultiFieldResult(
num_total_images=len(self.images),
max_2d_field_to_bb_mm=self.max_bb_deviation_2d,
mean_2d_field_to_bb_mm=self.mean_bb_deviation_2d,
median_2d_field_to_bb_mm=self.median_bb_deviation_2d,
bb_maxes=bb_maxes,
bb_arrangement=self.bb_arrangement,
bb_shift_vector=translation,
bb_shift_yaw=yaw,
bb_shift_pitch=pitch,
bb_shift_roll=roll,
)
[docs]
def plot_summary(self, show: bool = True, fig_size: tuple | None = None):
raise NotImplementedError("Not yet implemented")
[docs]
def plot_axis_images(
self, axis: Axis = Axis.GANTRY, show: bool = True, ax: plt.Axes | None = None
):
raise NotImplementedError("Not yet implemented")
@property
def max_bb_deviation_2d(self) -> float:
"""The maximum distance from any measured BB to its nominal position"""
return self.cax2bb_distance(metric="max")
@property
def mean_bb_deviation_2d(self) -> float:
"""The mean distance from any measured BB to its nominal position"""
return self.cax2bb_distance(metric="mean")
@property
def median_bb_deviation_2d(self) -> float:
"""The median distance from any measured BB to its nominal position"""
return self.cax2bb_distance(metric="median")
[docs]
def results(self, as_list: bool = False) -> str:
"""Return the analysis results summary.
Parameters
----------
as_list : bool
Whether to return as a list of strings vs single string. Pretty much for internal usage.
"""
if not self._is_analyzed:
raise ValueError("The set is not analyzed. Use .analyze() first.")
num_imgs = len(self.images)
result = [
"Winston-Lutz Multi-Target Multi-Field Analysis",
"==============================================",
f"Number of images: {num_imgs}",
"",
"2D distances",
"============",
f"Max 2D distance of any BB->Field: {self.max_bb_deviation_2d:.2f} mm",
f"Mean 2D distance of any BB->Field: {self.mean_bb_deviation_2d:.2f} mm",
f"Median 2D distance of any BB->Field: {self.median_bb_deviation_2d:.2f} mm",
"",
]
bb_descriptions = [[bb.name, bb.to_human()] for bb in self.bb_arrangement]
bb_names = [bb[0] for bb in bb_descriptions]
result += tabulate(bb_descriptions, headers=["BB #", "Description"]).split("\n")
result += [
"",
]
data = []
for img in self.images:
img_name = img.base_path[-20:]
gantry = f"{img.gantry_angle:.1f}"
collimator = f"{img.collimator_angle:.1f}"
couch = f"{img.couch_angle:.1f}"
deviations = []
# loop through the expected BBs.
# not every BB may be in every image, so we have to loop through the
# BBs and find the match if there is one.
for bb in self.bb_arrangement:
match = Enumerable(img.arrangement_matches.items()).single_or_default(
lambda x: x[0] == bb.name
)
if match:
deviations.append(f"{match[1].bb_field_distance_mm:.2f}")
else:
deviations.append("---")
data.append([img_name, gantry, collimator, couch, *deviations])
result += tabulate(data, headers=["Image", "G", "C", "P", *bb_names]).split(
"\n"
)
result += [""]
# calculate couch-kick errors
couch_results = self._couch_rotation_error()
couch_data = [
[name[-20:], v["couch angle"], f"{v['yaw error']:.2f}"]
for name, v in couch_results.items()
]
result += tabulate(
couch_data, headers=["Image", "Couch Angle", "Yaw Error (\N{DEGREE SIGN})"]
).split("\n")
if not as_list:
result = "\n".join(result)
return result
[docs]
def publish_pdf(
self,
filename: str,
notes: str | list[str] | None = None,
open_file: bool = False,
metadata: dict | None = None,
logo: Path | str | None = None,
):
"""Publish (print) a PDF containing the analysis, images, and quantitative results.
Parameters
----------
filename : (str, file-like object)
The file to write the results to.
notes : str, list of strings
Text; if str, prints single line.
If list of strings, each list item is printed on its own line.
open_file : bool
Whether to open the file using the default program after creation.
metadata : dict
Extra data to be passed and shown in the PDF. The key and value will be shown with a colon.
E.g. passing {'Author': 'James', 'Unit': 'TrueBeam'} would result in text in the PDF like:
--------------
Author: James
Unit: TrueBeam
--------------
logo: Path, str
A custom logo to use in the PDF report. If nothing is passed, the default pylinac logo is used.
"""
if not self._is_analyzed:
raise ValueError("The set is not analyzed. Use .analyze() first.")
plt.ioff()
title = "Winston-Lutz Multi-BB Analysis"
canvas = pdf.PylinacCanvas(
filename,
page_title=title,
metadata=metadata,
logo=logo,
metadata_location=(15, 25.5),
)
text = self.results(as_list=True)
canvas.add_text(text=text, location=(1, 25.5), font="Courier")
# draw summary image on 1st page
if notes is not None:
canvas.add_text(text="Notes:", location=(1, 4.5), font_size=14)
canvas.add_text(text=notes, location=(1, 4))
# plot each BB's images
bb_streams = self.save_images_to_stream()
for stream in bb_streams.values():
canvas.add_new_page()
canvas.add_image(stream, location=(2, 7), dimensions=(18, 18))
canvas.finish()
if open_file:
webbrowser.open(filename)
# class WinstonLutzMultiTargetSingleFieldImage(WinstonLutzMultiTargetMultiFieldImage):
# def find_field_centroids(self, is_open_field: bool) -> list[Point]:
# """For the single field case, the field centroid is the same as the CAX."""
# # TODO: use the global field finder
# return [self.center]
#
# def find_field_matches(self, detected_points: list[Point]) -> dict[str, Point]:
# """For the single field case, the field centroid is the same as the CAX for every BB."""
# return {bb.name: detected_points[0] for bb in self.bb_arrangement}
#
# def field_to_bb_distances(self) -> list[float]:
# """For a single-field, multi-BB setup we shift the BBs to the isocenter by shifting by the nominal offset.
# The delta between the nominal and actual BB position is the same distance as if we were at iso.
# """
# distances = []
# for match in self.arrangement_matches.values():
# distances.append(match["bb nominal"].distance_to(match["bb"]) / self.dpmm)
# return distances
#
#
# class WinstonLutzMultiTargetSingleField(WinstonLutzMultiTargetMultiField):
# image_type = WinstonLutzMultiTargetSingleFieldImage
# images: Sequence[WinstonLutzMultiTargetSingleFieldImage]
def max_distance_to_lines(p, lines: Iterable[Line]) -> float:
"""Calculate the maximum distance to any line from the given point."""
point = Point(p[0], p[1], p[2])
return max(line.distance_to(point) for line in lines)
def bb_projection_with_rotation(
offset_left: float,
offset_up: float,
offset_in: float,
gantry: float,
couch: float,
sad: float = 1000,
) -> (float, float):
"""Calculate the isoplane projection onto the panel at the given SSD.
This function applies a rotation around the gantry plane (X/Z) to the
ball bearing (BB) position and calculates its projection onto the isocenter plane in the beam's eye view.
Could be used to calculate couch rotations, but not validated yet.
Args
----
offset_left (float): The BB position in the left/right direction.
offset_up (float): The BB position in the superior/inferior direction.
offset_in (float): The BB position in the anterior/posterior direction.
gantry (float): The gantry angle in degrees.
couch (float, optional): The couch angle in degrees. Defaults to 0.
sad (float, optional): The source-to-axis distance in mm. Defaults to 1000.
Returns
-------
left_right_projection (float): The projection of the BB onto the panel in the left/right direction.
Left is negative, right is positive. This is always in the plane normal to the CAX.
superior_inferior_projection (float): The projection of the BB onto the panel in the superior/inferior direction.
Superior is positive, inferior is negative. This is always in the plane normal to the CAX which
in this case is also the absolute Y coordinate system axis.
"""
# Define the BB positions in the patient coordinate system (ap, lr, si)
bb_positions = np.array([offset_up, offset_left, offset_in])
# Apply the rotation matrix to the BB positions
collimator = 0 # Collimator doesn't change positional projection onto panel
rotation_matrix = Rotation.from_euler(
"xyz",
[-couch, collimator, gantry],
degrees=True, # negative couch due to origin shift vs coordinate space
)
rotated_positions = rotation_matrix.apply(bb_positions)
# Calculate the projection onto the panel at the given SAD
bb_magnification = sad / (
sad - rotated_positions[0]
) # Distance from source to panel
imager_projection = (
np.array([rotated_positions[1], rotated_positions[2]]) * bb_magnification
)
return -imager_projection[0], imager_projection[1]
def straight_ray(vector: Vector, gantry_angle: float) -> Line:
"""A ray projecting from the gantry source point in the given direction.
Notes
-----
This does NOT account for couch rotation. It also generates a straight line
that goes through the vector. I.e. if the vector is 5mm left, the line will be straight
up and down 5mm left. It does not account for 2D projection from the gantry source.
The reason is that in vanilla WL, the assumption is that the field is about isocenter (i.e. not purposely offset)
and thus creating straight lines is a good approximation.
In the Multi-BB case, our fields are purposely off-center,
so this assumption does not hold for that case. See the ``ray`` function for that.
TL;DR: this is useful for vanilla WL but not Multi-BB.
"""
p1 = Point()
p2 = Point()
# point 1 - ray origin
p1.x = vector.x * cos(gantry_angle) + 20 * sin(gantry_angle)
p1.z = vector.x * -sin(gantry_angle) + 20 * cos(gantry_angle)
p1.y = vector.y
# point 2 - ray destination
p2.x = vector.x * cos(gantry_angle) - 20 * sin(gantry_angle)
p2.z = vector.x * -sin(gantry_angle) - 20 * cos(gantry_angle)
p2.y = vector.y
line = Line(p1, p2)
return line
def solve_3d_shift_vector_from_2d_planes(
xs: Sequence[float],
ys: Sequence[float],
thetas: Sequence[float],
phis: Sequence[float],
scale: MachineScale,
) -> Vector:
"""Solve for long, lat, and vert given arrays of x's, y's, theta's, and phi's.
This is Y, X, and Z in pylinac coordinate conventions.
This is a generalization of the Low et al. equation 6 and 7. This is used for both vanilla WL and Multi-BB WL.
This is intended to be a relatively strict interpretation of the Low et al. equations.
E.g. the gantry and couch angles are converted to Varian Standard scale.
The idea is this is meant for high readability when comparing to the Low et al. equations.
However, this does mean we have to invert the LONG/Y axis. See the conversion table in the docs
Parameters
----------
xs : array_like
Array of x coordinates. Positive is to the right in the local plane.
ys : array_like
Array of y coordinates. Positive is up in the local plane.
thetas : array_like
Array of gantry angles.
phis : array_like
Array of couch angles.
scale: MachineScale
The scale of the machine. IEC61217 is the default. This converts the gantry and couch angles to Varian Standard scale
which is to match Low's equations.
Returns
-------
Vector
The coordinates are in the pylinac coordinate system. See docs.
"""
if not (len(xs) == len(ys) == len(thetas) == len(phis)):
raise ValueError("The x, y, theta, and phi arrays must all be the same length.")
n = len(xs)
# convert the angles to Varian Standard scale
f_thetas, f_phis = [], []
for theta, phi in zip(thetas, phis):
g, _, c = convert(
scale,
MachineScale.VARIAN_STANDARD,
gantry=theta,
collimator=0,
rotation=phi,
)
f_thetas.append(g)
f_phis.append(c)
# Initialize A matrix and xi matrices
A = np.zeros((2 * n, 3))
xi = np.zeros(2 * n)
for i in range(n):
# A general rotation matrix and also Low eqn 6a
A[2 * i, :] = [-cos(f_phis[i]), -sin(f_phis[i]), 0]
A[2 * i + 1, :] = [
-cos(f_thetas[i]) * sin(f_phis[i]),
cos(f_thetas[i]) * cos(f_phis[i]),
-sin(f_thetas[i]),
]
# Low eqn 7
xi[2 * i] = ys[
i
] # usually this would be (x, y) but Figure 1 of Low is rotated 90 degrees.
xi[2 * i + 1] = -xs[i]
# equation 9a; B is the pseudo-inverse of A
B = np.linalg.pinv(A)
# full equation 9
long, lat, vert = B.dot(xi).squeeze()
# At this point we have the **Low et al shift vector**. Conversion to pylinac coordinates
# requires flipping the LONG axis; see the conversion table in the docs.
# we use the X/Y/Z terms for the shift to be consistent with our own convention
# and for clarity of the conversion
# Finally, this is the **SHIFT VECTOR**. The 3D position in space is the inverse of this
y = -long
x = lat
z = vert
return Vector(x=x, y=y, z=z)
def solve_3d_position_from_2d_planes(
xs: Sequence[float],
ys: Sequence[float],
thetas: Sequence[float],
phis: Sequence[float],
scale: MachineScale,
) -> Vector:
"""Solve for the 3D position of a BB in space given 2D images/vectors and the gantry/couch angles.
The good news is that the position in space is the inverse of the shift vector!
"""
return -solve_3d_shift_vector_from_2d_planes(xs, ys, thetas, phis, scale)
def conventional_to_euler_notation(axes_resolution: str) -> str:
"""Convert conventional understandings of 6DOF rotations into Euler notation.
Ensures we don't mix up x, y, z with the pylinac coordinate system.
"""
EULER = {
# FROM THE COUCH PERSPECTIVE
"pitch": "x", # positive pitch goes up
"yaw": "z",
"roll": "y", # positive angle rolls to the right
}
axes = axes_resolution.split(",")
euler = "".join([EULER[a.strip()] for a in axes])
return euler
def align_points(
measured_points: Sequence[Point],
ideal_points: Sequence[Point],
axes_order: str = "roll,pitch,yaw",
) -> (Vector, float, float, float):
"""
Aligns a set of measured points to a set of ideal points in 3D space, returning the
translation and yaw rotation needed.
Parameters
----------
measured_points : np.ndarray
The measured points as an Nx3 numpy array.
ideal_points : np.ndarray
The ideal points as an Nx3 numpy array.
axes_order : str
The order in which to resolve the axes.
Resolution is **not** independent of the axes order. I.e. doing 'yaw,pitch,roll' may result
in a different outcome.
Returns
-------
Vector, float, float, float
The vector is the cartesian translation (dx, dy, dz) and yaw, pitch, and roll angle in degrees required to align the measured points
to the ideal points.
"""
# convert from Point to stacked array (x, y, z) x N
measured_array = [[p.x, p.y, p.z] for p in measured_points]
ideal_array = [[p.x, p.y, p.z] for p in ideal_points]
# Ensure the points are centered at their centroids
measured_centroid = np.mean(measured_array, axis=0)
ideal_centroid = np.mean(ideal_array, axis=0)
measured_centered = measured_array - measured_centroid
ideal_centered = ideal_array - ideal_centroid
# Compute the covariance matrix
H = measured_centered.T @ ideal_centered
# Singular Value Decomposition
U, _, Vt = np.linalg.svd(H)
rotation_matrix = Vt.T @ U.T
# Ensure a right-handed coordinate system
if np.linalg.det(rotation_matrix) < 0:
Vt[2, :] *= -1
rotation_matrix = Vt.T @ U.T
# Compute the euler angles
rotation = Rotation.from_matrix(rotation_matrix)
euler = conventional_to_euler_notation(axes_order)
roll, pitch, yaw = rotation.as_euler(euler, degrees=True)
rotated_measured_centroid = rotation.apply(measured_centroid)
translation = ideal_centroid - rotated_measured_centroid
return Vector(*translation), yaw, pitch, roll