1# Copyright 2008-2018 pydicom authors. See LICENSE file for details.
2"""Read a dicom media file"""
3
4
5# Need zlib and io.BytesIO for deflate-compressed file
6from io import BytesIO
7import os
8from struct import (Struct, unpack)
9import sys
10from typing import (
11    BinaryIO, Union, Optional, List, Any, Callable, cast, MutableSequence,
12    Iterator, Dict
13)
14import warnings
15import zlib
16
17from pydicom import config
18from pydicom.charset import default_encoding, convert_encodings
19from pydicom.config import logger
20from pydicom.datadict import dictionary_VR
21from pydicom.dataelem import (
22    DataElement, RawDataElement, DataElement_from_raw, empty_value_for_VR
23)
24from pydicom.dataset import Dataset, FileDataset, FileMetaDataset
25from pydicom.dicomdir import DicomDir
26from pydicom.errors import InvalidDicomError
27from pydicom.filebase import DicomFileLike
28from pydicom.fileutil import (
29    read_undefined_length_value, path_from_pathlike, PathType, _unpack_tag
30)
31from pydicom.misc import size_in_bytes
32from pydicom.sequence import Sequence
33from pydicom.tag import (
34    ItemTag, SequenceDelimiterTag, TupleTag, Tag, BaseTag, TagListType
35)
36import pydicom.uid
37from pydicom.util.hexutil import bytes2hex
38from pydicom.valuerep import extra_length_VRs
39
40
41def data_element_generator(
42    fp: BinaryIO,
43    is_implicit_VR: bool,
44    is_little_endian: bool,
45    stop_when: Optional[Callable[[BaseTag, Optional[str], int], bool]] = None,
46    defer_size: Optional[Union[int, str, float]] = None,
47    encoding: Union[str, MutableSequence[str]] = default_encoding,
48    specific_tags: Optional[List[BaseTag]] = None
49) -> Iterator[Union[RawDataElement, DataElement]]:
50    """Create a generator to efficiently return the raw data elements.
51
52    .. note::
53
54        This function is used internally - usually there is no need to call it
55        from user code. To read data from a DICOM file, :func:`dcmread`
56        shall be used instead.
57
58    Parameters
59    ----------
60    fp : file-like
61        The file-like to read from.
62    is_implicit_VR : bool
63        ``True`` if the data is encoded as implicit VR, ``False`` otherwise.
64    is_little_endian : bool
65        ``True`` if the data is encoded as little endian, ``False`` otherwise.
66    stop_when : None, callable, optional
67        If ``None`` (default), then the whole file is read. A callable which
68        takes tag, VR, length, and returns ``True`` or ``False``. If it
69        returns ``True``, ``read_data_element`` will just return.
70    defer_size : int, str or float, optional
71        See :func:`dcmread` for parameter info.
72    encoding : Union[str, MutableSequence[str]]
73        Encoding scheme
74    specific_tags : list or None
75        See :func:`dcmread` for parameter info.
76
77    Yields
78    -------
79    RawDataElement or DataElement
80        Yields DataElement for undefined length UN or SQ, RawDataElement
81        otherwise.
82    """
83    # Summary of DICOM standard PS3.5-2008 chapter 7:
84    # If Implicit VR, data element is:
85    #    tag, 4-byte length, value.
86    #        The 4-byte length can be FFFFFFFF (undefined length)*
87    #
88    # If Explicit VR:
89    #    if OB, OW, OF, SQ, UN, or UT:
90    #       tag, VR, 2-bytes reserved (both zero), 4-byte length, value
91    #           For all but UT, the length can be FFFFFFFF (undefined length)*
92    #   else: (any other VR)
93    #       tag, VR, (2 byte length), value
94    # * for undefined length, a Sequence Delimitation Item marks the end
95    #        of the Value Field.
96    # Note, except for the special_VRs, both impl and expl VR use 8 bytes;
97    #    the special VRs follow the 8 bytes with a 4-byte length
98
99    # With a generator, state is stored, so we can break down
100    #    into the individual cases, and not have to check them again for each
101    #    data element
102    from pydicom.values import convert_string
103
104    if is_little_endian:
105        endian_chr = "<"
106    else:
107        endian_chr = ">"
108
109    # assign implicit VR struct to variable as use later if VR assumed missing
110    implicit_VR_struct = Struct(endian_chr + "HHL")
111    if is_implicit_VR:
112        element_struct = implicit_VR_struct
113    else:  # Explicit VR
114        # tag, VR, 2-byte length (or 0 if special VRs)
115        element_struct = Struct(endian_chr + "HH2sH")
116        extra_length_struct = Struct(endian_chr + "L")  # for special VRs
117        extra_length_unpack = extra_length_struct.unpack  # for lookup speed
118
119    # Make local variables so have faster lookup
120    fp_read = fp.read
121    fp_tell = fp.tell
122    logger_debug = logger.debug
123    debugging = config.debugging
124    element_struct_unpack = element_struct.unpack
125    defer_size = size_in_bytes(defer_size)
126
127    tag_set = {Tag(tag) for tag in specific_tags} if specific_tags else set()
128    has_tag_set = bool(tag_set)
129    if has_tag_set:
130        tag_set.add(Tag(0x00080005))  # Specific Character Set
131
132    while True:
133        # VR: Optional[str]
134
135        # Read tag, VR, length, get ready to read value
136        bytes_read = fp_read(8)
137        if len(bytes_read) < 8:
138            return  # at end of file
139
140        if debugging:
141            debug_msg = f"{fp.tell() - 8:08x}: {bytes2hex(bytes_read)}"
142
143        if is_implicit_VR:
144            # must reset VR each time; could have set last iteration (e.g. SQ)
145            VR = None
146            group, elem, length = element_struct_unpack(bytes_read)
147        else:  # explicit VR
148            group, elem, VR, length = element_struct_unpack(bytes_read)
149            # defend against switching to implicit VR, some writer do in SQ's
150            # issue 1067, issue 1035
151
152            if not (b'AA' <= VR <= b'ZZ') and config.assume_implicit_vr_switch:
153                # invalid VR, must be 2 cap chrs, assume implicit and continue
154                VR = None
155                group, elem, length = implicit_VR_struct.unpack(bytes_read)
156            else:
157                VR = VR.decode(default_encoding)
158                if VR in extra_length_VRs:
159                    bytes_read = fp_read(4)
160                    length = extra_length_unpack(bytes_read)[0]
161                    if debugging:
162                        debug_msg += " " + bytes2hex(bytes_read)
163
164        if debugging:
165            debug_msg = "%-47s  (%04x, %04x)" % (debug_msg, group, elem)
166            if not is_implicit_VR:
167                debug_msg += " %s " % VR
168            if length != 0xFFFFFFFF:
169                debug_msg += "Length: %d" % length
170            else:
171                debug_msg += "Length: Undefined length (FFFFFFFF)"
172            logger_debug(debug_msg)
173
174        # Positioned to read the value, but may not want to -- check stop_when
175        value_tell = fp_tell()
176        tag = TupleTag((group, elem))
177        if stop_when is not None:
178            # XXX VR may be None here!! Should stop_when just take tag?
179            if stop_when(tag, VR, length):
180                if debugging:
181                    logger_debug("Reading ended by stop_when callback. "
182                                 "Rewinding to start of data element.")
183                rewind_length = 8
184                if not is_implicit_VR and VR in extra_length_VRs:
185                    rewind_length += 4
186                fp.seek(value_tell - rewind_length)
187                return
188
189        # Reading the value
190        # First case (most common): reading a value with a defined length
191        if length != 0xFFFFFFFF:
192            # don't defer loading of Specific Character Set value as it is
193            # needed immediately to get the character encoding for other tags
194            if has_tag_set and tag not in tag_set:
195                # skip the tag if not in specific tags
196                fp.seek(fp_tell() + length)
197                continue
198
199            if (defer_size is not None and length > defer_size and
200                    tag != BaseTag(0x00080005)):
201                # Flag as deferred by setting value to None, and skip bytes
202                value = None
203                logger_debug("Defer size exceeded. "
204                             "Skipping forward to next data element.")
205                fp.seek(fp_tell() + length)
206            else:
207                value = (
208                    fp_read(length) if length > 0
209                    else cast(
210                        Optional[bytes], empty_value_for_VR(VR, raw=True)
211                    )
212                )
213                if debugging:
214                    dotdot = "..." if length > 20 else "   "
215                    displayed_value = value[:20] if value else b''
216                    logger_debug("%08x: %-34s %s %r %s" %
217                                 (value_tell, bytes2hex(displayed_value),
218                                  dotdot, displayed_value, dotdot))
219
220            # If the tag is (0008,0005) Specific Character Set, then store it
221            if tag == BaseTag(0x00080005):
222                # *Specific Character String* is b'' for empty value
223                encoding = convert_string(
224                    cast(bytes, value) or b'', is_little_endian
225                )
226                # Store the encoding value in the generator
227                # for use with future elements (SQs)
228                encoding = convert_encodings(encoding)
229
230            yield RawDataElement(tag, VR, length, value, value_tell,
231                                 is_implicit_VR, is_little_endian)
232
233        # Second case: undefined length - must seek to delimiter,
234        # unless is SQ type, in which case is easier to parse it, because
235        # undefined length SQs and items of undefined lengths can be nested
236        # and it would be error-prone to read to the correct outer delimiter
237        else:
238            # VR UN with undefined length shall be handled as SQ
239            # see PS 3.5, section 6.2.2
240            if VR == 'UN':
241                VR = 'SQ'
242            # Try to look up type to see if is a SQ
243            # if private tag, won't be able to look it up in dictionary,
244            #   in which case just ignore it and read the bytes unless it is
245            #   identified as a Sequence
246            if VR is None or VR == 'UN' and config.replace_un_with_known_vr:
247                try:
248                    VR = dictionary_VR(tag)
249                except KeyError:
250                    # Look ahead to see if it consists of items
251                    # and is thus a SQ
252                    next_tag = _unpack_tag(fp_read(4), endian_chr)
253                    # Rewind the file
254                    fp.seek(fp_tell() - 4)
255                    if next_tag == ItemTag:
256                        VR = 'SQ'
257
258            if VR == 'SQ':
259                if debugging:
260                    logger_debug(
261                        f"{fp_tell():08X}: Reading/parsing undefined length "
262                        "sequence"
263                    )
264
265                seq = read_sequence(fp, is_implicit_VR,
266                                    is_little_endian, length, encoding)
267                if has_tag_set and tag not in tag_set:
268                    continue
269
270                yield DataElement(tag, VR, seq, value_tell,
271                                  is_undefined_length=True)
272            else:
273                delimiter = SequenceDelimiterTag
274                if debugging:
275                    logger_debug("Reading undefined length data element")
276                value = read_undefined_length_value(
277                    fp, is_little_endian, delimiter, defer_size
278                )
279
280                # tags with undefined length are skipped after read
281                if has_tag_set and tag not in tag_set:
282                    continue
283
284                yield RawDataElement(tag, VR, length, value, value_tell,
285                                     is_implicit_VR, is_little_endian)
286
287
288def _is_implicit_vr(
289    fp: BinaryIO,
290    implicit_vr_is_assumed: bool,
291    is_little_endian: bool,
292    stop_when: Optional[Callable[[BaseTag, Optional[str], int], bool]],
293    is_sequence: bool
294) -> bool:
295    """Check if the real VR is explicit or implicit.
296
297    Parameters
298    ----------
299    fp : an opened file object
300    implicit_vr_is_assumed : bool
301        True if implicit VR is assumed.
302        If this does not match with the real transfer syntax, a user warning
303        will be issued.
304    is_little_endian : bool
305        True if file has little endian transfer syntax.
306        Needed to interpret the first tag.
307    stop_when : None, optional
308        Optional call_back function which can terminate reading.
309        Needed to check if the next tag still belongs to the read dataset.
310    is_sequence : bool
311        True if called for a sequence, False for a top-level dataset.
312
313    Returns
314    -------
315    True if implicit VR is used, False otherwise.
316    """
317    # sequences do not switch from implicit to explicit encoding,
318    # but they are allowed to use implicit encoding if the dataset
319    # is encoded as explicit VR
320    if is_sequence and implicit_vr_is_assumed:
321        return True
322
323    tag_bytes = fp.read(4)
324    raw_vr = fp.read(2)
325    if len(raw_vr) < 2:
326        return implicit_vr_is_assumed
327
328    # it is sufficient to check if the VR is in valid ASCII range, as it is
329    # extremely unlikely that the tag length accidentally has such a
330    # representation - this would need the first tag to be longer than 16kB
331    # (e.g. it should be > 0x4141 = 16705 bytes)
332    found_implicit = not (0x40 < raw_vr[0] < 0x5B and 0x40 < raw_vr[1] < 0x5B)
333    if found_implicit != implicit_vr_is_assumed:
334        # first check if the tag still belongs to the dataset if stop_when
335        # is given - if not, the dataset is empty and we just return
336        endian_chr = "<" if is_little_endian else ">"
337        tag = _unpack_tag(tag_bytes, endian_chr)
338        vr = raw_vr.decode(default_encoding)
339        if stop_when is not None and stop_when(tag, vr, 0):
340            return found_implicit
341
342        # sequences with undefined length can be encoded in implicit VR,
343        # see PS 3.5, section 6.2.2
344        if found_implicit and is_sequence:
345            return True
346
347        # got to the real problem - warn or raise depending on config
348        found_vr = 'implicit' if found_implicit else 'explicit'
349        expected_vr = 'implicit' if not found_implicit else 'explicit'
350        msg = f"Expected {expected_vr} VR, but found {found_vr} VR"
351        if config.enforce_valid_values:
352            raise InvalidDicomError(msg)
353
354        warnings.warn(msg + f" - using {found_vr} VR for reading", UserWarning)
355
356    return found_implicit
357
358
359def read_dataset(
360    fp: BinaryIO,
361    is_implicit_VR: bool,
362    is_little_endian: bool,
363    bytelength: Optional[int] = None,
364    stop_when: Optional[Callable[[BaseTag, Optional[str], int], bool]] = None,
365    defer_size: Optional[Union[str, int, float]] = None,
366    parent_encoding: Union[str, MutableSequence[str]] = default_encoding,
367    specific_tags: Optional[List[BaseTag]] = None,
368    at_top_level: bool = True
369) -> Dataset:
370    """Return a :class:`~pydicom.dataset.Dataset` instance containing the next
371    dataset in the file.
372
373    Parameters
374    ----------
375    fp : file-like
376        An opened file-like object.
377    is_implicit_VR : bool
378        ``True`` if file transfer syntax is implicit VR.
379    is_little_endian : bool
380        ``True`` if file has little endian transfer syntax.
381    bytelength : int, None, optional
382        ``None`` to read until end of file or ItemDeliterTag, else a fixed
383        number of bytes to read
384    stop_when : None, optional
385        Optional call_back function which can terminate reading. See help for
386        :func:`data_element_generator` for details
387    defer_size : int, str or float, optional
388        Size to avoid loading large elements in memory. See :func:`dcmread` for
389        more parameter info.
390    parent_encoding : str or List[str]
391        Optional encoding to use as a default in case (0008,0005) *Specific
392        Character Set* isn't specified.
393    specific_tags : list of BaseTag, optional
394        See :func:`dcmread` for parameter info.
395    at_top_level: bool
396        If dataset is top level (not within a sequence).
397        Used to turn off explicit VR heuristic within sequences
398
399    Returns
400    -------
401    dataset.Dataset
402        A Dataset instance.
403
404    See Also
405    --------
406    :class:`~pydicom.dataset.Dataset`
407        A collection (dictionary) of DICOM
408        :class:`~pydicom.dataelem.DataElement` instances.
409    """
410    raw_data_elements: Dict[BaseTag, Union[RawDataElement, DataElement]] = {}
411    fp_start = fp.tell()
412    is_implicit_VR = _is_implicit_vr(
413        fp, is_implicit_VR, is_little_endian, stop_when,
414        is_sequence=not at_top_level
415    )
416    fp.seek(fp_start)
417    de_gen = data_element_generator(
418        fp,
419        is_implicit_VR,
420        is_little_endian,
421        stop_when,
422        defer_size,
423        parent_encoding,
424        specific_tags,
425    )
426    try:
427        while (bytelength is None) or (fp.tell() - fp_start < bytelength):
428            raw_data_element = next(de_gen)
429            # Read data elements. Stop on some errors, but return what was read
430            tag = raw_data_element.tag
431            # Check for ItemDelimiterTag --dataset is an item in a sequence
432            if tag == BaseTag(0xFFFEE00D):
433                break
434            raw_data_elements[tag] = raw_data_element
435    except StopIteration:
436        pass
437    except EOFError as details:
438        if config.enforce_valid_values:
439            raise
440        msg = str(details) + " in file " + getattr(fp, "name", "<no filename>")
441        warnings.warn(msg, UserWarning)
442    except NotImplementedError as details:
443        logger.error(details)
444
445    ds = Dataset(raw_data_elements)
446
447    encoding: Union[str, MutableSequence[str]]
448    if 0x00080005 in raw_data_elements:
449        elem = cast(RawDataElement, raw_data_elements[BaseTag(0x00080005)])
450        char_set = cast(
451            Optional[Union[str, MutableSequence[str]]],
452            DataElement_from_raw(elem).value
453        )
454        encoding = convert_encodings(char_set)  # -> List[str]
455    else:
456        encoding = parent_encoding  # -> Union[str, MutableSequence[str]]
457
458    ds.set_original_encoding(is_implicit_VR, is_little_endian, encoding)
459    return ds
460
461
462def read_sequence(
463    fp: BinaryIO,
464    is_implicit_VR: bool,
465    is_little_endian: bool,
466    bytelength: int,
467    encoding: Union[str, MutableSequence[str]],
468    offset: int = 0
469) -> Sequence:
470    """Read and return a :class:`~pydicom.sequence.Sequence` -- i.e. a
471    :class:`list` of :class:`Datasets<pydicom.dataset.Dataset>`.
472    """
473    seq = []  # use builtin list to start for speed, convert to Sequence at end
474    is_undefined_length = False
475    if bytelength != 0:  # SQ of length 0 possible (PS 3.5-2008 7.5.1a (p.40)
476        if bytelength == 0xffffffff:
477            is_undefined_length = True
478            bytelength = 0
479
480        fp_tell = fp.tell  # for speed in loop
481        fpStart = fp_tell()
482        while (not bytelength) or (fp_tell() - fpStart < bytelength):
483            file_tell = fp.tell()
484            dataset = read_sequence_item(
485                fp, is_implicit_VR, is_little_endian, encoding, offset
486            )
487            if dataset is None:  # None is returned if hit Sequence Delimiter
488                break
489
490            dataset.file_tell = file_tell + offset
491            seq.append(dataset)
492
493    sequence = Sequence(seq)
494    sequence.is_undefined_length = is_undefined_length
495    return sequence
496
497
498def read_sequence_item(
499    fp: BinaryIO,
500    is_implicit_VR: bool,
501    is_little_endian: bool,
502    encoding: Union[str, MutableSequence[str]],
503    offset: int = 0
504) -> Optional[Dataset]:
505    """Read and return a single :class:`~pydicom.sequence.Sequence` item, i.e.
506    a :class:`~pydicom.dataset.Dataset`.
507    """
508    seq_item_tell = fp.tell() + offset
509    if is_little_endian:
510        tag_length_format = "<HHL"
511    else:
512        tag_length_format = ">HHL"
513    try:
514        bytes_read = fp.read(8)
515        group, element, length = unpack(tag_length_format, bytes_read)
516    except BaseException:
517        raise IOError(
518            f"No tag to read at file position {fp.tell() + offset:X}"
519        )
520
521    tag = (group, element)
522    if tag == SequenceDelimiterTag:  # No more items, time to stop reading
523        logger.debug(
524            f"{fp.tell() - 8 + offset:08x}: End of Sequence"
525        )
526        if length != 0:
527            logger.warning(
528                f"Expected 0x00000000 after delimiter, found 0x{length:X}, "
529                f"at position 0x{fp.tell() - 4 + offset:X}"
530            )
531        return None
532
533    if tag != ItemTag:
534        logger.warning(
535            f"Expected sequence item with tag {ItemTag} at file position "
536            f"0x{fp.tell() - 4 + offset:X}"
537        )
538    else:
539        logger.debug(
540            f"{fp.tell() - 4 + offset:08x}: {bytes2hex(bytes_read)}  "
541            "Found Item tag (start of item)"
542        )
543
544    if length == 0xFFFFFFFF:
545        ds = read_dataset(fp, is_implicit_VR, is_little_endian,
546                          bytelength=None, parent_encoding=encoding,
547                          at_top_level=False)
548        ds.is_undefined_length_sequence_item = True
549    else:
550        ds = read_dataset(fp, is_implicit_VR, is_little_endian, length,
551                          parent_encoding=encoding,
552                          at_top_level=False)
553        ds.is_undefined_length_sequence_item = False
554
555        logger.debug(f"{fp.tell() + offset:08X}: Finished sequence item")
556
557    ds.seq_item_tell = seq_item_tell
558    return ds
559
560
561def _read_command_set_elements(fp: BinaryIO) -> Dataset:
562    """Return a Dataset containing any Command Set (0000,eeee) elements
563    in `fp`.
564
565    Command Set elements are always Implicit VR Little Endian (DICOM Standard,
566    Part 7, :dcm:`Section 6.3<part07/sect_6.3.html>`). Once any Command Set
567    elements are read `fp` will be positioned at the start of the next group
568    of elements.
569
570    Parameters
571    ----------
572    fp : file-like
573        The file-like positioned at the start of any command set elements.
574
575    Returns
576    -------
577    dataset.Dataset
578        The command set elements as a Dataset instance. May be empty if no
579        command set elements are present.
580    """
581
582    def _not_group_0000(tag: BaseTag, VR: Optional[str], length: int) -> bool:
583        """Return True if the tag is not in group 0x0000, False otherwise."""
584        return tag.group != 0
585
586    return read_dataset(
587        fp,
588        is_implicit_VR=True,
589        is_little_endian=True,
590        stop_when=_not_group_0000
591    )
592
593
594def _read_file_meta_info(fp: BinaryIO) -> FileMetaDataset:
595    """Return a Dataset containing any File Meta (0002,eeee) elements in `fp`.
596
597    File Meta elements are always Explicit VR Little Endian (DICOM Standard,
598    Part 10, :dcm:`Section 7<part10/chapter_7.html>`). Once any File Meta
599    elements are read `fp` will be positioned at the start of the next group
600    of elements.
601
602    Parameters
603    ----------
604    fp : file-like
605        The file-like positioned at the start of any File Meta Information
606        group elements.
607
608    Returns
609    -------
610    dataset.Dataset
611        The File Meta elements as a Dataset instance. May be empty if no
612        File Meta are present.
613    """
614
615    def _not_group_0002(tag: BaseTag, VR: Optional[str], length: int) -> bool:
616        """Return True if the tag is not in group 0x0002, False otherwise."""
617        return tag.group != 2
618
619    start_file_meta = fp.tell()
620    file_meta = FileMetaDataset(
621        read_dataset(
622            fp, is_implicit_VR=False, is_little_endian=True,
623            stop_when=_not_group_0002
624        )
625    )
626    if not file_meta._dict:
627        return file_meta
628
629    # Test the file meta for correct interpretation by requesting the first
630    #   data element: if it fails, retry loading the file meta with an
631    #   implicit VR (issue #503)
632    try:
633        file_meta[list(file_meta.elements())[0].tag]
634    except NotImplementedError:
635        fp.seek(start_file_meta)
636        file_meta = FileMetaDataset(
637            read_dataset(
638                fp, is_implicit_VR=True, is_little_endian=True,
639                stop_when=_not_group_0002
640            )
641        )
642
643    # Log if the Group Length doesn't match actual length
644    if 'FileMetaInformationGroupLength' in file_meta:
645        # FileMetaInformationGroupLength must be 12 bytes long and its value
646        #   counts from the beginning of the next element to the end of the
647        #   file meta elements
648        actual_len = fp.tell() - (start_file_meta + 12)
649        elem_len = file_meta.FileMetaInformationGroupLength
650        if elem_len != actual_len:
651            logger.info(
652                "_read_file_meta_info: (0002,0000) 'File Meta Information "
653                "Group Length' value doesn't match the actual File Meta "
654                f"Information length ({elem_len} vs {actual_len} bytes)"
655            )
656
657    return file_meta
658
659
660def read_file_meta_info(filename: PathType) -> FileMetaDataset:
661    """Read and return the DICOM file meta information only.
662
663    This function is meant to be used in user code, for quickly going through
664    a series of files to find one which is referenced to a particular SOP,
665    without having to read the entire files.
666    """
667    with open(filename, 'rb') as fp:
668        read_preamble(fp, False)  # if no header, raise exception
669        return _read_file_meta_info(fp)
670
671
672def read_preamble(fp: BinaryIO, force: bool) -> Optional[bytes]:
673    """Return the 128-byte DICOM preamble in `fp` if present.
674
675    `fp` should be positioned at the start of the file-like. If the preamble
676    and prefix are found then after reading `fp` will be positioned at the
677    first byte after the prefix (byte offset 133). If either the preamble or
678    prefix are missing and `force` is ``True`` then after reading `fp` will be
679    positioned at the start of the file-like.
680
681    Parameters
682    ----------
683    fp : file-like object
684        The file-like to read the preamble from.
685    force : bool
686        Flag to force reading of a file even if no header is found.
687
688    Returns
689    -------
690    preamble : bytes or None
691        The 128-byte DICOM preamble will be returned if the appropriate prefix
692        ('DICM') is found at byte offset 128. Returns ``None`` if the 'DICM'
693        prefix is not found and `force` is ``True``.
694
695    Raises
696    ------
697    InvalidDicomError
698        If `force` is ``False`` and no appropriate header information found.
699
700    Notes
701    -----
702    Also reads past the 'DICM' marker. Rewinds file to the beginning if
703    no header found.
704    """
705    logger.debug("Reading File Meta Information preamble...")
706    preamble = fp.read(128)
707    if config.debugging:
708        sample = bytes2hex(preamble[:8]) + "..." + bytes2hex(preamble[-8:])
709        logger.debug(f"{fp.tell() - 128:08x}: {sample}")
710
711    logger.debug("Reading File Meta Information prefix...")
712    magic = fp.read(4)
713    if magic != b"DICM" and force:
714        logger.info(
715            "File is not conformant with the DICOM File Format: 'DICM' "
716            "prefix is missing from the File Meta Information header "
717            "or the header itself is missing. Assuming no header and "
718            "continuing."
719        )
720        fp.seek(0)
721        return None
722
723    if magic != b"DICM" and not force:
724        raise InvalidDicomError(
725            "File is missing DICOM File Meta Information header or the 'DICM' "
726            "prefix is missing from the header. Use force=True to force "
727            "reading."
728        )
729    else:
730        logger.debug(f"{fp.tell() - 4:08x}: 'DICM' prefix found")
731
732    return preamble
733
734
735def _at_pixel_data(tag: BaseTag, VR: Optional[str], length: int) -> bool:
736    pixel_data_tags = {0x7fe00010, 0x7fe00009, 0x7fe00008}
737    return tag in pixel_data_tags
738
739
740def read_partial(
741    fileobj: BinaryIO,
742    stop_when: Optional[Callable[[BaseTag, Optional[str], int], bool]] = None,
743    defer_size: Optional[Union[int, str, float]] = None,
744    force: bool = False,
745    specific_tags: Optional[List[BaseTag]] = None
746) -> Union[FileDataset, DicomDir]:
747    """Parse a DICOM file until a condition is met.
748
749    Parameters
750    ----------
751    fileobj : a file-like object
752        Note that the file will not close when the function returns.
753    stop_when :
754        Stop condition. See :func:`read_dataset` for more info.
755    defer_size : int, str or float, optional
756        See :func:`dcmread` for parameter info.
757    force : bool
758        See :func:`dcmread` for parameter info.
759    specific_tags : list or None
760        See :func:`dcmread` for parameter info.
761
762    Notes
763    -----
764    Use :func:`dcmread` unless you need to stop on some condition other than
765    reaching pixel data.
766
767    Returns
768    -------
769    dataset.FileDataset or dicomdir.DicomDir
770        The read dataset.
771
772    See Also
773    --------
774    dcmread
775        More generic file reading function.
776    """
777    # Read File Meta Information
778
779    # Read preamble (if present)
780    preamble = read_preamble(fileobj, force)
781    # Read any File Meta Information group (0002,eeee) elements (if present)
782    file_meta_dataset = _read_file_meta_info(fileobj)
783
784    # Read Dataset
785
786    # Read any Command Set group (0000,eeee) elements (if present)
787    command_set = _read_command_set_elements(fileobj)
788
789    # Check to see if there's anything left to read
790    peek = fileobj.read(1)
791    if peek != b'':
792        fileobj.seek(-1, 1)
793
794    # `filobj` should be positioned at the start of the dataset by this point.
795    # Ensure we have appropriate values for `is_implicit_VR` and
796    # `is_little_endian` before we try decoding. We assume an initial
797    # transfer syntax of implicit VR little endian and correct it as necessary
798    is_implicit_VR = True
799    is_little_endian = True
800    transfer_syntax = file_meta_dataset.get("TransferSyntaxUID")
801    if peek == b'':  # EOF
802        pass
803    elif transfer_syntax is None:  # issue 258
804        # If no TransferSyntaxUID element then we have to try and figure out
805        #   the correct values for `is_little_endian` and `is_implicit_VR`.
806        # Peek at the first 6 bytes to get the first element's tag group and
807        #   (possibly) VR
808        group, _, VR = unpack("<HH2s", fileobj.read(6))
809        fileobj.seek(-6, 1)
810
811        # Test the VR to see if it's valid, and if so then assume explicit VR
812        from pydicom.values import converters
813        VR = VR.decode(default_encoding)
814        if VR in converters.keys():
815            is_implicit_VR = False
816            # Big endian encoding can only be explicit VR
817            #   Big endian 0x0004 decoded as little endian will be 1024
818            #   Big endian 0x0100 decoded as little endian will be 1
819            # Therefore works for big endian tag groups up to 0x00FF after
820            #   which it will fail, in which case we leave it as little endian
821            #   and hope for the best (big endian is retired anyway)
822            if group >= 1024:
823                is_little_endian = False
824    elif transfer_syntax == pydicom.uid.ImplicitVRLittleEndian:
825        pass
826    elif transfer_syntax == pydicom.uid.ExplicitVRLittleEndian:
827        is_implicit_VR = False
828    elif transfer_syntax == pydicom.uid.ExplicitVRBigEndian:
829        is_implicit_VR = False
830        is_little_endian = False
831    elif transfer_syntax == pydicom.uid.DeflatedExplicitVRLittleEndian:
832        # See PS3.5 section A.5
833        # when written, the entire dataset following
834        #     the file metadata was prepared the normal way,
835        #     then "deflate" compression applied.
836        #  All that is needed here is to decompress and then
837        #     use as normal in a file-like object
838        zipped = fileobj.read()
839        # -MAX_WBITS part is from comp.lang.python answer:
840        # groups.google.com/group/comp.lang.python/msg/e95b3b38a71e6799
841        unzipped = zlib.decompress(zipped, -zlib.MAX_WBITS)
842        fileobj = BytesIO(unzipped)  # a file-like object
843        is_implicit_VR = False
844    else:
845        # Any other syntax should be Explicit VR Little Endian,
846        #   e.g. all Encapsulated (JPEG etc) are ExplVR-LE
847        #        by Standard PS 3.5-2008 A.4 (p63)
848        is_implicit_VR = False
849
850    # Try and decode the dataset
851    #   By this point we should be at the start of the dataset and have
852    #   the transfer syntax (whether read from the file meta or guessed at)
853    try:
854        dataset = read_dataset(
855            fileobj,
856            is_implicit_VR,
857            is_little_endian,
858            stop_when=stop_when,
859            defer_size=defer_size,
860            specific_tags=specific_tags,
861        )
862    except EOFError:
863        if config.enforce_valid_values:
864            raise
865        # warning already logged in read_dataset
866
867    # Add the command set elements to the dataset (if any)
868    dataset.update(command_set)
869
870    class_uid = cast(
871        pydicom.uid.UID, file_meta_dataset.get("MediaStorageSOPClassUID", None)
872    )
873    ds: Union[DicomDir, FileDataset]
874    if class_uid and class_uid.name == "Media Storage Directory Storage":
875        warnings.warn(
876            "The 'DicomDir' class is deprecated and will be removed in v3.0, "
877            "after which 'dcmread()' will return a normal 'FileDataset' "
878            "instance for 'Media Storage Directory' SOP Instances.",
879            DeprecationWarning
880        )
881        ds = DicomDir(
882            fileobj,
883            dataset,
884            preamble,
885            file_meta_dataset,
886            is_implicit_VR,
887            is_little_endian,
888        )
889    else:
890        ds = FileDataset(
891            fileobj,
892            dataset,
893            preamble,
894            file_meta_dataset,
895            is_implicit_VR,
896            is_little_endian,
897        )
898    # save the originally read transfer syntax properties in the dataset
899    ds.set_original_encoding(
900        is_implicit_VR, is_little_endian, dataset._character_set
901    )
902    return ds
903
904
905def dcmread(
906    fp: Union[PathType, BinaryIO, DicomFileLike],
907    defer_size: Optional[Union[str, int, float]] = None,
908    stop_before_pixels: bool = False,
909    force: bool = False,
910    specific_tags: Optional[TagListType] = None
911) -> Union[FileDataset, DicomDir]:
912    """Read and parse a DICOM dataset stored in the DICOM File Format.
913
914    Read a DICOM dataset stored in accordance with the :dcm:`DICOM File
915    Format <part10/chapter_7.html>`. If the dataset is not stored in
916    accordance with the File Format (i.e. the preamble and prefix are missing,
917    there are missing required Type 1 *File Meta Information Group* elements
918    or the entire *File Meta Information* is missing) then you will have to
919    set `force` to ``True``.
920
921    .. deprecated:: 2.2
922
923        Returning a :class:`~pydicom.dicomdir.DicomDir` is deprecated and
924        will be removed in v3.0. Use :class:`~pydicom.fileset.FileSet` instead.
925
926
927    Examples
928    --------
929    Read and return a dataset stored in accordance with the DICOM File Format:
930
931    >>> ds = pydicom.dcmread("CT_small.dcm")
932    >>> ds.PatientName
933
934    Read and return a dataset not in accordance with the DICOM File Format:
935
936    >>> ds = pydicom.dcmread("rtplan.dcm", force=True)
937    >>> ds.PatientName
938
939    Use within a context manager:
940
941    >>> with pydicom.dcmread("rtplan.dcm") as ds:
942    ...     ds.PatientName
943
944    Parameters
945    ----------
946    fp : str or PathLike or file-like
947        Either a file-like object, a string containing the file name or the
948        path to the file. The file-like object must have ``seek()``,
949        ``read()`` and ``tell()`` methods and the caller is responsible for
950        closing it (if required).
951    defer_size : int, str or float, optional
952        If not used then all elements are read into memory. If specified,
953        then if a data element's stored value is larger than `defer_size`, the
954        value is not read into memory until it is accessed in code. Should be
955        the number of bytes to be read as :class:`int` or as a :class:`str`
956        with units, e.g. ``'512 KB'``, ``'2 MB'``.
957    stop_before_pixels : bool, optional
958        If ``False`` (default), the full file will be read and parsed. Set
959        ``True`` to stop before reading (7FE0,0010) *Pixel Data* (and all
960        subsequent elements).
961    force : bool, optional
962        If ``False`` (default), raises an
963        :class:`~pydicom.errors.InvalidDicomError` if the file is
964        missing the *File Meta Information* header. Set to ``True`` to force
965        reading even if no *File Meta Information* header is found.
966    specific_tags : list of (int or str or 2-tuple of int), optional
967        If used the only the supplied tags will be returned. The supplied
968        elements can be tags or keywords. Note that the element (0008,0005)
969        *Specific Character Set* is always returned if present - this ensures
970        correct decoding of returned text values.
971
972    Returns
973    -------
974    FileDataset or DicomDir
975        An instance of :class:`~pydicom.dataset.FileDataset` that represents
976        a parsed DICOM file, unless the dataset is a *Media Storage Directory*
977        instance in which case it will be a
978        :class:`~pydicom.dicomdir.DicomDir`.
979
980    Raises
981    ------
982    InvalidDicomError
983        If `force` is ``False`` and the file is not a valid DICOM file.
984    TypeError
985        If `fp` is ``None`` or of an unsupported type.
986
987    See Also
988    --------
989    pydicom.dataset.FileDataset
990        Data class that is returned.
991    pydicom.filereader.read_partial
992        Only read part of a DICOM file, stopping on given conditions.
993    """
994    # Open file if not already a file object
995    caller_owns_file = True
996    fp = path_from_pathlike(fp)
997    if isinstance(fp, str):
998        # caller provided a file name; we own the file handle
999        caller_owns_file = False
1000        logger.debug("Reading file '{0}'".format(fp))
1001        fp = open(fp, 'rb')
1002    elif fp is None or not hasattr(fp, "read") or not hasattr(fp, "seek"):
1003        raise TypeError("dcmread: Expected a file path or a file-like, "
1004                        "but got " + type(fp).__name__)
1005
1006    if config.debugging:
1007        logger.debug("\n" + "-" * 80)
1008        logger.debug("Call to dcmread()")
1009        msg = ("filename:'%s', defer_size='%s', "
1010               "stop_before_pixels=%s, force=%s, specific_tags=%s")
1011        logger.debug(msg % (fp.name, defer_size, stop_before_pixels,
1012                            force, specific_tags))
1013        if caller_owns_file:
1014            logger.debug("Caller passed file object")
1015        else:
1016            logger.debug("Caller passed file name")
1017        logger.debug("-" * 80)
1018
1019    if specific_tags:
1020        specific_tags = [Tag(t) for t in specific_tags]
1021
1022    specific_tags = cast(Optional[List[BaseTag]], specific_tags)
1023
1024    # Iterate through all items and store them --include file meta if present
1025    stop_when = None
1026    if stop_before_pixels:
1027        stop_when = _at_pixel_data
1028    try:
1029        dataset = read_partial(
1030            fp,
1031            stop_when,
1032            defer_size=size_in_bytes(defer_size),
1033            force=force,
1034            specific_tags=specific_tags,
1035        )
1036    finally:
1037        if not caller_owns_file:
1038            fp.close()
1039    # XXX need to store transfer syntax etc.
1040    return dataset
1041
1042
1043def __getattr__(name: str) -> Any:
1044    if name == 'read_file':
1045        warnings.warn(
1046            "'read_file' is deprecated and will be removed in v3.0, use "
1047            "'dcmread' instead",
1048            DeprecationWarning
1049        )
1050        return globals()['dcmread']
1051
1052    raise AttributeError(f"module {__name__} has no attribute {name}")
1053
1054
1055if sys.version_info[:2] < (3, 7):
1056    read_file = dcmread
1057
1058
1059def read_dicomdir(filename: PathType = "DICOMDIR") -> DicomDir:
1060    """Read a DICOMDIR file and return a :class:`~pydicom.dicomdir.DicomDir`.
1061
1062    This is a wrapper around :func:`dcmread` which gives a default file name.
1063
1064    .. deprecated:: 2.1
1065
1066        ``read_dicomdir()`` is deprecated and will be removed in v3.0. Use
1067        :func:`~pydicom.filereader.dcmread` instead.
1068
1069    Parameters
1070    ----------
1071    filename : str, optional
1072        Full path and name to DICOMDIR file to open
1073
1074    Returns
1075    -------
1076    DicomDir
1077
1078    Raises
1079    ------
1080    InvalidDicomError
1081        Raised if filename is not a DICOMDIR file.
1082    """
1083    warnings.warn(
1084        "'read_dicomdir()' is deprecated and will be removed in v3.0, use "
1085        "'dcmread()' instead",
1086        DeprecationWarning
1087    )
1088
1089    str_or_obj = path_from_pathlike(filename)
1090    ds = dcmread(str_or_obj)
1091    if not isinstance(ds, DicomDir):
1092        raise InvalidDicomError(
1093            f"File '{filename!r}' is not a Media Storage Directory file"
1094        )
1095
1096    return ds
1097
1098
1099def data_element_offset_to_value(
1100    is_implicit_VR: bool, VR: Optional[str]
1101) -> int:
1102    """Return number of bytes from start of data element to start of value"""
1103    if is_implicit_VR:
1104        return 8  # tag of 4 plus 4-byte length
1105
1106    VR = cast(str, VR)
1107    if VR in extra_length_VRs:
1108        return 12  # tag 4 + 2 VR + 2 reserved + 4 length
1109
1110    return 8  # tag 4 + 2 VR + 2 length
1111
1112
1113def read_deferred_data_element(
1114    fileobj_type: Any,
1115    filename_or_obj: Union[PathType, BinaryIO],
1116    timestamp: Optional[float],
1117    raw_data_elem: RawDataElement
1118) -> RawDataElement:
1119    """Read the previously deferred value from the file into memory
1120    and return a raw data element.
1121
1122    .. note:
1123
1124        This is called internally by pydicom and will normally not be
1125        needed in user code.
1126
1127    Parameters
1128    ----------
1129    fileobj_type : type
1130        The type of the original file object.
1131    filename_or_obj : str or file-like
1132        The filename of the original file if one exists, or the file-like
1133        object where the data element persists.
1134    timestamp : float or None
1135        The time (as given by stat.st_mtime) the original file has been
1136        read, if not a file-like.
1137    raw_data_elem : dataelem.RawDataElement
1138        The raw data element with no value set.
1139
1140    Returns
1141    -------
1142    dataelem.RawDataElement
1143        The data element with the value set.
1144
1145    Raises
1146    ------
1147    IOError
1148        If `filename_or_obj` is ``None``.
1149    IOError
1150        If `filename_or_obj` is a filename and the corresponding file does
1151        not exist.
1152    ValueError
1153        If the VR or tag of `raw_data_elem` does not match the read value.
1154    """
1155    logger.debug("Reading deferred element %r" % str(raw_data_elem.tag))
1156    # If it wasn't read from a file, then return an error
1157    if filename_or_obj is None:
1158        raise IOError(
1159            "Deferred read -- original filename not stored. Cannot re-open"
1160        )
1161
1162    # Check that the file is the same as when originally read
1163    is_filename = isinstance(filename_or_obj, str)
1164    if isinstance(filename_or_obj, str):
1165        if not os.path.exists(filename_or_obj):
1166            raise IOError(
1167                f"Deferred read -- original file {filename_or_obj} is missing"
1168            )
1169
1170        if timestamp is not None:
1171            statinfo = os.stat(filename_or_obj)
1172            if statinfo.st_mtime != timestamp:
1173                warnings.warn(
1174                    "Deferred read warning -- file modification time has "
1175                    "changed"
1176                )
1177
1178    # Open the file, position to the right place
1179    fp = (
1180        fileobj_type(filename_or_obj, 'rb') if is_filename
1181        else filename_or_obj
1182    )
1183    is_implicit_VR = raw_data_elem.is_implicit_VR
1184    is_little_endian = raw_data_elem.is_little_endian
1185    offset = data_element_offset_to_value(is_implicit_VR, raw_data_elem.VR)
1186    # Seek back to the start of the deferred element
1187    fp.seek(raw_data_elem.value_tell - offset)
1188    elem_gen = data_element_generator(
1189        fp, is_implicit_VR, is_little_endian, defer_size=None
1190    )
1191
1192    # Read the data element and check matches what was stored before
1193    # The first element out of the iterator should be the same type as the
1194    #   the deferred element == RawDataElement
1195    elem = cast(RawDataElement, next(elem_gen))
1196    fp.close()
1197    if elem.VR != raw_data_elem.VR:
1198        raise ValueError(
1199            f"Deferred read VR {elem.VR} does not match original "
1200            f"{raw_data_elem.VR}"
1201        )
1202
1203    if elem.tag != raw_data_elem.tag:
1204        raise ValueError(
1205            f"Deferred read tag {elem.tag!r} does not match "
1206            f"original {raw_data_elem.tag!r}"
1207        )
1208
1209    # Everything is ok, now this object should act like usual DataElement
1210    return elem
1211