Source code for pylinac.core.image

"""This module holds classes for image loading and manipulation."""
from __future__ import annotations

import copy
import io
import json
import math
import os
import os.path as osp
import re
import warnings
from collections import Counter
from datetime import datetime
from functools import cached_property
from io import BufferedReader, BytesIO
from pathlib import Path
from typing import Any, BinaryIO, Iterable, Sequence, Union

import argue
import matplotlib.pyplot as plt
import numpy as np
import pydicom
import scipy.ndimage as spf
from PIL import Image as pImage
from PIL.PngImagePlugin import PngInfo
from PIL.TiffTags import TAGS
from pydicom.dataset import Dataset, FileMetaDataset
from pydicom.errors import InvalidDicomError
from pydicom.uid import UID, generate_uid
from scipy import ndimage
from skimage.draw import disk
from skimage.transform import rotate

from ..metrics.image import MetricBase
from ..settings import PATH_TRUNCATION_LENGTH, get_dicom_cmap
from .array_utils import (
    bit_invert,
    convert_to_dtype,
    filter,
    get_dtype_info,
    ground,
    invert,
    normalize,
)
from .geometry import Point
from .io import (
    TemporaryZipDirectory,
    get_url,
    is_dicom_image,
    retrieve_dicom_file,
    retrieve_filenames,
)
from .profile import stretch as stretcharray
from .scale import MachineScale, convert, wrap360
from .utilities import decode_binary, is_close, simple_round

ARRAY = "Array"
DICOM = "DICOM"
IMAGE = "Image"

FILE_TYPE = "file"
STREAM_TYPE = "stream"

XIM_PROP_INT = 0
XIM_PROP_DOUBLE = 1
XIM_PROP_STRING = 2
XIM_PROP_DOUBLE_ARRAY = 4
XIM_PROP_INT_ARRAY = 5

MM_PER_INCH = 25.4

ImageLike = Union["DicomImage", "ArrayImage", "FileImage", "LinacDicomImage"]


