Source code for pylinac.plan_generator.dicom

from __future__ import annotations

import datetime
import math
from abc import ABC
from collections.abc import Iterable
from enum import Enum
from pathlib import Path
from typing import Literal

import numpy as np
import pydicom
from matplotlib.figure import Figure
from pydicom.dataset import Dataset
from pydicom.sequence import Sequence
from pydicom.uid import generate_uid

from ..core import scale
from ..core.image_generator.layers import ArrayLayer
from ..core.image_generator.simulators import Simulator
from ..core.scale import wrap360
from .fluence import generate_fluences, plot_fluences
from .mlc import MLCShaper


class GantryDirection(Enum):
    CLOCKWISE = "CW"
    COUNTER_CLOCKWISE = "CC"
    NONE = "NONE"


class GantrySpeedTransition(Enum):
    LEADING = "leading"
    TRAILING = "trailing"


class FluenceMode(Enum):
    STANDARD = "STANDARD"
    FFF = "FFF"
    SRS = "SRS"


class Stack(Enum):
    DISTAL = "distal"
    PROXIMAL = "proximal"
    BOTH = "both"


MLC_MILLENNIUM_BOUNDARIES = (
    list(np.arange(-200, -100 + 1, 10))
    + list(np.arange(-95, 95 + 1, 5))
    + list(np.arange(100, 200 + 1, 10))
)
MLC_120HDMIL_BOUNDARIES = (
    list(np.arange(-110, -40 + 1, 5))
    + list(np.arange(-37.5, 37.5 + 1, 2.5))
    + list(np.arange(40, 110 + 1, 10))
)
MLC_DISTAL_BOUNDARIES = list(np.arange(-140, 140 + 1, 10))
MLC_PROXIMAL_BOUNDARIES = list(np.arange(-145, 145 + 1, 10))


def _create_basic_beam_info(
    beam_name: str,
    beam_type: str,
    fluence_mode: FluenceMode,
    beam_limiting_device_sequence: Sequence,
    number_of_control_points: int,
) -> Dataset:
    beam = Dataset()
    beam.Manufacturer = "Radformation"
    beam.ManufacturerModelName = "RadMachine"
    beam.PrimaryDosimeterUnit = "MU"
    beam.SourceAxisDistance = 1000.0

    # Primary Fluence Mode Sequence
    primary_fluence_mode_sequence = Sequence()
    beam.PrimaryFluenceModeSequence = primary_fluence_mode_sequence

    primary_fluence_mode1 = Dataset()
    if fluence_mode == FluenceMode.STANDARD:
        primary_fluence_mode1.FluenceMode = "STANDARD"
    elif fluence_mode == FluenceMode.FFF:
        primary_fluence_mode1.FluenceMode = "NON_STANDARD"
        primary_fluence_mode1.FluenceModeID = "FFF"
    elif fluence_mode == FluenceMode.SRS:
        primary_fluence_mode1.FluenceMode = "NON_STANDARD"
        primary_fluence_mode1.FluenceModeID = "SRS"

    primary_fluence_mode_sequence.append(primary_fluence_mode1)

    # Beam Limiting Device Sequence
    beam.BeamLimitingDeviceSequence = beam_limiting_device_sequence

    # beam numbers start at 0 and increment from there.
    beam.BeamName = beam_name
    beam.BeamType = beam_type
    beam.RadiationType = "PHOTON"
    beam.TreatmentDeliveryType = "TREATMENT"
    beam.NumberOfWedges = 0
    beam.NumberOfCompensators = 0
    beam.NumberOfBoli = 0
    beam.NumberOfBlocks = 0
    beam.FinalCumulativeMetersetWeight = 1.0
    beam.NumberOfControlPoints = number_of_control_points

    # Control Point Sequence
    cp_sequence = Sequence()
    beam.ControlPointSequence = cp_sequence
    return beam


class _Beam(ABC):
    """Represents a DICOM beam dataset. Has methods for creating the dataset and adding control points.
    Generally not created on its own but rather under the hood as part of a PlanGenerator object.

    It contains enough independent logic steps that it's worth separating out from the PlanGenerator class.
    """

    meterset: float

    def __init__(
        self,
        beam_limiting_device_sequence: Sequence,
        beam_name: str,
        energy: float,
        fluence_mode: FluenceMode,
        dose_rate: int,
        metersets: list[float],
        gantry_angles: float | list[float],
        coll_angle: float,
        beam_limiting_device_positions: dict[str, list],
        couch_vrt: float,
        couch_lat: float,
        couch_lng: float,
        couch_rot: float,
    ):
        """
        Parameters
        ----------
        beam_limiting_device_sequence : Sequence
            The beam_limiting_device_sequence as defined in the template plan.
        beam_name : str
            The name of the beam. Must be less than 16 characters.
        energy : float
            The energy of the beam.
        fluence_mode : FluenceMode
            The fluence mode of the beam.
        dose_rate : int
            The dose rate of the beam.
        metersets : list[float]
            The meter sets for each control point.
        gantry_angles : Union[float, list[float]]
            The gantry angle(s) of the beam. If a single number, it's assumed to be a static beam. If multiple numbers, it's assumed to be a dynamic beam.
        coll_angle : float
            The collimator angle.
        beam_limiting_device_positions : dict[str, list]
            The positions of the beam_limiting_device_positions for each control point,
            where key is the type of beam limiting device (e.g. "MLCX") and the value contains the positions.
        couch_vrt : float
            The couch vertical position.
        couch_lat : float
            The couch lateral position.
        couch_lng : float
            The couch longitudinal position.
        couch_rot : float
            The couch rotation.
        """
        ROUNDING_DECIMALS = 6

        number_of_control_points = len(metersets)

        # The Meterset at a given Control Point is equal to Beam Meterset (300A,0086)
        # specified in the Referenced Beam Sequence (300C,0004) of the RT Fraction Scheme Module,
        # multiplied by the Cumulative Meterset Weight (300A,0134) for the Control Point,
        # divided by the Final Cumulative Meterset Weight (300A,010E)
        # https://dicom.innolitics.com/ciods/rt-plan/rt-beams/300a00b0/300a0111/300a0134
        metersets_weights = np.array(metersets) / metersets[-1]
        self.meterset = np.round(metersets[-1], ROUNDING_DECIMALS)

        if len(beam_name) > 16:
            raise ValueError("Beam name must be less than or equal to 16 characters")

        if not isinstance(gantry_angles, Iterable):
            # if it's just a single number (like for a static beam) set it to an array of that value
            gantry_angles = [gantry_angles] * number_of_control_points

        # Round all possible dynamic elements  to avoid floating point comparisons.
        # E.g. to evaluate is an axis is static, all elements should be equal to the first
        # Note: using np.isclose does not solve the problem since the tolerance should be the same
        # as Eclipse/Machine, and we don't know which tolerance they use.
        # Here we assume that their tolerance is tighter than ROUNDING_DECIMALS
        metersets_weights = np.round(metersets_weights, ROUNDING_DECIMALS)
        gantry_angles = np.round(gantry_angles, ROUNDING_DECIMALS)
        bld_positions = {
            k: np.round(v, ROUNDING_DECIMALS)
            for k, v in beam_limiting_device_positions.items()
        }

        # Infer gantry rotation from the gantry angles
        # It assumes the gantry cannot rotate over 180, so there is only one possible direction to go from A to B.
        ga_wrap180 = scale.wrap180(np.array(gantry_angles))
        # This dictionary is used for mapping the sign of the difference with the GantryDirection enum.
        gantry_direction_map = {
            0: GantryDirection.NONE,
            1: GantryDirection.CLOCKWISE,
            -1: GantryDirection.COUNTER_CLOCKWISE,
        }
        gantry_direction = [
            gantry_direction_map[s] for s in np.sign(np.diff(ga_wrap180))
        ]
        # The last GantryRotationDirection should always be 'NONE'
        gantry_direction += [GantryDirection.NONE]

        # Infer if a beam is static or dynamic from the control points
        gantry_is_static = len(set(gantry_direction)) == 1
        dict_bld_is_static = {
            k: np.all(pos == pos[0]) for k, pos in bld_positions.items()
        }
        blds_are_static = np.all(list(dict_bld_is_static.values()))
        beam_is_static = gantry_is_static and blds_are_static
        beam_type = "STATIC" if beam_is_static else "DYNAMIC"

        # Create dataset with basic beam info
        self.ds = _create_basic_beam_info(
            beam_name,
            beam_type,
            fluence_mode,
            beam_limiting_device_sequence=beam_limiting_device_sequence,
            number_of_control_points=number_of_control_points,
        )

        # Add initial control point
        cp0 = Dataset()
        cp0.ControlPointIndex = 0
        cp0.NominalBeamEnergy = energy
        cp0.DoseRateSet = dose_rate
        beam_limiting_device_position_sequence = Sequence()
        cp0.BeamLimitingDevicePositionSequence = beam_limiting_device_position_sequence
        for key, values in bld_positions.items():
            beam_limiting_device_position = Dataset()
            beam_limiting_device_position.RTBeamLimitingDeviceType = key
            beam_limiting_device_position.LeafJawPositions = list(values[0])
            beam_limiting_device_position_sequence.append(beam_limiting_device_position)
        cp0.GantryAngle = gantry_angles[0]
        cp0.GantryRotationDirection = gantry_direction[0].value
        cp0.BeamLimitingDeviceAngle = coll_angle
        cp0.BeamLimitingDeviceRotationDirection = "NONE"
        cp0.PatientSupportAngle = couch_rot
        cp0.PatientSupportRotationDirection = "NONE"
        cp0.TableTopEccentricAngle = 0.0
        cp0.TableTopEccentricRotationDirection = "NONE"
        cp0.TableTopVerticalPosition = couch_vrt
        cp0.TableTopLongitudinalPosition = couch_lng
        cp0.TableTopLateralPosition = couch_lat
        cp0.IsocenterPosition = None
        cp0.CumulativeMetersetWeight = 0.0
        self.ds.ControlPointSequence.append(cp0)

        # Add rest of the control points
        for cp_idx in range(1, number_of_control_points):
            cp = Dataset()
            cp.ControlPointIndex = cp_idx
            cp.CumulativeMetersetWeight = metersets_weights[cp_idx]

            if not gantry_is_static:
                cp.GantryAngle = gantry_angles[cp_idx]
                cp.GantryRotationDirection = gantry_direction[cp_idx].value

            bld_position_sequence = Sequence()
            for bld, positions in bld_positions.items():
                if not dict_bld_is_static[bld]:
                    bld_position = Dataset()
                    bld_position.RTBeamLimitingDeviceType = bld
                    bld_position.LeafJawPositions = list(positions[cp_idx])
                    bld_position_sequence.append(bld_position)
            if len(bld_position_sequence) > 0:
                cp.BeamLimitingDevicePositionSequence = bld_position_sequence

            self.ds.ControlPointSequence.append(cp)

    def as_dicom(self) -> Dataset:
        """Return the beam as a DICOM dataset that represents a BeamSequence item."""
        return self.ds


