1 2from os.path import join as pjoin 3import glob 4 5import numpy as np 6 7from .. import Nifti1Image 8from .dicomwrappers import (wrapper_from_data, wrapper_from_file) 9 10 11class DicomReadError(Exception): 12 pass 13 14 15DPCS_TO_TAL = np.diag([-1, -1, 1, 1]) 16 17 18def mosaic_to_nii(dcm_data): 19 """ Get Nifti file from Siemens 20 21 Parameters 22 ---------- 23 dcm_data : ``dicom.DataSet`` 24 DICOM header / image as read by ``dicom`` package 25 26 Returns 27 ------- 28 img : ``Nifti1Image`` 29 Nifti image object 30 """ 31 dcm_w = wrapper_from_data(dcm_data) 32 if not dcm_w.is_mosaic: 33 raise DicomReadError('data does not appear to be in mosaic format') 34 data = dcm_w.get_data() 35 aff = np.dot(DPCS_TO_TAL, dcm_w.affine) 36 return Nifti1Image(data, aff) 37 38 39def read_mosaic_dwi_dir(dicom_path, globber='*.dcm', dicom_kwargs=None): 40 return read_mosaic_dir(dicom_path, 41 globber, 42 check_is_dwi=True, 43 dicom_kwargs=dicom_kwargs) 44 45 46def read_mosaic_dir(dicom_path, 47 globber='*.dcm', check_is_dwi=False, dicom_kwargs=None): 48 """ Read all Siemens mosaic DICOMs in directory, return arrays, params 49 50 Parameters 51 ---------- 52 dicom_path : str 53 path containing mosaic DICOM images 54 globber : str, optional 55 glob to apply within `dicom_path` to select DICOM files. Default 56 is ``*.dcm`` 57 check_is_dwi : bool, optional 58 If True, raises an error if we don't find DWI information in the 59 DICOM headers. 60 dicom_kwargs : None or dict 61 Extra keyword arguments to pass to the pydicom ``read_file`` function. 62 63 Returns 64 ------- 65 data : 4D array 66 data array with last dimension being acquisition. If there were N 67 acquisitions, each of shape (X, Y, Z), `data` will be shape (X, 68 Y, Z, N) 69 affine : (4,4) array 70 affine relating 3D voxel space in data to RAS world space 71 b_values : (N,) array 72 b values for each acquisition. nan if we did not find diffusion 73 information for these images. 74 unit_gradients : (N, 3) array 75 gradient directions of unit length for each acquisition. (nan, 76 nan, nan) if we did not find diffusion information. 77 """ 78 if dicom_kwargs is None: 79 dicom_kwargs = {} 80 full_globber = pjoin(dicom_path, globber) 81 filenames = sorted(glob.glob(full_globber)) 82 b_values = [] 83 gradients = [] 84 arrays = [] 85 if len(filenames) == 0: 86 raise IOError(f'Found no files with "{full_globber}"') 87 for fname in filenames: 88 dcm_w = wrapper_from_file(fname, **dicom_kwargs) 89 # Because the routine sorts by filename, it only makes sense to use 90 # this order for mosaic images. Slice by slice dicoms need more 91 # sensible sorting 92 if not dcm_w.is_mosaic: 93 raise DicomReadError('data does not appear to be in mosaic format') 94 arrays.append(dcm_w.get_data()[..., None]) 95 q = dcm_w.q_vector 96 if q is None: # probably not diffusion 97 if check_is_dwi: 98 raise DicomReadError( 99 f'Could not find diffusion information reading file "{fname}"; ' 100 'is it possible this is not a _raw_ diffusion directory? ' 101 'Could it be a processed dataset like ADC etc?') 102 b = np.nan 103 g = np.ones((3,)) + np.nan 104 else: 105 b = dcm_w.b_value 106 g = dcm_w.b_vector 107 b_values.append(b) 108 gradients.append(g) 109 affine = np.dot(DPCS_TO_TAL, dcm_w.affine) 110 return (np.concatenate(arrays, -1), 111 affine, 112 np.array(b_values), 113 np.array(gradients)) 114 115 116def slices_to_series(wrappers): 117 """ Sort sequence of slice wrappers into series 118 119 This follows the SPM model fairly closely 120 121 Parameters 122 ---------- 123 wrappers : sequence 124 sequence of ``Wrapper`` objects for sorting into volumes 125 126 Returns 127 ------- 128 series : sequence 129 sequence of sequences of wrapper objects, where each sequence is 130 wrapper objects comprising a series, sorted into slice order 131 """ 132 # first pass 133 volume_lists = [wrappers[0:1]] 134 for dw in wrappers[1:]: 135 for vol_list in volume_lists: 136 if dw.is_same_series(vol_list[0]): 137 vol_list.append(dw) 138 break 139 else: # no match in current volume lists 140 volume_lists.append([dw]) 141 print('We appear to have %d Series' % len(volume_lists)) 142 # second pass 143 out_vol_lists = [] 144 for vol_list in volume_lists: 145 if len(vol_list) > 1: 146 vol_list.sort(key=_slice_sorter) 147 zs = [s.slice_indicator for s in vol_list] 148 if len(set(zs)) < len(zs): # not unique zs 149 # third pass 150 out_vol_lists += _third_pass(vol_list) 151 continue 152 out_vol_lists.append(vol_list) 153 print('We have %d volumes after second pass' % len(out_vol_lists)) 154 # final pass check 155 for vol_list in out_vol_lists: 156 zs = [s.slice_indicator for s in vol_list] 157 diffs = np.diff(zs) 158 if not np.allclose(diffs, np.mean(diffs)): 159 raise DicomReadError('Largeish slice gaps - missing DICOMs?') 160 return out_vol_lists 161 162 163def _slice_sorter(s): 164 return s.slice_indicator 165 166 167def _instance_sorter(s): 168 return s.instance_number 169 170 171def _third_pass(wrappers): 172 """ What we do when there are not unique zs in a slice set """ 173 inos = [s.instance_number for s in wrappers] 174 msg_fmt = ('Plausibly matching slices, but where some have ' 175 'the same apparent slice location, and %s; ' 176 '- slices are probably unsortable') 177 if None in inos: 178 raise DicomReadError(msg_fmt % 'some or all slices with ' 179 'missing InstanceNumber') 180 if len(set(inos)) < len(inos): 181 raise DicomReadError(msg_fmt % 'some or all slices with ' 182 'the same InstanceNumber') 183 # sort by instance number 184 wrappers.sort(key=_instance_sorter) 185 # start loop, in which we start a new volume, each time we see a z 186 # we've seen already in the current volume 187 dw = wrappers[0] 188 these_zs = [dw.slice_indicator] 189 vol_list = [dw] 190 out_vol_lists = [vol_list] 191 for dw in wrappers[1:]: 192 z = dw.slice_indicator 193 if z not in these_zs: 194 # same volume 195 vol_list.append(dw) 196 these_zs.append(z) 197 continue 198 # new volumne 199 vol_list.sort(_slice_sorter) 200 vol_list = [dw] 201 these_zs = [z] 202 out_vol_lists.append(vol_list) 203 vol_list.sort(_slice_sorter) 204 return out_vol_lists 205