"""I/O helper functions for pylinac."""
import os
import os.path as osp
import struct
import zipfile
from tempfile import TemporaryDirectory
from typing import Callable, List, Tuple
from urllib.error import HTTPError, URLError
from urllib.request import urlretrieve, urlopen
import numpy as np
import pydicom
from tqdm import tqdm
from pylinac.core.profile import SingleProfile
[docs]def is_dicom(file: str) -> bool:
"""Boolean specifying if file is a proper DICOM file.
This function is a pared down version of read_preamble meant for a fast return.
The file is read for a proper preamble ('DICM'), returning True if so,
and False otherwise. This is a conservative approach.
Parameters
----------
file : str
The path to the file.
See Also
--------
pydicom.filereader.read_preamble
pydicom.filereader.read_partial
"""
with open(file, 'rb') as fp:
fp.read(0x80)
prefix = fp.read(4)
return prefix == b"DICM"
[docs]def is_dicom_image(file: str) -> bool:
"""Boolean specifying if file is a proper DICOM file with a image
Parameters
----------
file : str
The path to the file.
See Also
--------
pydicom.filereader.read_preamble
pydicom.filereader.read_partial
"""
result = False
try:
img = pydicom.dcmread(file, force=True)
if 'TransferSyntaxUID' not in img.file_meta:
img.file_meta.TransferSyntaxUID = pydicom.uid.ImplicitVRLittleEndian
img.pixel_array
result = True
except (AttributeError, TypeError, KeyError, struct.error):
pass
return result
[docs]def retrieve_dicom_file(file: str) -> pydicom.FileDataset:
"""Read and return the DICOM dataset.
Parameters
----------
file : str
The path to the file.
"""
img = pydicom.dcmread(file, force=True)
if 'TransferSyntaxUID' not in img.file_meta:
img.file_meta.TransferSyntaxUID = pydicom.uid.ImplicitVRLittleEndian
return img
[docs]def is_zipfile(file: str) -> bool:
"""Wrapper function for detecting if file is a true ZIP archive"""
return zipfile.is_zipfile(file)
[docs]class TemporaryZipDirectory(TemporaryDirectory):
"""Creates a temporary directory that unpacks a ZIP archive."""
def __init__(self, zfile):
"""
Parameters
----------
zfile : str
String that points to a ZIP archive.
"""
super().__init__()
zfiles = zipfile.ZipFile(zfile)
zfiles.extractall(path=self.name)
[docs]def retrieve_filenames(directory: str, func: Callable=None, recursive: bool=True, **kwargs) -> List[str]:
"""Retrieve file names in a directory.
Parameters
----------
directory : str
The directory to walk over recursively.
func : function, None
The function that validates if the file name should be kept.
If None, no validation will be performed and all file names will be returned.
recursive : bool
Whether to search only the root directory.
kwargs
Additional arguments passed to the func parameter.
"""
filenames = []
if func is None:
func = lambda x: True
for pdir, _, files in os.walk(directory):
for file in files:
filename = osp.join(pdir, file)
if func(filename, **kwargs):
filenames.append(filename)
if not recursive:
break
return filenames
[docs]def retrieve_demo_file(url: str, force: bool = False) -> str:
"""Retrieve the demo file either by getting it from file or from a URL.
If the file is already on disk it returns the file name. If the file isn't
on disk, get the file from the URL and put it at the expected demo file location
on disk for lazy loading next time.
Parameters
----------
url : str
The suffix to the url (location within the S3 bucket) pointing to the demo file.
"""
true_url = r'https://storage.googleapis.com/pylinac_demo_files/' + url
demo_file = osp.join(osp.dirname(osp.dirname(__file__)), 'demo_files', url)
demo_dir = osp.dirname(demo_file)
if not osp.exists(demo_dir):
os.makedirs(demo_dir)
if force or not osp.isfile(demo_file):
get_url(true_url, destination=demo_file)
return demo_file
[docs]def is_url(url: str) -> bool:
"""Determine whether a given string is a valid URL.
Parameters
----------
url : str
Returns
-------
bool
"""
try:
with urlopen(url) as r:
return r.status == 200
except:
return False
[docs]def get_url(url: str, destination: str=None, progress_bar: bool=True) -> str:
"""Download a URL to a local file.
Parameters
----------
url : str
The URL to download.
destination : str, None
The destination of the file. If None is given the file is saved to a temporary directory.
progress_bar : bool
Whether to show a command-line progress bar while downloading.
Returns
-------
filename : str
The location of the downloaded file.
Notes
-----
Progress bar use/example adapted from tqdm documentation: https://github.com/tqdm/tqdm
"""
def my_hook(t):
last_b = [0]
def inner(b=1, bsize=1, tsize=None):
if tsize is not None:
t.total = tsize
if b > 0:
t.update((b - last_b[0]) * bsize)
last_b[0] = b
return inner
try:
if progress_bar:
with tqdm(unit='B', unit_scale=True, miniters=1, desc=url.split('/')[-1]) as t:
filename, _ = urlretrieve(url, filename=destination, reporthook=my_hook(t))
else:
filename, _ = urlretrieve(url, filename=destination)
except (HTTPError, URLError, ValueError) as e:
raise e
return filename
# this is easier with pandas, but I don't want that as a dependency at this point
[docs]class SNCProfiler:
"""Load a file from a Sun Nuclear Profiler device. This accepts .prs files."""
def __init__(self, path: str, detector_row: int = 106, bias_row: int = 107, calibration_row: int = 108,
data_row: int = -1, data_columns: slice = slice(5, 259)):
"""
Parameters
----------
path : str
Path to the .prs file.
detector_row
bias_row
calibration_row
data_row
data_columns
The range of columns that the data is in. Usually, there are some columns before and after the real data.
"""
with open(path, encoding='cp437') as f:
raw_data = f.read().splitlines()
self.detectors = raw_data[detector_row].split('\t')[data_columns]
self.bias = np.array(raw_data[bias_row].split('\t')[data_columns]).astype(float)
self.calibration = np.array(raw_data[calibration_row].split('\t')[data_columns]).astype(float)
self.data = np.array(raw_data[data_row].split('\t')[data_columns]).astype(float)
self.timetic = float(raw_data[bias_row].split('\t')[2])
self.integrated_dose = self.calibration * (self.data - self.bias * self.timetic)
[docs] def to_profiles(self, n_detectors_row: int = 63, **kwargs) -> Tuple[SingleProfile, SingleProfile, SingleProfile, SingleProfile]:
"""Convert the SNC data to SingleProfiles. These can be analyzed directly or passed to other modules like flat/sym.
Parameters
----------
n_detectors_row : int
The number of detectors in a given row. Note that they Y profile includes 2 extra detectors from the other 3.
"""
x_prof = SingleProfile(self.integrated_dose[:n_detectors_row], **kwargs)
y_prof = SingleProfile(self.integrated_dose[n_detectors_row:2*n_detectors_row+2], **kwargs)
pos_prof = SingleProfile(self.integrated_dose[2*n_detectors_row+2:3*n_detectors_row+2], **kwargs)
neg_prof = SingleProfile(self.integrated_dose[3*n_detectors_row+2:4*n_detectors_row+2], **kwargs)
return x_prof, y_prof, pos_prof, neg_prof