[docs] class TrueBeamBeam(_Beam): """Represents a DICOM beam dataset for a TrueBeam. Has methods for creating the dataset and adding control points. Generally not created on its own but rather under the hood as part of a PlanGenerator object. It contains enough independent logic steps that it's worth separating out from the PlanGenerator class. """ def __init__( self, is_mlc_hd: bool, beam_name: str, energy: float, fluence_mode: FluenceMode, dose_rate: int, metersets: list[float], gantry_angles: float | list[float], x1: float, x2: float, y1: float, y2: float, mlc_positions: list[list[float]], coll_angle: float, couch_vrt: float, couch_lat: float, couch_lng: float, couch_rot: float, ): """ Parameters ---------- is_mlc_hd : bool Whether the MLC type is HD or Millennium beam_name : str The name of the beam. Must be less than 16 characters. energy : float The energy of the beam. fluence_mode : FluenceMode The fluence mode of the beam. dose_rate : int The dose rate of the beam. metersets : list[float] The meter sets for each control point. The length must match the number of control points in mlc_positions. gantry_angles : Union[float, list[float]] The gantry angle(s) of the beam. If a single number, it's assumed to be a static beam. If multiple numbers, it's assumed to be a dynamic beam. x1 : float The left jaw position. x2 : float The right jaw position. y1 : float The bottom jaw position. y2 : float The top jaw position. mlc_positions : list[list[float]] The MLC positions for each control point. This is the x-position of each leaf for each control point. coll_angle : float The collimator angle. couch_vrt : float The couch vertical position. couch_lat : float The couch lateral position. couch_lng : float The couch longitudinal position. couch_rot : float The couch rotation. """ jaw_x = Dataset() jaw_x.RTBeamLimitingDeviceType = "X" jaw_x.NumberOfLeafJawPairs = 1 jaw_y = Dataset() jaw_y.RTBeamLimitingDeviceType = "Y" jaw_y.NumberOfLeafJawPairs = 1 jaw_asymx = Dataset() jaw_asymx.RTBeamLimitingDeviceType = "ASYMX" jaw_asymx.NumberOfLeafJawPairs = 1 jaw_asymy = Dataset() jaw_asymy.RTBeamLimitingDeviceType = "ASYMX" jaw_asymy.NumberOfLeafJawPairs = 1 mlc = Dataset() mlc.RTBeamLimitingDeviceType = "MLCX" mlc.NumberOfLeafJawPairs = 60 if is_mlc_hd: mlc.LeafPositionBoundaries = MLC_120HDMIL_BOUNDARIES else: mlc.LeafPositionBoundaries = MLC_MILLENNIUM_BOUNDARIES bld_sequence = Sequence((jaw_x, jaw_y, jaw_asymx, jaw_asymy, mlc)) beam_limiting_device_positions = { "ASYMX": [[x1, x2]], "ASYMY": [[y1, y2]], "MLCX": mlc_positions, } super().__init__( beam_limiting_device_sequence=bld_sequence, beam_name=beam_name, energy=energy, fluence_mode=fluence_mode, dose_rate=dose_rate, metersets=metersets, gantry_angles=gantry_angles, beam_limiting_device_positions=beam_limiting_device_positions, coll_angle=coll_angle, couch_vrt=couch_vrt, couch_lat=couch_lat, couch_lng=couch_lng, couch_rot=couch_rot, )
[docs] class HalcyonBeam(_Beam): """A beam representing a Halcyon. Halcyons have dual MLC stacks, no X-jaws, no couch rotation, etc.""" def __init__( self, beam_name: str, metersets: list[float], gantry_angles: float | list[float], distal_mlc_positions: list[list[float]], proximal_mlc_positions: list[list[float]], coll_angle: float, couch_vrt: float, couch_lat: float, couch_lng: float, ): """ Parameters ---------- beam_name : str The name of the beam. Must be less than 16 characters. metersets : list[float] The meter sets for each control point. The length must match the number of control points in mlc_positions. gantry_angles : Union[float, list[float]] The gantry angle(s) of the beam. If a single number, it's assumed to be a static beam. If multiple numbers, it's assumed to be a dynamic beam. distal_mlc_positions : list[list[float]] The distal MLC positions for each control point. This is the x-position of each leaf for each control point. proximal_mlc_positions : list[list[float]] The proximal MLC positions for each control point. This is the x-position of each leaf for each control point. coll_angle : float The collimator angle. couch_vrt : float The couch vertical position. couch_lat : float The couch lateral position. couch_lng : float The couch longitudinal position. """ jaw_x = Dataset() jaw_x.RTBeamLimitingDeviceType = "X" jaw_x.NumberOfLeafJawPairs = 1 jaw_y = Dataset() jaw_y.RTBeamLimitingDeviceType = "Y" jaw_y.NumberOfLeafJawPairs = 1 mlc_x1 = Dataset() mlc_x1.RTBeamLimitingDeviceType = "MLCX1" mlc_x1.NumberOfLeafJawPairs = 28 mlc_x1.LeafPositionBoundaries = MLC_DISTAL_BOUNDARIES mlc_x2 = Dataset() mlc_x2.RTBeamLimitingDeviceType = "MLCX2" mlc_x2.NumberOfLeafJawPairs = 29 mlc_x2.LeafPositionBoundaries = MLC_PROXIMAL_BOUNDARIES bld_sequence = Sequence((jaw_x, jaw_y, mlc_x1, mlc_x2)) beam_limiting_device_positions = { "X": [[-140, 140]], "Y": [[-140, 140]], "MLCX1": distal_mlc_positions, "MLCX2": proximal_mlc_positions, } super().__init__( beam_limiting_device_sequence=bld_sequence, beam_name=beam_name, energy=6, fluence_mode=FluenceMode.FFF, dose_rate=600, metersets=metersets, gantry_angles=gantry_angles, beam_limiting_device_positions=beam_limiting_device_positions, coll_angle=coll_angle, couch_vrt=couch_vrt, couch_lat=couch_lat, couch_lng=couch_lng, couch_rot=0, )
class PlanGenerator(ABC): def __init__( self, ds: Dataset, plan_label: str, plan_name: str, patient_name: str | None, patient_id: str | None, max_mlc_position: float, max_mlc_speed: float, max_gantry_speed: float, max_overtravel_mm: float, ): """A tool for generating new QA RTPlan files based on an initial, somewhat empty RTPlan file. Parameters ---------- ds : Dataset The RTPLAN dataset to base the new plan off of. The plan must already have MLC positions. plan_label : str The label of the new plan. plan_name : str The name of the new plan. patient_name : str, optional The name of the patient. If not provided, it will be taken from the RTPLAN file. patient_id : str, optional The ID of the patient. If not provided, it will be taken from the RTPLAN file. max_mlc_position : float The max mlc position in mm max_mlc_speed : float The maximum speed of the MLC leaves in mm/s max_gantry_speed : float The maximum speed of the gantry in degrees/s. max_overtravel_mm : float The maximum distance the MLC leaves can overtravel from each other as well as the jaw size (for tail exposure protection). """ if ds.Modality != "RTPLAN": raise ValueError("File is not an RTPLAN file") self.max_overtravel_mm = max_overtravel_mm self.max_mlc_position = max_mlc_position self.max_mlc_speed = max_mlc_speed self.max_gantry_speed = max_gantry_speed patient_name = patient_name or getattr(ds, "PatientName", None) if not patient_name: raise ValueError( "RTPLAN file must have PatientName or pass it via `patient_name`" ) patient_id = patient_id or getattr(ds, "PatientID", None) if not patient_id: raise ValueError( "RTPLAN file must have PatientID or pass it via `patient_id`" ) if not hasattr(ds, "ToleranceTableSequence"): raise ValueError("RTPLAN file must have ToleranceTableSequence") if not hasattr(ds, "BeamSequence") or not hasattr( ds.BeamSequence[0].BeamLimitingDeviceSequence[-1], "LeafPositionBoundaries" ): raise ValueError("RTPLAN file must have MLC data") self.ds = ds ###### Clear/initialize the metadata for the new plan self.ds.PatientName = patient_name self.ds.PatientID = patient_id self.ds.RTPlanLabel = plan_label self.ds.RTPlanName = plan_name date = datetime.datetime.now().strftime("%Y%m%d") time = datetime.datetime.now().strftime("%H%M%S") self.ds.InstanceCreationDate = date self.ds.InstanceCreationTime = time self.ds.SOPInstanceUID = generate_uid() # Patient Setup Sequence patient_setup = Dataset() patient_setup.PatientPosition = "HFS" patient_setup.PatientSetupNumber = "0" self.ds.PatientSetupSequence = Sequence((patient_setup,)) # Dose Reference Sequence dose_ref_sequence = Sequence() self.ds.DoseReferenceSequence = dose_ref_sequence dose_ref1 = Dataset() dose_ref1.DoseReferenceNumber = "1" dose_ref1.DoseReferenceUID = generate_uid() dose_ref1.DoseReferenceStructureType = "SITE" dose_ref1.DoseReferenceDescription = "PTV" dose_ref1.DoseReferenceType = "TARGET" dose_ref1.DeliveryMaximumDose = "20.0" dose_ref1.TargetPrescriptionDose = "40.0" dose_ref1.TargetMaximumDose = "20.0" self.ds.DoseReferenceSequence.append(dose_ref1) # Fraction Group Sequence frxn_gp_sequence = Sequence() self.ds.FractionGroupSequence = frxn_gp_sequence # Fraction Group Sequence: Fraction Group 1 frxn_gp1 = Dataset() frxn_gp1.FractionGroupNumber = 1 frxn_gp1.NumberOfFractionsPlanned = 1 frxn_gp1.NumberOfBeams = 0 frxn_gp1.NumberOfBrachyApplicationSetups = 0 # Referenced Beam Sequence refd_beam_sequence = Sequence() frxn_gp1.ReferencedBeamSequence = refd_beam_sequence self.ds.FractionGroupSequence.append(frxn_gp1) # Beam Sequence self._old_beam = self.ds.BeamSequence[0] beam_sequence = Sequence() self.ds.BeamSequence = beam_sequence @classmethod def from_rt_plan_file(cls, rt_plan_file: str | Path, **kwargs) -> PlanGenerator: """Load an existing RTPLAN file and create a new plan based on it. Parameters ---------- rt_plan_file : str The path to the RTPLAN file. kwargs See the PlanGenerator constructor for details. """ ds = pydicom.dcmread(rt_plan_file) return cls(ds, **kwargs) @property def machine_name(self) -> str: """The name of the machine.""" return self._old_beam.TreatmentMachineName def add_beam(self, beam: HalcyonBeam | TrueBeamBeam): """Add a beam to the plan using the Beam object. Although public, this is a low-level method that is used by the higher-level methods like add_open_field_beam. This handles the associated metadata like the referenced beam sequence and fraction group sequence. """ beam_dataset = beam.as_dicom() # Update the beam beam_dataset.BeamNumber = len(self.ds.BeamSequence) + 1 beam_dataset.TreatmentMachineName = self.machine_name patient_setup_nr = self.ds.PatientSetupSequence[0].PatientSetupNumber beam_dataset.ReferencedPatientSetupNumber = patient_setup_nr tolerance_table_nr = self.ds.ToleranceTableSequence[0].ToleranceTableNumber beam_dataset.ReferencedToleranceTableNumber = tolerance_table_nr self.ds.BeamSequence.append(beam_dataset) # increment number of beams fr = self.ds.FractionGroupSequence[0] fr.NumberOfBeams += 1 # Update plan references referenced_beam = Dataset() referenced_beam.BeamDose = 1.0 referenced_beam.BeamMeterset = beam.meterset referenced_beam.ReferencedBeamNumber = beam_dataset.BeamNumber dose_reference_uid = self.ds.DoseReferenceSequence[0].DoseReferenceUID referenced_beam.ReferencedDoseReferenceUID = dose_reference_uid self.ds.FractionGroupSequence[0].ReferencedBeamSequence.append(referenced_beam) def to_file(self, filename: str | Path) -> None: """Write the DICOM dataset to file""" self.ds.save_as(filename, write_like_original=False) def as_dicom(self) -> Dataset: """Return the new DICOM dataset.""" return self.ds def plot_fluences( self, width_mm: float = 400, resolution_mm: float = 0.5, dtype: np.dtype = np.uint16, ) -> list[Figure]: """Plot the fluences of the beams generated See Also -------- :func:`~pydicom_planar.PlanarImage.plot_fluences` """ return plot_fluences(self.as_dicom(), width_mm, resolution_mm, dtype, show=True) def to_dicom_images( self, simulator: type[Simulator], invert: bool = True ) -> list[Dataset]: """Generate simulated DICOM images of the plan. This provides a way to generate an end-to-end simulation of the plan. The images will always be at 1000mm SID. Parameters ---------- simulator : Simulator The simulator to use to generate the images. This provides the size of the image and the pixel size invert: bool Invert the fluence. Setting to True simulates EPID-style images where dose->lower pixel value. """ image_ds = [] fluences = generate_fluences( rt_plan=self.as_dicom(), width_mm=simulator.shape[1] * simulator.pixel_size, resolution_mm=simulator.pixel_size, ) for beam, fluence in zip(self.ds.BeamSequence, fluences): beam_info = beam.ControlPointSequence[0] sim = simulator(sid=1000) sim.add_layer(ArrayLayer(fluence)) ds = sim.as_dicom( gantry_angle=beam_info.GantryAngle, coll_angle=beam_info.BeamLimitingDeviceAngle, table_angle=beam_info.PatientSupportAngle, invert_array=invert, ) image_ds.append(ds) return image_ds
[docs] class TrueBeamPlanGenerator(PlanGenerator): def __init__( self, ds: Dataset, plan_label: str, plan_name: str, patient_name: str | None = None, patient_id: str | None = None, max_mlc_position: float = 200, max_mlc_speed: float = 25, max_gantry_speed: float = 4.8, max_overtravel_mm: float = 140, ): """A tool for generating new QA RTPlan files based on an initial, somewhat empty RTPlan file. Parameters ---------- ds : Dataset The RTPLAN dataset to base the new plan off of. The plan must already have MLC positions. plan_label : str The label of the new plan. plan_name : str The name of the new plan. patient_name : str, optional The name of the patient. If not provided, it will be taken from the RTPLAN file. patient_id : str, optional The ID of the patient. If not provided, it will be taken from the RTPLAN file. max_mlc_position : float The max mlc position in mm max_mlc_speed : float The maximum speed of the MLC leaves in mm/s max_gantry_speed : float The maximum speed of the gantry in degrees/s. max_overtravel_mm : float The maximum distance the MLC leaves can overtravel from each other as well as the jaw size (for tail exposure protection). """ super().__init__( ds, plan_label, plan_name, patient_name, patient_id, max_mlc_position, max_mlc_speed, max_gantry_speed, max_overtravel_mm, ) @property def leaf_config(self) -> list[float]: """The leaf boundaries of the MLC.""" mlc = next( bld for bld in self._old_beam.BeamLimitingDeviceSequence if bld.RTBeamLimitingDeviceType == "MLCX" ) return mlc.LeafPositionBoundaries @property def is_mlc_hd(self) -> bool: """Whether the MLC type id HD or Millennium.""" return self.leaf_config[0] == -110 def _create_mlc( self, sacrifice_gap_mm: float = None, sacrifice_max_move_mm: float = None ) -> MLCShaper: """Utility to create MLC shaper instances.""" return MLCShaper( leaf_y_positions=self.leaf_config, max_mlc_position=self.max_mlc_position, sacrifice_gap_mm=sacrifice_gap_mm, sacrifice_max_move_mm=sacrifice_max_move_mm, max_overtravel_mm=self.max_overtravel_mm, )
[docs] def add_picketfence_beam( self, strip_width_mm: float = 3, strip_positions_mm: tuple[float | int, ...] = (-45, -30, -15, 0, 15, 30, 45), y1: float = -100, y2: float = 100, fluence_mode: FluenceMode = FluenceMode.STANDARD, dose_rate: int = 600, energy: float = 6, gantry_angle: float = 0, coll_angle: float = 0, couch_vrt: float = 0, couch_lng: float = 1000, couch_lat: float = 0, couch_rot: float = 0, mu: int = 200, jaw_padding_mm: float = 10, beam_name: str = "PF", max_sacrificial_move_mm: float = 50, ): """Add a picket fence beam to the plan. Parameters ---------- strip_width_mm : float The width of the strips in mm. strip_positions_mm : tuple The positions of the strips in mm relative to the center of the image. y1 : float The bottom jaw position. Usually negative. More negative is lower. y2 : float The top jaw position. Usually positive. More positive is higher. fluence_mode : FluenceMode The fluence mode of the beam. dose_rate : int The dose rate of the beam. energy : float The energy of the beam. gantry_angle : float The gantry angle of the beam. coll_angle : float The collimator angle of the beam. couch_vrt : float The couch vertical position. couch_lng : float The couch longitudinal position. couch_lat : float The couch lateral position. couch_rot : float The couch rotation. mu : int The monitor units of the beam. jaw_padding_mm : float The padding to add to the X jaws. beam_name : str The name of the beam. max_sacrificial_move_mm : float The maximum distance the sacrificial leaves can move in a given control point. Smaller values generate more control points and more back-and-forth movement. Too large of values may cause deliverability issues. """ # check MLC overtravel; machine may prevent delivery if exposing leaf tail x1 = min(strip_positions_mm) - jaw_padding_mm x2 = max(strip_positions_mm) + jaw_padding_mm max_dist_to_jaw = max( max(abs(pos - x1), abs(pos + x2)) for pos in strip_positions_mm ) if max_dist_to_jaw > self.max_overtravel_mm: raise ValueError( "Picket fence beam exceeds MLC overtravel limits. Lower padding, the number of pickets, or the picket spacing." ) mlc = self._create_mlc(sacrifice_max_move_mm=max_sacrificial_move_mm) # create initial starting point; start under the jaws mlc.add_strip( position_mm=strip_positions_mm[0] - 2, strip_width_mm=strip_width_mm, meterset_at_target=0, ) for strip in strip_positions_mm: # starting control point mlc.add_strip( position_mm=strip, strip_width_mm=strip_width_mm, meterset_at_target=1 / len(strip_positions_mm), ) beam = TrueBeamBeam( beam_name=beam_name, energy=energy, dose_rate=dose_rate, x1=x1, x2=x2, y1=y1, y2=y2, gantry_angles=gantry_angle, coll_angle=coll_angle, couch_vrt=couch_vrt, couch_lat=couch_lat, couch_lng=couch_lng, couch_rot=couch_rot, mlc_positions=mlc.as_control_points(), metersets=[mu * m for m in mlc.as_metersets()], fluence_mode=fluence_mode, is_mlc_hd=self.is_mlc_hd, ) self.add_beam(beam)
[docs] def add_mlc_transmission( self, bank: Literal["A", "B"], mu: int = 50, overreach: float = 10, beam_name: str = "MLC Tx", energy: int = 6, dose_rate: int = 600, x1: float = -50, x2: float = 50, y1: float = -100, y2: float = 100, gantry_angle: float = 0, coll_angle: float = 0, couch_vrt: float = 0, couch_lat: float = 0, couch_lng: float = 1000, couch_rot: float = 0, fluence_mode: FluenceMode = FluenceMode.STANDARD, ): """Add a single-image MLC transmission beam to the plan. The beam is delivered with the MLCs closed and moved to one side underneath the jaws. Parameters ---------- bank : str The MLC bank to move. Either "A" or "B". mu : int The monitor units to deliver. overreach : float The amount to tuck the MLCs under the jaws in mm. beam_name : str The name of the beam. energy : int The energy of the beam. dose_rate : int The dose rate of the beam. x1 : float The left jaw position. Usually negative. More negative is left. x2 : float The right jaw position. Usually positive. More positive is right. y1 : float The bottom jaw position. Usually negative. More negative is lower. y2 : float The top jaw position. Usually positive. More positive is higher. gantry_angle : float The gantry angle of the beam in degrees. coll_angle : float The collimator angle of the beam in degrees. couch_vrt : float The couch vertical position. couch_lat : float The couch lateral position. couch_lng : float The couch longitudinal position. couch_rot : float The couch rotation in degrees. fluence_mode : FluenceMode The fluence mode of the beam. """ mlc = self._create_mlc() if bank == "A": mlc_tips = x2 + overreach elif bank == "B": mlc_tips = x1 - overreach else: raise ValueError("Bank must be 'A' or 'B'") # test for overtravel if abs(x2 - x1) + overreach > self.max_overtravel_mm: raise OvertravelError( "The MLC overtravel is too large for the given jaw positions and overreach. Reduce the x-jaw opening size and/or overreach value." ) mlc.add_strip( position_mm=mlc_tips, strip_width_mm=1, meterset_at_target=1, ) beam = TrueBeamBeam( beam_name=f"{beam_name} {bank}", energy=energy, dose_rate=dose_rate, x1=x1, x2=x2, y1=y1, y2=y2, gantry_angles=gantry_angle, coll_angle=coll_angle, couch_vrt=couch_vrt, couch_lat=couch_lat, couch_lng=couch_lng, couch_rot=couch_rot, mlc_positions=mlc.as_control_points(), metersets=[mu * m for m in mlc.as_metersets()], fluence_mode=fluence_mode, is_mlc_hd=self.is_mlc_hd, ) self.add_beam(beam)
[docs] def add_dose_rate_beams( self, dose_rates: tuple[int, ...] = (100, 300, 500, 600), default_dose_rate: int = 600, gantry_angle: float = 0, desired_mu: int = 50, energy: float = 6, fluence_mode: FluenceMode = FluenceMode.STANDARD, coll_angle: float = 0, couch_vrt: float = 0, couch_lat: float = 0, couch_lng: float = 1000, couch_rot: float = 0, jaw_padding_mm: float = 5, roi_size_mm: float = 25, y1: float = -100, y2: float = 100, max_sacrificial_move_mm: float = 50, ): """Create a single-image dose rate test. Multiple ROIs are generated. A reference beam is also created where all ROIs are delivered at the default dose rate for comparison. The field names are generated automatically based on the min and max dose rates tested. Parameters ---------- dose_rates : tuple The dose rates to test in MU/min. Each dose rate will have its own ROI. default_dose_rate : int The default dose rate. Typically, this is the clinical default. The reference beam will be delivered at this dose rate for all ROIs. gantry_angle : float The gantry angle of the beam. desired_mu : int The desired monitor units to deliver. It can be that based on the dose rates asked for, the MU required might be higher than this value. energy : float The energy of the beam. fluence_mode : FluenceMode The fluence mode of the beam. coll_angle : float The collimator angle of the beam. couch_vrt : float The couch vertical position. couch_lat : float The couch lateral position. couch_lng : float The couch longitudinal position. couch_rot : float The couch rotation. jaw_padding_mm : float The padding to add to the X jaws. The X-jaws will close around the ROIs plus this padding. roi_size_mm : float The width of the ROIs in mm. y1 : float The bottom jaw position. Usually negative. More negative is lower. y2 : float The top jaw position. Usually positive. More positive is higher. max_sacrificial_move_mm : float The maximum distance the sacrificial leaves can move in a given control point. Smaller values generate more control points and more back-and-forth movement. Too large of values may cause deliverability issues. """ if roi_size_mm * len(dose_rates) > self.max_overtravel_mm: raise ValueError( "The ROI size * number of dose rates must be less than the overall MLC allowable width" ) # calculate MU mlc_transition_time = roi_size_mm / self.max_mlc_speed min_mu = mlc_transition_time * max(dose_rates) * len(dose_rates) / 60 mu = max(desired_mu, math.ceil(min_mu)) # create MLC sacrifices times_to_transition = [ mu * 60 / (dose_rate * len(dose_rates)) for dose_rate in dose_rates ] sacrificial_movements = [tt * self.max_mlc_speed for tt in times_to_transition] mlc = self._create_mlc(sacrifice_max_move_mm=max_sacrificial_move_mm) ref_mlc = self._create_mlc() roi_centers = np.linspace( -roi_size_mm * len(dose_rates) / 2 + roi_size_mm / 2, roi_size_mm * len(dose_rates) / 2 - roi_size_mm / 2, len(dose_rates), ) # we have a starting and ending strip ref_mlc.add_strip( position_mm=float(roi_centers[0] - roi_size_mm / 2), strip_width_mm=0, meterset_at_target=0, ) mlc.add_strip( position_mm=float(roi_centers[0] - roi_size_mm / 2), strip_width_mm=0, meterset_at_target=0, initial_sacrificial_gap_mm=5, ) for sacrifice_distance, center in zip(sacrificial_movements, roi_centers): ref_mlc.add_rectangle( left_position=center - roi_size_mm / 2, right_position=center + roi_size_mm / 2, x_outfield_position=-200, top_position=max(self.leaf_config), bottom_position=min(self.leaf_config), outer_strip_width=5, meterset_at_target=0, meterset_transition=0.5 / len(dose_rates), sacrificial_distance=0, ) ref_mlc.add_strip( position_mm=center + roi_size_mm / 2, strip_width_mm=0, meterset_at_target=0, meterset_transition=0.5 / len(dose_rates), sacrificial_distance_mm=0, ) mlc.add_rectangle( left_position=center - roi_size_mm / 2, right_position=center + roi_size_mm / 2, x_outfield_position=-200, # not used top_position=max(self.leaf_config), bottom_position=min(self.leaf_config), outer_strip_width=5, # not used meterset_at_target=0, meterset_transition=0.5 / len(dose_rates), sacrificial_distance=sacrifice_distance, ) mlc.add_strip( position_mm=center + roi_size_mm / 2, strip_width_mm=0, meterset_at_target=0, meterset_transition=0.5 / len(dose_rates), sacrificial_distance_mm=sacrifice_distance, ) ref_beam = TrueBeamBeam( beam_name="DR Ref", energy=energy, dose_rate=default_dose_rate, x1=float(roi_centers[0] - roi_size_mm / 2 - jaw_padding_mm), x2=float(roi_centers[-1] + roi_size_mm / 2 + jaw_padding_mm), y1=y1, y2=y2, gantry_angles=gantry_angle, coll_angle=coll_angle, couch_vrt=couch_vrt, couch_lat=couch_lat, couch_lng=couch_lng, couch_rot=couch_rot, fluence_mode=fluence_mode, mlc_positions=ref_mlc.as_control_points(), metersets=[mu * m for m in ref_mlc.as_metersets()], is_mlc_hd=self.is_mlc_hd, ) self.add_beam(ref_beam) beam = TrueBeamBeam( beam_name=f"DR{min(dose_rates)}-{max(dose_rates)}", energy=energy, dose_rate=default_dose_rate, x1=float(roi_centers[0] - roi_size_mm / 2 - jaw_padding_mm), x2=float(roi_centers[-1] + roi_size_mm / 2 + jaw_padding_mm), y1=y1, y2=y2, gantry_angles=gantry_angle, coll_angle=coll_angle, couch_vrt=couch_vrt, couch_lat=couch_lat, couch_lng=couch_lng, couch_rot=couch_rot, fluence_mode=fluence_mode, mlc_positions=mlc.as_control_points(), metersets=[mu * m for m in mlc.as_metersets()], is_mlc_hd=self.is_mlc_hd, ) self.add_beam(beam)
[docs] def add_mlc_speed_beams( self, speeds: tuple[float | int, ...] = (5, 10, 15, 20), roi_size_mm: float = 20, mu: int = 50, default_dose_rate: int = 600, gantry_angle: float = 0, energy: float = 6, coll_angle: float = 0, couch_vrt: float = 0, couch_lat: float = 0, couch_lng: float = 1000, couch_rot: float = 0, fluence_mode: FluenceMode = FluenceMode.STANDARD, jaw_padding_mm: float = 5, y1: float = -100, y2: float = 100, beam_name: str = "MLC Speed", max_sacrificial_move_mm: float = 50, ): """Create a single-image MLC speed test. Multiple speeds are generated. A reference beam is also generated. The reference beam is delivered at the maximum MLC speed. Parameters ---------- speeds : tuple[float] The speeds to test in mm/s. Each speed will have its own ROI. roi_size_mm : float The width of the ROIs in mm. mu : int The monitor units to deliver. default_dose_rate : int The dose rate used for the reference beam. gantry_angle : float The gantry angle of the beam. energy : int The energy of the beam. coll_angle : float The collimator angle of the beam. couch_vrt : float The couch vertical position. couch_lat : float The couch lateral position. couch_lng : float The couch longitudinal position. couch_rot : float The couch rotation. fluence_mode : FluenceMode The fluence mode of the beam. jaw_padding_mm : float The padding to add to the X jaws. The X-jaws will close around the ROIs plus this padding. y1 : float The bottom jaw position. Usually negative. More negative is lower. y2 : float The top jaw position. Usually positive. More positive is higher. beam_name : str The name of the beam. The reference beam will be called "MLC Sp Ref". max_sacrificial_move_mm : float The maximum distance the sacrificial leaves can move in a given control point. Smaller values generate more control points and more back-and-forth movement. Too large of values may cause deliverability issues. Notes ----- The desired speed can be achieved through the following formula: speed = roi_size_mm * max dose rate / MU * 60 We solve for MU with the desired speed. The 60 is for converting the dose rate as MU/min to MU/sec. Thus, MU = roi_size_mm * max dose rate / speed * 60 MUs are calculated automatically based on the speed and the ROI size. """ if max(speeds) > self.max_mlc_speed: raise ValueError( f"Maximum speed given {max(speeds)} is greater than the maximum MLC speed {self.max_mlc_speed}" ) if min(speeds) <= 0: raise ValueError("Speeds must be greater than 0") if roi_size_mm * len(speeds) > self.max_overtravel_mm: raise ValueError( "The ROI size * number of speeds must be less than the overall MLC allowable width" ) # create MLC positions times_to_transition = [roi_size_mm / speed for speed in speeds] sacrificial_movements = [tt * self.max_mlc_speed for tt in times_to_transition] mlc = self._create_mlc(sacrifice_max_move_mm=max_sacrificial_move_mm) ref_mlc = self._create_mlc() roi_centers = np.linspace( -roi_size_mm * len(speeds) / 2 + roi_size_mm / 2, roi_size_mm * len(speeds) / 2 - roi_size_mm / 2, len(speeds), ) # we have a starting and ending strip ref_mlc.add_strip( position_mm=float(roi_centers[0] - roi_size_mm / 2), strip_width_mm=0, meterset_at_target=0, ) mlc.add_strip( position_mm=float(roi_centers[0] - roi_size_mm / 2), strip_width_mm=0, meterset_at_target=0, initial_sacrificial_gap_mm=5, ) for sacrifice_distance, center in zip(sacrificial_movements, roi_centers): ref_mlc.add_rectangle( left_position=center - roi_size_mm / 2, right_position=center + roi_size_mm / 2, x_outfield_position=-200, # not relevant top_position=max(self.leaf_config), bottom_position=min(self.leaf_config), outer_strip_width=5, # not relevant meterset_at_target=0, meterset_transition=0.5 / len(speeds), sacrificial_distance=0, ) ref_mlc.add_strip( position_mm=center + roi_size_mm / 2, strip_width_mm=0, meterset_at_target=0, meterset_transition=0.5 / len(speeds), sacrificial_distance_mm=0, ) mlc.add_rectangle( left_position=center - roi_size_mm / 2, right_position=center + roi_size_mm / 2, x_outfield_position=-200, # not used top_position=max(self.leaf_config), bottom_position=min(self.leaf_config), outer_strip_width=5, # not used meterset_at_target=0, meterset_transition=0.5 / len(speeds), sacrificial_distance=sacrifice_distance, ) mlc.add_strip( position_mm=center + roi_size_mm / 2, strip_width_mm=0, meterset_at_target=0, meterset_transition=0.5 / len(speeds), sacrificial_distance_mm=sacrifice_distance, ) ref_beam = TrueBeamBeam( beam_name=f"{beam_name} Ref", energy=energy, dose_rate=default_dose_rate, x1=float(roi_centers[0] - roi_size_mm / 2 - jaw_padding_mm), x2=float(roi_centers[-1] + roi_size_mm / 2 + jaw_padding_mm), y1=y1, y2=y2, gantry_angles=gantry_angle, coll_angle=coll_angle, couch_vrt=couch_vrt, couch_lat=couch_lat, couch_lng=couch_lng, couch_rot=couch_rot, fluence_mode=fluence_mode, mlc_positions=ref_mlc.as_control_points(), metersets=[mu * m for m in ref_mlc.as_metersets()], is_mlc_hd=self.is_mlc_hd, ) self.add_beam(ref_beam) beam = TrueBeamBeam( beam_name=beam_name, energy=energy, dose_rate=default_dose_rate, x1=float(roi_centers[0] - roi_size_mm / 2 - jaw_padding_mm), x2=float(roi_centers[-1] + roi_size_mm / 2 + jaw_padding_mm), y1=y1, y2=y2, gantry_angles=gantry_angle, coll_angle=coll_angle, couch_vrt=couch_vrt, couch_lat=couch_lat, couch_lng=couch_lng, couch_rot=couch_rot, fluence_mode=fluence_mode, mlc_positions=mlc.as_control_points(), metersets=[mu * m for m in mlc.as_metersets()], is_mlc_hd=self.is_mlc_hd, ) self.add_beam(beam)
[docs] def add_winston_lutz_beams( self, x1: float = -10, x2: float = 10, y1: float = -10, y2: float = 10, defined_by_mlcs: bool = True, energy: float = 6, fluence_mode: FluenceMode = FluenceMode.STANDARD, dose_rate: int = 600, axes_positions: Iterable[dict] = ({"gantry": 0, "collimator": 0, "couch": 0},), couch_vrt: float = 0, couch_lng: float = 1000, couch_lat: float = 0, mu: int = 10, padding_mm: float = 5, ): """Add Winston-Lutz beams to the plan. Will create a beam for each set of axes positions. Field names are generated automatically based on the axes positions. Parameters ---------- x1 : float The left jaw position. x2 : float The right jaw position. y1 : float The bottom jaw position. y2 : float The top jaw position. defined_by_mlcs : bool Whether the field edges are defined by the MLCs or the jaws. energy : float The energy of the beam. fluence_mode : FluenceMode The fluence mode of the beam. dose_rate : int The dose rate of the beam. axes_positions : Iterable[dict] The positions of the axes. Each dict should have keys 'gantry', 'collimator', 'couch', and optionally 'name'. couch_vrt : float The couch vertical position. couch_lng : float The couch longitudinal position. couch_lat : float The couch lateral position. mu : int The monitor units of the beam. padding_mm : float The padding to add. If defined by the MLCs, this is the padding of the jaws. If defined by the jaws, this is the padding of the MLCs. """ for axes in axes_positions: if defined_by_mlcs: mlc_padding = 0 jaw_padding = padding_mm else: mlc_padding = padding_mm jaw_padding = 0 mlc = self._create_mlc() mlc.add_rectangle( left_position=x1 - mlc_padding, right_position=x2 + mlc_padding, top_position=y2 + mlc_padding, bottom_position=y1 - mlc_padding, outer_strip_width=5, meterset_at_target=1.0, x_outfield_position=x1 - mlc_padding - jaw_padding - 20, ) beam_name = ( axes.get("name") or f"G{axes['gantry']:g}C{axes['collimator']:g}P{axes['couch']:g}" ) beam = TrueBeamBeam( beam_name=beam_name, energy=energy, dose_rate=dose_rate, x1=x1 - jaw_padding, x2=x2 + jaw_padding, y1=y1 - jaw_padding, y2=y2 + jaw_padding, gantry_angles=axes["gantry"], coll_angle=axes["collimator"], couch_vrt=couch_vrt, couch_lat=couch_lat, couch_lng=couch_lng, couch_rot=0, mlc_positions=mlc.as_control_points(), metersets=[mu * m for m in mlc.as_metersets()], fluence_mode=fluence_mode, is_mlc_hd=self.is_mlc_hd, ) self.add_beam(beam)
[docs] def add_gantry_speed_beams( self, speeds: tuple[float | int, ...] = (2, 3, 4, 4.8), max_dose_rate: int = 600, start_gantry_angle: float = 179, energy: float = 6, fluence_mode: FluenceMode = FluenceMode.STANDARD, coll_angle: float = 0, couch_vrt: float = 0, couch_lat: float = 0, couch_lng: float = 1000, couch_rot: float = 0, beam_name: str = "GS", gantry_rot_dir: GantryDirection = GantryDirection.CLOCKWISE, jaw_padding_mm: float = 5, roi_size_mm: float = 30, y1: float = -100, y2: float = 100, mu: int = 120, ): """Create a single-image gantry speed test. Multiple speeds are generated. A reference beam is also generated. The reference beam is delivered without gantry movement. Parameters ---------- speeds : tuple The gantry speeds to test. Each speed will have its own ROI. max_dose_rate : int The max dose rate clinically allowed for the energy. start_gantry_angle : float The starting gantry angle. The gantry will rotate around this point. It is up to the user to know what the machine's limitations are. (i.e. don't go through 180 for Varian machines). The ending gantry angle will be the starting angle + the sum of the gantry deltas generated by the speed ROIs. Slower speeds require more gantry angle to reach the same MU. energy : float The energy of the beam. fluence_mode : FluenceMode The fluence mode of the beam. coll_angle : float The collimator angle of the beam. couch_vrt : float The couch vertical position. couch_lat : float The couch lateral position. couch_lng : float The couch longitudinal position. couch_rot : float The couch rotation. beam_name : str The name of the beam. gantry_rot_dir : GantryDirection The direction of gantry rotation. jaw_padding_mm : float The padding to add to the X jaws. The X-jaws will close around the ROIs plus this padding. roi_size_mm : float The width of the ROIs in mm. y1 : float The bottom jaw position. Usually negative. More negative is lower. y2 : float The top jaw position. Usually positive. More positive is higher. mu : int The monitor units of the beam. Notes ----- The gantry angle to cover can be determined via the following: gantry speed = gantry_range * max_dose_rate / (MU * 60) We can thus solve for the gantry range: gantry_range = gantry_speed * MU * 60 / max_dose_rate """ if max(speeds) > self.max_gantry_speed: raise ValueError( f"Maximum speed given {max(speeds)} is greater than the maximum gantry speed {self.max_gantry_speed}" ) if roi_size_mm * len(speeds) > self.max_overtravel_mm: raise ValueError( "The ROI size * number of speeds must be less than the overall MLC allowable width" ) # determine sacrifices and gantry angles gantry_deltas = [speed * mu * 60 / max_dose_rate for speed in speeds] gantry_sign = -1 if gantry_rot_dir == GantryDirection.CLOCKWISE else 1 g_angles_uncorrected = [start_gantry_angle] + ( start_gantry_angle + gantry_sign * np.cumsum(gantry_deltas) ).tolist() gantry_angles = [round(wrap360(angle), 2) for angle in g_angles_uncorrected] if sum(gantry_deltas) >= 360: raise ValueError( "Gantry travel is >360 degrees. Lower the beam MU, use fewer speeds, or decrease the desired gantry speeds" ) mlc = self._create_mlc() ref_mlc = self._create_mlc() roi_centers = np.linspace( -roi_size_mm * len(speeds) / 2 + roi_size_mm / 2, roi_size_mm * len(speeds) / 2 - roi_size_mm / 2, len(speeds), ) # we have a starting and ending strip ref_mlc.add_strip( position_mm=float(roi_centers[0]), strip_width_mm=roi_size_mm, meterset_at_target=0, ) mlc.add_strip( position_mm=float(roi_centers[0]), strip_width_mm=roi_size_mm, meterset_at_target=0, ) for center, gantry_angle in zip(roi_centers, gantry_angles): ref_mlc.add_strip( position_mm=center, strip_width_mm=roi_size_mm, meterset_at_target=0, meterset_transition=1 / len(speeds), ) mlc.add_strip( position_mm=center, strip_width_mm=roi_size_mm, meterset_at_target=0, meterset_transition=1 / len(speeds), ) beam = TrueBeamBeam( beam_name=beam_name, energy=energy, dose_rate=max_dose_rate, x1=min(roi_centers) - roi_size_mm - jaw_padding_mm, x2=max(roi_centers) + roi_size_mm + jaw_padding_mm, y1=y1, y2=y2, gantry_angles=gantry_angles, coll_angle=coll_angle, couch_vrt=couch_vrt, couch_lat=couch_lat, couch_lng=couch_lng, couch_rot=couch_rot, fluence_mode=fluence_mode, mlc_positions=mlc.as_control_points(), metersets=[mu * m for m in mlc.as_metersets()], is_mlc_hd=self.is_mlc_hd, ) self.add_beam(beam) ref_beam = TrueBeamBeam( beam_name=f"{beam_name} Ref", energy=energy, dose_rate=max_dose_rate, x1=min(roi_centers) - roi_size_mm - jaw_padding_mm, x2=max(roi_centers) + roi_size_mm + jaw_padding_mm, y1=y1, y2=y2, gantry_angles=gantry_angles[-1], coll_angle=coll_angle, couch_vrt=couch_vrt, couch_lat=couch_lat, couch_lng=couch_lng, couch_rot=couch_rot, fluence_mode=fluence_mode, mlc_positions=ref_mlc.as_control_points(), metersets=[mu * m for m in ref_mlc.as_metersets()], is_mlc_hd=self.is_mlc_hd, ) self.add_beam(ref_beam)
[docs] def add_open_field_beam( self, x1: float, x2: float, y1: float, y2: float, defined_by_mlcs: bool = True, energy: float = 6, fluence_mode: FluenceMode = FluenceMode.STANDARD, dose_rate: int = 600, gantry_angle: float = 0, coll_angle: float = 0, couch_vrt: float = 0, couch_lng: float = 1000, couch_lat: float = 0, couch_rot: float = 0, mu: int = 200, padding_mm: float = 5, beam_name: str = "Open", outside_strip_width_mm: float = 5, ): """Add an open field beam to the plan. Parameters ---------- x1 : float The left jaw position. x2 : float The right jaw position. y1 : float The bottom jaw position. y2 : float The top jaw position. defined_by_mlcs : bool Whether the field edges are defined by the MLCs or the jaws. energy : float The energy of the beam. fluence_mode : FluenceMode The fluence mode of the beam. dose_rate : int The dose rate of the beam. gantry_angle : float The gantry angle of the beam. coll_angle : float The collimator angle of the beam. couch_vrt : float The couch vertical position. couch_lng : float The couch longitudinal position. couch_lat : float The couch lateral position. couch_rot : float The couch rotation. mu : int The monitor units of the beam. padding_mm : float The padding to add to the jaws or MLCs. beam_name : str The name of the beam. outside_strip_width_mm : float The width of the strip of MLCs outside the field. The MLCs will be placed to the left, under the X1 jaw by ~2cm. """ if defined_by_mlcs: mlc_padding = 0 jaw_padding = padding_mm else: mlc_padding = padding_mm jaw_padding = 0 mlc = self._create_mlc() mlc.add_rectangle( left_position=x1 - mlc_padding, right_position=x2 + mlc_padding, top_position=y2 + mlc_padding, bottom_position=y1 - mlc_padding, outer_strip_width=outside_strip_width_mm, x_outfield_position=x1 - mlc_padding - jaw_padding - 20, meterset_at_target=1.0, ) beam = TrueBeamBeam( beam_name=beam_name, energy=energy, dose_rate=dose_rate, x1=x1 - jaw_padding, x2=x2 + jaw_padding, y1=y1 - jaw_padding, y2=y2 + jaw_padding, gantry_angles=gantry_angle, coll_angle=coll_angle, couch_vrt=couch_vrt, couch_lat=couch_lat, couch_lng=couch_lng, couch_rot=couch_rot, mlc_positions=mlc.as_control_points(), metersets=[mu * m for m in mlc.as_metersets()], fluence_mode=fluence_mode, is_mlc_hd=self.is_mlc_hd, ) self.add_beam(beam)
[docs] class HalcyonPlanGenerator(PlanGenerator): """A class to generate a plan with two beams stacked on top of each other such as the Halcyon. This also assumes no jaws.""" def __init__( self, ds: Dataset, plan_label: str, plan_name: str, patient_name: str | None = None, patient_id: str | None = None, max_mlc_position: float = 140, max_mlc_speed: float = 25, max_gantry_speed: float = 4.8, max_overtravel_mm: float = 140, ): """A tool for generating new QA RTPlan files based on an initial, somewhat empty RTPlan file. Parameters ---------- ds : Dataset The RTPLAN dataset to base the new plan off of. The plan must already have MLC positions. plan_label : str The label of the new plan. plan_name : str The name of the new plan. patient_name : str, optional The name of the patient. If not provided, it will be taken from the RTPLAN file. patient_id : str, optional The ID of the patient. If not provided, it will be taken from the RTPLAN file. max_mlc_position : float The max mlc position in mm max_mlc_speed : float The maximum speed of the MLC leaves in mm/s max_gantry_speed : float The maximum speed of the gantry in degrees/s. max_overtravel_mm : float The maximum distance the MLC leaves can overtravel from each other as well as the jaw size (for tail exposure protection). """ super().__init__( ds, plan_label, plan_name, patient_name, patient_id, max_mlc_position, max_mlc_speed, max_gantry_speed, max_overtravel_mm, ) @property def distal_leaf_config(self) -> list[float]: return self._old_beam.BeamLimitingDeviceSequence[-2].LeafPositionBoundaries @property def proximal_leaf_config(self) -> list[float]: return self._old_beam.BeamLimitingDeviceSequence[-1].LeafPositionBoundaries def _create_mlc(self) -> tuple[MLCShaper, MLCShaper]: """Create 2 MLC shaper objects, one for each stack.""" proximal_mlc = MLCShaper( leaf_y_positions=self.proximal_leaf_config, max_mlc_position=self.max_mlc_position, max_overtravel_mm=self.max_overtravel_mm, sacrifice_gap_mm=None, sacrifice_max_move_mm=None, ) distal_mlc = MLCShaper( leaf_y_positions=self.distal_leaf_config, max_mlc_position=self.max_mlc_position, max_overtravel_mm=self.max_overtravel_mm, sacrifice_gap_mm=None, sacrifice_max_move_mm=None, ) return proximal_mlc, distal_mlc
[docs] def add_picketfence_beam( self, stack: Stack, strip_width_mm: float = 3, strip_positions_mm: tuple[float, ...] = (-45, -30, -15, 0, 15, 30, 45), gantry_angle: float = 0, coll_angle: float = 0, couch_vrt: float = 0, couch_lng: float = 1000, couch_lat: float = 0, mu: int = 200, beam_name: str = "PF", ): """Add a picket fence beam to the plan. The beam will be delivered with the MLCs stacked on top of each other. Parameters ---------- stack: Stack Which MLC stack to use for the beam. The other stack will be parked. strip_width_mm : float The width of the strips in mm. strip_positions_mm : tuple The positions of the strips in mm. gantry_angle : float The gantry angle of the beam. coll_angle : float The collimator angle of the beam. couch_vrt : float The couch vertical position. couch_lng : float The couch longitudinal position. couch_lat : float The couch lateral position. mu : int The monitor units of the beam. beam_name : str The name of the beam. """ prox_mlc, dist_mlc = self._create_mlc() # we prepend the positions with an initial starting position 2mm from the first strip # that way, each picket is the same cadence where the leaves move into position dynamically. # If you didn't do this, the first picket might be different as it has the advantage # of starting from a static position vs the rest of the pickets being dynamic. strip_positions = [strip_positions_mm[0] - 2, *strip_positions_mm] metersets = [0, *[1 / len(strip_positions_mm) for _ in strip_positions_mm]] for strip, meterset in zip(strip_positions, metersets): if stack in (Stack.DISTAL, Stack.BOTH): dist_mlc.add_strip( position_mm=strip, strip_width_mm=strip_width_mm, meterset_at_target=meterset, ) if stack == Stack.DISTAL: prox_mlc.park(meterset=meterset) if stack in (Stack.PROXIMAL, Stack.BOTH): prox_mlc.add_strip( position_mm=strip, strip_width_mm=strip_width_mm, meterset_at_target=meterset, ) if stack == Stack.PROXIMAL: dist_mlc.park(meterset=meterset) beam = HalcyonBeam( beam_name=beam_name, gantry_angles=gantry_angle, coll_angle=coll_angle, couch_vrt=couch_vrt, couch_lat=couch_lat, couch_lng=couch_lng, proximal_mlc_positions=prox_mlc.as_control_points(), distal_mlc_positions=dist_mlc.as_control_points(), # can use either MLC for metersets metersets=[mu * m for m in prox_mlc.as_metersets()], ) self.add_beam(beam)
[docs] def add_open_field_beam( self, ): """Add an open field beam to the plan. This can be defined by one or both MLC stacks. The x1, x2, y1, and y2 terms are colloquial. The MLCs define the field edges. The nearest MLC to the field edge in the y-direction will be the one used. It can round up or down. """ raise NotImplementedError( "Open field beams are not yet implemented for Halcyon plans" )
def add_dose_rate_beams( self, ): raise NotImplementedError( "Dose rate beams are not yet implemented for Halcyon plans" ) def add_mlc_speed_beams(self): raise NotImplementedError( "MLC speed beams are not yet implemented for Halcyon plans" ) def add_gantry_speed_beams( self, ): raise NotImplementedError( "Gantry speed beams are not yet implemented for Halcyon plans" ) def add_winston_lutz_beams( self, ): raise NotImplementedError( "Winston-Lutz beams are not yet implemented for Halcyon plans" )
class OvertravelError(ValueError): pass