[docs] def equate_images(image1: ImageLike, image2: ImageLike) -> tuple[ImageLike, ImageLike]: """Crop and resize two images to make them: * The same pixel dimensions * The same DPI The usefulness of the function comes when trying to compare images from different sources. The best example is calculating gamma on a machine log fluence and EPID image. The physical and pixel dimensions must be normalized, the SID normalized Parameters ---------- image1 : {:class:`~pylinac.core.image.ArrayImage`, :class:`~pylinac.core.image.DicomImage`, :class:`~pylinac.core.image.FileImage`} Must have DPI and SID. image2 : {:class:`~pylinac.core.image.ArrayImage`, :class:`~pylinac.core.image.DicomImage`, :class:`~pylinac.core.image.FileImage`} Must have DPI and SID. Returns ------- image1 : :class:`~pylinac.core.image.ArrayImage` The first image equated. image2 : :class:`~pylinac.core.image.ArrayImage` The second image equated. """ image1 = copy.deepcopy(image1) image2 = copy.deepcopy(image2) # crop images to be the same physical size # ...crop height physical_height_diff = image1.physical_shape[0] - image2.physical_shape[0] if physical_height_diff < 0: # image2 is bigger img = image2 else: img = image1 pixel_height_diff = abs(int(round(-physical_height_diff * img.dpmm / 2))) if pixel_height_diff > 0: img.crop(pixel_height_diff, edges=("top", "bottom")) # ...crop width physical_width_diff = image1.physical_shape[1] - image2.physical_shape[1] if physical_width_diff > 0: img = image1 else: img = image2 pixel_width_diff = abs(int(round(physical_width_diff * img.dpmm / 2))) if pixel_width_diff > 0: img.crop(pixel_width_diff, edges=("left", "right")) # resize images to be of the same shape zoom_factor = image1.shape[1] / image2.shape[1] image2_array = ndimage.interpolation.zoom(image2.as_type(float), zoom_factor) image2 = load(image2_array, dpi=image2.dpi * zoom_factor) return image1, image2
[docs] def is_image(path: str | io.BytesIO | ImageLike | np.ndarray) -> bool: """Determine whether the path is a valid image file. Returns ------- bool """ return any((_is_array(path), _is_dicom(path), _is_image_file(path), _is_xim(path)))
[docs] def retrieve_image_files(path: str) -> list[str]: """Retrieve the file names of all the valid image files in the path. Returns ------- list Contains strings pointing to valid image paths. """ return retrieve_filenames(directory=path, func=is_image)
[docs] def load(path: str | Path | ImageLike | np.ndarray | BinaryIO, **kwargs) -> ImageLike: r"""Load a DICOM image, JPG/TIF/BMP image, or numpy 2D array. Parameters ---------- path : str, file-object The path to the image file or data stream or array. kwargs See :class:`~pylinac.core.image.FileImage`, :class:`~pylinac.core.image.DicomImage`, or :class:`~pylinac.core.image.ArrayImage` for keyword arguments. Returns ------- ::class:`~pylinac.core.image.FileImage`, :class:`~pylinac.core.image.ArrayImage`, or :class:`~pylinac.core.image.DicomImage` Return type depends on input image. Examples -------- Load an image from a file and then apply a filter:: >>> from pylinac.core.image import load >>> my_image = r"C:\QA\image.tif" >>> img = load(my_image) # returns a FileImage >>> img.filter(5) Loading from an array is just like loading from a file:: >>> arr = np.arange(36).reshape(6, 6) >>> img = load(arr) # returns an ArrayImage """ if isinstance(path, BaseImage): return path if _is_array(path): return ArrayImage(path, **kwargs) elif _is_dicom(path): return DicomImage(path, **kwargs) elif _is_image_file(path): return FileImage(path, **kwargs) else: raise TypeError( f"The argument `{path}` was not found to be a valid DICOM file, Image file, or array" )
[docs] def load_url(url: str, progress_bar: bool = True, **kwargs) -> ImageLike: """Load an image from a URL. Parameters ---------- url : str A string pointing to a valid URL that points to a file. .. note:: For some images (e.g. Github), the raw binary URL must be used, not simply the basic link. progress_bar: bool Whether to display a progress bar of download status. """ filename = get_url(url, progress_bar=progress_bar) return load(filename, **kwargs)
[docs] def load_multiples( image_file_list: Sequence, method: str = "mean", stretch_each: bool = True, loader: callable = load, **kwargs, ) -> ImageLike: """Combine multiple image files into one superimposed image. Parameters ---------- image_file_list : list A list of the files to be superimposed. method : {'mean', 'max', 'sum'} A string specifying how the image values should be combined. stretch_each : bool Whether to normalize the images being combined by stretching their high/low values to the same values across images. loader: callable The function to use to load the images. If a special image subclass is used, this is how it can be passed. kwargs : Further keyword arguments are passed to the load function and stretch function. Examples -------- Load multiple images:: >>> from pylinac.core.image import load_multiples >>> paths = ['starshot1.tif', 'starshot2.tif'] >>> superimposed_img = load_multiples(paths) """ # load images img_list = [loader(path, **kwargs) for path in image_file_list] first_img = img_list[0] # check that all images are the same size and stretch if need be for img in img_list: if img.shape != first_img.shape: raise ValueError("Images were not the same shape") if stretch_each: img.array = stretcharray(img.array, fill_dtype=kwargs.get("dtype")) # stack and combine arrays new_array = np.dstack(tuple(img.array for img in img_list)) if method == "mean": combined_arr = np.mean(new_array, axis=2) elif method == "max": combined_arr = np.max(new_array, axis=2) elif method == "sum": combined_arr = np.sum(new_array, axis=2) # replace array of first object and return first_img.array = combined_arr # set the raw pixels flag; this will mark the image to be converted if we save out first_img._raw_pixels = True return first_img
def _rescale_dicom_values( unscaled_array: np.ndarray, metadata: Dataset, raw_pixels: bool ) -> np.ndarray: """Rescale the DICOM pixel values depending on the tags available. See Also -------- https://pylinac.readthedocs.io/en/latest/topics/images.html#pixel-data-inversion """ has_all_rescale_tags = ( hasattr(metadata, "RescaleSlope") and hasattr(metadata, "RescaleIntercept") and hasattr(metadata, "PixelIntensityRelationshipSign") ) has_some_rescale_tags = hasattr(metadata, "RescaleSlope") and hasattr( metadata, "RescaleIntercept" ) is_ct_storage = metadata.SOPClassUID.name == "CT Image Storage" is_mr_storage = metadata.SOPClassUID.name == "MR Image Storage" if raw_pixels: return unscaled_array elif has_all_rescale_tags: scaled_array = ( (metadata.RescaleSlope * unscaled_array) + metadata.RescaleIntercept ) * metadata.PixelIntensityRelationshipSign elif is_ct_storage or has_some_rescale_tags: scaled_array = ( metadata.RescaleSlope * unscaled_array ) + metadata.RescaleIntercept elif is_mr_storage: # signal is usually correct as-is, no inversion needed scaled_array = unscaled_array else: # invert it orig_array = unscaled_array scaled_array = -orig_array + orig_array.max() + orig_array.min() return scaled_array def _unscale_dicom_values( scaled_array: np.ndarray, metadata: Dataset, raw_pixels: bool ) -> np.ndarray: """Unscale the DICOM pixel values depending on the tags available. This is the inverse of _rescale_dicom_values; specifically, when we want to save the DICOM image we want to save the raw values back to such that when re-importing and rescaling we will get the same array. """ has_all_rescale_tags = ( hasattr(metadata, "RescaleSlope") and hasattr(metadata, "RescaleIntercept") and hasattr(metadata, "PixelIntensityRelationshipSign") ) has_some_rescale_tags = hasattr(metadata, "RescaleSlope") and hasattr( metadata, "RescaleIntercept" ) is_ct_storage = metadata.SOPClassUID.name == "CT Image Storage" is_mr_storage = metadata.SOPClassUID.name == "MR Image Storage" if raw_pixels: return scaled_array elif has_all_rescale_tags: unscaled_array = scaled_array * metadata.PixelIntensityRelationshipSign unscaled_array = ( unscaled_array - metadata.RescaleIntercept ) / metadata.RescaleSlope elif is_ct_storage or has_some_rescale_tags: unscaled_array = ( scaled_array - metadata.RescaleIntercept ) / metadata.RescaleSlope elif is_mr_storage: # signal is usually correct as-is, no inversion needed unscaled_array = scaled_array else: # invert it orig_array = scaled_array unscaled_array = -orig_array + orig_array.max() + orig_array.min() return unscaled_array def _is_dicom(path: str | Path | io.BytesIO | ImageLike | np.ndarray) -> bool: """Whether the file is a readable DICOM file via pydicom.""" return is_dicom_image(file=path) def _is_image_file(path: str | Path) -> bool: """Whether the file is a readable image file via Pillow.""" try: with pImage.open(path): return True except Exception: return False def _is_xim(path: str | Path) -> bool: """Whether the file is a readable XIM file.""" try: with open(path, "rb") as xim: format_id = decode_binary(xim, str, 8) return format_id == "VMS.XI" except Exception: return False def _is_array(obj: Any) -> bool: """Whether the object is a numpy array.""" return isinstance(obj, np.ndarray)
[docs] class BaseImage: """Base class for the Image classes. Attributes ---------- path : str The path to the image file. array : numpy.ndarray The actual image pixel array. """ array: np.ndarray path: str | Path metrics: list[MetricBase] metric_values: dict[str, Any] source: FILE_TYPE | STREAM_TYPE def __init__( self, path: str | Path | BytesIO | ImageLike | np.ndarray | BufferedReader ): """ Parameters ---------- path : str The path to the image. """ self.metrics = [] self.metric_values = {} if isinstance(path, (str, Path)) and not osp.isfile(path): raise FileExistsError( f"File `{path}` does not exist. Verify the file path name." ) elif isinstance(path, (str, Path)) and osp.isfile(path): self.path = path self.base_path = osp.basename(path) self.source = FILE_TYPE else: self.source = STREAM_TYPE path.seek(0) try: self.path = str(Path(path.name)) except AttributeError: self.path = "" @property def truncated_path( self, ) -> str: # TODO: Use textwrap or pull out into util function if self.source == FILE_TYPE: path = str(self.path) if len(path) > PATH_TRUNCATION_LENGTH: return ( path[: PATH_TRUNCATION_LENGTH // 2] + "..." + path[-PATH_TRUNCATION_LENGTH // 2 :] ) else: return path else: return "" # was from stream, no path
[docs] @classmethod def from_multiples( cls, filelist: list[str], method: str = "mean", stretch: bool = True, **kwargs, ) -> ImageLike: """Load an instance from multiple image items. See :func:`~pylinac.core.image.load_multiples`.""" return load_multiples(filelist, method, stretch, **kwargs)
@property def center(self) -> Point: """Return the center position of the image array as a Point. Even-length arrays will return the midpoint between central two indices. Odd will return the central index. """ x_center = (self.shape[1] / 2) - 0.5 y_center = (self.shape[0] / 2) - 0.5 return Point(x_center, y_center) @property def physical_shape(self) -> (float, float): """The physical size of the image in mm.""" return self.shape[0] / self.dpmm, self.shape[1] / self.dpmm
[docs] def date_created(self, format: str = "%A, %B %d, %Y") -> str: """The date the file was created. Tries DICOM data before falling back on OS timestamp. The method use one or more inputs of formatted code, where % means a placeholder and the letter the time unit of interest. For a full description of the several formatting codes see `strftime() documentation. <https://docs.python.org/3/library/datetime.html#strftime-and-strptime-format-codes>`_ Parameters ---------- format : str %A means weekday full name, %B month full name, %d day of the month as a zero-padded decimal number and %Y year with century as a decimal number. Returns ------- str The date the file was created. """ date = None try: date = datetime.strptime( self.metadata.InstanceCreationDate + str(round(float(self.metadata.InstanceCreationTime))), "%Y%m%d%H%M%S", ) date = date.strftime(format) except (AttributeError, ValueError): try: date = datetime.strptime(self.metadata.StudyDate, "%Y%m%d") date = date.strftime(format) except Exception: pass if date is None: try: date = datetime.fromtimestamp(osp.getctime(self.path)).strftime(format) except AttributeError: date = "Unknown" return date
[docs] def plot( self, ax: plt.Axes = None, show: bool = True, clear_fig: bool = False, show_metrics: bool = True, metric_kwargs: dict | None = None, **kwargs, ) -> plt.Axes: """Plot the image. Parameters ---------- ax : matplotlib.Axes instance The axis to plot the image to. If None, creates a new figure. show : bool Whether to actually show the image. Set to false when plotting multiple items. clear_fig : bool Whether to clear the prior items on the figure before plotting. show_metrics : bool Whether to show the metrics on the image. metric_kwargs : dict kwargs passed to the metric plot method. kwargs kwargs passed to plt.imshow() """ if metric_kwargs is None: metric_kwargs = {} if ax is None: fig, ax = plt.subplots() if clear_fig: plt.clf() ax.imshow(self.array, cmap=get_dicom_cmap(), **kwargs) # plot the metrics if show_metrics: for metric in self.metrics: metric.plot(axis=ax, **metric_kwargs) if show: plt.show() return ax
[docs] def plot_metrics(self, show: bool = True) -> list[plt.figure]: """Plot any additional figures from the metrics. Returns a list of figures of the metrics. These metrics are not drawn on the original image but rather are something complete separate. E.g. a profile plot or a histogram of the metric.""" figs = [] for metric in self.metrics: figs.append(metric.additional_plots()) if show: plt.show() return figs
[docs] def filter( self, size: float | int = 0.05, kind: str = "median", ) -> None: """Filter the profile in place. Parameters ---------- size : int, float Size of the median filter to apply. If a float, the size is the ratio of the length. Must be in the range 0-1. E.g. if size=0.1 for a 1000-element array, the filter will be 100 elements. If an int, the filter is the size passed. kind : {'median', 'gaussian'} The kind of filter to apply. If gaussian, *size* is the sigma value. """ self.array = filter(self.array, size=size, kind=kind)
[docs] def crop( self, pixels: int = 15, edges: tuple[str, ...] = ("top", "bottom", "left", "right"), ) -> None: """Removes pixels on all edges of the image in-place. Parameters ---------- pixels : int Number of pixels to cut off all sides of the image. edges : tuple Which edges to remove from. Can be any combination of the four edges. """ if pixels <= 0: raise ValueError("Pixels to remove must be a positive number") if "top" in edges: self.array = self.array[pixels:, :] if "bottom" in edges: self.array = self.array[:-pixels, :] if "left" in edges: self.array = self.array[:, pixels:] if "right" in edges: self.array = self.array[:, :-pixels]
[docs] def flipud(self) -> None: """Flip the image array upside down in-place. Wrapper for np.flipud()""" self.array = np.flipud(self.array)
[docs] def fliplr(self) -> None: """Flip the image array upside down in-place. Wrapper for np.fliplr()""" self.array = np.fliplr(self.array)
[docs] def invert(self) -> None: """Invert (imcomplement) the image.""" self.array = invert(self.array)
[docs] def bit_invert(self) -> None: """Invert the image bit-wise""" self.array = bit_invert(self.array)
[docs] def roll(self, direction: str = "x", amount: int = 1) -> None: """Roll the image array around in-place. Wrapper for np.roll(). Parameters ---------- direction : {'x', 'y'} The axis to roll over. amount : int The amount of elements to roll over. """ axis = 1 if direction == "x" else 0 self.array = np.roll(self.array, amount, axis=axis)
[docs] def rot90(self, n: int = 1) -> None: """Wrapper for numpy.rot90; rotate the array by 90 degrees CCW n times.""" self.array = np.rot90(self.array, n)
[docs] def rotate(self, angle: float, mode: str = "edge", *args, **kwargs): """Rotate the image counter-clockwise. Simple wrapper for scikit-image. See https://scikit-image.org/docs/stable/api/skimage.transform.html#skimage.transform.rotate. All parameters are passed to that function.""" self.array = rotate(self.array, angle, mode=mode, *args, **kwargs)
[docs] def threshold(self, threshold: float, kind: str = "high") -> None: """Apply a high- or low-pass threshold filter. Parameters ---------- threshold : int The cutoff value. kind : str If ``high`` (default), will apply a high-pass threshold. All values above the cutoff are left as-is. Remaining points are set to 0. If ``low``, will apply a low-pass threshold. """ if kind == "high": self.array = np.where(self.array >= threshold, self, 0) else: self.array = np.where(self.array <= threshold, self, 0)
[docs] def as_binary(self, threshold: int) -> ImageLike: """Return a binary (black & white) image based on the given threshold. Parameters ---------- threshold : int, float The threshold value. If the value is above or equal to the threshold it is set to 1, otherwise to 0. Returns ------- ArrayImage """ array = np.where(self.array >= threshold, 1, 0) return ArrayImage(array)
[docs] def dist2edge_min(self, point: Point | tuple) -> float: """Calculates distance from given point to the closest edge. Parameters ---------- point : geometry.Point, tuple Returns ------- float """ if isinstance(point, tuple): point = Point(point) rows = self.shape[0] cols = self.shape[1] disttoedge = np.zeros(4) disttoedge[0] = rows - point.y disttoedge[1] = cols - point.x disttoedge[2] = point.y disttoedge[3] = point.x return min(disttoedge)
[docs] def ground(self) -> float: """Ground the profile in place such that the lowest value is 0. .. note:: This will also "ground" profiles that are negative or partially-negative. For such profiles, be careful that this is the behavior you desire. Returns ------- float The amount subtracted from the image. """ min_val = self.array.min() self.array = ground(self.array) return min_val
[docs] def normalize(self, norm_val: str | float | None = None) -> None: """Normalize the profile to the given value. Parameters ---------- value : number or None If a number, normalize the array to that number. If None, normalizes to the maximum value. """ # backwards compatibility if norm_val == "max": norm_val = None self.array = normalize(self.array, value=norm_val)
[docs] def check_inversion( self, box_size: int = 20, position: (float, float) = (0.0, 0.0) ) -> None: """Check the image for inversion by sampling the 4 image corners. If the average value of the four corners is above the average pixel value, then it is very likely inverted. Parameters ---------- box_size : int The size in pixels of the corner box to detect inversion. position : 2-element sequence The location of the sampling boxes. """ row_pos = max(int(position[0] * self.array.shape[0]), 1) col_pos = max(int(position[1] * self.array.shape[1]), 1) lt_upper = self.array[ row_pos : row_pos + box_size, col_pos : col_pos + box_size ] rt_upper = self.array[ row_pos : row_pos + box_size, -col_pos - box_size : -col_pos ] lt_lower = self.array[ -row_pos - box_size : -row_pos, col_pos : col_pos + box_size ] rt_lower = self.array[ -row_pos - box_size : -row_pos, -col_pos - box_size : -col_pos ] avg = np.mean((lt_upper, lt_lower, rt_upper, rt_lower)) if avg > np.mean(self.array.flatten()): self.invert()
[docs] def check_inversion_by_histogram( self, percentiles: (float, float, float) = (5, 50, 95) ) -> bool: """Check the inversion of the image using histogram analysis. The assumption is that the image is mostly background-like values and that there is a relatively small amount of dose getting to the image (e.g. a picket fence image). This function looks at the distance from one percentile to another to determine if the image should be inverted. Parameters ---------- percentiles : 3-element tuple The 3 percentiles to compare. Default is (5, 50, 95). Recommend using (x, 50, y). To invert the other way (where pixel value is *decreasing* with dose, reverse the percentiles, e.g. (95, 50, 5). Returns ------- bool: Whether an inversion was performed. """ was_inverted = False p_low = np.percentile(self.array, percentiles[0]) p_mid = np.percentile(self.array, percentiles[1]) p_high = np.percentile(self.array, percentiles[2]) mid_to_low = abs(p_mid - p_low) mid_to_high = abs(p_mid - p_high) if mid_to_low > mid_to_high: was_inverted = True self.invert() return was_inverted
[docs] @argue.bounds(threshold=(0.0, 1.0)) def gamma( self, comparison_image: ImageLike, doseTA: float = 1, distTA: float = 1, threshold: float = 0.1, ground: bool = True, normalize: bool = True, ) -> np.ndarray: """Calculate the gamma between the current image (reference) and a comparison image. .. versionadded:: 1.2 The gamma calculation is based on `Bakai et al <http://iopscience.iop.org/0031-9155/48/21/006/>`_ eq.6, which is a quicker alternative to the standard Low gamma equation. Parameters ---------- comparison_image : {:class:`~pylinac.core.image.ArrayImage`, :class:`~pylinac.core.image.DicomImage`, or :class:`~pylinac.core.image.FileImage`} The comparison image. The image must have the same DPI/DPMM to be comparable. The size of the images must also be the same. doseTA : int, float Dose-to-agreement in percent; e.g. 2 is 2%. distTA : int, float Distance-to-agreement in mm. threshold : float The dose threshold percentage of the maximum dose, below which is not analyzed. Must be between 0 and 1. ground : bool Whether to "ground" the image values. If true, this sets both datasets to have the minimum value at 0. This can fix offset errors in the data. normalize : bool Whether to normalize the images. This sets the max value of each image to the same value. Returns ------- gamma_map : numpy.ndarray The calculated gamma map. See Also -------- :func:`~pylinac.core.image.equate_images` """ # error checking if not is_close(self.dpi, comparison_image.dpi, delta=0.1): raise AttributeError( f"The image DPIs to not match: {self.dpi:.2f} vs. {comparison_image.dpi:.2f}" ) same_x = is_close(self.shape[1], comparison_image.shape[1], delta=1.1) same_y = is_close(self.shape[0], comparison_image.shape[0], delta=1.1) if not (same_x and same_y): raise AttributeError( f"The images are not the same size: {self.shape} vs. {comparison_image.shape}" ) # set up reference and comparison images ref_img = ArrayImage(copy.copy(self.array)) ref_img.check_inversion_by_histogram() if ground: ref_img.ground() if normalize: ref_img.normalize() comp_img = ArrayImage(copy.copy(comparison_image.array)) comp_img.check_inversion_by_histogram() if ground: comp_img.ground() if normalize: comp_img.normalize() # invalidate dose values below threshold so gamma doesn't calculate over it ref_img.array[ref_img < threshold * np.max(ref_img)] = np.NaN # convert distance value from mm to pixels distTA_pixels = self.dpmm * distTA # construct image gradient using sobel filter img_x = spf.sobel(ref_img.as_type(np.float32), 1) img_y = spf.sobel(ref_img.as_type(np.float32), 0) grad_img = np.hypot(img_x, img_y) # equation: (measurement - reference) / sqrt ( doseTA^2 + distTA^2 * image_gradient^2 ) subtracted_img = np.abs(comp_img - ref_img) denominator = np.sqrt( ((doseTA / 100.0) ** 2) + ((distTA_pixels**2) * (grad_img**2)) ) gamma_map = subtracted_img / denominator return gamma_map
def as_type(self, dtype: np.dtype) -> np.ndarray: return self.array.astype(dtype)
[docs] def compute(self, metrics: list[MetricBase] | MetricBase) -> Any | dict[str, Any]: """Compute the given metrics on the image. This can be called multiple times to compute different metrics. Metrics are appended on each call. This allows for modification of the image between metric calls as well as the ability to compute different metrics on the same image that might depend on earlier metrics. Metrics are both returned and stored in the ``metrics`` attribute. The ``metrics`` attribute will store all metrics every calculated. The metrics returned are only those passed in the ``metrics`` argument. Parameters ---------- metrics : list[MetricBase] | MetricBase The metric(s) to compute. """ metric_data = {} if isinstance(metrics, MetricBase): metrics = [metrics] for metric in metrics: metric.inject_image(self) self.metrics.append(metric) value = metric.context_calculate() metric_data[metric.name] = value # TODO: use |= when 3.9 is min supported version self.metric_values.update(metric_data) if len(metrics) == 1: return metric_data[metrics[0].name] return metric_data
@property def shape(self) -> (int, int): return self.array.shape @property def size(self) -> int: return self.array.size @property def ndim(self) -> int: return self.array.ndim @property def dtype(self) -> np.dtype: return self.array.dtype def sum(self) -> float: return self.array.sum() def ravel(self) -> np.ndarray: return self.array.ravel() @property def flat(self) -> np.ndarray: return self.array.flat def __len__(self): return len(self.array) def __getitem__(self, item): return self.array[item]
[docs] class XIM(BaseImage): """A class to open, read, and/or export an .xim image, Varian's custom image format which is 99.999% PNG This had inspiration from a number of places: - https://gist.github.com/1328/7da697c71f9c4ef12e1e - https://medium.com/@duhroach/how-png-works-f1174e3cc7b7 - https://www.mathworks.com/matlabcentral/answers/419228-how-to-write-for-loop-and-execute-data - https://www.w3.org/TR/PNG-Filters.html - https://bitbucket.org/dmoderesearchtools/ximreader/src/master/ """ array: np.ndarray #: properties: dict #: def __init__(self, file_path: str | Path, read_pixels: bool = True): """ Parameters ---------- file_path The path to the file of interest. read_pixels Whether to read and parse the pixel information. Doing so is quite slow. Set this to false if, e.g., you are searching for images only via tags or doing a pre-filtering of image selection. """ super().__init__(path=file_path) with open(self.path, "rb") as xim: self.format_id = decode_binary(xim, str, 8) self.format_version = decode_binary(xim, int) self.img_width_px = decode_binary(xim, int) self.img_height_px = decode_binary(xim, int) self.bits_per_pixel = decode_binary(xim, int) self.bytes_per_pixel = decode_binary(xim, int) self.compression = decode_binary(xim, int) if not self.compression: pixel_buffer_size = decode_binary(xim, int) self.pixel_buffer = decode_binary( xim, str, num_values=pixel_buffer_size ) else: lookup_table_size = decode_binary(xim, int) self.lookup_table = decode_binary( xim, "B", num_values=lookup_table_size ) comp_pixel_buffer_size = decode_binary(xim, int) if read_pixels: lookup_keys = self._parse_lookup_table(self.lookup_table) self.array = self._parse_compressed_bytes( xim, lookup_table=lookup_keys ) else: _ = decode_binary(xim, "c", num_values=comp_pixel_buffer_size) decode_binary(xim, int) self.num_hist_bins = decode_binary(xim, int) self.histogram = decode_binary(xim, int, num_values=self.num_hist_bins) self.num_properties = decode_binary(xim, int) self.properties = {} for prop in range(self.num_properties): name_length = decode_binary(xim, int) name = decode_binary(xim, str, num_values=name_length) tipe = decode_binary(xim, int) if tipe == XIM_PROP_INT: value = decode_binary(xim, int) elif tipe == XIM_PROP_DOUBLE: value = decode_binary(xim, "d") elif tipe == XIM_PROP_STRING: num_bytes = decode_binary(xim, int) value = decode_binary(xim, str, num_values=num_bytes) elif tipe == XIM_PROP_DOUBLE_ARRAY: num_bytes = decode_binary(xim, int) value = decode_binary( xim, "d", num_values=int(num_bytes // 8) ) # doubles are 8 bytes elif tipe == XIM_PROP_INT_ARRAY: num_bytes = decode_binary(xim, int) value = decode_binary( xim, int, num_values=int(num_bytes // 4) ) # ints are 4 bytes self.properties[name] = value @staticmethod def _parse_lookup_table(lookup_table_bytes: np.ndarray) -> np.ndarray: """The lookup table doesn't follow normal structure conventions like 1, 2, or 4 byte values. They got smart and said each value is 2 bits. Yes, bits. This means each byte is actually 4 values. Python only reads things as granular as bytes. To get around this the general logic is: 1) interpret the data as integers at the single byte level 2) convert those integers back into bit representation; e.g. 115 => 01110011. Note the representation must contain the full byte. I.e. 3 => 11 does not work. 3) split the binary representation into the 2-bit representations; generates 4x the number of elements. 01110011 => (01, 11, 00, 11) 4) Convert the 2-bit representation back into integers (01, 11, 00, 11) => (1, 3, 0, 3) .. note:: This is ripe for optimization, but brevity and clarity won out. Options include bit-shifting (fastest) and numpy.packbits/unpackbits. """ table = [] extend = table.extend # prevent python having to do a lookup on each iteration for byte in lookup_table_bytes: byte_repr = f"{byte:08b}" # didn't actually check these indexes but I think they're right. extend( [ int(byte_repr[6:8], 2), int(byte_repr[4:6], 2), int(byte_repr[2:4], 2), int(byte_repr[0:2], 2), ] ) return np.asarray(table, dtype=np.int8) def _parse_compressed_bytes( self, xim: BinaryIO, lookup_table: np.ndarray ) -> np.ndarray: """Parse the compressed pixels. We have to do this pixel-by-pixel because each pixel can have a different number of bytes representing it Per the readme: 1) The first row is uncompressed 2) The first element of the second row is uncompressed 3) all other elements are represented by 1, 2, or 4 bytes of data (the annoying part) 4) The byte size of the element is given in the lookup table So, we have to read in 1, 2, or 4 bytes and convert to an integer depending on the lookup table, which tells us how many bytes to read in .. note:: Optimization can help here. A few ideas: - reading in groups of data of the same byte size. I already tried this, and I think it will work, but I couldn't get it going. - reading in rows of data where no byte change occurred in that row. Similar to above. - Using joblib or a processpool """ img_height = self.img_height_px img_width = self.img_width_px dtype = np.int8 if self.bytes_per_pixel == 1 else np.int16 compressed_array = a = np.zeros((img_height * img_width), dtype=dtype) # first row and 1st element, 2nd row is uncompressed # this SHOULD work by reading the # of bytes specified in the header but AFAICT this is just a standard int (4 bytes) compressed_array[: img_width + 1] = decode_binary( xim, int, num_values=img_width + 1 ) diffs = self._get_diffs(lookup_table, xim) for diff, idx in zip(diffs, range(img_width + 1, img_width * img_height)): # intermediate math can cause overflow errors. Use float for intermediate, then back to int left = float(a[idx - 1]) above = float(a[idx - img_width]) upper_left = float(a[idx - img_width - 1]) a[idx] = np.asarray(diff + left + above - upper_left, dtype=dtype) return a.reshape((img_height, img_width)) @staticmethod def _get_diffs(lookup_table: np.ndarray, xim: BinaryIO): """Read in all the pixel value 'diffs'. These can be 1, 2, or 4 bytes in size, so instead of just reading N pixels of M bytes which would be SOOOO easy, we have to read dynamically We optimize here by reading bytes in clumps, which is way faster than reading one at a time. Knowing that most values are single bytes with an occasional 2-byte element we read chunks that all look like (n 1-bytes and 1 2-byte) """ byte_changes = lookup_table.nonzero() byte_changes = np.insert(byte_changes, 0, -1) byte_changes = np.append(byte_changes, len(lookup_table) - 1) diffs = [5000] * ( len(lookup_table) - 1 ) # pre-allocate for speed; 5000 is just for debugging LOOKUP_CONVERSION = {0: "b", 1: "h", 2: "i"} for start, stop in zip(byte_changes[:-1], byte_changes[1:]): if stop - start > 1: vals = decode_binary(xim, "b", num_values=stop - start - 1) if not isinstance(vals, Iterable): vals = [ vals, ] diffs[start + 1 : stop] = vals if stop != byte_changes[-1]: diffs[stop] = decode_binary(xim, LOOKUP_CONVERSION[lookup_table[stop]]) return np.asarray(diffs, dtype=float) @property def dpmm(self) -> float: """The dots/mm value of the XIM images. The value appears to be in cm in the file.""" if self.properties["PixelWidth"] != self.properties["PixelHeight"]: raise ValueError( "The XIM image does not have the same pixel height and width" ) return 1 / (10 * self.properties["PixelHeight"])
[docs] def as_dicom(self) -> Dataset: """Save the XIM image as a *simplistic* DICOM file. Only meant for basic image storage/analysis. It appears that XIM images are in the Varian standard coordinate system. We convert to IEC61217 for more general compatibility. """ iec_g, iec_c, iec_p = convert( input_scale=MachineScale.VARIAN_STANDARD, output_scale=MachineScale.IEC61217, gantry=self.properties["GantryRtn"], collimator=self.properties["MVCollimatorRtn"], rotation=self.properties["CouchRtn"], ) uint_array = convert_to_dtype(self.array, np.uint16) file_meta = FileMetaDataset() # Main data elements ds = Dataset() ds.SOPClassUID = UID("1.2.840.10008.5.1.4.1.1.481.1") # RT Image ds.SOPInstanceUID = generate_uid() ds.SeriesInstanceUID = generate_uid() ds.Modality = "RTIMAGE" ds.ConversionType = "WSD" ds.PatientName = "Lutz^Test Tool" ds.PatientID = "Someone Important" ds.SamplesPerPixel = 1 ds.PhotometricInterpretation = "MONOCHROME2" ds.Rows = self.array.shape[0] ds.Columns = self.array.shape[1] ds.BitsAllocated = 16 ds.BitsStored = 16 ds.HighBit = 15 ds.PixelRepresentation = 0 ds.ImagePlanePixelSpacing = [1 / self.dpmm, 1 / self.dpmm] ds.RadiationMachineSAD = "1000.0" ds.RTImageSID = "1000" ds.PrimaryDosimeterUnit = "MU" ds.GantryAngle = f"{iec_g:.2f}" ds.BeamLimitingDeviceAngle = f"{iec_c:.2f}" ds.PatientSupportAngle = f"{iec_p:.2f}" ds.PixelData = uint_array ds.file_meta = file_meta ds.is_implicit_VR = True ds.is_little_endian = True return ds
[docs] def save_as(self, file: str | Path, format: str | None = None) -> None: """Save the image to a NORMAL format. PNG is highly suggested. Accepts any format supported by Pillow. Ironically, an equivalent PNG image (w/ metadata) is ~50% smaller than an .xim image. .. warning:: Any format other than PNG will not include the properties included in the .xim image! Parameters ---------- file The file to save the image to. E.g. my_xim.png format The format to save the image as. Uses the Pillow logic, which will infer the format if the file name has one. """ img = pImage.fromarray(self.array) # we construct the custom PNG tags; it won't be included for tiff or jpeg, etc but it won't error it either. metadata = PngInfo() for prop, value in self.properties.items(): if isinstance(value, np.ndarray): value = value.tolist() if not isinstance(value, str): value = json.dumps(value) metadata.add_text(prop, value) img.save(file, format=format, pnginfo=metadata)
[docs] class DicomImage(BaseImage): """An image from a DICOM RTImage file. Attributes ---------- metadata : pydicom Dataset The dataset of the file as returned by pydicom without pixel data. """ metadata: pydicom.FileDataset _sid: float _dpi: float _sad: float def __init__( self, path: str | Path | BytesIO | BufferedReader, *, dtype: np.dtype | None = None, dpi: float = None, sid: float = None, sad: float = 1000, raw_pixels: bool = False, ): """ Parameters ---------- path : str, file-object The path to the file or the data stream. dtype : dtype, None, optional The data type to cast the image data as. If None, will use whatever raw image format is. dpi : int, float The dots-per-inch of the image, defined at isocenter. .. note:: If a DPI tag is found in the image, that value will override the parameter, otherwise this one will be used. sid : int, float The Source-to-Image distance in mm. sad : float The Source-to-Axis distance in mm. raw_pixels : bool Whether to apply pixel intensity correction to the DICOM data. Typically, Rescale Slope, Rescale Intercept, and other tags are included and meant to be applied to the raw pixel data, which is potentially compressed. If True, no correction will be applied. This is typically used for scenarios when you want to match behavior to older or different software. """ super().__init__(path) self._sid = sid self._dpi = dpi self._sad = sad # read the file once to get just the DICOM metadata self.metadata = retrieve_dicom_file(path) self._original_dtype = self.metadata.pixel_array.dtype self._raw_pixels = raw_pixels # read a second time to get pixel data try: path.seek(0) except AttributeError: pass ds = retrieve_dicom_file(path) if dtype is not None: self.array = ds.pixel_array.astype(dtype) else: self.array = ds.pixel_array.copy() # convert values to HU or CU self.array = _rescale_dicom_values(self.array, ds, raw_pixels=raw_pixels)
[docs] @classmethod def from_dataset(cls, dataset: Dataset): """Create a DICOM image instance from a pydicom Dataset.""" stream = io.BytesIO() dataset.save_as(stream) return cls(path=stream)
[docs] def save(self, filename: str | Path) -> str | Path: """Save the image instance back out to a .dcm file. Parameters ---------- filename : str, Path The filename to save the DICOM file as. Returns ------- A string pointing to the new filename. """ unscaled_array = _unscale_dicom_values( self.array, self.metadata, self._raw_pixels ) # if we will have bit overflows, stretch instead max_is_too_high = ( unscaled_array.max() > get_dtype_info(self._original_dtype).max ) min_is_too_low = unscaled_array.min() < get_dtype_info(self._original_dtype).min if min_is_too_low or max_is_too_high: warnings.warn( "The pixel values of image were detected to be outside" f"the range of {self._original_dtype} values and will be normalized to fit the original dtype. " f"The maximum value will be the maximum value of the original datatype: ({get_dtype_info(self._original_dtype).max})." ) unscaled_array = convert_to_dtype(unscaled_array, self._original_dtype) if self._raw_pixels: # for raw pixels, often the values are wacky; convert them to a normal range # e.g. the range might be 0-1 for a float array. This will convert to the original dtype # This prevents such arrays from being converted to only 0s and 1s for an int dtype. unscaled_array = convert_to_dtype(unscaled_array, self._original_dtype) self.metadata.PixelData = unscaled_array.astype(self._original_dtype).tobytes() self.metadata.Columns = unscaled_array.shape[1] self.metadata.Rows = unscaled_array.shape[0] self.metadata.save_as(filename) return filename
@property def z_position(self) -> float: """The z-position of the slice. Relevant for CT and MR images.""" return z_position(self.metadata) @property def slice_spacing(self) -> float: """Determine the distance between slices. The spacing can be greater than the slice thickness (i.e. gaps). Uses the absolute version as it can apparently be negative: https://dicom.innolitics.com/ciods/nm-image/nm-reconstruction/00180088 This attempts to use the slice spacing attr and if it doesn't exist, use the slice thickness attr """ try: return abs(self.metadata.SpacingBetweenSlices) except AttributeError: return self.metadata.SliceThickness @property def sid(self) -> float: """The Source-to-Image in mm.""" try: return float(self.metadata.RTImageSID) except (AttributeError, ValueError, TypeError): return self._sid @property def sad(self) -> float: """The source to axis (iso) in mm""" try: return float(self.metadata.RadiationMachineSAD) except (AttributeError, ValueError, TypeError): return self._sad @property def dpi(self) -> float: """The dots-per-inch of the image, defined at isocenter.""" try: return self.dpmm * MM_PER_INCH except Exception: return self._dpi @property def dpmm(self) -> float: """The Dots-per-mm of the image, defined at isocenter. E.g. if an EPID image is taken at 150cm SID, the dpmm will scale back to 100cm.""" dpmm = None for tag in ("PixelSpacing", "ImagePlanePixelSpacing"): mmpd = self.metadata.get(tag) if mmpd is not None: dpmm = 1 / mmpd[0] break if dpmm is not None and self.sid is not None: dpmm *= self.sid / self.sad elif dpmm is None and self._dpi is not None: dpmm = self._dpi / MM_PER_INCH return dpmm @property def cax(self) -> Point: """The position of the beam central axis. If no DICOM translation tags are found then the center is returned. Uses this tag: https://dicom.innolitics.com/ciods/rt-beams-delivery-instruction/rt-beams-delivery-instruction/00741020/00741030/3002000d """ try: x = self.center.x - self.metadata.XRayImageReceptorTranslation[0] y = self.center.y - self.metadata.XRayImageReceptorTranslation[1] except (AttributeError, ValueError, TypeError): return self.center else: return Point(x, y)
[docs] class LinacDicomImage(DicomImage): """DICOM image taken on a linac. Also allows passing of gantry/coll/couch values via the filename.""" gantry_keyword = "Gantry" collimator_keyword = "Coll" couch_keyword = "Couch" _use_filenames: bool def __init__( self, path: str | Path | BinaryIO, use_filenames: bool = False, axes_precision: int | None = None, **kwargs, ): self._gantry = kwargs.pop("gantry", None) self._coll = kwargs.pop("coll", None) self._couch = kwargs.pop("couch", None) self._axes_precision = axes_precision super().__init__(path, **kwargs) self._use_filenames = use_filenames @property def gantry_angle(self) -> float: """Gantry angle of the irradiation.""" if self._gantry is not None: g = self._gantry else: g = self._get_axis_value(self.gantry_keyword, "GantryAngle") return wrap360(simple_round(g, self._axes_precision)) @property def collimator_angle(self) -> float: """Collimator angle of the irradiation.""" if self._coll is not None: c = self._coll else: c = self._get_axis_value(self.collimator_keyword, "BeamLimitingDeviceAngle") return wrap360(simple_round(c, self._axes_precision)) @property def couch_angle(self) -> float: """Couch angle of the irradiation.""" if self._couch is not None: c = self._couch else: c = self._get_axis_value(self.couch_keyword, "PatientSupportAngle") return wrap360(simple_round(c, self._axes_precision)) def _get_axis_value(self, axis_str: str, axis_dcm_attr: str) -> float: """Retrieve the value of the axis. This will first look in the file name for the value. If not in the filename then it will look in the DICOM metadata. If the value can be found in neither then a value of 0 is assumed. Parameters ---------- axis_str : str The string to look for in the filename. axis_dcm_attr : str The DICOM attribute that should contain the axis value. Returns ------- float """ axis_found = False if self._use_filenames: filename = osp.basename(self.path) # see if the keyword is in the filename keyword_in_filename = axis_str.lower() in filename.lower() # if it's not there, then assume it's zero if not keyword_in_filename: axis = 0 axis_found = True # if it is, then make sure it follows the naming convention of <axis###> else: match = re.search(rf"(?<={axis_str.lower()})\d+", filename.lower()) if match is None: raise ValueError( f"The filename contains '{axis_str}' but could not read a number following it. Use the format '...{axis_str}<#>...'" ) else: axis = float(match.group()) axis_found = True # try to interpret from DICOM data if not axis_found: try: axis = float(getattr(self.metadata, axis_dcm_attr)) except AttributeError: axis = 0 return axis
[docs] class FileImage(BaseImage): """An image from a "regular" file (.tif, .jpg, .bmp). Attributes ---------- info : dict The info dictionary as generated by Pillow. sid : float The SID value as passed in upon construction. """ def __init__( self, path: str | Path | BinaryIO, *, dpi: float | None = None, sid: float | None = None, dtype: np.dtype | None = None, ): """ Parameters ---------- path : str, file-object The path to the file or a data stream. dpi : int, float The dots-per-inch of the image, defined at isocenter. .. note:: If a DPI tag is found in the image, that value will override the parameter, otherwise this one will be used. sid : int, float The Source-to-Image distance in mm. dtype : numpy.dtype The data type to cast the array as. If None, will use the datatype stored in the file. If the file is multi-channel (e.g. RGB), it will be converted to int32 """ super().__init__(path) pil_image = pImage.open(path) # convert from multichannel if need be if len(pil_image.getbands()) > 1: pil_image = pil_image.convert( "I" ) # int32; uint16 preferred but not reliable using PIL self.info = pil_image.info try: # tiff tags self.tags = {TAGS[key]: pil_image.tag_v2[key] for key in pil_image.tag_v2} except AttributeError: pass self.array = np.array(pil_image, dtype=dtype) self._dpi = dpi self.sid = sid @property def dpi(self) -> float | None: """The dots-per-inch of the image, defined at isocenter.""" dpi = None for key in ("dpi", "resolution"): dpi = self.info.get(key) if dpi is not None: dpi = float(dpi[0]) if dpi < 3 and not self._dpi: raise ValueError( f"The DPI setting is abnormal or nonsensical. Got resolution of {dpi}. Pass in the dpi manually." ) if dpi < 3: dpi = None break if dpi is None: dpi = self._dpi if self.sid is not None and dpi is not None: dpi *= self.sid / 1000 return dpi @property def dpmm(self) -> float | None: """The Dots-per-mm of the image, defined at isocenter. E.g. if an EPID image is taken at 150cm SID, the dpmm will scale back to 100cm.""" try: return self.dpi / MM_PER_INCH except TypeError: return
[docs] class ArrayImage(BaseImage): """An image constructed solely from a numpy array.""" def __init__( self, array: np.ndarray, *, dpi: float = None, sid: float = None, dtype=None, ): """ Parameters ---------- array : numpy.ndarray The image array. dpi : int, float The dots-per-inch of the image, defined at isocenter. .. note:: If a DPI tag is found in the image, that value will override the parameter, otherwise this one will be used. sid : int, float The Source-to-Image distance in mm. dtype : dtype, None, optional The data type to cast the image data as. If None, will use whatever raw image format is. """ if dtype is not None: self.array = np.array(array, dtype=dtype) else: self.array = array self._dpi = dpi self.sid = sid self.metrics = [] self.metric_values = {} @property def dpmm(self) -> float | None: """The Dots-per-mm of the image, defined at isocenter. E.g. if an EPID image is taken at 150cm SID, the dpmm will scale back to 100cm.""" try: return self.dpi / MM_PER_INCH except Exception: return @property def dpi(self) -> float | None: """The dots-per-inch of the image, defined at isocenter.""" dpi = None if self._dpi is not None: dpi = self._dpi if self.sid is not None: dpi *= self.sid / 1000 return dpi def __sub__(self, other): return ArrayImage(self.array - other.array)
class LazyDicomImageStack: _image_path_keys: list[Path] metadatas: list[pydicom.Dataset] def __init__( self, folder: str | Path | Sequence[str | Path], dtype: np.dtype | None = None, min_number: int = 39, check_uid: bool = True, ): """Load a folder with DICOM CT images. This variant is more memory efficient than the standard DicomImageStack. This is done by loading images from disk on the fly. This assumes all images remain on disk for the lifetime of the instance. This does not need to be true for the original implementation. See the documentation for DicomImageStack for parameter descriptions. """ self.dtype = dtype paths = [] # load in images in their received order if isinstance(folder, (list, tuple)): paths = folder elif osp.isdir(folder): for pdir, sdir, files in os.walk(folder): for file in files: paths.append(osp.join(pdir, file)) # we only want to read the metadata once # so we read it here and then filter and sort metadatas, paths = self._get_path_metadatas(paths) # check that at least 1 image was loaded if len(paths) < 1: raise FileNotFoundError( f"No files were found in the specified location: {folder}" ) # error checking if check_uid: most_common_uid = self._get_common_uid_imgs(metadatas, min_number) metadatas = [m for m in metadatas if m.SeriesInstanceUID == most_common_uid] # sort according to physical order order = np.argsort([m.ImagePositionPatient[-1] for m in metadatas]) self.metadatas = [metadatas[i] for i in order] self._image_path_keys = [paths[i] for i in order] @classmethod def from_zip(cls, zip_path: str | Path, dtype: np.dtype | None = None): """Load a DICOM ZIP archive. Parameters ---------- zip_path : str Path to the ZIP archive. dtype : dtype, None, optional The data type to cast the image data as. If None, will use whatever raw image format is. """ with TemporaryZipDirectory(zip_path, delete=False) as tmpzip: obj = cls(tmpzip, dtype) return obj def _get_common_uid_imgs( self, metadata: list[pydicom.Dataset], min_number: int ) -> pydicom.DataElement: """Check that all the images are from the same study.""" most_common_uid = Counter(i.SeriesInstanceUID for i in metadata).most_common(1)[ 0 ] if most_common_uid[1] < min_number: raise ValueError( "The minimum number images from the same study were not found" ) return most_common_uid[0] def _get_path_metadatas( self, paths: list[Path] ) -> (list[pydicom.Dataset], list[Path]): """Get the metadata for the images. This also filters out non-image files.""" metadata = [] matched_paths = [] for path in paths: try: ds = pydicom.dcmread(path, force=True, stop_before_pixels=True) if "Image Storage" in ds.SOPClassUID.name: metadata.append(ds) matched_paths.append(path) except (InvalidDicomError, AttributeError, MemoryError): pass return metadata, matched_paths def side_view(self, axis: int) -> np.ndarray: """Return the side view of the stack. E.g. if axis=0, return the maximum value along the 0th axis.""" return np.stack([i for i in self], axis=-1).max(axis=axis) @cached_property def metadata(self) -> pydicom.FileDataset: """The metadata of the first image; shortcut attribute. Only attributes that are common throughout the stack should be used, otherwise the individual image metadata should be used.""" return self[0].metadata @cached_property def slice_spacing(self) -> float: """The slice spacing of the stack. Assumes all slices are equally spaced.""" return np.abs( self.metadatas[0].ImagePositionPatient[-1] - self.metadatas[1].ImagePositionPatient[-1] ) def __getitem__(self, item: int) -> DicomImage: return DicomImage(self._image_path_keys[item], dtype=self.dtype) def __setitem__(self, key: int, value: DicomImage): """Save the passed image to disk in place of the current image.""" current_path = self._image_path_keys[key] value.save(current_path) def __delitem__(self, key): """Delete the image from the stack and OS.""" current_path = self._image_path_keys[key] try: os.remove(current_path) except Exception: pass del self._image_path_keys[key] def __len__(self): return len(self._image_path_keys)
[docs] class DicomImageStack(LazyDicomImageStack): """A class that loads and holds a stack of DICOM images (e.g. a CT dataset). The class can take a folder or zip file and will read CT images. The images must all be the same size. Supports indexing to individual images. Attributes ---------- images : list Holds instances of :class:`~pylinac.core.image.DicomImage`. Can be accessed via index; i.e. self[0] == self.images[0]. Examples -------- Load a folder of Dicom images >>> from pylinac import image >>> img_folder = r"folder/qa/cbct/june" >>> dcm_stack = image.DicomImageStack(img_folder) # loads and sorts the images >>> dcm_stack.plot(3) # plot the 3rd image Load a zip archive >>> img_folder_zip = r"archive/qa/cbct/june.zip" # save space and zip your CBCTs >>> dcm_stack = image.DicomImageStack.from_zip(img_folder_zip) Load as a certain data type >>> dcm_stack_uint32 = image.DicomImageStack(img_folder, dtype=np.uint32) """ images: list[ImageLike] def __init__( self, folder: str | Path, dtype: np.dtype | None = None, min_number: int = 39, check_uid: bool = True, raw_pixels: bool = False, ): """Load a folder with DICOM CT images. Parameters ---------- folder : str Path to the folder. dtype : dtype, None, optional The data type to cast the image data as. If None, will use whatever raw image format is. """ super().__init__(folder, dtype, min_number, check_uid) self.images = [ DicomImage(path, dtype=dtype, raw_pixels=raw_pixels) for path in self._image_path_keys ]
[docs] @classmethod def from_zip(cls, zip_path: str | Path, dtype: np.dtype | None = None, **kwargs): """Load a DICOM ZIP archive. Parameters ---------- zip_path : str Path to the ZIP archive. dtype : dtype, None, optional The data type to cast the image data as. If None, will use whatever raw image format is. """ with TemporaryZipDirectory(zip_path) as tmpzip: obj = cls(tmpzip, dtype, **kwargs) return obj
[docs] def plot_3view(self): """Plot the stack in 3 views: axial, coronal, and sagittal.""" fig, axes = plt.subplots(1, 3) names = ("Coronal", "Sagittal", "Axial") for idx, (ax, name) in enumerate(zip(axes, names)): arry = self.side_view(idx) ax.imshow(arry, cmap="gray", aspect="equal") ax.set_title(name) plt.show()
def roll(self, direction: str, amount: int): for img in self.images: img.roll(direction, amount) def __getitem__(self, item) -> DicomImage: return self.images[item] def __setitem__(self, key, value: DicomImage): self.images[key] = value def __delitem__(self, key): """Delete the image from the stack.""" del self.images[key] def __len__(self): return len(self.images)
[docs] class NMImageStack: """A class of frames of a nuclear medicine image. A single image can have N frames. For our purposes, we can treat this as a stack of images.""" metadata: Dataset path: str | Path frames: list[DicomImage] def __init__(self, path: str | Path): """Load a single NM image with N frames.""" self.path = path self.frames = [] ds = pydicom.dcmread(path, force=True, stop_before_pixels=True) if ds.Modality != "NM": raise TypeError("The file is not a NM image") self.metadata = ds full_ds = pydicom.dcmread(path, force=True) for i in range(full_ds.NumberOfFrames): full_array = full_ds.pixel_array if full_array.ndim == 2: array = full_array else: array = full_array[i] img = DicomImage(self.path) img.array = array self.frames.append(img)
[docs] def as_3d_array(self) -> np.ndarray: """Return the frames as a 3D array.""" return np.stack([i.array for i in self.frames], axis=0)
def __len__(self): return len(self.frames)
[docs] def tiff_to_dicom( tiff_file: str | Path | BytesIO, sid: float, gantry: float, coll: float, couch: float, dpi: float | None = None, ) -> Dataset: """Converts a TIFF file into a **simplistic** DICOM file. Not meant to be a full-fledged tool. Used for conversion so that tools that are traditionally oriented towards DICOM have a path to accept TIFF. Currently used to convert files for WL. .. note:: This will convert the image into an uint16 datatype to match the native EPID datatype. Parameters ---------- tiff_file The TIFF file to be converted. sid The Source-to-Image distance in mm. dpi The dots-per-inch value of the TIFF image. gantry The gantry value that the image was taken at. coll The collimator value that the image was taken at. couch The couch value that the image was taken at. """ tiff_img = FileImage(tiff_file, dpi=dpi, sid=sid) if not tiff_img.dpmm: raise ValueError( "Automatic detection of `dpi` failed. A `dpi` value must be passed to the constructor." ) uint_array = convert_to_dtype(tiff_img.array, np.uint16) mm_pixel = 25.4 / tiff_img.dpi file_meta = FileMetaDataset() # Main data elements ds = Dataset() ds.SOPClassUID = UID("1.2.840.10008.5.1.4.1.1.481.1") ds.SOPInstanceUID = generate_uid() ds.SeriesInstanceUID = generate_uid() ds.Modality = "RTIMAGE" ds.ConversionType = "WSD" ds.PatientName = "Lutz^Test Tool" ds.PatientID = "Someone Important" ds.SamplesPerPixel = 1 ds.PhotometricInterpretation = "MONOCHROME2" ds.Rows = tiff_img.shape[0] ds.Columns = tiff_img.shape[1] ds.BitsAllocated = 16 ds.BitsStored = 16 ds.HighBit = 15 ds.PixelRepresentation = 0 ds.ImagePlanePixelSpacing = [mm_pixel, mm_pixel] ds.RadiationMachineSAD = "1000.0" ds.RTImageSID = sid ds.PrimaryDosimeterUnit = "MU" ds.GantryAngle = str(gantry) ds.BeamLimitingDeviceAngle = str(coll) ds.PatientSupportAngle = str(couch) ds.PixelData = uint_array ds.file_meta = file_meta ds.is_implicit_VR = True ds.is_little_endian = True return ds
[docs] def gamma_2d( reference: np.ndarray, evaluation: np.ndarray, dose_to_agreement: float = 1, distance_to_agreement: int = 1, gamma_cap_value: float = 2, global_dose: bool = True, dose_threshold: float = 5, fill_value: float = np.nan, ) -> np.ndarray: """Compute a 2D gamma of two 2D numpy arrays. This does NOT do size or spatial resolution checking. It performs an element-by-element evaluation. It is the responsibility of the caller to ensure the reference and evaluation have comparable spatial resolution. The algorithm follows Table I of D. Low's 2004 paper: Evaluation of the gamma dose distribution comparison method: https://aapm.onlinelibrary.wiley.com/doi/epdf/10.1118/1.1598711 This is similar to the gamma_1d function for profiles, except we must search a 2D grid around the reference point. Parameters ---------- reference The reference 2D array. evaluation The evaluation 2D array. dose_to_agreement The dose to agreement in %. E.g. 1 is 1% of global reference max dose. distance_to_agreement The distance to agreement in **elements**. E.g. if the value is 4 this means 4 elements from the reference point under calculation. Must be >0 gamma_cap_value The value to cap the gamma at. E.g. a gamma of 5.3 will get capped to 2. Useful for displaying data with a consistent range. global_dose Whether to evaluate the dose to agreement threshold based on the global max or the dose point under evaluation. dose_threshold The dose threshold as a number between 0 and 100 of the % of max dose under which a gamma is not calculated. This is not affected by the global/local dose normalization and the threshold value is evaluated against the global max dose, period. fill_value The value to give pixels that were not calculated because they were under the dose threshold. Default is NaN, but another option would be 0. If NaN, allows the user to calculate mean/median gamma over just the evaluated portion and not be skewed by 0's that should not be considered. """ if reference.ndim != 2 or evaluation.ndim != 2: raise ValueError( f"Reference and evaluation arrays must be 2D. Got reference: {reference.ndim} and evaluation: {evaluation.ndim}" ) threshold = reference.max() / 100 * dose_threshold # convert dose to agreement to % of global max; ignored later if local dose dose_ta = dose_to_agreement / 100 * reference.max() # pad eval array on both edges so our search does not go out of bounds eval_padded = np.pad(evaluation, distance_to_agreement, mode="edge") # iterate over each reference element, computing distance value and dose value gamma = np.zeros(reference.shape) for row_idx, row in enumerate(reference): for col_idx, ref_point in enumerate(row): # skip if below dose threshold if ref_point < threshold: gamma[row_idx, col_idx] = fill_value continue # use scikit-image to compute the indices of a disk around the reference point # we can then compute gamma over the eval points at these indices # unlike the 1D computation, we have to search at an index offset by the distance to agreement # we use DTA+1 in disk because it looks like the results are exclusive of edges. # https://scikit-image.org/docs/stable/api/skimage.draw.html#disk rs, cs = disk( (row_idx + distance_to_agreement, col_idx + distance_to_agreement), distance_to_agreement + 1, ) capital_gammas = [] for r, c in zip(rs, cs): eval_point = eval_padded[r, c] # for the distance, we compare the ref row/col to the eval padded matrix # but remember the padded array is padded by DTA, so to compare distances, we # have to cancel the offset we used for dose purposes. dist = math.dist( (row_idx, col_idx), (r - distance_to_agreement, c - distance_to_agreement), ) dose = eval_point - ref_point if not global_dose: dose_ta = dose_to_agreement / 100 * ref_point capital_gamma = math.sqrt( dist**2 / distance_to_agreement**2 + dose**2 / dose_ta**2 ) capital_gammas.append(capital_gamma) gamma[row_idx, col_idx] = min(np.nanmin(capital_gammas), gamma_cap_value) return np.asarray(gamma)
[docs] def z_position(metadata: pydicom.Dataset) -> float: """The 'z-position' of the image. Relevant for CT and MR images.""" try: return metadata.ImagePositionPatient[-1] except AttributeError: return metadata.SliceLocation