Source code for pylinac.plan_generator.dicom

from __future__ import annotations

import datetime
import math
from enum import Enum
from pathlib import Path
from typing import Iterable

import numpy as np
import pydicom
from matplotlib.figure import Figure
from pydicom.dataset import Dataset
from pydicom.sequence import Sequence
from pydicom.uid import generate_uid

from ..core.scale import wrap360
from .fluence import plot_fluences
from .mlc import MLCShaper


class GantryDirection(Enum):
    CLOCKWISE = "CC"
    COUNTER_CLOCKWISE = "CCW"
    NONE = "NONE"


class GantrySpeedTransition(Enum):
    LEADING = "leading"
    TRAILING = "trailing"


class BeamType(Enum):
    DYNAMIC = "DYNAMIC"
    STATIC = "STATIC"


class FluenceMode(Enum):
    STANDARD = "STANDARD"
    FFF = "FFF"
    SRS = "SRS"


[docs] class Beam: """Represents a DICOM beam dataset. Has methods for creating the dataset and adding control points. Generally not created on its own but rather under the hood as part of a PlanGenerator object. It contains enough independent logic steps that it's worth separating out from the PlanGenerator class. """ ds: Dataset def __init__( self, plan_dataset: Dataset, beam_name: str, beam_type: BeamType, energy: float, dose_rate: int, x1: float, x2: float, y1: float, y2: float, machine_name: str, gantry_angles: float | list[float], gantry_direction: GantryDirection, coll_angle: float, couch_vrt: float, couch_lat: float, couch_lng: float, couch_rot: float, mlc_boundaries: list[float], mlc_positions: list[list[float]], metersets: list[float], fluence_mode: FluenceMode, ): """ Parameters ---------- plan_dataset The plan dataset. Used for dynamic links to other Sequences of the plan, such as Dose Reference and Tolerance Table. beam_name : str The name of the beam. Must be less than 16 characters. beam_type : BeamType The type of beam: dynamic or static. energy : float The energy of the beam. dose_rate : int The dose rate of the beam. x1 : float The left jaw position. x2 : float The right jaw position. y1 : float The bottom jaw position. y2 : float The top jaw position. machine_name : str The name of the machine. gantry_angles : Union[float, list[float]] The gantry angle(s) of the beam. If a single number, it's assumed to be a static beam. If multiple numbers, it's assumed to be a dynamic beam. gantry_direction : GantryDirection The direction of the gantry rotation. Only relevant if multiple gantry angles are specified. coll_angle : float The collimator angle. couch_vrt : float The couch vertical position. couch_lat : float The couch lateral position. couch_lng : float The couch longitudinal position. couch_rot : float The couch rotation. mlc_boundaries : list[float] The MLC boundaries. These are the same thing as the LeafPositionBoundaries in the DICOM file. mlc_positions : list[list[float]] The MLC positions for each control point. This is the x-position of each leaf for each control point. metersets : list[float] The meter sets for each control point. The length must match the number of control points in mlc_positions. fluence_mode : FluenceMode The fluence mode of the beam. """ if len(beam_name) > 16: raise ValueError("Beam name must be less than or equal to 16 characters") if beam_type == BeamType.STATIC and len(mlc_positions) != 1: raise ValueError( f"Static beam can only have one MLC position change ({len(mlc_positions)})" ) if len(metersets) != len(mlc_positions): raise ValueError( f"The number of meter sets ({len(metersets)}) " f"must match the number of MLC position changes ({len(mlc_positions)})" ) if ( isinstance(gantry_angles, Iterable) and gantry_direction == GantryDirection.NONE ): raise ValueError( "Cannot specify multiple gantry angles without a gantry direction" ) if isinstance(gantry_angles, Iterable) and beam_type == BeamType.STATIC: raise ValueError("Cannot specify multiple gantry angles as a static beam") self.plan_ds = plan_dataset self.ds = self._create_basic_beam_info( beam_name, beam_type, fluence_mode, machine_name, num_leaves=len(mlc_positions[0]), mlc_boundaries=mlc_boundaries, ) if not isinstance( gantry_angles, Iterable ): # if it's just a single number (like for a static beam) set it to an array of that value gantry_angles = [gantry_angles] * len(metersets) self._append_initial_control_point( energy, dose_rate, x1, x2, y1, y2, gantry_angles[0], gantry_direction, coll_angle, couch_vrt, couch_lat, couch_lng, couch_rot, mlc_positions=mlc_positions[0], ) for mlc_pos, meterset, gantry_angle in zip( mlc_positions[1:], metersets[1:], gantry_angles[1:] ): if beam_type == BeamType.DYNAMIC: self._append_secondary_dynamic_control_point( mlc_positions=mlc_pos, meterset=meterset, gantry_angle=gantry_angle, gantry_dir=gantry_direction, ) else: self._append_secondary_static_control_point(meterset=meterset) def _create_basic_beam_info( self, beam_name: str, beam_type: BeamType, fluence_mode: FluenceMode, machine_name: str, num_leaves: int, mlc_boundaries: list[float], ) -> Dataset: beam = Dataset() beam.Manufacturer = "Radformation" beam.ManufacturerModelName = "RadMachine" beam.TreatmentMachineName = machine_name beam.PrimaryDosimeterUnit = "MU" beam.SourceAxisDistance = "1000.0" # Primary Fluence Mode Sequence primary_fluence_mode_sequence = Sequence() beam.PrimaryFluenceModeSequence = primary_fluence_mode_sequence primary_fluence_mode1 = Dataset() if fluence_mode == FluenceMode.STANDARD: primary_fluence_mode1.FluenceMode = "STANDARD" elif fluence_mode == FluenceMode.FFF: primary_fluence_mode1.FluenceMode = "NON_STANDARD" primary_fluence_mode1.FluenceModeID = "FFF" elif fluence_mode == FluenceMode.SRS: primary_fluence_mode1.FluenceMode = "NON_STANDARD" primary_fluence_mode1.FluenceModeID = "SRS" primary_fluence_mode_sequence.append(primary_fluence_mode1) # Beam Limiting Device Sequence beam_limiting_device_sequence = Sequence() beam.BeamLimitingDeviceSequence = beam_limiting_device_sequence # Beam Limiting Device Sequence: Beam Limiting Device 1 beam_limiting_device1 = Dataset() beam_limiting_device1.RTBeamLimitingDeviceType = "ASYMX" beam_limiting_device1.NumberOfLeafJawPairs = "1" beam_limiting_device_sequence.append(beam_limiting_device1) # Beam Limiting Device Sequence: Beam Limiting Device 2 beam_limiting_device2 = Dataset() # TODO: likely that Elekta will need tweaking for this beam_limiting_device2.RTBeamLimitingDeviceType = "ASYMY" beam_limiting_device2.NumberOfLeafJawPairs = "1" beam_limiting_device_sequence.append(beam_limiting_device2) # Beam Limiting Device Sequence: Beam Limiting Device 3 beam_limiting_device3 = Dataset() beam_limiting_device3.RTBeamLimitingDeviceType = "MLCX" beam_limiting_device3.NumberOfLeafJawPairs = int(num_leaves / 2) beam_limiting_device3.LeafPositionBoundaries = mlc_boundaries beam_limiting_device_sequence.append(beam_limiting_device3) # beam numbers start at 0 and increment from there. beam.BeamNumber = len(self.plan_ds.BeamSequence) beam.BeamName = beam_name beam.BeamType = beam_type.value beam.RadiationType = "PHOTON" beam.TreatmentDeliveryType = "TREATMENT" beam.NumberOfWedges = "0" beam.NumberOfCompensators = "0" beam.NumberOfBoli = "0" beam.NumberOfBlocks = "0" beam.FinalCumulativeMetersetWeight = "1.0" beam.NumberOfControlPoints = "0" # Control Point Sequence cp_sequence = Sequence() beam.ControlPointSequence = cp_sequence return beam def _append_initial_control_point( self, energy: int, dose_rate: int, x1: float, x2: float, y1: float, y2: float, gantry_angle: float | list[float], gantry_rot: GantryDirection, coll_angle: float, couch_vrt: float, couch_lat: float, couch_lng: float, couch_rot: float, mlc_positions: list[float], ): # Control Point Sequence: Control Point 0 cp0 = Dataset() cp0.ControlPointIndex = "0" cp0.NominalBeamEnergy = str(energy) cp0.DoseRateSet = str(dose_rate) # Beam Limiting Device Position Sequence beam_limiting_device_position_sequence = Sequence() cp0.BeamLimitingDevicePositionSequence = beam_limiting_device_position_sequence # Beam Limiting Device Position Sequence: Beam Limiting Device Position 1 beam_limiting_device_position1 = Dataset() beam_limiting_device_position1.RTBeamLimitingDeviceType = "ASYMX" beam_limiting_device_position1.LeafJawPositions = [x1, x2] beam_limiting_device_position_sequence.append(beam_limiting_device_position1) # Beam Limiting Device Position Sequence: Beam Limiting Device Position 2 beam_limiting_device_position2 = Dataset() beam_limiting_device_position2.RTBeamLimitingDeviceType = "ASYMY" beam_limiting_device_position2.LeafJawPositions = [y1, y2] beam_limiting_device_position_sequence.append(beam_limiting_device_position2) # Beam Limiting Device Position Sequence: Beam Limiting Device Position 3 beam_limiting_device_position3 = Dataset() beam_limiting_device_position3.RTBeamLimitingDeviceType = "MLCX" beam_limiting_device_position3.LeafJawPositions = [ f"{m:6f}" for m in mlc_positions ] # convert to truncated string to fit VR limitations beam_limiting_device_position_sequence.append(beam_limiting_device_position3) cp0.GantryAngle = str(gantry_angle) cp0.GantryRotationDirection = gantry_rot.value cp0.BeamLimitingDeviceAngle = str(coll_angle) cp0.BeamLimitingDeviceRotationDirection = "NONE" cp0.PatientSupportAngle = str(couch_rot) cp0.PatientSupportRotationDirection = "NONE" cp0.TableTopEccentricAngle = "0.0" cp0.TableTopEccentricRotationDirection = "NONE" cp0.TableTopVerticalPosition = str(couch_vrt) cp0.TableTopLongitudinalPosition = str(couch_lng) cp0.TableTopLateralPosition = str(couch_lat) cp0.IsocenterPosition = None cp0.CumulativeMetersetWeight = "0.0" # Referenced Dose Reference Sequence refd_dose_ref_sequence = Sequence() cp0.ReferencedDoseReferenceSequence = refd_dose_ref_sequence # linked setup number and tolerance table # we *could* hardcode it but this makes it more flexible to changes upstream self.ds.ReferencedPatientSetupNumber = self.plan_ds.PatientSetupSequence[ 0 ].PatientSetupNumber self.ds.ReferencedToleranceTableNumber = self.plan_ds.ToleranceTableSequence[ 0 ].ToleranceTableNumber self.ds.NumberOfControlPoints = 1 # increment this self.ds.ControlPointSequence.append(cp0) def _append_secondary_static_control_point(self, meterset: float) -> None: # Control Point Sequence: Control Point 1 cp1 = Dataset() cp1.ControlPointIndex = len(self.ds.ControlPointSequence) cp1.CumulativeMetersetWeight = ( f"{meterset:.5f}" # convert to truncated string to fit VR limitations ) self.ds.NumberOfControlPoints = int(self.ds.NumberOfControlPoints) + 1 self.ds.ControlPointSequence.append(cp1) def _append_secondary_dynamic_control_point( self, mlc_positions: list[float], meterset: float, gantry_angle: float, gantry_dir: GantryDirection, ) -> None: # Control Point Sequence: Control Point 1 cp1 = Dataset() cp1.ControlPointIndex = len(self.ds.ControlPointSequence) if gantry_dir != GantryDirection.NONE: cp1.GantryAngle = gantry_angle cp1.GantryRotationDirection = gantry_dir.value cp1.CumulativeMetersetWeight = ( f"{meterset:.5f}" # convert to truncated string to fit VR limitations ) # Beam Limiting Device Position Sequence beam_limiting_device_position_sequence = Sequence() cp1.BeamLimitingDevicePositionSequence = beam_limiting_device_position_sequence # Beam Limiting Device Position Sequence: Beam Limiting Device Position 1 beam_limiting_device_position1 = Dataset() beam_limiting_device_position1.RTBeamLimitingDeviceType = "MLCX" beam_limiting_device_position1.LeafJawPositions = [ f"{m:6f}" for m in mlc_positions ] # convert to truncated string to fit VR limitations beam_limiting_device_position_sequence.append(beam_limiting_device_position1) # Referenced Dose Reference Sequence refd_dose_ref_sequence = Sequence() cp1.ReferencedDoseReferenceSequence = refd_dose_ref_sequence self.ds.NumberOfControlPoints = int(self.ds.NumberOfControlPoints) + 1 self.ds.ControlPointSequence.append(cp1)
[docs] def as_dicom(self) -> Dataset: """Return the beam as a DICOM dataset that represents a BeamSequence item.""" return self.ds
[docs] class PlanGenerator: def __init__( self, ds: Dataset, plan_label: str, plan_name: str, x_width_mm: float = 400, max_mlc_speed: float = 25, max_gantry_speed: float = 4.8, sacrificial_gap_mm: float = 5, max_sacrificial_move_mm: float = 50, max_overtravel_mm: float = 140, ): """A tool for generating new QA RTPlan files based on an initial, somewhat empty RTPlan file. Parameters ---------- ds : Dataset The RTPLAN dataset to base the new plan off of. The plan must already have MLC positions. plan_label : str The label of the new plan. plan_name : str The name of the new plan. x_width_mm : float The overall width of the MLC movement in the x-direction. Generally, this is the x field size. max_mlc_speed : float The maximum speed of the MLC leaves in mm/s max_gantry_speed : float The maximum speed of the gantry in degrees/s. sacrificial_gap_mm : float For certain dynamic beams, the top and bottom leaf pair are used to slow axes down. This is the gap between those leaves at any given time. max_sacrificial_move_mm : float The maximum distance the sacrificial leaves can move in a given control point. Smaller values generate more control points and more back-and-forth movement. Too large of values may cause deliverability issues. max_overtravel_mm : float The maximum distance the MLC leaves can overtravel from each other as well as the jaw size (for tail exposure protection). """ if ds.Modality != "RTPLAN": raise ValueError("File is not an RTPLAN file") self.max_overtravel_mm = max_overtravel_mm self.x_width = x_width_mm self.max_mlc_speed = max_mlc_speed self.max_gantry_speed = max_gantry_speed self.sacrificial_gap = sacrificial_gap_mm self.max_sacrificial_move = max_sacrificial_move_mm if not hasattr(ds, "PatientName") or not hasattr(ds, "PatientID"): raise ValueError("RTPLAN file must have PatientName and PatientID") if not hasattr(ds, "ToleranceTableSequence"): raise ValueError("RTPLAN file must have ToleranceTableSequence") if not hasattr(ds, "BeamSequence") or not hasattr( ds.BeamSequence[0].BeamLimitingDeviceSequence[-1], "LeafPositionBoundaries" ): raise ValueError("RTPLAN file must have MLC data") self.ds = ds ###### Clear/initialize the metadata for the new plan self.ds.RTPlanLabel = plan_label self.ds.RTPlanName = plan_name date = datetime.datetime.now().strftime("%Y%m%d") time = datetime.datetime.now().strftime("%H%M%S") self.ds.InstanceCreationDate = date self.ds.InstanceCreationTime = time self.ds.SOPInstanceUID = generate_uid() # Patient Setup Sequence patient_setup = Dataset() patient_setup.PatientPosition = "HFS" patient_setup.PatientSetupNumber = "0" self.ds.PatientSetupSequence = Sequence((patient_setup,)) # Dose Reference Sequence dose_ref_sequence = Sequence() self.ds.DoseReferenceSequence = dose_ref_sequence dose_ref1 = Dataset() dose_ref1.DoseReferenceNumber = "1" dose_ref1.DoseReferenceUID = generate_uid() dose_ref1.DoseReferenceStructureType = "SITE" dose_ref1.DoseReferenceDescription = "PTV" dose_ref1.DoseReferenceType = "TARGET" dose_ref1.DeliveryMaximumDose = "20.0" dose_ref1.TargetPrescriptionDose = "40.0" dose_ref1.TargetMaximumDose = "20.0" self.ds.DoseReferenceSequence.append(dose_ref1) # Fraction Group Sequence frxn_gp_sequence = Sequence() self.ds.FractionGroupSequence = frxn_gp_sequence # Fraction Group Sequence: Fraction Group 1 frxn_gp1 = Dataset() frxn_gp1.FractionGroupNumber = "1" frxn_gp1.NumberOfFractionsPlanned = "1" frxn_gp1.NumberOfBeams = str(0) frxn_gp1.NumberOfBrachyApplicationSetups = "0" # Referenced Beam Sequence refd_beam_sequence = Sequence() frxn_gp1.ReferencedBeamSequence = refd_beam_sequence self.ds.FractionGroupSequence.append(frxn_gp1) # Beam Sequence self._old_beam = self.ds.BeamSequence[0] beam_sequence = Sequence() self.ds.BeamSequence = beam_sequence
[docs] @classmethod def from_rt_plan_file(cls, rt_plan_file: str, **kwargs) -> PlanGenerator: """Load an existing RTPLAN file and create a new plan based on it. Parameters ---------- rt_plan_file : str The path to the RTPLAN file. kwargs See the PlanGenerator constructor for details. """ ds = pydicom.dcmread(rt_plan_file) return cls(ds, **kwargs)
@property def num_leaves(self) -> int: """The number of leaves in the MLC.""" return self._old_beam.BeamLimitingDeviceSequence[-1].NumberOfLeafJawPairs * 2 @property def leaf_config(self) -> list[float]: """The leaf boundaries of the MLC.""" return self._old_beam.BeamLimitingDeviceSequence[-1].LeafPositionBoundaries @property def machine_name(self) -> str: """The name of the machine.""" return self._old_beam.TreatmentMachineName def _create_mlc(self): """Utility to create MLC shaper instances.""" return MLCShaper( leaf_y_positions=self.leaf_config, max_x_mm=self.x_width / 2, sacrifice_gap_mm=self.sacrificial_gap, sacrifice_max_move_mm=self.max_sacrificial_move, max_overtravel_mm=self.max_overtravel_mm, )
[docs] def add_beam(self, beam_dataset: Dataset, mu: int): """Add a beam to the plan using the Beam object. Although public, this is a low-level method that is used by the higher-level methods like add_open_field_beam. This handles the associated metadata like the referenced beam sequence and fraction group sequence. """ self.ds.BeamSequence.append(beam_dataset) self._update_references(mu=mu)
[docs] def add_picketfence_beam( self, strip_width_mm: float = 3, strip_positions_mm: tuple[float] = (-45, -30, -15, 0, 15, 30, 45), y1: float = -100, y2: float = 100, fluence_mode: FluenceMode = FluenceMode.STANDARD, dose_rate: int = 600, energy: float = 6, gantry_angle: float = 0, coll_angle: float = 0, couch_vrt: float = 0, couch_lng: float = 1000, couch_lat: float = 0, couch_rot: float = 0, mu: int = 200, jaw_padding_mm: float = 10, beam_name: str = "PF", ): """Add a picket fence beam to the plan. Parameters ---------- strip_width_mm : float The width of the strips in mm. strip_positions_mm : tuple The positions of the strips in mm relative to the center of the image. y1 : float The bottom jaw position. Usually negative. More negative is lower. y2 : float The top jaw position. Usually positive. More positive is higher. fluence_mode : FluenceMode The fluence mode of the beam. dose_rate : int The dose rate of the beam. energy : float The energy of the beam. gantry_angle : float The gantry angle of the beam. coll_angle : float The collimator angle of the beam. couch_vrt : float The couch vertical position. couch_lng : float The couch longitudinal position. couch_lat : float The couch lateral position. couch_rot : float The couch rotation. mu : int The monitor units of the beam. jaw_padding_mm : float The padding to add to the X jaws. beam_name : str The name of the beam. """ # check MLC overtravel; machine may prevent delivery if exposing leaf tail x1 = min(strip_positions_mm) - jaw_padding_mm x2 = max(strip_positions_mm) + jaw_padding_mm max_dist_to_jaw = max( max(abs(pos - x1), abs(pos + x2)) for pos in strip_positions_mm ) if max_dist_to_jaw > self.max_overtravel_mm: raise ValueError( "Picket fence beam exceeds MLC overtravel limits. Lower padding, the number of pickets, or the picket spacing." ) mlc = self._create_mlc() # create initial starting point; start under the jaws mlc.add_strip( position_mm=strip_positions_mm[0] + 2, strip_width_mm=strip_width_mm, meterset_at_target=0, ) for strip in strip_positions_mm: # starting control point mlc.add_strip( position_mm=strip, strip_width_mm=strip_width_mm, meterset_at_target=1 / len(strip_positions_mm), ) beam = Beam( plan_dataset=self.ds, beam_name=beam_name, beam_type=BeamType.DYNAMIC, energy=energy, dose_rate=dose_rate, x1=x1, x2=x2, y1=y1, y2=y2, gantry_angles=gantry_angle, gantry_direction=GantryDirection.NONE, coll_angle=coll_angle, couch_vrt=couch_vrt, couch_lat=couch_lat, couch_lng=couch_lng, couch_rot=couch_rot, mlc_positions=mlc.as_control_points(), metersets=mlc.as_metersets(), fluence_mode=fluence_mode, mlc_boundaries=self.leaf_config, machine_name=self.machine_name, ) self.add_beam(beam.as_dicom(), mu=mu)
[docs] def add_dose_rate_beams( self, dose_rates: tuple[int] = (100, 300, 500, 600), default_dose_rate: int = 600, gantry_angle: float = 0, desired_mu: int = 50, energy: float = 6, fluence_mode: FluenceMode = FluenceMode.STANDARD, coll_angle: float = 0, couch_vrt: float = 0, couch_lat: float = 0, couch_lng: float = 1000, couch_rot: float = 0, jaw_padding_mm: float = 5, roi_size_mm: float = 25, y1: float = -100, y2: float = 100, ): """Create a single-image dose rate test. Multiple ROIs are generated. A reference beam is also created where all ROIs are delivered at the default dose rate for comparison. The field names are generated automatically based on the min and max dose rates tested. Parameters ---------- dose_rates : tuple The dose rates to test in MU/min. Each dose rate will have its own ROI. default_dose_rate : int The default dose rate. Typically, this is the clinical default. The reference beam will be delivered at this dose rate for all ROIs. gantry_angle : float The gantry angle of the beam. desired_mu : int The desired monitor units to deliver. It can be that based on the dose rates asked for, the MU required might be higher than this value. energy : float The energy of the beam. fluence_mode : FluenceMode The fluence mode of the beam. coll_angle : float The collimator angle of the beam. couch_vrt : float The couch vertical position. couch_lat : float The couch lateral position. couch_lng : float The couch longitudinal position. couch_rot : float The couch rotation. jaw_padding_mm : float The padding to add to the X jaws. The X-jaws will close around the ROIs plus this padding. roi_size_mm : float The width of the ROIs in mm. y1 : float The bottom jaw position. Usually negative. More negative is lower. y2 : float The top jaw position. Usually positive. More positive is higher. """ if roi_size_mm * len(dose_rates) > self.max_overtravel_mm: raise ValueError( "The ROI size * number of dose rates must be less than the overall MLC allowable width" ) # calculate MU mlc_transition_time = roi_size_mm / self.max_mlc_speed min_mu = mlc_transition_time * max(dose_rates) * len(dose_rates) / 60 mu = max(desired_mu, math.ceil(min_mu)) # create MLC sacrifices times_to_transition = [ mu * 60 / (dose_rate * len(dose_rates)) for dose_rate in dose_rates ] sacrificial_movements = [tt * self.max_mlc_speed for tt in times_to_transition] mlc = self._create_mlc() ref_mlc = self._create_mlc() roi_centers = np.linspace( -roi_size_mm * len(dose_rates) / 2 + roi_size_mm / 2, roi_size_mm * len(dose_rates) / 2 - roi_size_mm / 2, len(dose_rates), ) # we have a starting and ending strip ref_mlc.add_strip( position_mm=roi_centers[0] - roi_size_mm / 2, strip_width_mm=0, meterset_at_target=0, ) mlc.add_strip( position_mm=roi_centers[0] - roi_size_mm / 2, strip_width_mm=0, meterset_at_target=0, initial_sacrificial_gap_mm=5, ) for sacrifice_distance, center in zip(sacrificial_movements, roi_centers): ref_mlc.add_rectangle( left_position=center - roi_size_mm / 2, right_position=center + roi_size_mm / 2, x_outfield_position=-200, top_position=max(self.leaf_config), bottom_position=min(self.leaf_config), outer_strip_width=5, meterset_at_target=0, meterset_transition=0.5 / len(dose_rates), sacrificial_distance=0, ) ref_mlc.add_strip( position_mm=center + roi_size_mm / 2, strip_width_mm=0, meterset_at_target=0, meterset_transition=0.5 / len(dose_rates), sacrificial_distance_mm=0, ) mlc.add_rectangle( left_position=center - roi_size_mm / 2, right_position=center + roi_size_mm / 2, x_outfield_position=-200, # not used top_position=max(self.leaf_config), bottom_position=min(self.leaf_config), outer_strip_width=5, # not used meterset_at_target=0, meterset_transition=0.5 / len(dose_rates), sacrificial_distance=sacrifice_distance, ) mlc.add_strip( position_mm=center + roi_size_mm / 2, strip_width_mm=0, meterset_at_target=0, meterset_transition=0.5 / len(dose_rates), sacrificial_distance_mm=sacrifice_distance, ) ref_beam = Beam( plan_dataset=self.ds, beam_name="DR Ref", beam_type=BeamType.DYNAMIC, energy=energy, dose_rate=default_dose_rate, x1=roi_centers[0] - roi_size_mm / 2 - jaw_padding_mm, x2=roi_centers[-1] + roi_size_mm / 2 + jaw_padding_mm, y1=y1, y2=y2, gantry_angles=gantry_angle, gantry_direction=GantryDirection.NONE, coll_angle=coll_angle, couch_vrt=couch_vrt, couch_lat=couch_lat, couch_lng=couch_lng, couch_rot=couch_rot, machine_name=self.machine_name, fluence_mode=fluence_mode, mlc_positions=ref_mlc.as_control_points(), metersets=ref_mlc.as_metersets(), mlc_boundaries=self.leaf_config, ) self.add_beam(ref_beam.as_dicom(), mu=mu) beam = Beam( plan_dataset=self.ds, beam_name=f"DR{min(dose_rates)}-{max(dose_rates)}", beam_type=BeamType.DYNAMIC, energy=energy, dose_rate=default_dose_rate, x1=roi_centers[0] - roi_size_mm / 2 - jaw_padding_mm, x2=roi_centers[-1] + roi_size_mm / 2 + jaw_padding_mm, y1=y1, y2=y2, gantry_angles=gantry_angle, gantry_direction=GantryDirection.NONE, coll_angle=coll_angle, couch_vrt=couch_vrt, couch_lat=couch_lat, couch_lng=couch_lng, couch_rot=couch_rot, machine_name=self.machine_name, fluence_mode=fluence_mode, mlc_positions=mlc.as_control_points(), metersets=mlc.as_metersets(), mlc_boundaries=self.leaf_config, ) self.add_beam(beam.as_dicom(), mu=mu)
[docs] def add_mlc_speed_beams( self, speeds: tuple[float] = (5, 10, 15, 20), roi_size_mm: float = 20, mu: int = 50, default_dose_rate: int = 600, gantry_angle: float = 0, energy: float = 6, coll_angle: float = 0, couch_vrt: float = 0, couch_lat: float = 0, couch_lng: float = 1000, couch_rot: float = 0, fluence_mode: FluenceMode = FluenceMode.STANDARD, jaw_padding_mm: float = 5, y1: float = -100, y2: float = 100, beam_name: str = "MLC Speed", ): """Create a single-image MLC speed test. Multiple speeds are generated. A reference beam is also generated. The reference beam is delivered at the maximum MLC speed. Parameters ---------- speeds : tuple[float] The speeds to test in mm/s. Each speed will have its own ROI. roi_size_mm : float The width of the ROIs in mm. mu : int The monitor units to deliver. default_dose_rate : int The dose rate used for the reference beam. gantry_angle : float The gantry angle of the beam. energy : int The energy of the beam. coll_angle : float The collimator angle of the beam. couch_vrt : float The couch vertical position. couch_lat : float The couch lateral position. couch_lng : float The couch longitudinal position. couch_rot : float The couch rotation. fluence_mode : FluenceMode The fluence mode of the beam. jaw_padding_mm : float The padding to add to the X jaws. The X-jaws will close around the ROIs plus this padding. y1 : float The bottom jaw position. Usually negative. More negative is lower. y2 : float The top jaw position. Usually positive. More positive is higher. beam_name : str The name of the beam. The reference beam will be called "MLC Sp Ref". Notes ----- The desired speed can be achieved through the following formula: speed = roi_size_mm * max dose rate / MU * 60 We solve for MU with the desired speed. The 60 is for converting the dose rate as MU/min to MU/sec. Thus, MU = roi_size_mm * max dose rate / speed * 60 MUs are calculated automatically based on the speed and the ROI size. """ if max(speeds) > self.max_mlc_speed: raise ValueError( f"Maximum speed given {max(speeds)} is greater than the maximum MLC speed {self.max_mlc_speed}" ) if min(speeds) <= 0: raise ValueError("Speeds must be greater than 0") if roi_size_mm * len(speeds) > self.max_overtravel_mm: raise ValueError( "The ROI size * number of speeds must be less than the overall MLC allowable width" ) # create MLC positions times_to_transition = [roi_size_mm / speed for speed in speeds] sacrificial_movements = [tt * self.max_mlc_speed for tt in times_to_transition] mlc = self._create_mlc() ref_mlc = self._create_mlc() roi_centers = np.linspace( -roi_size_mm * len(speeds) / 2 + roi_size_mm / 2, roi_size_mm * len(speeds) / 2 - roi_size_mm / 2, len(speeds), ) # we have a starting and ending strip ref_mlc.add_strip( position_mm=roi_centers[0] - roi_size_mm / 2, strip_width_mm=0, meterset_at_target=0, ) mlc.add_strip( position_mm=roi_centers[0] - roi_size_mm / 2, strip_width_mm=0, meterset_at_target=0, initial_sacrificial_gap_mm=5, ) for sacrifice_distance, center in zip(sacrificial_movements, roi_centers): ref_mlc.add_rectangle( left_position=center - roi_size_mm / 2, right_position=center + roi_size_mm / 2, x_outfield_position=-200, # not relevant top_position=max(self.leaf_config), bottom_position=min(self.leaf_config), outer_strip_width=5, # not relevant meterset_at_target=0, meterset_transition=0.5 / len(speeds), sacrificial_distance=0, ) ref_mlc.add_strip( position_mm=center + roi_size_mm / 2, strip_width_mm=0, meterset_at_target=0, meterset_transition=0.5 / len(speeds), sacrificial_distance_mm=0, ) mlc.add_rectangle( left_position=center - roi_size_mm / 2, right_position=center + roi_size_mm / 2, x_outfield_position=-200, # not used top_position=max(self.leaf_config), bottom_position=min(self.leaf_config), outer_strip_width=5, # not used meterset_at_target=0, meterset_transition=0.5 / len(speeds), sacrificial_distance=sacrifice_distance, ) mlc.add_strip( position_mm=center + roi_size_mm / 2, strip_width_mm=0, meterset_at_target=0, meterset_transition=0.5 / len(speeds), sacrificial_distance_mm=sacrifice_distance, ) ref_beam = Beam( plan_dataset=self.ds, beam_name="MLC Sp Ref", beam_type=BeamType.DYNAMIC, energy=energy, dose_rate=default_dose_rate, x1=roi_centers[0] - roi_size_mm / 2 - jaw_padding_mm, x2=roi_centers[-1] + roi_size_mm / 2 + jaw_padding_mm, y1=y1, y2=y2, gantry_angles=gantry_angle, gantry_direction=GantryDirection.NONE, coll_angle=coll_angle, couch_vrt=couch_vrt, couch_lat=couch_lat, couch_lng=couch_lng, couch_rot=couch_rot, machine_name=self.machine_name, fluence_mode=fluence_mode, mlc_positions=ref_mlc.as_control_points(), metersets=ref_mlc.as_metersets(), mlc_boundaries=self.leaf_config, ) self.add_beam(ref_beam.as_dicom(), mu=mu) beam = Beam( plan_dataset=self.ds, beam_name=beam_name, beam_type=BeamType.DYNAMIC, energy=energy, dose_rate=default_dose_rate, x1=roi_centers[0] - roi_size_mm / 2 - jaw_padding_mm, x2=roi_centers[-1] + roi_size_mm / 2 + jaw_padding_mm, y1=y1, y2=y2, gantry_angles=gantry_angle, gantry_direction=GantryDirection.NONE, coll_angle=coll_angle, couch_vrt=couch_vrt, couch_lat=couch_lat, couch_lng=couch_lng, couch_rot=couch_rot, machine_name=self.machine_name, fluence_mode=fluence_mode, mlc_positions=mlc.as_control_points(), metersets=mlc.as_metersets(), mlc_boundaries=self.leaf_config, ) self.add_beam(beam.as_dicom(), mu=mu)
[docs] def add_winston_lutz_beams( self, x1: float = -10, x2: float = 10, y1: float = -10, y2: float = 10, defined_by_mlcs: bool = True, energy: float = 6, fluence_mode: FluenceMode = FluenceMode.STANDARD, dose_rate: int = 600, axes_positions: Iterable[dict] = ({"gantry": 0, "collimator": 0, "couch": 0},), couch_vrt: float = 0, couch_lng: float = 1000, couch_lat: float = 0, mu: int = 10, padding_mm: float = 5, ): """Add Winston-Lutz beams to the plan. Will create a beam for each set of axes positions. Field names are generated automatically based on the axes positions. Parameters ---------- x1 : float The left jaw position. x2 : float The right jaw position. y1 : float The bottom jaw position. y2 : float The top jaw position. defined_by_mlcs : bool Whether the field edges are defined by the MLCs or the jaws. energy : float The energy of the beam. fluence_mode : FluenceMode The fluence mode of the beam. dose_rate : int The dose rate of the beam. axes_positions : Iterable[dict] The positions of the axes. Each dict should have keys 'gantry', 'collimator', and 'couch'. couch_vrt : float The couch vertical position. couch_lng : float The couch longitudinal position. couch_lat : float The couch lateral position. mu : int The monitor units of the beam. padding_mm : float The padding to add. If defined by the MLCs, this is the padding of the jaws. If defined by the jaws, this is the padding of the MLCs. """ for axes in axes_positions: if defined_by_mlcs: mlc_padding = 0 jaw_padding = padding_mm else: mlc_padding = padding_mm jaw_padding = 0 mlc = self._create_mlc() mlc.add_rectangle( left_position=x1 - mlc_padding, right_position=x2 + mlc_padding, top_position=y2 + mlc_padding, bottom_position=y1 - mlc_padding, outer_strip_width=5, meterset_at_target=1.0, x_outfield_position=x1 - mlc_padding - jaw_padding - 20, ) beam = Beam( plan_dataset=self.ds, beam_name=f"G{axes['gantry']:g}C{axes['collimator']:g}P{axes['couch']:g}", beam_type=BeamType.DYNAMIC, energy=energy, dose_rate=dose_rate, x1=x1 - jaw_padding, x2=x2 + jaw_padding, y1=y1 - jaw_padding, y2=y2 + jaw_padding, gantry_angles=axes["gantry"], gantry_direction=GantryDirection.NONE, coll_angle=axes["collimator"], couch_vrt=couch_vrt, couch_lat=couch_lat, couch_lng=couch_lng, couch_rot=0, mlc_positions=mlc.as_control_points(), metersets=mlc.as_metersets(), fluence_mode=fluence_mode, mlc_boundaries=self.leaf_config, machine_name=self.machine_name, ) self.add_beam(beam.as_dicom(), mu=mu)
[docs] def add_gantry_speed_beams( self, speeds: tuple = (2, 3, 4, 4.8), max_dose_rate: int = 600, start_gantry_angle: float = 179, energy: float = 6, fluence_mode: FluenceMode = FluenceMode.STANDARD, coll_angle: float = 0, couch_vrt: float = 0, couch_lat: float = 0, couch_lng: float = 1000, couch_rot: float = 0, beam_name: str = "GS", gantry_rot_dir: GantryDirection = GantryDirection.CLOCKWISE, jaw_padding_mm: float = 5, roi_size_mm: float = 30, y1: float = -100, y2: float = 100, mu: int = 120, ): """Create a single-image gantry speed test. Multiple speeds are generated. A reference beam is also generated. The reference beam is delivered without gantry movement. Parameters ---------- speeds : tuple The gantry speeds to test. Each speed will have its own ROI. max_dose_rate : int The max dose rate clinically allowed for the energy. start_gantry_angle : float The starting gantry angle. The gantry will rotate around this point. It is up to the user to know what the machine's limitations are. (i.e. don't go through 180 for Varian machines). The ending gantry angle will be the starting angle + the sum of the gantry deltas generated by the speed ROIs. Slower speeds require more gantry angle to reach the same MU. energy : float The energy of the beam. fluence_mode : FluenceMode The fluence mode of the beam. coll_angle : float The collimator angle of the beam. couch_vrt : float The couch vertical position. couch_lat : float The couch lateral position. couch_lng : float The couch longitudinal position. couch_rot : float The couch rotation. beam_name : str The name of the beam. gantry_rot_dir : GantryDirection The direction of gantry rotation. jaw_padding_mm : float The padding to add to the X jaws. The X-jaws will close around the ROIs plus this padding. roi_size_mm : float The width of the ROIs in mm. y1 : float The bottom jaw position. Usually negative. More negative is lower. y2 : float The top jaw position. Usually positive. More positive is higher. mu : int The monitor units of the beam. Notes ----- The gantry angle to cover can be determined via the following: gantry speed = gantry_range * max_dose_rate / (MU * 60) We can thus solve for the gantry range: gantry_range = gantry_speed * MU * 60 / max_dose_rate """ if max(speeds) > self.max_gantry_speed: raise ValueError( f"Maximum speed given {max(speeds)} is greater than the maximum gantry speed {self.max_gantry_speed}" ) if roi_size_mm * len(speeds) > self.max_overtravel_mm: raise ValueError( "The ROI size * number of speeds must be less than the overall MLC allowable width" ) # determine sacrifices and gantry angles gantry_deltas = [speed * mu * 60 / max_dose_rate for speed in speeds] gantry_sign = -1 if gantry_rot_dir == GantryDirection.CLOCKWISE else 1 g_angles_uncorrected = [start_gantry_angle] + ( start_gantry_angle + gantry_sign * np.cumsum(gantry_deltas) ).tolist() gantry_angles = [wrap360(angle) for angle in g_angles_uncorrected] if sum(gantry_deltas) >= 360: raise ValueError( "Gantry travel is >360 degrees. Lower the beam MU, use fewer speeds, or decrease the desired gantry speeds" ) mlc = self._create_mlc() ref_mlc = self._create_mlc() roi_centers = np.linspace( -roi_size_mm * len(speeds) / 2 + roi_size_mm / 2, roi_size_mm * len(speeds) / 2 - roi_size_mm / 2, len(speeds), ) # we have a starting and ending strip ref_mlc.add_strip( position_mm=roi_centers[0], strip_width_mm=roi_size_mm, meterset_at_target=0, ) mlc.add_strip( position_mm=roi_centers[0], strip_width_mm=roi_size_mm, meterset_at_target=0, ) for center, gantry_angle in zip(roi_centers, gantry_angles): ref_mlc.add_strip( position_mm=center, strip_width_mm=roi_size_mm, meterset_at_target=0, meterset_transition=1 / len(speeds), ) mlc.add_strip( position_mm=center, strip_width_mm=roi_size_mm, meterset_at_target=0, meterset_transition=1 / len(speeds), ) beam = Beam( plan_dataset=self.ds, beam_name=beam_name, beam_type=BeamType.DYNAMIC, energy=energy, dose_rate=max_dose_rate, x1=min(roi_centers) - roi_size_mm - jaw_padding_mm, x2=max(roi_centers) + roi_size_mm + jaw_padding_mm, y1=y1, y2=y2, gantry_angles=gantry_angles, gantry_direction=gantry_rot_dir, coll_angle=coll_angle, couch_vrt=couch_vrt, couch_lat=couch_lat, couch_lng=couch_lng, couch_rot=couch_rot, machine_name=self.machine_name, fluence_mode=fluence_mode, mlc_positions=mlc.as_control_points(), metersets=mlc.as_metersets(), mlc_boundaries=self.leaf_config, ) self.add_beam(beam.as_dicom(), mu=mu) ref_beam = Beam( plan_dataset=self.ds, beam_name="G Sp Ref", beam_type=BeamType.DYNAMIC, energy=energy, dose_rate=max_dose_rate, x1=min(roi_centers) - roi_size_mm - jaw_padding_mm, x2=max(roi_centers) + roi_size_mm + jaw_padding_mm, y1=y1, y2=y2, gantry_angles=gantry_angles[-1], gantry_direction=GantryDirection.NONE, coll_angle=coll_angle, couch_vrt=couch_vrt, couch_lat=couch_lat, couch_lng=couch_lng, couch_rot=couch_rot, machine_name=self.machine_name, fluence_mode=fluence_mode, mlc_positions=ref_mlc.as_control_points(), metersets=ref_mlc.as_metersets(), mlc_boundaries=self.leaf_config, ) self.add_beam(ref_beam.as_dicom(), mu=mu)
[docs] def add_open_field_beam( self, x1: float, x2: float, y1: float, y2: float, defined_by_mlcs: bool = True, energy: float = 6, fluence_mode: FluenceMode = FluenceMode.STANDARD, dose_rate: int = 600, gantry_angle: float = 0, coll_angle: float = 0, couch_vrt: float = 0, couch_lng: float = 1000, couch_lat: float = 0, couch_rot: float = 0, mu: int = 200, padding_mm: float = 5, beam_name: str = "Open", outside_strip_width_mm: float = 5, ): """Add an open field beam to the plan. Parameters ---------- x1 : float The left jaw position. x2 : float The right jaw position. y1 : float The bottom jaw position. y2 : float The top jaw position. defined_by_mlcs : bool Whether the field edges are defined by the MLCs or the jaws. energy : float The energy of the beam. fluence_mode : FluenceMode The fluence mode of the beam. dose_rate : int The dose rate of the beam. gantry_angle : float The gantry angle of the beam. coll_angle : float The collimator angle of the beam. couch_vrt : float The couch vertical position. couch_lng : float The couch longitudinal position. couch_lat : float The couch lateral position. couch_rot : float The couch rotation. mu : int The monitor units of the beam. padding_mm : float The padding to add to the jaws or MLCs. beam_name : str The name of the beam. outside_strip_width_mm : float The width of the strip of MLCs outside the field. The MLCs will be placed to the left, under the X1 jaw by ~2cm. """ if defined_by_mlcs: mlc_padding = 0 jaw_padding = padding_mm else: mlc_padding = padding_mm jaw_padding = 0 mlc = self._create_mlc() mlc.add_rectangle( left_position=x1 - mlc_padding, right_position=x2 + mlc_padding, top_position=y2 + mlc_padding, bottom_position=y1 - mlc_padding, outer_strip_width=outside_strip_width_mm, x_outfield_position=x1 - mlc_padding - jaw_padding - 20, meterset_at_target=1.0, ) beam = Beam( plan_dataset=self.ds, beam_name=beam_name, beam_type=BeamType.DYNAMIC, energy=energy, dose_rate=dose_rate, x1=x1 - jaw_padding, x2=x2 + jaw_padding, y1=y1 - jaw_padding, y2=y2 + jaw_padding, gantry_angles=gantry_angle, gantry_direction=GantryDirection.NONE, coll_angle=coll_angle, couch_vrt=couch_vrt, couch_lat=couch_lat, couch_lng=couch_lng, couch_rot=couch_rot, mlc_positions=mlc.as_control_points(), metersets=mlc.as_metersets(), fluence_mode=fluence_mode, mlc_boundaries=self.leaf_config, machine_name=self.machine_name, ) self.add_beam(beam.as_dicom(), mu=mu)
[docs] def to_file(self, filename: str | Path) -> None: """Write the DICOM dataset to file""" self.ds.save_as(filename, write_like_original=False)
[docs] def as_dicom(self) -> Dataset: """Return the new DICOM dataset.""" return self.ds
[docs] def plot_fluences( self, width_mm: float = 400, resolution_mm: float = 0.5, dtype: np.dtype = np.uint16, ) -> list[Figure]: """Plot the fluences of the beams generated See Also -------- :func:`~pydicom_planar.PlanarImage.plot_fluences` """ return plot_fluences(self.as_dicom(), width_mm, resolution_mm, dtype, show=True)
def _update_references(self, mu: float) -> None: """Update the other sequences that reference the beam sequence.""" referenced_beam = Dataset() referenced_beam.BeamDose = "1.0" referenced_beam.BeamMeterset = str(mu) referenced_beam.ReferencedDoseReferenceUID = self.ds.DoseReferenceSequence[ 0 ].DoseReferenceUID self.ds.FractionGroupSequence[0].ReferencedBeamSequence.append(referenced_beam) referenced_beam.ReferencedBeamNumber = ( len(self.ds.FractionGroupSequence[0].ReferencedBeamSequence) - 1 ) # increment number of beams self.ds.FractionGroupSequence[0].NumberOfBeams = ( int(self.ds.FractionGroupSequence[0].NumberOfBeams) + 1 )