1# Copyright 2008-2018 pydicom authors. See LICENSE file for details. 2"""Read a dicom media file""" 3 4import os 5from struct import Struct, unpack 6from types import TracebackType 7from typing import ( 8 Iterator, Tuple, Optional, Union, Type, cast, BinaryIO, Callable 9) 10 11from pydicom.misc import size_in_bytes 12from pydicom.datadict import dictionary_VR 13from pydicom.tag import TupleTag, ItemTag 14from pydicom.uid import UID 15from pydicom.valuerep import extra_length_VRs 16 17 18extra_length_VRs_b = tuple(vr.encode('ascii') for vr in extra_length_VRs) 19ExplicitVRLittleEndian = b'1.2.840.10008.1.2.1' 20ImplicitVRLittleEndian = b'1.2.840.10008.1.2' 21DeflatedExplicitVRLittleEndian = b'1.2.840.10008.1.2.1.99' 22ExplicitVRBigEndian = b'1.2.840.10008.1.2.2' 23 24 25_ElementType = Tuple[ 26 Tuple[int, int], Optional[bytes], int, Optional[bytes], int 27] 28 29 30class dicomfile: 31 """Context-manager based DICOM file object with data element iteration""" 32 33 def __init__(self, filename: Union[str, bytes, os.PathLike]) -> None: 34 self.fobj = fobj = open(filename, "rb") 35 36 # Read the DICOM preamble, if present 37 self.preamble: Optional[bytes] = fobj.read(0x80) 38 dicom_prefix = fobj.read(4) 39 if dicom_prefix != b"DICM": 40 self.preamble = None 41 fobj.seek(0) 42 43 def __enter__(self) -> "dicomfile": 44 return self 45 46 def __exit__( 47 self, 48 exc_type: Optional[Type[BaseException]], 49 exc_val: Optional[BaseException], 50 exc_tb: Optional[TracebackType] 51 ) -> Optional[bool]: 52 self.fobj.close() 53 54 return None 55 56 def __iter__(self) -> Iterator[_ElementType]: 57 # Need the transfer_syntax later 58 tsyntax: Optional[UID] = None 59 60 # Yield the file meta info elements 61 file_meta = data_element_generator( 62 self.fobj, 63 is_implicit_VR=False, 64 is_little_endian=True, 65 stop_when=lambda group, elem: group != 2 66 ) 67 68 for elem in file_meta: 69 if elem[0] == (0x0002, 0x0010): 70 value = cast(bytes, elem[3]) 71 tsyntax = UID(value.strip(b" \0").decode('ascii')) 72 73 yield elem 74 75 # Continue to yield elements from the main data 76 if not tsyntax: 77 raise NotImplementedError("No transfer syntax in file meta info") 78 79 ds_gen = data_element_generator( 80 self.fobj, tsyntax.is_implicit_VR, tsyntax.is_little_endian 81 ) 82 for elem in ds_gen: 83 yield elem 84 85 86def data_element_generator( 87 fp: BinaryIO, 88 is_implicit_VR: bool, 89 is_little_endian: bool, 90 stop_when: Optional[Callable[[int, int], bool]] = None, 91 defer_size: Optional[Union[str, int, float]] = None, 92) -> Iterator[_ElementType]: 93 """:return: (tag, VR, length, value, value_tell, 94 is_implicit_VR, is_little_endian) 95 """ 96 endian_chr = "<" if is_little_endian else ">" 97 98 if is_implicit_VR: 99 element_struct = Struct(endian_chr + "HHL") 100 else: # Explicit VR 101 # tag, VR, 2-byte length (or 0 if special VRs) 102 element_struct = Struct(endian_chr + "HH2sH") 103 extra_length_struct = Struct(endian_chr + "L") # for special VRs 104 extra_length_unpack = extra_length_struct.unpack # for lookup speed 105 106 # Make local variables so have faster lookup 107 fp_read = fp.read 108 fp_tell = fp.tell 109 element_struct_unpack = element_struct.unpack 110 defer_size = size_in_bytes(defer_size) 111 112 while True: 113 # Read tag, VR, length, get ready to read value 114 bytes_read = fp_read(8) 115 if len(bytes_read) < 8: 116 return # at end of file 117 118 if is_implicit_VR: 119 # must reset VR each time; could have set last iteration (e.g. SQ) 120 VR = None 121 group, elem, length = element_struct_unpack(bytes_read) 122 else: # explicit VR 123 group, elem, VR, length = element_struct_unpack(bytes_read) 124 if VR in extra_length_VRs_b: 125 length = extra_length_unpack(fp_read(4))[0] 126 127 # Positioned to read the value, but may not want to -- check stop_when 128 value_tell = fp_tell() 129 if stop_when is not None: 130 if stop_when(group, elem): 131 rewind_length = 8 132 if not is_implicit_VR and VR in extra_length_VRs_b: 133 rewind_length += 4 134 fp.seek(value_tell - rewind_length) 135 136 return 137 138 # Reading the value 139 # First case (most common): reading a value with a defined length 140 if length != 0xFFFFFFFF: 141 if defer_size is not None and length > defer_size: 142 # Flag as deferred by setting value to None, and skip bytes 143 value = None 144 fp.seek(fp_tell() + length) 145 else: 146 value = fp_read(length) 147 # import pdb;pdb.set_trace() 148 yield ((group, elem), VR, length, value, value_tell) 149 150 # Second case: undefined length - must seek to delimiter, 151 # unless is SQ type, in which case is easier to parse it, because 152 # undefined length SQs and items of undefined lengths can be nested 153 # and it would be error-prone to read to the correct outer delimiter 154 else: 155 # Try to look up type to see if is a SQ 156 # if private tag, won't be able to look it up in dictionary, 157 # in which case just ignore it and read the bytes unless it is 158 # identified as a Sequence 159 if VR is None: 160 try: 161 VR = dictionary_VR((group, elem)).encode('ascii') 162 except KeyError: 163 # Look ahead to see if it consists of items and 164 # is thus a SQ 165 next_tag = TupleTag( 166 cast( 167 Tuple[int, int], 168 unpack(endian_chr + "HH", fp_read(4)), 169 ) 170 ) 171 # Rewind the file 172 fp.seek(fp_tell() - 4) 173 if next_tag == ItemTag: 174 VR = b'SQ' 175 176 if VR == b'SQ': 177 yield ((group, elem), VR, length, None, value_tell) 178 else: 179 raise NotImplementedError( 180 "This reader does not handle undefined length except for " 181 "SQ" 182 ) 183