1# Copyright 2008-2018 pydicom authors. See LICENSE file for details.
2"""Read a dicom media file"""
3
4import os
5from struct import Struct, unpack
6from types import TracebackType
7from typing import (
8    Iterator, Tuple, Optional, Union, Type, cast, BinaryIO, Callable
9)
10
11from pydicom.misc import size_in_bytes
12from pydicom.datadict import dictionary_VR
13from pydicom.tag import TupleTag, ItemTag
14from pydicom.uid import UID
15from pydicom.valuerep import extra_length_VRs
16
17
18extra_length_VRs_b = tuple(vr.encode('ascii') for vr in extra_length_VRs)
19ExplicitVRLittleEndian = b'1.2.840.10008.1.2.1'
20ImplicitVRLittleEndian = b'1.2.840.10008.1.2'
21DeflatedExplicitVRLittleEndian = b'1.2.840.10008.1.2.1.99'
22ExplicitVRBigEndian = b'1.2.840.10008.1.2.2'
23
24
25_ElementType = Tuple[
26    Tuple[int, int], Optional[bytes], int, Optional[bytes], int
27]
28
29
30class dicomfile:
31    """Context-manager based DICOM file object with data element iteration"""
32
33    def __init__(self, filename: Union[str, bytes, os.PathLike]) -> None:
34        self.fobj = fobj = open(filename, "rb")
35
36        # Read the DICOM preamble, if present
37        self.preamble: Optional[bytes] = fobj.read(0x80)
38        dicom_prefix = fobj.read(4)
39        if dicom_prefix != b"DICM":
40            self.preamble = None
41            fobj.seek(0)
42
43    def __enter__(self) -> "dicomfile":
44        return self
45
46    def __exit__(
47        self,
48        exc_type: Optional[Type[BaseException]],
49        exc_val: Optional[BaseException],
50        exc_tb: Optional[TracebackType]
51    ) -> Optional[bool]:
52        self.fobj.close()
53
54        return None
55
56    def __iter__(self) -> Iterator[_ElementType]:
57        # Need the transfer_syntax later
58        tsyntax: Optional[UID] = None
59
60        # Yield the file meta info elements
61        file_meta = data_element_generator(
62            self.fobj,
63            is_implicit_VR=False,
64            is_little_endian=True,
65            stop_when=lambda group, elem: group != 2
66        )
67
68        for elem in file_meta:
69            if elem[0] == (0x0002, 0x0010):
70                value = cast(bytes, elem[3])
71                tsyntax = UID(value.strip(b" \0").decode('ascii'))
72
73            yield elem
74
75        # Continue to yield elements from the main data
76        if not tsyntax:
77            raise NotImplementedError("No transfer syntax in file meta info")
78
79        ds_gen = data_element_generator(
80            self.fobj, tsyntax.is_implicit_VR, tsyntax.is_little_endian
81        )
82        for elem in ds_gen:
83            yield elem
84
85
86def data_element_generator(
87    fp: BinaryIO,
88    is_implicit_VR: bool,
89    is_little_endian: bool,
90    stop_when: Optional[Callable[[int, int], bool]] = None,
91    defer_size: Optional[Union[str, int, float]] = None,
92) -> Iterator[_ElementType]:
93    """:return: (tag, VR, length, value, value_tell,
94                                 is_implicit_VR, is_little_endian)
95    """
96    endian_chr = "<" if is_little_endian else ">"
97
98    if is_implicit_VR:
99        element_struct = Struct(endian_chr + "HHL")
100    else:  # Explicit VR
101        # tag, VR, 2-byte length (or 0 if special VRs)
102        element_struct = Struct(endian_chr + "HH2sH")
103        extra_length_struct = Struct(endian_chr + "L")  # for special VRs
104        extra_length_unpack = extra_length_struct.unpack  # for lookup speed
105
106    # Make local variables so have faster lookup
107    fp_read = fp.read
108    fp_tell = fp.tell
109    element_struct_unpack = element_struct.unpack
110    defer_size = size_in_bytes(defer_size)
111
112    while True:
113        # Read tag, VR, length, get ready to read value
114        bytes_read = fp_read(8)
115        if len(bytes_read) < 8:
116            return  # at end of file
117
118        if is_implicit_VR:
119            # must reset VR each time; could have set last iteration (e.g. SQ)
120            VR = None
121            group, elem, length = element_struct_unpack(bytes_read)
122        else:  # explicit VR
123            group, elem, VR, length = element_struct_unpack(bytes_read)
124            if VR in extra_length_VRs_b:
125                length = extra_length_unpack(fp_read(4))[0]
126
127        # Positioned to read the value, but may not want to -- check stop_when
128        value_tell = fp_tell()
129        if stop_when is not None:
130            if stop_when(group, elem):
131                rewind_length = 8
132                if not is_implicit_VR and VR in extra_length_VRs_b:
133                    rewind_length += 4
134                fp.seek(value_tell - rewind_length)
135
136                return
137
138        # Reading the value
139        # First case (most common): reading a value with a defined length
140        if length != 0xFFFFFFFF:
141            if defer_size is not None and length > defer_size:
142                # Flag as deferred by setting value to None, and skip bytes
143                value = None
144                fp.seek(fp_tell() + length)
145            else:
146                value = fp_read(length)
147            # import pdb;pdb.set_trace()
148            yield ((group, elem), VR, length, value, value_tell)
149
150        # Second case: undefined length - must seek to delimiter,
151        # unless is SQ type, in which case is easier to parse it, because
152        # undefined length SQs and items of undefined lengths can be nested
153        # and it would be error-prone to read to the correct outer delimiter
154        else:
155            # Try to look up type to see if is a SQ
156            # if private tag, won't be able to look it up in dictionary,
157            #   in which case just ignore it and read the bytes unless it is
158            #   identified as a Sequence
159            if VR is None:
160                try:
161                    VR = dictionary_VR((group, elem)).encode('ascii')
162                except KeyError:
163                    # Look ahead to see if it consists of items and
164                    # is thus a SQ
165                    next_tag = TupleTag(
166                        cast(
167                            Tuple[int, int],
168                            unpack(endian_chr + "HH", fp_read(4)),
169                        )
170                    )
171                    # Rewind the file
172                    fp.seek(fp_tell() - 4)
173                    if next_tag == ItemTag:
174                        VR = b'SQ'
175
176            if VR == b'SQ':
177                yield ((group, elem), VR, length, None, value_tell)
178            else:
179                raise NotImplementedError(
180                    "This reader does not handle undefined length except for "
181                    "SQ"
182                )
183