1# Copyright 2008-2020 pydicom authors. See LICENSE file for details. 2"""DICOM File-set handling.""" 3 4import copy 5import os 6from pathlib import Path 7import re 8import shutil 9from tempfile import TemporaryDirectory 10from typing import ( 11 Iterator, Optional, Union, Any, List, cast, Iterable, Dict, Callable 12) 13import warnings 14 15from pydicom.charset import default_encoding 16from pydicom.datadict import tag_for_keyword, dictionary_description 17from pydicom.dataelem import DataElement 18from pydicom.dataset import Dataset, FileMetaDataset, FileDataset 19from pydicom.filebase import DicomBytesIO, DicomFileLike 20from pydicom.filereader import dcmread 21from pydicom.filewriter import ( 22 write_dataset, write_data_element, write_file_meta_info 23) 24from pydicom._storage_sopclass_uids import MediaStorageDirectoryStorage 25import pydicom._storage_sopclass_uids as sop 26from pydicom.tag import Tag, BaseTag 27from pydicom.uid import ( 28 generate_uid, UID, ExplicitVRLittleEndian, ImplicitVRLittleEndian 29) 30 31 32# Regex for conformant File ID paths - PS3.10 Section 8.5 33_RE_FILE_ID = re.compile("^[A-Z0-9_]*$") 34# Prefixes to use when generating File ID components 35_PREFIXES = { 36 "PATIENT": "PT", 37 "STUDY": "ST", 38 "SERIES": "SE", 39 "IMAGE": "IM", 40 "RT DOSE": "RD", 41 "RT STRUCTURE SET": "RS", 42 "RT PLAN": "RP", 43 "RT TREAT RECORD": "RX", 44 "PRESENTATION": "PR", 45 "WAVEFORM": "WV", 46 "SR DOCUMENT": "SR", 47 "KEY OBJECT DOC": "KY", 48 "SPECTROSCOPY": "SP", 49 "RAW DATA": "RW", 50 "REGISTRATION": "RG", 51 "FIDUCIAL": "FD", 52 "HANGING PROTOCOL": "HG", 53 "ENCAP DOC": "ED", 54 "VALUE MAP": "VM", 55 "STEREOMETRIC": "SX", 56 "PALETTE": "PA", 57 "IMPLANT": "IP", 58 "IMPLANT ASSY": "IA", 59 "IMPLANT GROUP": "IG", 60 "PLAN": "PL", 61 "MEASUREMENT": "MX", 62 "SURFACE": "SF", 63 "SURFACE SCAN": "SS", 64 "TRACT": "TR", 65 "ASSESSMENT": "AS", 66 "RADIOTHERAPY": "RT", 67 "PRIVATE": "P", 68} 69_FIRST_OFFSET = "OffsetOfTheFirstDirectoryRecordOfTheRootDirectoryEntity" 70_NEXT_OFFSET = "OffsetOfTheNextDirectoryRecord" 71_LOWER_OFFSET = "OffsetOfReferencedLowerLevelDirectoryEntity" 72_LAST_OFFSET = "OffsetOfTheLastDirectoryRecordOfTheRootDirectoryEntity" 73 74 75def generate_filename( 76 prefix: str = "", start: int = 0, alphanumeric: bool = False 77) -> Iterator[str]: 78 """Yield File IDs for a File-set. 79 80 Maximum number of File IDs is: 81 82 * Numeric: (10 ** (8 - `prefix`)) - `start` 83 * Alphanumeric: (36 ** (8 - `prefix`)) - `start` 84 85 Parameters 86 ---------- 87 prefix : str, optional 88 The prefix to use for all filenames, default (``""``). 89 start : int, optional 90 The starting index to use for the suffixes, (default ``0``). 91 i.e. if you want to start at ``'00010'`` then `start` should be ``10``. 92 alphanumeric : bool, optional 93 If ``False`` (default) then only generate suffixes using the characters 94 [0-9], otherwise use [0-9][A-Z]. 95 96 Yields 97 ------ 98 str 99 A unique filename with 8 characters, with each incremented by 1 from 100 the previous one (i.e. ``'00000000'``, ``'00000001'``, ``'00000002'``, 101 and so on). 102 """ 103 if len(prefix) > 7: 104 raise ValueError("The 'prefix' must be less than 8 characters long") 105 106 chars = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ" 107 if not alphanumeric: 108 chars = chars[:10] 109 110 idx = start 111 b = len(chars) 112 length = 8 - len(prefix) 113 while idx < b ** length: 114 n = idx 115 suffix = "" 116 while n: 117 suffix += chars[n % b] 118 n //= b 119 120 yield f"{prefix}{suffix[::-1]:>0{length}}" 121 idx += 1 122 123 124def is_conformant_file_id(path: Path) -> bool: 125 """Return ``True`` if `path` is a conformant File ID. 126 127 **Conformance** 128 129 * :dcm:`No more than 8 components<part03/sect_F.3.2.2.html>` (parts) in 130 the path 131 * :dcm:`No more than 8 characters per component<part03/sect_F.3.2.2.html>` 132 * :dcm:`Characters in a component must be ASCII<part10/sect_8.2.html>` 133 * :dcm:`Valid characters in a component are 0-9, A-Z and _ 134 <part10/sect_8.5.html>` 135 136 Parameters 137 ---------- 138 path : pathlib.Path 139 The path to check, relative to the File-set root directory. 140 141 Returns 142 ------- 143 bool 144 ``True`` if `path` is conformant, ``False`` otherwise. 145 """ 146 # No more than 8 characters per component 147 parts = path.parts 148 if any([len(pp) > 8 for pp in parts]): 149 return False 150 151 # No more than 8 components 152 if len(parts) > 8: 153 return False 154 155 # Characters in the path are ASCII 156 chars = ''.join(parts) 157 try: 158 chars.encode(encoding="ascii", errors="strict") 159 except UnicodeEncodeError: 160 return False 161 162 # Characters are in [0-9][A-Z] and _ 163 if re.match(_RE_FILE_ID, chars): 164 return True 165 166 return False 167 168 169class RecordNode(Iterable["RecordNode"]): 170 """Representation of a DICOMDIR's directory record. 171 172 Attributes 173 ---------- 174 children : list of RecordNode 175 The current node's child nodes (if any) 176 instance : FileInstance or None 177 If the current node is a leaf node, a 178 :class:`~pydicom.fileset.FileInstance` for the corresponding SOP 179 Instance. 180 """ 181 def __init__(self, record: Optional[Dataset] = None) -> None: 182 """Create a new ``RecordNode``. 183 184 Parameters 185 ---------- 186 record : pydicom.dataset.Dataset, optional 187 A *Directory Record Sequence's* directory record. 188 """ 189 self.children: List["RecordNode"] = [] 190 self.instance: Optional[FileInstance] = None 191 self._parent: Optional["RecordNode"] = None 192 self._record: Dataset 193 194 if record: 195 self._set_record(record) 196 197 # When the record is encoded as part of the *Directory Record Sequence* 198 # this is the offset to the start of the sequence item containing 199 # the record - not guaranteed to be up-to-date 200 self._offset = 0 201 # The offset to the start of the encoded record's *Offset of the 202 # Next Directory Record* and *Offset of Referenced Lower Level 203 # Directory Entity* values - use _encode_record() to set them 204 self._offset_next = 0 205 self._offset_lower = 0 206 207 def add(self, leaf: "RecordNode") -> None: 208 """Add a leaf to the tree. 209 210 Parameters 211 ---------- 212 leaf : pydicom.fileset.RecordNode 213 A leaf node (i.e. one with a 214 :class:`~pydicom.fileset.FileInstance`) to be added to the tree 215 (if not already present). 216 """ 217 # Move up to the branch's furthest ancestor with a directory record 218 node = leaf.root 219 if node is self: 220 node = node.children[0] 221 222 # Move back down, inserting at the point where the node is unique 223 current = self.root 224 while node in current and node.children: 225 current = current[node] 226 node = node.children[0] 227 228 node.parent = current 229 230 @property 231 def ancestors(self) -> List["RecordNode"]: 232 """Return a list of the current node's ancestors, ordered from nearest 233 to furthest. 234 """ 235 return [nn for nn in self.reverse() if nn is not self] 236 237 @property 238 def component(self) -> str: 239 """Return a File ID component as :class:`str` for the current node.""" 240 if self.is_root: 241 raise ValueError( 242 "The root node doesn't contribute a File ID component" 243 ) 244 245 prefix = _PREFIXES[self.record_type] 246 if self.record_type == "PRIVATE": 247 prefix = f"{prefix}{self.depth}" 248 249 chars = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ" 250 if not self.file_set._use_alphanumeric: 251 chars = chars[:10] 252 253 suffix = "" 254 n = self.index 255 b = len(chars) 256 while n: 257 suffix += chars[n % b] 258 n //= b 259 260 idx = f"{suffix[::-1]:>0{8 - len(prefix)}}" 261 262 return f"{prefix}{idx}" 263 264 def __contains__(self, key: Union[str, "RecordNode"]) -> bool: 265 """Return ``True`` if the current node has a child matching `key`.""" 266 if isinstance(key, RecordNode): 267 key = key.key 268 269 return key in [child.key for child in self.children] 270 271 def __delitem__(self, key: Union[str, "RecordNode"]) -> None: 272 """Remove one of the current node's children and if the current node 273 becomes childless recurse upwards and delete it from its parent. 274 """ 275 if isinstance(key, RecordNode): 276 key = key.key 277 278 if key not in self: 279 raise KeyError(key) 280 281 self.children = [ii for ii in self.children if ii.key != key] 282 283 # Recurse upwards to the root, removing any empty nodes 284 if not self.children and not self.is_root: 285 del self.parent[self] 286 287 @property 288 def depth(self) -> int: 289 "Return the number of nodes to the level below the tree root" 290 return len(list(self.reverse())) - 1 291 292 def _encode_record(self, force_implicit: bool = False) -> int: 293 """Encode the node's directory record. 294 295 * Encodes the record as explicit VR little endian 296 * Sets the ``RecordNode._offset_next`` and ``RecordNode._offset_lower`` 297 attributes to the position of the start of the values of the *Offset 298 of the Next Directory Record* and *Offset of Referenced Lower Level 299 Directory Entity* elements. Note that the offsets are relative to 300 the start of the current directory record. 301 302 The values for the *Offset Of The Next Directory Record* and *Offset 303 of Referenced Lower Level Directory Entity* elements are not guaranteed 304 to be correct. 305 306 Parameters 307 ---------- 308 force_implicit : bool, optional 309 ``True`` to force using implicit VR encoding, which is 310 non-conformant. Default ``False``. 311 312 Returns 313 ------- 314 int 315 The length of the encoded directory record. 316 317 See Also 318 -------- 319 :meth:`~pydicom.fileset.RecordNode._update_record_offsets` 320 """ 321 fp = DicomBytesIO() 322 fp.is_little_endian = True 323 fp.is_implicit_VR = force_implicit 324 325 encoding = self._record.get('SpecificCharacterSet', default_encoding) 326 327 for tag in sorted(self._record.keys()): 328 if tag.element == 0 and tag.group > 6: 329 continue 330 331 # (0004,1400) Offset Of The Next Directory Record 332 # (0004,1420) Offset Of Referenced Lower Level Directory Entity 333 # Offset from start of tag to start of value for VR UL is always 8 334 # however the absolute position may change with transfer syntax 335 if tag == 0x00041400: 336 self._offset_next = fp.tell() + 8 337 elif tag == 0x00041420: 338 self._offset_lower = fp.tell() + 8 339 340 write_data_element(fp, self._record[tag], encoding) 341 342 return len(fp.getvalue()) 343 344 @property 345 def _file_id(self) -> Optional[Path]: 346 """Return the *Referenced File ID* as a :class:`~pathlib.Path`. 347 348 Returns 349 ------- 350 pathlib.Path or None 351 The *Referenced File ID* from the directory record as a 352 :class:`pathlib.Path` or ``None`` if the element value is null. 353 """ 354 if "ReferencedFileID" in self._record: 355 elem = self._record["ReferencedFileID"] 356 if elem.VM == 1: 357 return Path(cast(str, self._record.ReferencedFileID)) 358 if elem.VM > 1: 359 return Path(*cast(List[str], self._record.ReferencedFileID)) 360 361 return None 362 363 raise AttributeError("No 'Referenced File ID' in the directory record") 364 365 @property 366 def file_set(self) -> "FileSet": 367 """Return the tree's :class:`~pydicom.fileset.FileSet`.""" 368 return self.root.file_set 369 370 def __getitem__(self, key: Union[str, "RecordNode"]) -> "RecordNode": 371 """Return the current node's child using it's 372 :attr:`~pydicom.fileset.RecordNode.key` 373 """ 374 if isinstance(key, RecordNode): 375 key = key.key 376 377 for child in self.children: 378 if key == child.key: 379 return child 380 381 raise KeyError(key) 382 383 @property 384 def has_instance(self) -> bool: 385 """Return ``True`` if the current node corresponds to an instance.""" 386 return self.instance is not None 387 388 @property 389 def index(self) -> int: 390 """Return the index of the current node amongst its siblings.""" 391 if not self.parent: 392 return 0 393 394 return self.parent.children.index(self) 395 396 @property 397 def is_root(self) -> bool: 398 """Return ``True`` if the current node is the tree's root node.""" 399 return False 400 401 def __iter__(self) -> Iterator["RecordNode"]: 402 """Yield this node (unless it's the root node) and all nodes below it. 403 """ 404 if not self.is_root: 405 yield self 406 407 for child in self.children: 408 yield from child 409 410 @property 411 def key(self) -> str: 412 """Return a unique key for the node's record as :class:`str`.""" 413 rtype = self.record_type 414 if rtype == "PATIENT": 415 # PS3.3, Annex F.5.1: Each Patient ID is unique within a File-set 416 return cast(str, self._record.PatientID) 417 if rtype == "STUDY": 418 # PS3.3, Annex F.5.2: Type 1C 419 if "StudyInstanceUID" in self._record: 420 return cast(UID, self._record.StudyInstanceUID) 421 else: 422 return cast(UID, self._record.ReferencedSOPInstanceUIDInFile) 423 if rtype == "SERIES": 424 return cast(UID, self._record.SeriesInstanceUID) 425 if rtype == "PRIVATE": 426 return cast(UID, self._record.PrivateRecordUID) 427 428 # PS3.3, Table F.3-3: Required if record references an instance 429 try: 430 return cast(UID, self._record.ReferencedSOPInstanceUIDInFile) 431 except AttributeError as exc: 432 raise AttributeError( 433 f"Invalid '{rtype}' record - missing required element " 434 "'Referenced SOP Instance UID in File'" 435 ) from exc 436 437 @property 438 def next(self) -> Optional["RecordNode"]: 439 """Return the node after the current one (if any), or ``None``.""" 440 if not self.parent: 441 return None 442 443 try: 444 return self.parent.children[self.index + 1] 445 except IndexError: 446 return None 447 448 @property 449 def parent(self) -> "RecordNode": 450 """Return the current node's parent (if it has one).""" 451 return cast("RecordNode", self._parent) 452 453 @parent.setter 454 def parent(self, node: "RecordNode") -> None: 455 """Set the parent of the current node.""" 456 self._parent = node 457 if node is not None and self not in node.children: 458 node.children.append(self) 459 460 def prettify(self, indent_char: str = ' ') -> List[str]: 461 """Return the tree structure as a list of pretty strings, starting at 462 the current node (unless the current node is the root node). 463 464 Parameters 465 ---------- 466 indent_char : str, optional 467 The characters to use to indent each level of the tree. 468 """ 469 def leaf_summary(node: "RecordNode", indent_char: str) -> List[str]: 470 """Summarize the leaves at the current level.""" 471 # Examples: 472 # IMAGE: 15 SOP Instances (10 initial, 9 additions, 4 removals) 473 # RTDOSE: 1 SOP Instance 474 out = [] 475 if not node.children: 476 indent = indent_char * node.depth 477 sibs = [ii for ii in node.parent if ii.has_instance] 478 # Split into record types 479 rtypes = {ii.record_type for ii in sibs} 480 for record_type in sorted(rtypes): 481 # nr = initial + additions 482 nr = [ii for ii in sibs if ii.record_type == record_type] 483 # All leaves should have a corresponding FileInstance 484 add = len( 485 [ 486 ii for ii in nr 487 if cast(FileInstance, ii.instance).for_addition 488 ] 489 ) 490 rm = len( 491 [ 492 ii for ii in nr 493 if cast(FileInstance, ii.instance).for_removal 494 ] 495 ) 496 initial = len(nr) - add 497 result = len(nr) - rm 498 499 changes = [] 500 if (add or rm) and initial > 0: 501 changes.append(f"{initial} initial") 502 if add: 503 plural = 's' if add > 1 else '' 504 changes.append(f"{add} addition{plural}") 505 if rm: 506 plural = 's' if rm > 1 else '' 507 changes.append(f"{rm} removal{plural}") 508 509 summary = ( 510 f"{indent}{record_type}: {result} " 511 f"SOP Instance{'' if result == 1 else 's'}" 512 ) 513 if changes: 514 summary += f" ({', '.join(changes)})" 515 516 out.append(summary) 517 518 return out 519 520 s = [] 521 for node in self: 522 indent = indent_char * node.depth 523 if node.children: 524 s.append(f"{indent}{str(node)}") 525 # Summarise any leaves at the next level 526 for child in node.children: 527 if child.has_instance: 528 s.extend(leaf_summary(child, indent_char)) 529 break 530 elif node.depth == 0 and node.has_instance: 531 node.instance = cast(FileInstance, node.instance) 532 # Single-level records 533 line = f"{indent}{node.record_type}: 1 SOP Instance" 534 if node.instance.for_addition: 535 line += " (to be added)" 536 elif node.instance.for_removal: 537 line += " (to be removed)" 538 539 s.append(line) 540 541 return s 542 543 @property 544 def previous(self) -> Optional["RecordNode"]: 545 """Return the node before the current one (if any), or ``None``.""" 546 if not self.parent: 547 return None 548 549 if self.index == 0: 550 return None 551 552 return self.parent.children[self.index - 1] 553 554 def _set_record(self, ds: Dataset) -> None: 555 """Set the node's initial directory record dataset. 556 557 The record is used as a starting point when filling the DICOMDIR's 558 *Directory Record Sequence* and is modified as required during 559 encoding. 560 561 Parameters 562 ---------- 563 ds : pydicom.dataset.Dataset 564 Set the node's initial directory record dataset, must be conformant 565 to :dcm:`Part 3, Annex F of the DICOM Standard 566 <part03/chapter_F.html>`. 567 """ 568 offset = getattr(ds, "seq_item_tell", None) 569 rtype = ds.get("DirectoryRecordType", None) 570 rtype = f"{rtype} " if rtype else "" 571 msg = f"The {rtype}directory record is missing" 572 if offset: 573 msg = f"The {rtype}directory record at offset {offset} is missing" 574 575 keywords = ["DirectoryRecordType"] 576 missing = [kw for kw in keywords if kw not in ds] 577 if missing: 578 msg = ( 579 f"{msg} one or more required elements: {', '.join(missing)}" 580 ) 581 raise ValueError(msg) 582 583 if _NEXT_OFFSET not in ds: 584 setattr(ds, _NEXT_OFFSET, 0) 585 if _LOWER_OFFSET not in ds: 586 setattr(ds, _LOWER_OFFSET, 0) 587 ds.RecordInUseFlag = 0xFFFF 588 self._record = ds 589 590 try: 591 self.key 592 except (AttributeError, ValueError) as exc: 593 raise ValueError(f"{msg} a required element") from exc 594 595 @property 596 def record_type(self) -> str: 597 """Return the record's *Directory Record Type* as :class:`str`.""" 598 return cast(str, self._record.DirectoryRecordType) 599 600 def remove(self, node: "RecordNode") -> None: 601 """Remove a leaf from the tree 602 603 Parameters 604 ---------- 605 node : pydicom.fileset.RecordNode 606 The leaf node (i.e. one with a 607 :class:`~pydicom.fileset.FileInstance`) to remove. 608 """ 609 if not node.has_instance: 610 raise ValueError("Only leaf nodes can be removed") 611 612 del node.parent[node] 613 614 def reverse(self) -> Iterable["RecordNode"]: 615 """Yield nodes up to the level below the tree's root node.""" 616 node = self 617 while node.parent: 618 yield node 619 node = node.parent 620 621 if not node.is_root: 622 yield node 623 624 @property 625 def root(self) -> "RecordNode": 626 """Return the tree's root node.""" 627 if self.parent: 628 return self.parent.root 629 630 return self 631 632 def __str__(self) -> str: 633 """Return a string representation of the node.""" 634 if self.is_root: 635 return "ROOT" 636 637 ds = self._record 638 record_type = f"{self.record_type}" 639 640 s = [] 641 if self.record_type == "PATIENT": 642 s += [ 643 f"PatientID='{ds.PatientID}'", 644 f"PatientName='{ds.PatientName}'" 645 ] 646 elif self.record_type == "STUDY": 647 s += [f"StudyDate={ds.StudyDate}", f"StudyTime={ds.StudyTime}"] 648 if getattr(ds, "StudyDescription", None): 649 s.append(f"StudyDescription='{ds.StudyDescription}'") 650 elif self.record_type == "SERIES": 651 s += [f"Modality={ds.Modality}", f"SeriesNumber={ds.SeriesNumber}"] 652 elif self.record_type == "IMAGE": 653 s.append(f"InstanceNumber={ds.InstanceNumber}") 654 else: 655 s.append(f"{self.key}") 656 657 return f"{record_type}: {', '.join(s)}" 658 659 def _update_record_offsets(self) -> None: 660 """Update the record's offset elements. 661 662 Updates the values for *Offset of the Next Directory Record* and 663 *Offset of Referenced Lower Level Directory Entity*, provided all of 664 the nodes have had their *_offset* attribute set correctly. 665 """ 666 next_elem = self._record[_NEXT_OFFSET] 667 next_elem.value = 0 668 if self.next: 669 next_elem.value = self.next._offset 670 671 lower_elem = self._record[_LOWER_OFFSET] 672 lower_elem.value = 0 673 if self.children: 674 self._record[_LOWER_OFFSET].value = self.children[0]._offset 675 676 677class RootNode(RecordNode): 678 """The root node for the File-set's record tree.""" 679 def __init__(self, fs: "FileSet") -> None: 680 """Create a new root node. 681 682 Parameters 683 ---------- 684 fs : pydicom.fileset.FileSet 685 The File-set the record tree belongs to. 686 """ 687 super().__init__() 688 689 self._fs = fs 690 691 @property 692 def file_set(self) -> "FileSet": 693 """Return the tree's :class:`~pydicom.fileset.FileSet`.""" 694 return self._fs 695 696 @property 697 def is_root(self) -> bool: 698 """Return ``True`` if the current node is the tree's root node.""" 699 return True 700 701 702class FileInstance: 703 """Representation of a File in the File-set. 704 705 Attributes 706 ---------- 707 node : pydicom.fileset.RecordNode 708 The leaf record that references this instance. 709 """ 710 def __init__(self, node: RecordNode) -> None: 711 """Create a new FileInstance. 712 713 Parameters 714 ---------- 715 node : pydicom.fileset.RecordNode 716 The record that references this instance. 717 """ 718 class Flags: 719 add: bool 720 remove: bool 721 722 self._flags = Flags() 723 self._apply_stage('x') 724 self._stage_path: Optional[Path] = None 725 self.node = node 726 727 def _apply_stage(self, flag: str) -> None: 728 """Apply staging to the instance. 729 730 Parameters 731 ---------- 732 flag : str 733 The staging to apply, one of ``'+'``, ``'-'`` or ``'x'``. 734 This will flag the instance for addition to or removal from the 735 File-set, or to reset the staging, respectively. 736 """ 737 # Clear flags 738 if flag == 'x': 739 self._flags.add = False 740 self._flags.remove = False 741 self._stage_path = None 742 elif flag == '+': 743 # remove + add = no change 744 if self._flags.remove: 745 self._flags.remove = False 746 self._stage_path = None 747 else: 748 self._flags.add = True 749 self._stage_path = ( 750 self.file_set._stage['path'] / self.SOPInstanceUID 751 ) 752 753 elif flag == '-': 754 # add + remove = no change 755 if self._flags.add: 756 self._flags.add = False 757 self._stage_path = None 758 else: 759 self._flags.remove = True 760 self._stage_path = None 761 762 def __contains__(self, name: Union[str, int]) -> bool: 763 """Return ``True`` if the element with keyword or tag `name` is 764 in one of the corresponding directory records. 765 766 Parameters 767 ---------- 768 name : str or int 769 The element keyword or tag to search for. 770 771 Returns 772 ------- 773 bool 774 ``True`` if the corresponding element is present, ``False`` 775 otherwise. 776 """ 777 try: 778 self[name] 779 except KeyError: 780 return False 781 782 return True 783 784 @property 785 def FileID(self) -> str: 786 """Return the File ID of the referenced instance.""" 787 root = self.node.root 788 components = [ 789 ii.component for ii in self.node.reverse() if ii is not root 790 ] 791 return os.fspath(Path(*components[::-1])) 792 793 @property 794 def file_set(self) -> "FileSet": 795 """Return the :class:`~pydicom.fileset.FileSet` this instance belongs 796 to. 797 """ 798 return self.node.file_set 799 800 @property 801 def for_addition(self) -> bool: 802 """Return ``True`` if the instance has been staged for addition to 803 the File-set. 804 """ 805 return self._flags.add 806 807 @property 808 def for_moving(self) -> bool: 809 """Return ``True`` if the instance will be moved to a new location 810 within the File-set. 811 """ 812 if self.for_addition: 813 return False 814 815 if self["ReferencedFileID"].VM == 1: 816 file_id = self.FileID.split(os.path.sep) 817 return [self.ReferencedFileID] != file_id 818 819 return cast( 820 bool, self.ReferencedFileID != self.FileID.split(os.path.sep) 821 ) 822 823 @property 824 def for_removal(self) -> bool: 825 """Return ``True`` if the instance has been staged for removal from 826 the File-set. 827 """ 828 return self._flags.remove 829 830 def __getattribute__(self, name: str) -> Any: 831 """Return the class attribute value for `name`. 832 833 Parameters 834 ---------- 835 name : str 836 An element keyword or a class attribute name. 837 838 Returns 839 ------- 840 object 841 If `name` matches a DICOM keyword and the element is 842 present in one of the directory records then returns the 843 corresponding element's value. Otherwise returns the class 844 attribute's value (if present). Directory records are searched 845 from the lowest (i.e. an IMAGE or similar record type) to the 846 highest (PATIENT or similar). 847 """ 848 tag = tag_for_keyword(name) 849 if tag is not None: 850 tag = Tag(tag) 851 for node in self.node.reverse(): 852 if tag in node._record: 853 return node._record[tag].value 854 855 return super().__getattribute__(name) 856 857 def __getitem__(self, key: Union[str, int]) -> DataElement: 858 """Return the DataElement with keyword or tag `key`. 859 860 Parameters 861 ---------- 862 key : str or int 863 An element keyword or tag. 864 865 Returns 866 ------- 867 pydicom.dataelem.DataElement 868 The DataElement corresponding to `key`, if present in one of the 869 directory records. Directory records are searched 870 from the lowest (i.e. an IMAGE or similar record type) to the 871 highest (PATIENT or similar). 872 """ 873 874 if isinstance(key, BaseTag): 875 tag = key 876 else: 877 tag = Tag(key) 878 879 if tag == 0x00080018: 880 # SOP Instance UID 881 tag = Tag(0x00041511) 882 elif tag == 0x00080016: 883 # SOP Class UID 884 tag = Tag(0x00041510) 885 elif tag == 0x00020010: 886 # Transfer Syntax UID 887 tag = Tag(0x00041512) 888 889 for node in self.node.reverse(): 890 if tag in node._record: 891 return node._record[tag] 892 893 raise KeyError(tag) 894 895 @property 896 def is_private(self) -> bool: 897 """Return ``True`` if the instance is privately defined.""" 898 return self.node.record_type == "PRIVATE" 899 900 @property 901 def is_staged(self) -> bool: 902 """Return ``True`` if the instance is staged for moving, addition or 903 removal 904 """ 905 return self.for_addition or self.for_moving or self.for_removal 906 907 def load(self) -> Dataset: 908 """Return the referenced instance as a 909 :class:`~pydicom.dataset.Dataset`. 910 """ 911 if self.for_addition: 912 return dcmread(cast(Path, self._stage_path)) 913 914 return dcmread(self.path) 915 916 @property 917 def path(self) -> str: 918 """Return the path to the corresponding instance as :class:`str`. 919 920 Returns 921 ------- 922 str 923 The absolute path to the corresponding instance. If the instance is 924 staged for addition to the File-set this will be a path to the 925 staged file in the temporary staging directory. 926 """ 927 if self.for_addition: 928 return os.fspath(cast(Path, self._stage_path)) 929 930 # If not staged for addition then File Set must exist on file system 931 return os.fspath( 932 cast(Path, self.file_set.path) / cast(Path, self.node._file_id) 933 ) 934 935 @property 936 def SOPClassUID(self) -> UID: 937 """Return the *SOP Class UID* of the referenced instance.""" 938 return cast(UID, self.ReferencedSOPClassUIDInFile) 939 940 @property 941 def SOPInstanceUID(self) -> UID: 942 """Return the *SOP Instance UID* of the referenced instance.""" 943 return cast(UID, self.ReferencedSOPInstanceUIDInFile) 944 945 @property 946 def TransferSyntaxUID(self) -> UID: 947 """Return the *Transfer Syntax UID* of the referenced instance.""" 948 return cast(UID, self.ReferencedTransferSyntaxUIDInFile) 949 950 951DSPathType = Union[Dataset, str, os.PathLike] 952 953 954class FileSet: 955 """Representation of a DICOM File-set.""" 956 def __init__(self, ds: Optional[DSPathType] = None) -> None: 957 """Create or load a File-set. 958 959 Parameters 960 ---------- 961 ds : pydicom.dataset.Dataset, str or PathLike, optional 962 If loading a File-set, the DICOMDIR dataset or the path 963 to the DICOMDIR file. 964 """ 965 # The nominal path to the root of the File-set 966 self._path: Optional[Path] = None 967 # The root node of the record tree used to fill out the DICOMDIR's 968 # *Directory Record Sequence*. 969 # The tree for instances currently in the File-set 970 self._tree = RootNode(self) 971 972 # For tracking changes to the File-set 973 self._stage: Dict[str, Any] = { 974 't': TemporaryDirectory(), 975 '+': {}, # instances staged for addition 976 '-': {}, # instances staged for removal 977 '~': False, # instances staged for moving 978 '^': False, # a File-set Identification module element has changed 979 } 980 self._stage["path"] = Path(self._stage['t'].name) 981 982 # The DICOMDIR instance, not guaranteed to be up-to-date 983 self._ds = Dataset() 984 # The File-set's managed SOP Instances as list of FileInstance 985 self._instances: List[FileInstance] = [] 986 # Use alphanumeric or numeric File IDs 987 self._use_alphanumeric = False 988 989 # The File-set ID 990 self._id: Optional[str] = None 991 # The File-set UID 992 self._uid: Optional[UID] = None 993 # The File-set Descriptor File ID 994 self._descriptor: Optional[str] = None 995 # The Specific Character Set of File-set Descriptor File 996 self._charset: Optional[str] = None 997 998 # Check the DICOMDIR dataset and create the record tree 999 if ds: 1000 self.load(ds) 1001 else: 1002 # New File-set 1003 self.UID = generate_uid() 1004 1005 def add(self, ds_or_path: DSPathType) -> FileInstance: 1006 """Stage an instance for addition to the File-set. 1007 1008 If the instance has been staged for removal then calling 1009 :meth:`~pydicom.fileset.FileSet.add` will cancel the staging 1010 and the instance will not be removed. 1011 1012 Parameters 1013 ---------- 1014 ds_or_path : pydicom.dataset.Dataset, str or PathLike 1015 The instance to add to the File-set, either as a 1016 :class:`~pydicom.dataset.Dataset` or the path to the instance. 1017 1018 Returns 1019 ------- 1020 FileInstance 1021 The :class:`~pydicom.fileset.FileInstance` that was added. 1022 1023 See Also 1024 -------- 1025 :meth:`~pydicom.fileset.FileSet.add_custom` 1026 """ 1027 ds: Union[Dataset, FileDataset] 1028 if isinstance(ds_or_path, (str, os.PathLike)): 1029 ds = dcmread(ds_or_path) 1030 else: 1031 ds = ds_or_path 1032 1033 key = ds.SOPInstanceUID 1034 have_instance = [ii for ii in self if ii.SOPInstanceUID == key] 1035 1036 # If staged for removal, keep instead - check this now because 1037 # `have_instance` is False when instance staged for removal 1038 if key in self._stage['-']: 1039 instance = self._stage['-'][key] 1040 del self._stage['-'][key] 1041 self._instances.append(instance) 1042 instance._apply_stage('+') 1043 1044 return cast(FileInstance, instance) 1045 1046 # The instance is already in the File-set (and not staged for removal) 1047 # May or may not be staged for addition/movement 1048 if have_instance: 1049 return have_instance[0] 1050 1051 # If not already in the File-set, stage for addition 1052 # Create the directory records and tree nodes for the dataset 1053 # For instances that won't contain PRIVATE records we shouldn't have 1054 # to worry about exceeding the maximum component depth of 8 1055 record_gen = self._recordify(ds) 1056 record = next(record_gen) 1057 parent = RecordNode(record) 1058 node = parent # Maybe only be a single record 1059 for record in record_gen: 1060 node = RecordNode(record) 1061 node.parent = parent 1062 parent = node 1063 1064 instance = FileInstance(node) 1065 node.instance = instance 1066 self._tree.add(node) 1067 1068 # Save the dataset to the stage 1069 self._stage['+'][instance.SOPInstanceUID] = instance 1070 self._instances.append(instance) 1071 instance._apply_stage('+') 1072 ds.save_as(instance.path, write_like_original=False) 1073 1074 return cast(FileInstance, instance) 1075 1076 def add_custom( 1077 self, ds_or_path: DSPathType, leaf: RecordNode 1078 ) -> FileInstance: 1079 """Stage an instance for addition to the File-set using custom records. 1080 1081 This method allows you to add a SOP instance and customize the 1082 directory records that will be used when writing the DICOMDIR file. It 1083 must be used when you require PRIVATE records and may be used instead 1084 of modifying :attr:`~pydicom.fileset.DIRECTORY_RECORDERS` with your 1085 own record definition functions when the default functions aren't 1086 suitable. 1087 1088 The following elements will be added automatically to the supplied 1089 directory records if required and not present: 1090 1091 * (0004,1400) *Offset of the Next Directory Record* 1092 * (0004,1410) *Record In-use Flag* 1093 * (0004,1420) *Offset of Referenced Lower-Level Directory Entity* 1094 * (0004,1500) *Referenced File ID* 1095 * (0004,1510) *Referenced SOP Class UID in File* 1096 * (0004,1511) *Referenced SOP Instance UID in File* 1097 * (0004,1512) *Referenced Transfer Syntax UID in File* 1098 1099 If the instance has been staged for removal then calling 1100 :meth:`~pydicom.fileset.FileSet.add_custom` will cancel the staging 1101 and the instance will not be removed. 1102 1103 Examples 1104 -------- 1105 1106 Add a SOP Instance using a two record hierarchy of PATIENT -> PRIVATE 1107 1108 .. code-block:: python 1109 1110 from pydicom import dcmread, Dataset 1111 from pydicom.data import get_testdata_file 1112 from pydicom.fileset import FileSet, RecordNode 1113 from pydicom.uid import generate_uid 1114 1115 # The instance to be added 1116 ds = dcmread(get_testdata_file("CT_small.dcm")) 1117 1118 # Define the leaf node (the PRIVATE record) 1119 record = Dataset() 1120 record.DirectoryRecordType = "PRIVATE" 1121 record.PrivateRecordUID = generate_uid() 1122 leaf_node = RecordNode(record) 1123 1124 # Define the top node (the PATIENT record) 1125 record = Dataset() 1126 record.DirectoryRecordType = "PATIENT" 1127 record.PatientID = ds.PatientID 1128 record.PatientName = ds.PatientName 1129 top_node = RecordNode(record) 1130 1131 # Set the node relationship 1132 leaf_node.parent = top_node 1133 1134 # Add the instance to the File-set 1135 fs = FileSet() 1136 instance = fs.add_custom(ds, leaf_node) 1137 1138 Parameters 1139 ---------- 1140 ds_or_path : pydicom.dataset.Dataset, str or PathLike 1141 The instance to add to the File-set, either as a 1142 :class:`~pydicom.dataset.Dataset` or the path to the instance. 1143 leaf : pydicom.fileset.RecordNode 1144 The leaf node for the instance, should have its ancestors nodes set 1145 correctly as well as their corresponding directory records. Should 1146 have no more than 7 ancestors due to the semantics used by 1147 :class:`~pydicom.fileset.FileSet` when creating the directory 1148 structure. 1149 1150 Returns 1151 ------- 1152 FileInstance 1153 The :class:`~pydicom.fileset.FileInstance` that was added. 1154 1155 See Also 1156 -------- 1157 :meth:`~pydicom.fileset.FileSet.add` 1158 """ 1159 ds: Union[Dataset, FileDataset] 1160 if isinstance(ds_or_path, (str, os.PathLike)): 1161 ds = dcmread(ds_or_path) 1162 else: 1163 ds = ds_or_path 1164 1165 # Check the supplied nodes 1166 if leaf.depth > 7: 1167 raise ValueError( 1168 "The 'leaf' node must not have more than 7 ancestors as " 1169 "'FileSet' supports a maximum directory structure depth of 8" 1170 ) 1171 1172 key = ds.SOPInstanceUID 1173 have_instance = [ii for ii in self if ii.SOPInstanceUID == key] 1174 1175 # If staged for removal, keep instead - check this now because 1176 # `have_instance` is False when instance staged for removal 1177 if key in self._stage['-']: 1178 instance = self._stage['-'][key] 1179 del self._stage['-'][key] 1180 self._instances.append(instance) 1181 instance._apply_stage('+') 1182 1183 return cast(FileInstance, instance) 1184 1185 if have_instance: 1186 return have_instance[0] 1187 1188 # Ensure the leaf node's record contains the required elements 1189 leaf._record.ReferencedFileID = None 1190 leaf._record.ReferencedSOPClassUIDInFile = ds.SOPClassUID 1191 leaf._record.ReferencedSOPInstanceUIDInFile = key 1192 leaf._record.ReferencedTransferSyntaxUIDInFile = ( 1193 ds.file_meta.TransferSyntaxUID 1194 ) 1195 1196 instance = FileInstance(leaf) 1197 leaf.instance = instance 1198 self._tree.add(leaf) 1199 1200 # Save the dataset to the stage 1201 self._stage['+'][instance.SOPInstanceUID] = instance 1202 self._instances.append(instance) 1203 instance._apply_stage('+') 1204 ds.save_as(instance.path, write_like_original=False) 1205 1206 return cast(FileInstance, instance) 1207 1208 def clear(self) -> None: 1209 """Clear the File-set.""" 1210 self._tree.children = [] 1211 self._instances = [] 1212 self._path = None 1213 self._ds = Dataset() 1214 self._id = None 1215 self._uid = generate_uid() 1216 self._descriptor = None 1217 self._charset = None 1218 1219 # Clean and reset the stage 1220 self._stage['+'] = {} 1221 self._stage['-'] = {} 1222 self._stage['~'] = False 1223 self._stage['^'] = False 1224 self._stage['t'].cleanup() 1225 self._stage['t'] = TemporaryDirectory() 1226 self._stage['path'] = Path(self._stage['t'].name) 1227 1228 def copy( 1229 self, path: Union[str, os.PathLike], force_implicit: bool = False 1230 ) -> "FileSet": 1231 """Copy the File-set to a new root directory and return the copied 1232 File-set. 1233 1234 Changes staged to the original :class:`~pydicom.fileset.FileSet` will 1235 be applied to the new File-set. The original 1236 :class:`~pydicom.fileset.FileSet` will remain staged. 1237 1238 Parameters 1239 ---------- 1240 path : str or PathLike 1241 The root directory where the File-set is to be copied to. 1242 force_implicit : bool, optional 1243 If ``True`` force the DICOMDIR file to be encoded using *Implicit 1244 VR Little Endian* which is non-conformant to the DICOM Standard 1245 (default ``False``). 1246 1247 Returns 1248 ------- 1249 pydicom.fileset.FileSet 1250 The copied File-set as a :class:`~pydicom.fileset.FileSet`. 1251 """ 1252 # !! We can't change anything public in the original FileSet !! 1253 1254 path = Path(path) 1255 if self.path and Path(self.path) == path: 1256 raise ValueError( 1257 "Cannot copy the File-set as the 'path' is unchanged" 1258 ) 1259 1260 if len(self) > 10**6: 1261 self._use_alphanumeric = True 1262 if len(self) > 36**6: 1263 raise NotImplementedError( 1264 "pydicom doesn't support writing File-sets with more than " 1265 "2176782336 managed instances" 1266 ) 1267 1268 # Removals are detached from the tree 1269 detached_nodes = [] 1270 for instance in self._stage['-'].values(): 1271 detached_nodes.append(instance.node) 1272 self._tree.remove(instance.node) 1273 continue 1274 1275 file_ids = [] 1276 for instance in self: 1277 file_ids.append(instance.ReferencedFileID) 1278 dst = path / Path(instance.FileID) 1279 dst.parent.mkdir(parents=True, exist_ok=True) 1280 shutil.copyfile(instance.path, dst) 1281 instance.node._record.ReferencedFileID = ( 1282 instance.FileID.split(os.path.sep) 1283 ) 1284 1285 # Create the DICOMDIR file 1286 p = path / 'DICOMDIR' 1287 with open(p, 'wb') as fp: 1288 f = DicomFileLike(fp) 1289 self._write_dicomdir( 1290 f, copy_safe=True, force_implicit=force_implicit 1291 ) 1292 1293 # Reset the *Referenced File ID* values 1294 # The order here doesn't matter because removed instances aren't 1295 # yielded by iter(self) 1296 for instance, file_id in zip(self, file_ids): 1297 instance.node._record.ReferencedFileID = file_id 1298 1299 # Reattach the removed nodes 1300 for node in detached_nodes: 1301 self._tree.add(node) 1302 1303 fs = FileSet() 1304 fs.load(p, raise_orphans=True) 1305 1306 return fs 1307 1308 def _create_dicomdir(self) -> Dataset: 1309 """Return a new minimal DICOMDIR dataset.""" 1310 ds = Dataset() 1311 ds.filename = None 1312 1313 ds.file_meta = FileMetaDataset() 1314 ds.file_meta.TransferSyntaxUID = ExplicitVRLittleEndian 1315 ds.file_meta.MediaStorageSOPInstanceUID = self.UID 1316 ds.file_meta.MediaStorageSOPClassUID = MediaStorageDirectoryStorage 1317 1318 ds.FileSetID = self.ID 1319 ds.OffsetOfTheFirstDirectoryRecordOfTheRootDirectoryEntity = 0 1320 ds.OffsetOfTheLastDirectoryRecordOfTheRootDirectoryEntity = 0 1321 ds.FileSetConsistencyFlag = 0 1322 ds.DirectoryRecordSequence = [] 1323 1324 if self.descriptor_file_id: 1325 ds.FileSetDescriptorFileID = self.descriptor_file_id 1326 if self.descriptor_character_set: 1327 ds.SpecificCharacterSetOfFileSetDescriptorFile = ( 1328 self.descriptor_character_set 1329 ) 1330 1331 return ds 1332 1333 @property 1334 def descriptor_character_set(self) -> Union[str, None]: 1335 """Return the *Specific Character Set of File-set Descriptor File* 1336 (if available) or ``None``. 1337 """ 1338 return self._charset 1339 1340 @descriptor_character_set.setter 1341 def descriptor_character_set(self, val: Union[str, None]) -> None: 1342 """Set the *Specific Character Set of File-set Descriptor File*. 1343 1344 The descriptor file itself is used for user comments related to the 1345 File-set (e.g. a README file) and is up the user to create. 1346 1347 Parameters 1348 ---------- 1349 val : str or None 1350 The value to use for the DICOMDIR's (0004,1142) *Specific 1351 Character Set of File-set Descriptor File*. See :dcm:`C.12.1.1.2 1352 in Part 3 of the DICOM Standard 1353 <part03/sect_C.12.html#sect_C.12.1.1.2>` for defined terms. 1354 1355 See Also 1356 -------- 1357 :attr:`~pydicom.fileset.FileSet.descriptor_file_id` set the descriptor 1358 file ID for the file that uses the character set. 1359 """ 1360 if val == self._charset: 1361 return 1362 1363 self._charset = val 1364 if self._ds: 1365 self._ds.SpecificCharacterSetOfFileSetDescriptorFile = val 1366 self._stage['^'] = True 1367 1368 @property 1369 def descriptor_file_id(self) -> Union[str, None]: 1370 """Return the *File-set Descriptor File ID* (if available) or ``None``. 1371 """ 1372 return self._descriptor 1373 1374 @descriptor_file_id.setter 1375 def descriptor_file_id(self, val: Union[str, None]) -> None: 1376 """Set the *File-set Descriptor File ID*. 1377 1378 The descriptor file itself is used for user comments related to the 1379 File-set (e.g. a README file) and is up the user to create. 1380 1381 Parameters 1382 ---------- 1383 val : str, list of str or None 1384 The value to use for the DICOMDIR's (0004,1141) *File-set 1385 Descriptor File ID*. Should be the relative path to the descriptor 1386 file and has a maximum length of 8 components, with each component 1387 up to 16 characters long. 1388 1389 Raises 1390 ------ 1391 ValueError 1392 If `val` has more than 8 items or if each item is longer than 16 1393 characters. 1394 1395 See Also 1396 -------- 1397 :attr:`~pydicom.fileset.FileSet.descriptor_character_set` the 1398 character set used in the descriptor file, required if an expanded or 1399 replaced character set is used. 1400 """ 1401 if val == self._descriptor: 1402 return 1403 1404 if val is None: 1405 pass 1406 elif isinstance(val, list): 1407 try: 1408 assert len(val) <= 8 1409 for component in val: 1410 assert isinstance(component, str) 1411 assert 0 <= len(component) <= 16 1412 except AssertionError: 1413 raise ValueError( 1414 "The 'File-set Descriptor File ID' has a maximum of 8 " 1415 "components, each between 0 and 16 characters long" 1416 ) 1417 1418 # Push the value through Path to clean it up and check validity 1419 val = list(Path(*val).parts) 1420 elif isinstance(val, str): 1421 if not 0 <= len(val) <= 16: 1422 raise ValueError( 1423 "Each 'File-set Descriptor File ID' component has a " 1424 "maximum length of 16 characters" 1425 ) 1426 else: 1427 raise TypeError( 1428 "The 'DescriptorFileID' must be a str, list of str, or None" 1429 ) 1430 1431 self._descriptor = val 1432 if self._ds: 1433 self._ds.FileSetDescriptorFileID = self._descriptor 1434 self._stage['^'] = True 1435 1436 def find(self, load: bool = False, **kwargs: Any) -> List[FileInstance]: 1437 """Return matching instances in the File-set 1438 1439 **Limitations** 1440 1441 * Only single value matching is supported so neither 1442 ``PatientID=['1234567', '7654321']`` or ``PatientID='1234567', 1443 PatientID='7654321'`` will work (although the first example will 1444 work if the *Patient ID* is actually multi-valued). 1445 * Repeating group and private elements cannot be used when searching. 1446 1447 Parameters 1448 ---------- 1449 load : bool, optional 1450 If ``True``, then load the SOP Instances belonging to the 1451 File-set and perform the search against their available elements. 1452 Otherwise (default) search only the elements available in the 1453 corresponding directory records (more efficient, but only a limited 1454 number of elements are available). 1455 **kwargs 1456 Search parameters, as element keyword=value (i.e. 1457 ``PatientID='1234567', StudyDescription="My study"``. 1458 1459 Returns 1460 ------- 1461 list of pydicom.fileset.FileInstance 1462 A list of matching instances. 1463 """ 1464 if not kwargs: 1465 return self._instances[:] 1466 1467 # Flag whether or not the query elements are in the DICOMDIR records 1468 has_elements = False 1469 1470 def match(ds: Union[Dataset, FileInstance], **kwargs: Any) -> bool: 1471 nonlocal has_elements 1472 if load: 1473 ds = ds.load() 1474 1475 # Check that all query elements are present 1476 if all([kw in ds for kw in kwargs]): 1477 has_elements = True 1478 1479 for kw, val in kwargs.items(): 1480 try: 1481 assert ds[kw].value == val 1482 except (AssertionError, KeyError): 1483 return False 1484 1485 return True 1486 1487 matches = [] 1488 for instance in self: 1489 if match(instance, **kwargs): 1490 matches.append(instance) 1491 1492 if not load and not has_elements: 1493 warnings.warn( 1494 "None of the records in the DICOMDIR dataset contain all " 1495 "the query elements, consider using the 'load' parameter " 1496 "to expand the search to the corresponding SOP instances" 1497 ) 1498 1499 return matches 1500 1501 def find_values( 1502 self, 1503 elements: Union[str, int, List[Union[str, int]]], 1504 instances: Optional[List[FileInstance]] = None, 1505 load: bool = False 1506 ) -> Union[List[Any], Dict[Union[str, int], List[Any]]]: 1507 """Return a list of unique values for given element(s). 1508 1509 Parameters 1510 ---------- 1511 elements : str, int or pydicom.tag.BaseTag, or list of these 1512 The keyword or tag of the element(s) to search for. 1513 instances : list of pydicom.fileset.FileInstance, optional 1514 Search within the given instances. If not used then all available 1515 instances will be searched. 1516 load : bool, optional 1517 If ``True``, then load the SOP Instances belonging to the 1518 File-set and perform the search against their available elements. 1519 Otherwise (default) search only the elements available in the 1520 corresponding directory records (more efficient, but only a limited 1521 number of elements are available). 1522 1523 Returns 1524 ------- 1525 list of object(s), or dict of lists of object(s) 1526 1527 * If single element was queried: A list of value(s) for the element 1528 available in the instances. 1529 * If list of elements was queried: A dict of element value pairs 1530 with lists of value(s) for the elements available in the instances. 1531 """ 1532 element_list = elements if isinstance(elements, list) else [elements] 1533 has_element = {element: False for element in element_list} 1534 results: Dict[Union[str, int], List[Any]] = { 1535 element: [] for element in element_list 1536 } 1537 iter_instances = instances or iter(self) 1538 instance: Union[Dataset, FileInstance] 1539 for instance in iter_instances: 1540 if load: 1541 instance = instance.load() 1542 1543 for element in element_list: 1544 if element not in instance: 1545 continue 1546 1547 has_element[element] = True 1548 val = instance[element].value 1549 # Not very efficient, but we can't use set 1550 if val not in results[element]: 1551 results[element].append(val) 1552 1553 missing_elements = [ 1554 element for element, v in has_element.items() if not v 1555 ] 1556 if not load and missing_elements: 1557 warnings.warn( 1558 "None of the records in the DICOMDIR dataset contain " 1559 f"{missing_elements}, consider using the 'load' parameter " 1560 "to expand the search to the corresponding SOP instances" 1561 ) 1562 1563 if not isinstance(elements, list): 1564 return results[element_list[0]] 1565 1566 return results 1567 1568 @property 1569 def ID(self) -> Union[str, None]: 1570 """Return the *File-set ID* (if available) or ``None``.""" 1571 return self._id 1572 1573 @ID.setter 1574 def ID(self, val: Union[str, None]) -> None: 1575 """Set the File-set ID. 1576 1577 Parameters 1578 ---------- 1579 val : str or None 1580 The value to use for the DICOMDIR's (0004,1130) *File-set ID*. 1581 1582 Raises 1583 ------ 1584 ValueError 1585 If `val` is greater than 16 characters long. 1586 """ 1587 if val == self._id: 1588 return 1589 1590 if val is None or 0 <= len(val) <= 16: 1591 self._id = val 1592 if self._ds: 1593 self._ds.FileSetID = val 1594 self._stage['^'] = True 1595 else: 1596 raise ValueError( 1597 "The maximum length of the 'File-set ID' is 16 characters" 1598 ) 1599 1600 @property 1601 def is_staged(self) -> bool: 1602 """Return ``True`` if the File-set is new or has changes staged.""" 1603 return any(self._stage[c] for c in '+-^~') 1604 1605 def __iter__(self) -> Iterator[FileInstance]: 1606 """Yield :class:`~pydicom.fileset.FileInstance` from the File-set.""" 1607 yield from self._instances[:] 1608 1609 def __len__(self) -> int: 1610 """Return the number of instances in the File-set.""" 1611 return len(self._instances) 1612 1613 def load( 1614 self, 1615 ds_or_path: DSPathType, 1616 include_orphans: bool = True, 1617 raise_orphans: bool = False, 1618 ) -> None: 1619 """Load an existing File-set. 1620 1621 Existing File-sets that do not use the same directory structure as 1622 *pydicom* will be staged to be moved to a new structure. This is 1623 because the DICOM Standard attaches no semantics to *how* the files 1624 in a File-set are to be structured so it's impossible to determine what 1625 the layout will be when changes are to be made. 1626 1627 Parameters 1628 ---------- 1629 ds_or_path : pydicom.dataset.Dataset, str or PathLike 1630 An existing File-set's DICOMDIR, either as a 1631 :class:`~pydicom.dataset.Dataset` or the path to the DICOMDIR file 1632 as :class:`str` or pathlike. 1633 include_orphans : bool, optional 1634 If ``True`` (default) include instances referenced by orphaned 1635 directory records in the File-set. 1636 raise_orphans : bool, optional 1637 If ``True`` then raise an exception if orphaned directory records 1638 are found in the File-set (default ``False``). 1639 """ 1640 if isinstance(ds_or_path, Dataset): 1641 ds = ds_or_path 1642 else: 1643 ds = dcmread(ds_or_path) 1644 1645 sop_class = ds.file_meta.get("MediaStorageSOPClassUID", None) 1646 if sop_class != MediaStorageDirectoryStorage: 1647 raise ValueError( 1648 "Unable to load the File-set as the supplied dataset is " 1649 "not a 'Media Storage Directory' instance" 1650 ) 1651 1652 tsyntax = ds.file_meta.TransferSyntaxUID 1653 if tsyntax != ExplicitVRLittleEndian: 1654 warnings.warn( 1655 "The DICOMDIR dataset uses an invalid transfer syntax " 1656 f"'{tsyntax.name}' and will be updated to use 'Explicit VR " 1657 "Little Endian'" 1658 ) 1659 1660 try: 1661 path = Path(cast(str, ds.filename)).resolve(strict=True) 1662 except FileNotFoundError: 1663 raise FileNotFoundError( 1664 "Unable to load the File-set as the 'filename' attribute " 1665 "for the DICOMDIR dataset is not a valid path: " 1666 f"{ds.filename}" 1667 ) 1668 except TypeError: 1669 # Custom message if DICOMDIR from bytes, etc 1670 raise TypeError( 1671 "Unable to load the File-set as the DICOMDIR dataset must " 1672 "have a 'filename' attribute set to the path of the " 1673 "DICOMDIR file" 1674 ) 1675 1676 self.clear() 1677 self._id = cast(Optional[str], ds.get("FileSetID", None)) 1678 uid = cast( 1679 Optional[UID], ds.file_meta.get("MediaStorageSOPInstanceUID") 1680 ) 1681 if not uid: 1682 uid = generate_uid() 1683 ds.file_meta.MediaStorageSOPInstanceUID = uid 1684 self._uid = uid 1685 self._descriptor = cast( 1686 Optional[str], ds.get("FileSetDescriptorFileID", None) 1687 ) 1688 self._charset = cast( 1689 Optional[str], 1690 ds.get("SpecificCharacterSetOfFileSetDescriptorFile", None) 1691 ) 1692 self._path = path.parent 1693 self._ds = ds 1694 1695 # Create the record tree 1696 self._parse_records(ds, include_orphans, raise_orphans) 1697 1698 bad_instances = [] 1699 for instance in self: 1700 # Check that the referenced file exists 1701 file_id = instance.node._file_id 1702 if file_id is None: 1703 bad_instances.append(instance) 1704 continue 1705 1706 try: 1707 # self.path is already set at this point 1708 (cast(Path, self.path) / file_id).resolve(strict=True) 1709 except FileNotFoundError: 1710 bad_instances.append(instance) 1711 warnings.warn( 1712 "The referenced SOP Instance for the directory record at " 1713 f"offset {instance.node._offset} does not exist: " 1714 f"{cast(Path, self.path) / file_id}" 1715 ) 1716 continue 1717 # If the instance's existing directory structure doesn't match 1718 # the pydicom semantics then stage for movement 1719 if instance.for_moving: 1720 self._stage['~'] = True 1721 1722 for instance in bad_instances: 1723 self._instances.remove(instance) 1724 1725 def _parse_records( 1726 self, 1727 ds: Dataset, 1728 include_orphans: bool, 1729 raise_orphans: bool = False 1730 ) -> None: 1731 """Parse the records in an existing DICOMDIR. 1732 1733 Parameters 1734 ---------- 1735 ds : pydicom.dataset.Dataset 1736 The File-set's DICOMDIR dataset. 1737 include_orphans : bool 1738 If ``True`` then include within the File-set orphaned records that 1739 contain a valid (and unique) *Referenced File ID* element. Orphaned 1740 records are those that aren't placed within the *Directory Record 1741 Sequence* hierarchy. 1742 raise_orphans : bool, optional 1743 If ``True`` then raise an exception if orphaned directory records 1744 are found in the File-set (default ``False``). 1745 """ 1746 # First pass: get the offsets for each record 1747 records = {} 1748 for record in cast(Iterable[Dataset], ds.DirectoryRecordSequence): 1749 offset = cast(int, record.seq_item_tell) 1750 node = RecordNode(record) 1751 node._offset = offset 1752 records[offset] = node 1753 1754 # Define the top-level nodes 1755 if records: 1756 node = records[ds[_FIRST_OFFSET].value] 1757 node.parent = self._tree 1758 while getattr(node._record, _NEXT_OFFSET, None): 1759 node = records[node._record[_NEXT_OFFSET].value] 1760 node.parent = self._tree 1761 1762 # Second pass: build the record hierarchy 1763 # Records not in the hierarchy will be ignored 1764 # Branches without a valid leaf node File ID will be removed 1765 def recurse_node(node: RecordNode) -> None: 1766 child_offset = getattr(node._record, _LOWER_OFFSET, None) 1767 if child_offset: 1768 child = records[child_offset] 1769 child.parent = node 1770 1771 next_offset = getattr(child._record, _NEXT_OFFSET, None) 1772 while next_offset: 1773 child = records[next_offset] 1774 child.parent = node 1775 next_offset = getattr(child._record, _NEXT_OFFSET, None) 1776 elif "ReferencedFileID" not in node._record: 1777 # No children = leaf node, leaf nodes must reference a File ID 1778 del node.parent[node] 1779 1780 # The leaf node references the FileInstance 1781 if "ReferencedFileID" in node._record: 1782 node.instance = FileInstance(node) 1783 self._instances.append(node.instance) 1784 1785 for child in node.children: 1786 recurse_node(child) 1787 1788 for node in self._tree.children: 1789 recurse_node(node) 1790 1791 if len(records) == len(list(iter(self._tree))): 1792 return 1793 1794 if raise_orphans: 1795 raise ValueError( 1796 "The DICOMDIR contains orphaned directory records" 1797 ) 1798 1799 # DICOMDIR contains orphaned records 1800 # Determine which nodes are both orphaned and reference an instance 1801 missing_set = set(records.keys()) - {ii._offset for ii in self._tree} 1802 missing = [records[o] for o in missing_set] 1803 missing = [r for r in missing if "ReferencedFileID" in r._record] 1804 1805 if missing and not include_orphans: 1806 warnings.warn( 1807 f"The DICOMDIR has {len(missing)} orphaned directory records " 1808 "that reference an instance that will not be included in the " 1809 "File-set" 1810 ) 1811 return 1812 1813 for node in missing: 1814 # Get the path to the orphaned instance 1815 original_value = node._record.ReferencedFileID 1816 file_id = node._file_id 1817 if file_id is None: 1818 continue 1819 1820 # self.path is set for an existing File Set 1821 path = cast(Path, self.path) / file_id 1822 if node.record_type == "PRIVATE": 1823 instance = self.add_custom(path, node) 1824 else: 1825 instance = self.add(path) 1826 1827 # Because the record is new the Referenced File ID isn't set 1828 instance.node._record.ReferencedFileID = original_value 1829 1830 @property 1831 def path(self) -> Optional[str]: 1832 """Return the absolute path to the File-set root directory as 1833 :class:`str` (if set) or ``None`` otherwise. 1834 """ 1835 if self._path is not None: 1836 return os.fspath(self._path) 1837 1838 return self._path 1839 1840 def _recordify(self, ds: Dataset) -> Iterator[Dataset]: 1841 """Yield directory records for a SOP Instance. 1842 1843 Parameters 1844 ---------- 1845 ds : pydicom.dataset.Dataset 1846 The SOP Instance to create DICOMDIR directory records for. 1847 1848 Yields 1849 ------ 1850 ds : pydicom.dataset.Dataset 1851 A directory record for the instance, ordered from highest to 1852 lowest level. 1853 1854 Raises 1855 ------ 1856 ValueError 1857 If unable to create the required directory records because of 1858 a missing required element or element value. 1859 """ 1860 # Single-level records: leaf 1861 record_type = _single_level_record_type(ds) 1862 if record_type != "PATIENT": 1863 try: 1864 record = DIRECTORY_RECORDERS[record_type](ds) 1865 except ValueError as exc: 1866 raise ValueError( 1867 f"Unable to use the default '{record_type}' " 1868 "record creator as the instance is missing a " 1869 "required element or value. Either update the instance, " 1870 "define your own record creation function or use " 1871 "'FileSet.add_custom()' instead" 1872 ) from exc 1873 1874 record.OffsetOfTheNextDirectoryRecord = 0 1875 record.RecordInUseFlag = 0xFFFF 1876 record.OffsetOfReferencedLowerLevelDirectoryEntity = 0 1877 record.DirectoryRecordType = record_type 1878 record.ReferencedFileID = None 1879 record.ReferencedSOPClassUIDInFile = ds.SOPClassUID 1880 record.ReferencedSOPInstanceUIDInFile = ds.SOPInstanceUID 1881 record.ReferencedTransferSyntaxUIDInFile = ( 1882 ds.file_meta.TransferSyntaxUID 1883 ) 1884 1885 yield record 1886 return 1887 1888 # Four-level records: PATIENT -> STUDY -> SERIES -> leaf 1889 records = [] 1890 leaf_type = _four_level_record_type(ds) 1891 for record_type in ["PATIENT", "STUDY", "SERIES", leaf_type]: 1892 try: 1893 record = DIRECTORY_RECORDERS[record_type](ds) 1894 except ValueError as exc: 1895 raise ValueError( 1896 f"Unable to use the default '{record_type}' " 1897 "record creator as the instance is missing a " 1898 "required element or value. Either update the instance, " 1899 "define your own record creation function or use " 1900 "'FileSet.add_custom()' instead" 1901 ) from exc 1902 1903 record.OffsetOfTheNextDirectoryRecord = 0 1904 record.RecordInUseFlag = 0xFFFF 1905 record.OffsetOfReferencedLowerLevelDirectoryEntity = 0 1906 record.DirectoryRecordType = record_type 1907 if "SpecificCharacterSet" in ds: 1908 record.SpecificCharacterSet = ds.SpecificCharacterSet 1909 1910 records.append(record) 1911 1912 # Add the instance referencing elements to the leaf 1913 leaf = records[3] 1914 leaf.ReferencedFileID = None 1915 leaf.ReferencedSOPClassUIDInFile = ds.SOPClassUID 1916 leaf.ReferencedSOPInstanceUIDInFile = ds.SOPInstanceUID 1917 leaf.ReferencedTransferSyntaxUIDInFile = ( 1918 ds.file_meta.TransferSyntaxUID 1919 ) 1920 1921 yield from records 1922 1923 def remove( 1924 self, instance: Union[FileInstance, List[FileInstance]] 1925 ) -> None: 1926 """Stage instance(s) for removal from the File-set. 1927 1928 If the instance has been staged for addition to the File-set, calling 1929 :meth:`~pydicom.fileset.FileSet.remove` will cancel the staging and 1930 the instance will not be added. 1931 1932 Parameters 1933 ---------- 1934 instance : pydicom.fileset.FileInstance or a list of FileInstance 1935 The instance(s) to remove from the File-set. 1936 """ 1937 if isinstance(instance, list): 1938 for item in instance: 1939 self.remove(item) 1940 return 1941 1942 if instance not in self._instances: 1943 raise ValueError("No such instance in the File-set") 1944 1945 # If staged for addition, no longer add 1946 if instance.SOPInstanceUID in self._stage['+']: 1947 leaf = instance.node 1948 del leaf.parent[leaf] 1949 del self._stage['+'][instance.SOPInstanceUID] 1950 # Delete file from stage 1951 try: 1952 Path(instance.path).unlink() 1953 except FileNotFoundError: 1954 pass 1955 instance._apply_stage('-') 1956 self._instances.remove(instance) 1957 1958 # Stage for removal if not already done 1959 elif instance.SOPInstanceUID not in self._stage['-']: 1960 instance._apply_stage('-') 1961 self._stage['-'][instance.SOPInstanceUID] = instance 1962 self._instances.remove(instance) 1963 1964 def __str__(self) -> str: 1965 """Return a string representation of the FileSet.""" 1966 s = [ 1967 "DICOM File-set", 1968 f" Root directory: {self.path or '(no value available)'}", 1969 f" File-set ID: {self.ID or '(no value available)'}", 1970 f" File-set UID: {self.UID}", 1971 ( 1972 f" Descriptor file ID: " 1973 f"{self.descriptor_file_id or '(no value available)'}" 1974 ), 1975 ( 1976 f" Descriptor file character set: " 1977 f"{self.descriptor_character_set or '(no value available)'}" 1978 ), 1979 ] 1980 if self.is_staged: 1981 changes = [] 1982 if not self._ds: 1983 changes.append("DICOMDIR creation") 1984 else: 1985 changes.append("DICOMDIR update") 1986 1987 if self._stage['~']: 1988 changes.append("directory structure update") 1989 1990 if self._stage['+']: 1991 suffix = 's' if len(self._stage['+']) > 1 else '' 1992 changes.append(f"{len(self._stage['+'])} addition{suffix}") 1993 if self._stage['-']: 1994 suffix = 's' if len(self._stage['-']) > 1 else '' 1995 changes.append(f"{len(self._stage['-'])} removal{suffix}") 1996 1997 s.append(f" Changes staged for write(): {', '.join(changes)}") 1998 1999 if not self._tree.children: 2000 return '\n'.join(s) 2001 2002 s.append("\n Managed instances:") 2003 s.extend([f" {ii}" for ii in self._tree.prettify()]) 2004 2005 return '\n'.join(s) 2006 2007 @property 2008 def UID(self) -> UID: 2009 """Return the File-set's UID.""" 2010 return cast(UID, self._uid) 2011 2012 @UID.setter 2013 def UID(self, uid: UID) -> None: 2014 """Set the File-set UID. 2015 2016 Parameters 2017 ---------- 2018 uid : pydicom.uid.UID 2019 The UID to use as the new File-set UID. 2020 """ 2021 if uid == self._uid: 2022 return 2023 2024 uid = UID(uid) 2025 assert uid.is_valid 2026 self._uid = uid 2027 if self._ds: 2028 self._ds.file_meta.MediaStorageSOPInstanceUID = uid 2029 2030 self._stage['^'] = True 2031 2032 def write( 2033 self, 2034 path: Optional[Union[str, os.PathLike]] = None, 2035 use_existing: bool = False, 2036 force_implicit: bool = False 2037 ) -> None: 2038 """Write the File-set, or changes to the File-set, to the file system. 2039 2040 .. warning:: 2041 2042 If modifying an existing File-set it's **strongly recommended** 2043 that you follow standard data management practices and ensure that 2044 you have an up-to-date backup of the original data. 2045 2046 By default, for both new or existing File-sets, *pydicom* uses the 2047 following directory structure semantics when writing out changes: 2048 2049 * For instances defined using the standard four-levels of directory 2050 records (i.e. PATIENT/STUDY/SERIES + one of the record types 2051 such as IMAGE or RT DOSE): ``PTxxxxxx/STxxxxxx/SExxxxxx/`` with a 2052 filename such as ``IMxxxxxx`` (for IMAGE), where the first two 2053 characters are dependent on the record type and ``xxxxxx`` is a 2054 numeric or alphanumeric index. 2055 * For instances defined using the standard one-level directory record 2056 (i.e. PALETTE, IMPLANT): a filename such as ``PAxxxxxx`` (for 2057 PALETTE). 2058 * For instances defined using PRIVATE directory records then the 2059 structure will be along the lines of ``P0xxxxxx/P1xxxxxx/P2xxxxxx`` 2060 for PRIVATE/PRIVATE/PRIVATE, ``PTxxxxxx/STxxxxxx/P2xxxxxx`` for 2061 PATIENT/STUDY/PRIVATE. 2062 2063 When only changes to the DICOMDIR file are required or instances have 2064 only been removed from an existing File-set you can use the 2065 `use_existing` keyword parameter to keep the existing directory 2066 structure and only update the DICOMDIR file. 2067 2068 Parameters 2069 ---------- 2070 path : str or PathLike, optional 2071 For new File-sets, the absolute path to the root directory where 2072 the File-set will be written. Using `path` with an existing 2073 File-set will raise :class:`ValueError`. 2074 use_existing : bool, optional 2075 If ``True`` and no instances have been added to the File-set 2076 (removals are OK), then only update the DICOMDIR file, keeping 2077 the current directory structure rather than converting everything 2078 to the semantics used by *pydicom* for File-sets (default 2079 ``False``). 2080 force_implicit : bool, optional 2081 If ``True`` force the DICOMDIR file to be encoded using *Implicit 2082 VR Little Endian* which is non-conformant to the DICOM Standard 2083 (default ``False``). 2084 2085 Raises 2086 ------ 2087 ValueError 2088 If `use_existing` is ``True`` but instances have been staged 2089 for addition to the File-set. 2090 """ 2091 if not path and self.path is None: 2092 raise ValueError( 2093 "The path to the root directory is required for a " 2094 "new File-set" 2095 ) 2096 2097 if path and self.path: 2098 raise ValueError( 2099 "The path for an existing File-set cannot be changed, use " 2100 "'FileSet.copy()' to write the File-set to a new location" 2101 ) 2102 2103 if path: 2104 self._path = Path(path) 2105 2106 # Don't write unless changed or new 2107 if not self.is_staged: 2108 return 2109 2110 # Path to the DICOMDIR file 2111 p = cast(Path, self._path) / 'DICOMDIR' 2112 2113 # Re-use the existing directory structure if only moves or removals 2114 # are required and `use_existing` is True 2115 major_change = bool(self._stage['+']) 2116 if use_existing and major_change: 2117 raise ValueError( 2118 "'Fileset.write()' called with 'use_existing' but additions " 2119 "to the File-set's managed instances are staged" 2120 ) 2121 2122 if not use_existing: 2123 major_change |= self._stage['~'] 2124 2125 # Worst case scenario if all instances in one directory 2126 if len(self) > 10**6: 2127 self._use_alphanumeric = True 2128 if len(self) > 36**6: 2129 raise NotImplementedError( 2130 "pydicom doesn't support writing File-sets with more than " 2131 "2176782336 managed instances" 2132 ) 2133 2134 # Remove the removals - must be first because the File IDs will be 2135 # incorrect with the removals still in the tree 2136 for instance in self._stage['-'].values(): 2137 try: 2138 Path(instance.path).unlink() 2139 except FileNotFoundError: 2140 pass 2141 self._tree.remove(instance.node) 2142 2143 if use_existing and not major_change: 2144 with open(p, 'wb') as fp: 2145 f = DicomFileLike(fp) 2146 self._write_dicomdir(f, force_implicit=force_implicit) 2147 2148 self.load(p, raise_orphans=True) 2149 2150 return 2151 2152 # We need to be careful not to overwrite the source file 2153 # for a different (later) instance 2154 # Check for collisions between the new and old File IDs 2155 # and copy any to the stage 2156 fout = {Path(ii.FileID) for ii in self} 2157 fin = { 2158 ii.node._file_id for ii in self 2159 if ii.SOPInstanceUID not in self._stage['+'] 2160 } 2161 collisions = fout & fin 2162 for instance in [ii for ii in self if ii.node._file_id in collisions]: 2163 self._stage['+'][instance.SOPInstanceUID] = instance 2164 instance._apply_stage('+') 2165 shutil.copyfile( 2166 self._path / instance.node._file_id, instance.path 2167 ) 2168 2169 for instance in self: 2170 dst = self._path / instance.FileID 2171 dst.parent.mkdir(parents=True, exist_ok=True) 2172 fn: Callable 2173 if instance.SOPInstanceUID in self._stage['+']: 2174 src = instance.path 2175 fn = shutil.copyfile 2176 else: 2177 src = self._path / instance.node._file_id 2178 fn = shutil.move 2179 2180 fn(os.fspath(src), os.fspath(dst)) 2181 instance.node._record.ReferencedFileID = ( 2182 instance.FileID.split(os.path.sep) 2183 ) 2184 2185 # Create the DICOMDIR file 2186 with open(p, 'wb') as fp: 2187 f = DicomFileLike(fp) 2188 self._write_dicomdir(f, force_implicit=force_implicit) 2189 2190 # Reload the File-set 2191 # We're doing things wrong if we have orphans so raise 2192 self.load(p, raise_orphans=True) 2193 2194 def _write_dicomdir( 2195 self, 2196 fp: DicomFileLike, 2197 copy_safe: bool = False, 2198 force_implicit: bool = False 2199 ) -> None: 2200 """Encode and write the File-set's DICOMDIR dataset. 2201 2202 Parameters 2203 ---------- 2204 fp : file-like 2205 The file-like to write the encoded DICOMDIR dataset to. Must 2206 have ``write()``, ``tell()`` and ``seek()`` methods. 2207 copy_safe : bool, optional 2208 If ``True`` then the function doesn't make any changes to the 2209 public parts of the current :class:`~pydicom.fileset.FileSet` 2210 instance. 2211 force_implicit : bool, optional 2212 Force encoding the DICOMDIR with 'Implicit VR Little Endian' which 2213 is non-conformant to the DICOM Standard (default ``False``). 2214 """ 2215 ds = self._ds 2216 if copy_safe or not ds: 2217 ds = self._create_dicomdir() 2218 2219 # By default, always convert to the correct syntax 2220 ds.file_meta.TransferSyntaxUID = ExplicitVRLittleEndian 2221 seq_offset = 12 2222 if force_implicit: 2223 ds.file_meta.TransferSyntaxUID = ImplicitVRLittleEndian 2224 seq_offset = 8 2225 2226 fp.is_implicit_VR = ds.file_meta.TransferSyntaxUID.is_implicit_VR 2227 fp.is_little_endian = ds.file_meta.TransferSyntaxUID.is_little_endian 2228 2229 # Reset the offsets 2230 first_elem = ds[_FIRST_OFFSET] 2231 first_elem.value = 0 2232 last_elem = ds[_LAST_OFFSET] 2233 last_elem.value = 0 2234 2235 # Write the preamble, DICM marker and File Meta 2236 fp.write(b'\x00' * 128 + b'DICM') 2237 write_file_meta_info(fp, ds.file_meta, enforce_standard=True) 2238 2239 # Write the dataset 2240 # Write up to the *Offset of the First Directory Record...* element 2241 write_dataset(fp, ds[:0x00041200]) 2242 tell_offset_first = fp.tell() # Start of *Offset of the First...* 2243 # Write up to (but not including) the *Directory Record Sequence* 2244 write_dataset(fp, ds[0x00041200:0x00041220]) 2245 2246 # Rebuild and encode the *Directory Record Sequence* 2247 # Step 1: Determine the offsets for all the records 2248 offset = fp.tell() + seq_offset # Start of the first seq. item tag 2249 for node in self._tree: 2250 # RecordNode._offset is the start of each record's seq. item tag 2251 node._offset = offset 2252 offset += 8 # a sequence item's (tag + length) 2253 # Copy safe - only modifies RecordNode._offset 2254 offset += node._encode_record(force_implicit) 2255 2256 # Step 2: Update the records and add to *Directory Record Sequence* 2257 ds.DirectoryRecordSequence = [] 2258 for node in self._tree: 2259 record = node._record 2260 if not copy_safe: 2261 node._update_record_offsets() 2262 else: 2263 record = copy.deepcopy(record) 2264 next_elem = record[_NEXT_OFFSET] 2265 next_elem.value = 0 2266 if node.next: 2267 next_elem.value = node.next._offset 2268 2269 lower_elem = record[_LOWER_OFFSET] 2270 lower_elem.value = 0 2271 if node.children: 2272 record[_LOWER_OFFSET].value = node.children[0]._offset 2273 2274 cast(List[Dataset], ds.DirectoryRecordSequence).append(record) 2275 2276 # Step 3: Encode *Directory Record Sequence* and the rest 2277 write_dataset(fp, ds[0x00041220:]) 2278 2279 # Update the first and last record offsets 2280 if self._tree.children: 2281 first_elem.value = self._tree.children[0]._offset 2282 last_elem.value = self._tree.children[-1]._offset 2283 # Re-write the record offset pointer elements 2284 fp.seek(tell_offset_first) 2285 write_data_element(fp, first_elem) 2286 write_data_element(fp, last_elem) 2287 # Go to the end 2288 fp.seek(0, 2) 2289 2290 2291# Functions for creating Directory Records 2292def _check_dataset(ds: Dataset, keywords: List[str]) -> None: 2293 """Check the dataset module for the Type 1 `keywords`. 2294 2295 Parameters 2296 ---------- 2297 ds : pydicom.dataset.Dataset 2298 The dataset to check. 2299 keywords : list of str 2300 The DICOM keywords for Type 1 elements that are to be checked. 2301 2302 Raises 2303 ------ 2304 KeyError 2305 If an element is not in the dataset. 2306 ValueError 2307 If the element is present but has no value. 2308 """ 2309 for kw in keywords: 2310 tag = Tag(cast(int, tag_for_keyword(kw))) 2311 name = dictionary_description(tag) 2312 if kw not in ds: 2313 raise ValueError( 2314 f"The instance's {tag} '{name}' element is missing" 2315 ) 2316 2317 if ds[kw].VM != 0: 2318 continue 2319 2320 raise ValueError( 2321 f"The instance's {tag} '{name}' element cannot be empty" 2322 ) 2323 2324 2325def _define_patient(ds: Dataset) -> Dataset: 2326 """Return a PATIENT directory record from `ds`.""" 2327 _check_dataset(ds, ["PatientID"]) 2328 2329 record = Dataset() 2330 record.PatientName = ds.get("PatientName") 2331 record.PatientID = ds.PatientID 2332 2333 return record 2334 2335 2336def _define_study(ds: Dataset) -> Dataset: 2337 """Return a STUDY directory record from `ds`.""" 2338 _check_dataset(ds, ["StudyDate", "StudyTime", "StudyID"]) 2339 2340 record = Dataset() 2341 record.StudyDate = ds.StudyDate 2342 record.StudyTime = ds.StudyTime 2343 record.StudyDescription = ds.get("StudyDescription") 2344 if "StudyInstanceUID" in ds: 2345 _check_dataset(ds, ["StudyInstanceUID"]) 2346 record.StudyInstanceUID = ds.StudyInstanceUID 2347 record.StudyID = ds.StudyID 2348 record.AccessionNumber = ds.get("AccessionNumber") 2349 2350 return record 2351 2352 2353def _define_series(ds: Dataset) -> Dataset: 2354 """Return a SERIES directory record from `ds`.""" 2355 _check_dataset(ds, ["Modality", "SeriesInstanceUID", "SeriesNumber"]) 2356 2357 record = Dataset() 2358 record.Modality = ds.Modality 2359 record.SeriesInstanceUID = ds.SeriesInstanceUID 2360 record.SeriesNumber = ds.SeriesNumber 2361 2362 return record 2363 2364 2365def _define_image(ds: Dataset) -> Dataset: 2366 """Return an IMAGE directory record from `ds`.""" 2367 _check_dataset(ds, ["InstanceNumber"]) 2368 2369 record = Dataset() 2370 record.InstanceNumber = ds.InstanceNumber 2371 2372 return record 2373 2374 2375def _define_rt_dose(ds: Dataset) -> Dataset: 2376 """Return an RT DOSE directory record from `ds`.""" 2377 _check_dataset(ds, ["InstanceNumber", "DoseSummationType"]) 2378 2379 record = Dataset() 2380 record.InstanceNumber = ds.InstanceNumber 2381 record.DoseSummationType = ds.DoseSummationType 2382 2383 return record 2384 2385 2386def _define_rt_structure_set(ds: Dataset) -> Dataset: 2387 """Return an RT STRUCTURE SET directory record from `ds`.""" 2388 _check_dataset(ds, ["InstanceNumber", "StructureSetLabel"]) 2389 2390 record = Dataset() 2391 record.InstanceNumber = ds.InstanceNumber 2392 record.StructureSetLabel = ds.StructureSetLabel 2393 record.StructureSetDate = ds.get("StructureSetDate") 2394 record.StructureSetTime = ds.get("StructureSetTime") 2395 2396 return record 2397 2398 2399def _define_rt_plan(ds: Dataset) -> Dataset: 2400 """Return an RT PLAN directory record from `ds`.""" 2401 _check_dataset(ds, ["InstanceNumber", "RTPlanLabel"]) 2402 2403 record = Dataset() 2404 record.InstanceNumber = ds.InstanceNumber 2405 record.RTPlanLabel = ds.RTPlanLabel 2406 record.RTPlanDate = ds.get("RTPlanDate") 2407 record.RTPlanTime = ds.get("RTPlanTime") 2408 2409 return record 2410 2411 2412def _define_rt_treatment_record(ds: Dataset) -> Dataset: 2413 """Return an RT TREAT RECORD directory record from `ds`.""" 2414 _check_dataset(ds, ["InstanceNumber"]) 2415 2416 record = Dataset() 2417 record.InstanceNumber = ds.InstanceNumber 2418 record.TreatmentDate = ds.get("TreatmentDate") 2419 record.TreatmentTime = ds.get("TreatmentTime") 2420 2421 return record 2422 2423 2424def _define_presentation(ds: Dataset) -> Dataset: 2425 """Return a PRESENTATION directory record from `ds`.""" 2426 _check_dataset( 2427 ds, 2428 [ 2429 "PresentationCreationDate", "PresentationCreationTime", 2430 "InstanceNumber", "ContentLabel" 2431 ] 2432 ) 2433 2434 record = Dataset() 2435 record.PresentationCreationDate = ds.PresentationCreationDate 2436 record.PresentationCreationTime = ds.PresentationCreationTime 2437 # Content Identification Macro 2438 record.InstanceNumber = ds.InstanceNumber 2439 record.ContentLabel = ds.ContentLabel 2440 record.ContentDescription = ds.get("ContentDescription") 2441 record.ContentCreatorName = ds.get("ContentCreatorName") 2442 if "ReferencedSeriesSequence" in ds: 2443 _check_dataset(ds, ["ReferencedSeriesSequence"]) 2444 record.ReferencedSeriesSequence = ds.ReferencedSeriesSequence 2445 if "BlendingSequence" in ds: 2446 _check_dataset(ds, ["BlendingSequence"]) 2447 record.BlendingSequence = ds.BlendingSequence 2448 2449 return record 2450 2451 2452def _define_sr_document(ds: Dataset) -> Dataset: 2453 """Return a SR DOCUMENT directory record from `ds`.""" 2454 _check_dataset( 2455 ds, 2456 [ 2457 "InstanceNumber", "CompletionFlag", "VerificationFlag", 2458 "ContentDate", "ContentTime", "ConceptNameCodeSequence", 2459 ] 2460 ) 2461 2462 record = Dataset() 2463 record.InstanceNumber = ds.InstanceNumber 2464 record.CompletionFlag = ds.CompletionFlag 2465 record.VerificationFlag = ds.VerificationFlag 2466 record.ContentDate = ds.ContentDate 2467 record.ContentTime = ds.ContentTime 2468 if "VerificationDateTime" in ds: 2469 _check_dataset(ds, ["VerificationDateTime"]) 2470 record.VerificationDateTime = ds.VerificationDateTime 2471 record.ConceptNameCodeSequence = ds.ConceptNameCodeSequence 2472 if "ContentSequence" in ds: 2473 _check_dataset(ds, ["ContentSequence"]) 2474 record.ContentSequence = ds.ContentSequence 2475 2476 return record 2477 2478 2479def _define_key_object_doc(ds: Dataset) -> Dataset: 2480 """Return a KEY OBJECT DOC directory record from `ds`.""" 2481 _check_dataset( 2482 ds, 2483 [ 2484 "InstanceNumber", "ContentDate", "ContentTime", 2485 "ConceptNameCodeSequence", 2486 ] 2487 ) 2488 2489 record = Dataset() 2490 record.ContentDate = ds.ContentDate 2491 record.ContentTime = ds.ContentTime 2492 record.InstanceNumber = ds.InstanceNumber 2493 record.ConceptNameCodeSequence = ds.ConceptNameCodeSequence 2494 if "ContentSequence" in ds: 2495 _check_dataset(ds, ["ContentSequence"]) 2496 record.ContentSequence = ds.ContentSequence 2497 2498 return record 2499 2500 2501def _define_spectroscopy(ds: Dataset) -> Dataset: 2502 """Return an SPECTROSCOPY directory record from `ds`.""" 2503 _check_dataset( 2504 ds, 2505 [ 2506 "ImageType", "ContentDate", "ContentTime", "InstanceNumber", 2507 "NumberOfFrames", "Rows", "Columns", "DataPointRows", 2508 "DataPointColumns" 2509 ] 2510 ) 2511 2512 record = Dataset() 2513 record.ImageType = ds.ImageType 2514 record.ContentDate = ds.ContentDate 2515 record.ContentTime = ds.ContentTime 2516 record.InstanceNumber = ds.InstanceNumber 2517 if "ReferencedImageEvidenceSequence" in ds: 2518 _check_dataset(ds, ["ReferencedImageEvidenceSequence"]) 2519 2520 record.ReferencedImageEvidenceSequence = ( 2521 ds.ReferencedImageEvidenceSequence 2522 ) 2523 2524 record.NumberOfFrames = ds.NumberOfFrames 2525 record.Rows = ds.Rows 2526 record.Columns = ds.Columns 2527 record.DataPointRows = ds.DataPointRows 2528 record.DataPointColumns = ds.DataPointColumns 2529 2530 return record 2531 2532 2533def _define_hanging_protocol(ds: Dataset) -> Dataset: 2534 """Return a HANGING PROTOCOL directory record from `ds`.""" 2535 _check_dataset( 2536 ds, 2537 [ 2538 "HangingProtocolCreator", "HangingProtocolCreationDateTime", 2539 "HangingProtocolDefinitionSequence", "NumberOfPriorsReferenced", 2540 ] 2541 ) 2542 2543 record = Dataset() 2544 record.HangingProtocolCreator = ds.HangingProtocolCreator 2545 record.HangingProtocolCreationDateTime = ds.HangingProtocolCreationDateTime 2546 record.HangingProtocolDefinitionSequence = ( 2547 ds.HangingProtocolDefinitionSequence 2548 ) 2549 record.NumberOfPriorsReferenced = ds.NumberOfPriorsReferenced 2550 record.HangingProtocolUserIdentificationCodeSequence = ( 2551 ds.get("HangingProtocolUserIdentificationCodeSequence", []) 2552 ) 2553 2554 return record 2555 2556 2557def _define_encap_doc(ds: Dataset) -> Dataset: 2558 """Return an ENCAP DOC directory record from `ds`.""" 2559 _check_dataset(ds, ["InstanceNumber", "MIMETypeOfEncapsulatedDocument"]) 2560 2561 record = Dataset() 2562 record.ContentDate = ds.get("ContentDate") 2563 record.ContentTime = ds.get("ContentTime") 2564 record.InstanceNumber = ds.InstanceNumber 2565 record.DocumentTitle = ds.get("DocumentTitle") 2566 if "HL7InstanceIdentifier" in ds: 2567 _check_dataset(ds, ["HL7InstanceIdentifier"]) 2568 record.HL7InstanceIdentifier = ds.HL7InstanceIdentifier 2569 record.ConceptNameCodeSequence = ds.get("ConceptNameCodeSequence") 2570 2571 record.MIMETypeOfEncapsulatedDocument = ds.MIMETypeOfEncapsulatedDocument 2572 2573 return record 2574 2575 2576def _define_palette(ds: Dataset) -> Dataset: 2577 """Return a PALETTE directory record from `ds`.""" 2578 _check_dataset(ds, ["ContentLabel"]) 2579 2580 record = Dataset() 2581 record.ContentLabel = ds.ContentLabel 2582 record.ContentDescription = ds.get("ContentDescription") 2583 2584 return record 2585 2586 2587def _define_implant(ds: Dataset) -> Dataset: 2588 """Return a IMPLANT directory record from `ds`.""" 2589 _check_dataset(ds, ["Manufacturer", "ImplantName", "ImplantPartNumber"]) 2590 2591 record = Dataset() 2592 record.Manufacturer = ds.Manufacturer 2593 record.ImplantName = ds.ImplantName 2594 if "ImplantSize" in ds: 2595 _check_dataset(ds, ["ImplantSize"]) 2596 record.ImplantSize = ds.ImplantSize 2597 record.ImplantPartNumber = ds.ImplantPartNumber 2598 2599 return record 2600 2601 2602def _define_implant_assy(ds: Dataset) -> Dataset: 2603 """Return a IMPLANT ASSY directory record from `ds`.""" 2604 _check_dataset( 2605 ds, 2606 [ 2607 "ImplantAssemblyTemplateName", "Manufacturer", 2608 "ProcedureTypeCodeSequence" 2609 ] 2610 ) 2611 2612 record = Dataset() 2613 record.ImplantAssemblyTemplateName = ds.ImplantAssemblyTemplateName 2614 record.Manufacturer = ds.Manufacturer 2615 record.ProcedureTypeCodeSequence = ds.ProcedureTypeCodeSequence 2616 2617 return record 2618 2619 2620def _define_implant_group(ds: Dataset) -> Dataset: 2621 """Return a IMPLANT GROUP directory record from `ds`.""" 2622 _check_dataset( 2623 ds, 2624 ["ImplantTemplateGroupName", "ImplantTemplateGroupIssuer"] 2625 ) 2626 2627 record = Dataset() 2628 record.ImplantTemplateGroupName = ds.ImplantTemplateGroupName 2629 record.ImplantTemplateGroupIssuer = ds.ImplantTemplateGroupIssuer 2630 2631 return record 2632 2633 2634def _define_surface_scan(ds: Dataset) -> Dataset: 2635 """Return a SURFACE SCAN directory record from `ds`.""" 2636 _check_dataset(ds, ["ContentDate", "ContentTime"]) 2637 2638 record = Dataset() 2639 record.ContentDate = ds.ContentDate 2640 record.ContentTime = ds.ContentTime 2641 2642 return record 2643 2644 2645def _define_assessment(ds: Dataset) -> Dataset: 2646 """Return a ASSESSMENT directory record from `ds`.""" 2647 _check_dataset(ds, ["InstanceNumber", "InstanceCreationDate"]) 2648 2649 record = Dataset() 2650 record.InstanceNumber = ds.InstanceNumber 2651 record.InstanceCreationDate = ds.InstanceCreationDate 2652 record.InstanceCreationTime = ds.get("InstanceCreationTime") 2653 2654 return record 2655 2656 2657def _define_radiotherapy(ds: Dataset) -> Dataset: 2658 """Return a RADIOTHERAPY directory record from `ds`.""" 2659 _check_dataset(ds, ["InstanceNumber"]) 2660 2661 record = Dataset() 2662 record.InstanceNumber = ds.InstanceNumber 2663 if "UserContentLabel" in ds: 2664 _check_dataset(ds, ["UserContentLabel"]) 2665 record.UserContentLabel = ds.UserContentLabel 2666 if "UserContentLongLabel" in ds: 2667 _check_dataset(ds, ["UserContentLongLabel"]) 2668 record.UserContentLongLabel = ds.UserContentLongLabel 2669 2670 record.ContentDescription = ds.get("ContentDescription") 2671 record.ContentCreatorName = ds.get("ContentCreatorName") 2672 2673 return record 2674 2675 2676def _define_generic_content(ds: Dataset) -> Dataset: 2677 """Return a WAVEFORM/RAW DATA directory record from `ds`.""" 2678 _check_dataset(ds, ["InstanceNumber", "ContentDate", "ContentTime"]) 2679 2680 record = Dataset() 2681 record.InstanceNumber = ds.InstanceNumber 2682 record.ContentDate = ds.ContentDate 2683 record.ContentTime = ds.ContentTime 2684 2685 return record 2686 2687 2688def _define_generic_content_id(ds: Dataset) -> Dataset: 2689 """Return a generic content identification directory record from `ds`.""" 2690 _check_dataset( 2691 ds, 2692 ["InstanceNumber", "ContentDate", "ContentTime", "ContentLabel"] 2693 ) 2694 2695 # Content Identification Macro 2696 record = Dataset() 2697 record.InstanceNumber = ds.InstanceNumber 2698 record.ContentDate = ds.ContentDate 2699 record.ContentTime = ds.ContentTime 2700 record.ContentLabel = ds.ContentLabel 2701 record.ContentDescription = ds.get("ContentDescription") 2702 record.ContentCreatorName = ds.get("ContentCreatorName") 2703 2704 return record 2705 2706 2707def _define_empty(ds: Dataset) -> Dataset: 2708 """Return an empty directory record from `ds`.""" 2709 return Dataset() 2710 2711 2712DIRECTORY_RECORDERS = { 2713 "PATIENT": _define_patient, # TOP LEVEL 2714 "STUDY": _define_study, # INTERMEDIATE or LEAF 2715 "SERIES": _define_series, # INTERMEDIATE 2716 "IMAGE": _define_image, # LEAF 2717 "RT DOSE": _define_rt_dose, # LEAF 2718 "RT STRUCTURE SET": _define_rt_structure_set, # LEAF 2719 "RT PLAN": _define_rt_plan, # LEAF 2720 "RT TREAT RECORD": _define_rt_treatment_record, # LEAF 2721 "PRESENTATION": _define_presentation, # LEAF 2722 "WAVEFORM": _define_generic_content, # LEAF 2723 "SR DOCUMENT": _define_sr_document, # LEAF 2724 "KEY OBJECT DOC": _define_key_object_doc, # LEAF 2725 "SPECTROSCOPY": _define_spectroscopy, # LEAF 2726 "RAW DATA": _define_generic_content, # LEAF 2727 "REGISTRATION": _define_generic_content_id, # LEAF 2728 "FIDUCIAL": _define_generic_content_id, # LEAF 2729 "HANGING PROTOCOL": _define_hanging_protocol, # TOP LEVEL and LEAF 2730 "ENCAP DOC": _define_encap_doc, # LEAF 2731 "VALUE MAP": _define_generic_content_id, # LEAF 2732 "STEREOMETRIC": _define_empty, # LEAF 2733 "PALETTE": _define_palette, # TOP LEVEL and LEAF 2734 "IMPLANT": _define_implant, # TOP LEVEL and LEAF 2735 "IMPLANT ASSY": _define_implant_assy, # TOP LEVEL and LEAF 2736 "IMPLANT GROUP": _define_implant_group, # TOP LEVEL and LEAF 2737 "PLAN": _define_empty, # LEAF 2738 "MEASUREMENT": _define_generic_content_id, # LEAF 2739 "SURFACE": _define_generic_content_id, # LEAF 2740 "SURFACE SCAN": _define_surface_scan, # LEAF 2741 "TRACT": _define_generic_content_id, # LEAF 2742 "ASSESSMENT": _define_assessment, # LEAF 2743 "RADIOTHERAPY": _define_radiotherapy, # LEAF 2744} 2745"""A :class:`dict` containing the directory record creation functions. 2746 2747The functions are used to create non-PRIVATE records for a given SOP Instance 2748as ``{"RECORD TYPE": callable}``, where ``"RECORD TYPE"`` should match one of 2749the allowable values - except PRIVATE - for (0004,1430) *Directory Record 2750Type*. By overriding the function for a given record type you can customize 2751the directory records that will be included in the DICOMDIR file. 2752 2753Example 2754------- 2755 2756.. code-block:: python 2757 2758 from pydicom.fileset import DIRECTORY_RECORDERS, FileSet 2759 2760 def my_recorder(ds: Dataset) -> Dataset: 2761 record = Dataset() 2762 record.OffsetOfTheNextDirectoryRecord = 0 2763 record.RecordInUseFlag = 0xFFFF 2764 record.OffsetOfReferencedLowerLevelDirectoryEntity = 0 2765 record.DirectoryRecordType = "PATIENT" 2766 if "SpecificCharacterSet" in ds: 2767 record.SpecificCharacterSet = ds.SpecificCharacterSet 2768 2769 record.PatientName = ds.get("PatientName") 2770 record.PatientID = ds.PatientID 2771 2772 return record 2773 2774 DIRECTORY_RECORDERS["PATIENT"] = my_recorder 2775 2776 # Use the updated directory recorder 2777 fs = FileSet() 2778 fs.add('my_instance.dcm') 2779 2780The function should take a single parameter which is the SOP Instance to be 2781added to the File-set as a :class:`~pydicom.dataset.Dataset` and return a 2782:class:`~pydicom.dataset.Dataset` with a single directory record matching the 2783directory record type. See :dcm:`Annex F.3.2.2<chtml/part03/sect_F.3.2.2.html>` 2784for possible record types. 2785 2786For PRIVATE records you must use the 2787:meth:`~pydicom.fileset.FileSet.add_custom` method instead. 2788""" 2789_SINGLE_LEVEL_SOP_CLASSES = { 2790 sop.HangingProtocolStorage: "HANGING PROTOCOL", 2791 sop.ColorPaletteStorage: "PALETTE", 2792 sop.GenericImplantTemplateStorage: "IMPLANT", 2793 sop.ImplantAssemblyTemplateStorage: "IMPLANT ASSY", 2794 sop.ImplantTemplateGroupStorage: "IMPLANT GROUP", 2795} 2796_FOUR_LEVEL_SOP_CLASSES = { 2797 sop.RTDoseStorage: "RT DOSE", 2798 sop.RTStructureSetStorage: "RT STRUCTURE SET", 2799 sop.RTBeamsTreatmentRecordStorage: "RT TREAT RECORD", 2800 sop.RTBrachyTreatmentRecordStorage: "RT TREAT RECORD", 2801 sop.RTTreatmentSummaryRecordStorage: "RT TREAT RECORD", 2802 sop.RTIonBeamsTreatmentRecordStorage: "RT TREAT RECORD", 2803 sop.GrayscaleSoftcopyPresentationStateStorage: "PRESENTATION", 2804 sop.ColorSoftcopyPresentationStateStorage: "PRESENTATION", 2805 sop.PseudoColorSoftcopyPresentationStateStorage: "PRESENTATION", 2806 sop.BlendingSoftcopyPresentationStateStorage: "PRESENTATION", 2807 sop.XAXRFGrayscaleSoftcopyPresentationStateStorage: "PRESENTATION", 2808 sop.BasicStructuredDisplayStorage: "PRESENTATION", 2809 sop.BasicVoiceAudioWaveformStorage: "WAVEFORM", 2810 sop.TwelveLeadECGWaveformStorage: "WAVEFORM", 2811 sop.GeneralECGWaveformStorage: "WAVEFORM", 2812 sop.AmbulatoryECGWaveformStorage: "WAVEFORM", 2813 sop.HemodynamicWaveformStorage: "WAVEFORM", 2814 sop.CardiacElectrophysiologyWaveformStorage: "WAVEFORM", 2815 sop.ArterialPulseWaveformStorage: "WAVEFORM", 2816 sop.RespiratoryWaveformStorage: "WAVEFORM", 2817 sop.GeneralAudioWaveformStorage: "WAVEFORM", 2818 sop.RoutineScalpElectroencephalogramWaveformStorage: "WAVEFORM", 2819 sop.ElectromyogramWaveformStorage: "WAVEFORM", 2820 sop.ElectrooculogramWaveformStorage: "WAVEFORM", 2821 sop.SleepElectroencephalogramWaveformStorage: "WAVEFORM", 2822 sop.MultichannelRespiratoryWaveformStorage: "WAVEFORM", 2823 sop.BodyPositionWaveformStorage: "WAVEFORM", 2824 sop.BasicTextSRStorage: "SR DOCUMENT", 2825 sop.EnhancedSRStorage: "SR DOCUMENT", 2826 sop.ComprehensiveSRStorage: "SR DOCUMENT", 2827 sop.MammographyCADSRStorage: "SR DOCUMENT", 2828 sop.ChestCADSRStorage: "SR DOCUMENT", 2829 sop.ProcedureLogStorage: "SR DOCUMENT", 2830 sop.XRayRadiationDoseSRStorage: "SR DOCUMENT", 2831 sop.SpectaclePrescriptionReportStorage: "SR DOCUMENT", 2832 sop.ColonCADSRStorage: "SR DOCUMENT", 2833 sop.MacularGridThicknessAndVolumeReportStorage: "SR DOCUMENT", 2834 sop.ImplantationPlanSRStorage: "SR DOCUMENT", 2835 sop.Comprehensive3DSRStorage: "SR DOCUMENT", 2836 sop.RadiopharmaceuticalRadiationDoseSRStorage: "SR DOCUMENT", 2837 sop.ExtensibleSRStorage: "SR DOCUMENT", 2838 sop.AcquisitionContextSRStorage: "SR DOCUMENT", 2839 sop.SimplifiedAdultEchoSRStorage: "SR DOCUMENT", 2840 sop.PatientRadiationDoseSRStorage: "SR DOCUMENT", 2841 sop.PlannedImagingAgentAdministrationSRStorage: "SR DOCUMENT", 2842 sop.PerformedImagingAgentAdministrationSRStorage: "SR DOCUMENT", 2843 sop.KeyObjectSelectionDocumentStorage: "KEY OBJECT DOC", 2844 sop.MRSpectroscopyStorage: "SPECTROSCOPY", 2845 sop.RawDataStorage: "RAW DATA", 2846 sop.SpatialRegistrationStorage: "REGISTRATION", 2847 sop.DeformableSpatialRegistrationStorage: "REGISTRATION", 2848 sop.SpatialFiducialsStorage: "FIDUCIAL", 2849 sop.RealWorldValueMappingStorage: "VALUE MAP", 2850 sop.StereometricRelationshipStorage: "STEREOMETRIC", 2851 sop.LensometryMeasurementsStorage: "MEASUREMENT", 2852 sop.AutorefractionMeasurementsStorage: "MEASUREMENT", 2853 sop.KeratometryMeasurementsStorage: "MEASUREMENT", 2854 sop.SubjectiveRefractionMeasurementsStorage: "MEASUREMENT", 2855 sop.VisualAcuityMeasurementsStorage: "MEASUREMENT", 2856 sop.OphthalmicAxialMeasurementsStorage: "MEASUREMENT", 2857 sop.OphthalmicVisualFieldStaticPerimetryMeasurementsStorage: "MEASUREMENT", 2858 sop.SurfaceSegmentationStorage: "SURFACE", 2859 sop.SurfaceScanMeshStorage: "SURFACE SCAN", 2860 sop.SurfaceScanPointCloudStorage: "SURFACE SCAN", 2861 sop.TractographyResultsStorage: "TRACT", 2862 sop.ContentAssessmentResultsStorage: "ASSESSMENT", 2863} 2864 2865 2866def _single_level_record_type(ds: Dataset) -> str: 2867 """Return a single-level *Directory Record Type* for `ds`.""" 2868 sop_class = getattr(ds, "SOPClassUID", None) 2869 2870 try: 2871 return _SINGLE_LEVEL_SOP_CLASSES[sop_class] 2872 except KeyError: 2873 return "PATIENT" 2874 2875 2876def _four_level_record_type(ds: Dataset) -> str: 2877 """Return the fourth-level *Directory Record Type* for `ds`.""" 2878 modality = getattr(ds, "Modality", None) 2879 if modality in ["RTINTENT", "RTSEGANN", "RTRAD"]: 2880 return "RADIOTHERAPY" 2881 2882 if modality == "PLAN": 2883 return "PLAN" 2884 2885 if "EncapsulatedDocument" in ds: 2886 return "ENCAP DOC" 2887 2888 if "RTPlanLabel" in ds: 2889 return "RT PLAN" 2890 2891 sop_class = getattr(ds, "SOPClassUID", None) 2892 2893 try: 2894 return _FOUR_LEVEL_SOP_CLASSES[sop_class] 2895 except KeyError: 2896 return "IMAGE" 2897