1 2import os 3from pathlib import Path 4import shutil 5from tempfile import TemporaryDirectory 6 7import pytest 8 9from pydicom import config, dcmread 10from pydicom.data import get_testdata_file 11from pydicom.dataset import Dataset, FileMetaDataset 12from pydicom.filebase import DicomBytesIO 13from pydicom.fileset import ( 14 FileSet, FileInstance, RecordNode, is_conformant_file_id, 15 generate_filename, _define_patient, _define_study, _define_series, 16 _define_image, _PREFIXES 17) 18from pydicom.filewriter import write_dataset 19from pydicom.tag import Tag, BaseTag 20from pydicom._storage_sopclass_uids import ( 21 MediaStorageDirectoryStorage, ComputedRadiographyImageStorage, 22 CTImageStorage, RTBeamsTreatmentRecordStorage, RTPlanStorage, 23 GrayscaleSoftcopyPresentationStateStorage, BasicTextSRStorage, 24 KeyObjectSelectionDocumentStorage, MRSpectroscopyStorage, 25 HangingProtocolStorage, EncapsulatedPDFStorage, ColorPaletteStorage, 26 GenericImplantTemplateStorage, ImplantAssemblyTemplateStorage, 27 ImplantTemplateGroupStorage, TwelveLeadECGWaveformStorage, 28 RawDataStorage, SpatialRegistrationStorage, SpatialFiducialsStorage, 29 RealWorldValueMappingStorage, StereometricRelationshipStorage, 30 LensometryMeasurementsStorage, SurfaceSegmentationStorage, 31 TractographyResultsStorage, SurfaceScanMeshStorage, RTDoseStorage, 32 ContentAssessmentResultsStorage, RTStructureSetStorage, 33 RTBeamsDeliveryInstructionStorage, CArmPhotonElectronRadiationStorage 34) 35from pydicom.uid import ( 36 ExplicitVRLittleEndian, generate_uid, ImplicitVRLittleEndian 37) 38 39 40TEST_FILE = get_testdata_file('DICOMDIR') 41TINY_ALPHA_FILESET = get_testdata_file("tiny_alpha/DICOMDIR") 42IMPLICIT_TEST_FILE = get_testdata_file('DICOMDIR-implicit') 43BIGENDIAN_TEST_FILE = get_testdata_file('DICOMDIR-bigEnd') 44 45 46@pytest.fixture 47def tiny(): 48 """Return the tiny alphanumeric File-set.""" 49 return dcmread(TINY_ALPHA_FILESET) 50 51 52@pytest.fixture 53def dicomdir(): 54 """Return the DICOMDIR dataset.""" 55 return dcmread(TEST_FILE) 56 57 58@pytest.fixture 59def dicomdir_copy(): 60 """Copy the File-set to a temporary directory and return its DICOMDIR.""" 61 t = TemporaryDirectory() 62 src = Path(TEST_FILE).parent 63 dst = Path(t.name) 64 65 shutil.copyfile(src / 'DICOMDIR', dst / 'DICOMDIR') 66 shutil.copytree(src / "77654033", dst / "77654033") 67 shutil.copytree(src / "98892003", dst / "98892003") 68 shutil.copytree(src / "98892001", dst / "98892001") 69 70 return t, dcmread(dst / "DICOMDIR") 71 72 73@pytest.fixture 74def ct(): 75 """Return a DICOMDIR dataset.""" 76 return dcmread(get_testdata_file("CT_small.dcm")) 77 78 79@pytest.fixture 80def tdir(): 81 """Return a TemporaryDirectory instance.""" 82 return TemporaryDirectory() 83 84 85@pytest.fixture 86def custom_leaf(): 87 """Return the leaf node from a custom 4-level record hierarchy""" 88 ct = dcmread(get_testdata_file("CT_small.dcm")) 89 patient = _define_patient(ct) 90 study = _define_study(ct) 91 series = _define_series(ct) 92 image = _define_image(ct) 93 for ii, record in enumerate([patient, study, series, image]): 94 rtypes = ["PATIENT", "STUDY", "SERIES", "IMAGE"] 95 record.DirectoryRecordType = rtypes[ii] 96 record.OffsetOfTheNextDirectoryRecord = 0 97 record.RecordInUseFlag = 0xFFFF 98 record.OffsetOfReferencedLowerLevelDirectoryEntity = 0 99 100 patient = RecordNode(patient) 101 study = RecordNode(study) 102 series = RecordNode(series) 103 104 image.ReferencedFileID = None 105 image.ReferencedSOPClassUIDInFile = ct.SOPClassUID 106 image.ReferencedSOPInstanceUIDInFile = ct.SOPInstanceUID 107 image.ReferencedTransferSyntaxUIDInFile = ( 108 ct.file_meta.TransferSyntaxUID 109 ) 110 image = RecordNode(image) 111 112 image.parent = series 113 series.parent = study 114 study.parent = patient 115 116 return image 117 118 119@pytest.fixture 120def private(dicomdir): 121 """Return a DICOMDIR dataset with PRIVATE records.""" 122 def write_record(ds): 123 """Return `ds` as explicit little encoded bytes.""" 124 fp = DicomBytesIO() 125 fp.is_implicit_VR = False 126 fp.is_little_endian = True 127 write_dataset(fp, ds) 128 129 return fp.parent.getvalue() 130 131 def private_record(): 132 record = Dataset() 133 record.OffsetOfReferencedLowerLevelDirectoryEntity = 0 134 record.RecordInUseFlag = 65535 135 record.OffsetOfTheNextDirectoryRecord = 0 136 record.DirectoryRecordType = "PRIVATE" 137 record.PrivateRecordUID = generate_uid() 138 139 return record 140 141 ds = dicomdir 142 143 top = private_record() 144 middle = private_record() 145 bottom = private_record() 146 bottom.ReferencedSOPClassUIDInFile = "1.2.3.4" 147 bottom.ReferencedFileID = [ 148 "tiny_alpha", "PT000000", "ST000000", "SE000000", "IM000000" 149 ] 150 bottom.ReferencedSOPInstanceUIDInFile = ( 151 "1.2.276.0.7230010.3.1.4.0.31906.1359940846.78187" 152 ) 153 bottom.ReferencedTransferSyntaxUIDInFile = ExplicitVRLittleEndian 154 155 len_top = len(write_record(top)) # 112 156 len_middle = len(write_record(middle)) # 112 157 len_bottom = len(write_record(bottom)) # 238 158 len_last = len(write_record(ds.DirectoryRecordSequence[-1])) # 248 159 160 records = {} 161 for item in ds.DirectoryRecordSequence: 162 records[item.seq_item_tell] = item 163 164 # Top PRIVATE 165 # Offset to the top PRIVATE - 10860 + 248 + 8 166 offset = ds.DirectoryRecordSequence[-1].seq_item_tell + len_last + 8 167 168 # Change the last top-level record to point at the top PRIVATE 169 # Original is 3126 170 last = ds.OffsetOfTheLastDirectoryRecordOfTheRootDirectoryEntity 171 record = records[last] 172 record.OffsetOfTheNextDirectoryRecord = offset 173 174 # Change the last record offset 175 ds.OffsetOfTheLastDirectoryRecordOfTheRootDirectoryEntity = offset 176 top.seq_item_tell = offset 177 178 # Offset to the middle PRIVATE 179 offset += len_top + 8 180 top.OffsetOfReferencedLowerLevelDirectoryEntity = offset 181 ds.DirectoryRecordSequence.append(top) 182 middle.seq_item_tell = offset 183 184 # Middle PRIVATE 185 # Offset to the bottom PRIVATE 186 offset += len_middle + 8 187 middle.OffsetOfReferencedLowerLevelDirectoryEntity = offset 188 ds.DirectoryRecordSequence.append(middle) 189 190 # Bottom PRIVATE 191 ds.DirectoryRecordSequence.append(bottom) 192 bottom.seq_item_tell = offset 193 194 # Redo the record parsing to reflect changes 195 ds.parse_records() 196 197 return ds 198 199 200@pytest.fixture 201def dummy(): 202 """Return a dummy dataset used for testing the record creators""" 203 ds = Dataset() 204 ds.file_meta = FileMetaDataset() 205 ds.file_meta.TransferSyntaxUID = ExplicitVRLittleEndian 206 ds.PatientID = "12345678" 207 ds.PatientName = "Citizen^Jan" 208 ds.StudyDate = "20201001" 209 ds.StudyTime = "120000" 210 ds.StudyID = "1" 211 ds.StudyInstanceUID = "1.2.3" 212 ds.SeriesInstanceUID = "1.2.3.4" 213 ds.SeriesNumber = "1" 214 ds.SOPInstanceUID = "1.2.3.4.5" 215 ds.InstanceNumber = "1" 216 ds.ContentDate = "20201002" 217 ds.ContentTime = "120100" 218 ds.ContentLabel = "Content label" 219 ds.ContentDescription = "Content description" 220 ds.ContentCreatorName = "Content^Creator^Name" 221 ds.TreatmentDate = "20201003" 222 ds.TreatmentTime = "120200" 223 ds.PresentationCreationDate = "20201004" 224 ds.PresentationCreationTime = "120300" 225 ds.InstanceCreationDate = "20200105" 226 ds.InstanceCreationTime = "120400" 227 ds.CompletionFlag = "COMPLETE" 228 ds.VerificationFlag = "VERIFIED" 229 ds.ConceptNameCodeSequence = [Dataset()] 230 ds.ImageType = "ADDITION" 231 ds.NumberOfFrames = 7 232 ds.Rows = 10 233 ds.Columns = 11 234 ds.DataPointRows = 12 235 ds.DataPointColumns = 13 236 ds.HangingProtocolCreator = "HP Creator" 237 ds.HangingProtocolCreationDateTime = "20201001120000" 238 ds.HangingProtocolDefinitionSequence = [Dataset()] 239 ds.NumberOfPriorsReferenced = 2 240 ds.HangingProtocolUserIdentificationCodeSequence = [Dataset()] 241 ds.DocumentTitle = "Document title" 242 ds.MIMETypeOfEncapsulatedDocument = "PDF" 243 ds.Manufacturer = "Implant manufacturer" 244 ds.ImplantName = "Implant name" 245 ds.ImplantPartNumber = "PN01" 246 ds.ImplantAssemblyTemplateName = "Template name" 247 ds.ProcedureTypeCodeSequence = [Dataset()] 248 ds.ImplantTemplateGroupName = "Group name" 249 ds.ImplantTemplateGroupIssuer = "Group issuer" 250 ds.RTPlanDate = "20201006" 251 ds.RTPlanTime = "120600" 252 ds.DoseSummationType = "PLAN" 253 ds.StructureSetLabel = "Structure set label" 254 ds.StructureSetDate = "20201007" 255 ds.StructureSetTime = "120700" 256 257 # To be customised 258 ds.Modality = "CT" # PLAN, STSEGANN 259 ds.SOPClassUID = CTImageStorage 260 261 # To be added 262 # ds.EncapsulatedDocument = None 263 # ds.RTPlanLabel = None 264 265 # 1C elements 266 opt = Dataset() 267 opt.SpecificCharacterSet = "ISO_IR 100" 268 opt.BlendingSequence = [Dataset()] 269 opt.ReferencedSeriesSequence = [Dataset()] 270 opt.VerificationDateTime = "20201001120000" 271 opt.ContentSequence = [Dataset()] 272 opt.ReferencedImageEvidenceSequence = [Dataset()] 273 opt.HL7InstanceIdentifier = "HL7 identifier" 274 opt.ImplantSize = "13.4x12.5" 275 opt.UserContentLabel = "Content label" 276 opt.UserContentLongLabel = "Content long label" 277 278 return ds, opt 279 280 281def write_fs(fs, path=None): 282 """Call FileSet.write(path). 283 284 Returns 285 ------- 286 pydicom.dataset.Dataset 287 The resulting DICOMDIR dataset 288 list of PathLike 289 A list of paths for the non-DICOMDIR files in the File-set. 290 """ 291 fs.write(path) 292 path = Path(fs.path) 293 paths = [ 294 p for p in path.glob('**/*') 295 if p.is_file() and p.name != 'DICOMDIR' 296 ] 297 return dcmread(path / "DICOMDIR"), sorted(paths) 298 299 300def copy_fs(fs, path, as_implicit=False): 301 """Call FileSet.copy(path). 302 303 Returns 304 ------- 305 pydicom.fileset.FileSet 306 The new FileSet, 307 pydicom.dataset.Dataset 308 The new File-set's DICOMDIR dataset 309 list of PathLike 310 A list of paths for the non-DICOMDIR files in the new File-set. 311 """ 312 path = Path(path) 313 fs = fs.copy(path, force_implicit=as_implicit) 314 paths = [ 315 p for p in path.glob('**/*') 316 if p.is_file() and p.name != 'DICOMDIR' 317 ] 318 return fs, dcmread(path / "DICOMDIR"), sorted(paths) 319 320 321def temporary_fs(ds): 322 """Copy a File-set to a temporary directory.""" 323 t = TemporaryDirectory() 324 src = Path(ds.filename).parent 325 dst = Path(t.name) 326 327 shutil.copyfile(src / 'DICOMDIR', dst / 'DICOMDIR') 328 for d in src.glob('*'): 329 if d.is_dir(): 330 shutil.copytree(d, dst / d.name) 331 332 return t, dcmread(dst / "DICOMDIR") 333 334 335def test_is_conformant_file_id(): 336 """Test conformant and non-conformant File ID paths""" 337 bad = [ 338 "aBCDEF123", "aBCD1234", "ABCD!", "1234)", " ", 339 "1/2/3/4/5/6/7/8/9", "لنزار", "ABCD.DCM", "123 ABCD" 340 ] 341 for p in bad: 342 assert not is_conformant_file_id(Path(p)) 343 344 good = [ 345 "ACBDEFGH", "12345678", "1/2/3/4/5/6/7/8", "0", "9", "A", "Z", 346 "ABCD1234", "1234ABCD", "_", "_ABCD", "ABCD_", "AB_CD", "________", 347 "A_______", "_______1" 348 ] 349 for p in good: 350 assert is_conformant_file_id(Path(p)) 351 352 353def test_prefixes(): 354 """Test that the file ID prefixes are unique.""" 355 prefixes = set(_PREFIXES.values()) 356 assert len(_PREFIXES) == len(prefixes) 357 358 359class TestGenerateFilename: 360 """Tests for generate_filename().""" 361 def test_numeric(self): 362 """Test generating numeric suffixes.""" 363 gen = generate_filename(start=0, alphanumeric=False) 364 assert '00000000' == next(gen) 365 assert '00000001' == next(gen) 366 assert '00000002' == next(gen) 367 assert '00000003' == next(gen) 368 assert '00000004' == next(gen) 369 assert '00000005' == next(gen) 370 assert '00000006' == next(gen) 371 assert '00000007' == next(gen) 372 assert '00000008' == next(gen) 373 assert '00000009' == next(gen) 374 assert '00000010' == next(gen) 375 376 def test_numeric_prefix(self): 377 """Test prefix for numeric filenames.""" 378 for ii in range(1, 8): 379 prefix = "A" * ii 380 gen = generate_filename( 381 prefix="A" * ii, start=0, alphanumeric=False 382 ) 383 assert prefix + '0' * (8 - ii) == next(gen) 384 385 def test_numeric_start(self): 386 """Test start point with numeric suffixes.""" 387 gen = generate_filename(start=10, alphanumeric=False) 388 assert '00000010' == next(gen) 389 assert '00000011' == next(gen) 390 assert '00000012' == next(gen) 391 392 def test_alphanumeric(self): 393 """Test generating alphanumeric suffixes.""" 394 gen = generate_filename(start=0, alphanumeric=True) 395 assert '00000000' == next(gen) 396 assert '00000001' == next(gen) 397 assert '00000002' == next(gen) 398 assert '00000003' == next(gen) 399 assert '00000004' == next(gen) 400 assert '00000005' == next(gen) 401 assert '00000006' == next(gen) 402 assert '00000007' == next(gen) 403 assert '00000008' == next(gen) 404 assert '00000009' == next(gen) 405 assert '0000000A' == next(gen) 406 for ii in range(24): 407 next(gen) 408 assert '0000000Z' == next(gen) 409 assert '00000010' == next(gen) 410 411 def test_alphanumeric_prefix(self): 412 """Test length of the suffixes.""" 413 for ii in range(1, 8): 414 prefix = "A" * ii 415 gen = generate_filename( 416 prefix="A" * ii, start=0, alphanumeric=True 417 ) 418 assert prefix + '0' * (8 - ii) == next(gen) 419 assert prefix + '0' * (7 - ii) + '1' == next(gen) 420 assert prefix + '0' * (7 - ii) + '2' == next(gen) 421 assert prefix + '0' * (7 - ii) + '3' == next(gen) 422 assert prefix + '0' * (7 - ii) + '4' == next(gen) 423 assert prefix + '0' * (7 - ii) + '5' == next(gen) 424 assert prefix + '0' * (7 - ii) + '6' == next(gen) 425 assert prefix + '0' * (7 - ii) + '7' == next(gen) 426 assert prefix + '0' * (7 - ii) + '8' == next(gen) 427 assert prefix + '0' * (7 - ii) + '9' == next(gen) 428 assert prefix + '0' * (7 - ii) + 'A' == next(gen) 429 430 def test_alphanumeric_start(self): 431 """Test start point with alphanumeric suffixes.""" 432 gen = generate_filename(start=10, alphanumeric=True) 433 assert '0000000A' == next(gen) 434 assert '0000000B' == next(gen) 435 assert '0000000C' == next(gen) 436 437 def test_long_prefix_raises(self): 438 """Test too long a prefix.""" 439 msg = r"The 'prefix' must be less than 8 characters long" 440 with pytest.raises(ValueError, match=msg): 441 next(generate_filename('A' * 8)) 442 443 444@pytest.mark.filterwarnings("ignore:The 'DicomDir'") 445class TestRecordNode: 446 """Tests for RecordNode.""" 447 def test_root(self, private): 448 """Tests the root node.""" 449 fs = FileSet(private) 450 root = fs._tree 451 assert [] == root.ancestors 452 msg = r"The root node doesn't contribute a File ID component" 453 with pytest.raises(ValueError, match=msg): 454 root.component 455 456 assert root.file_set == fs 457 assert root.parent is None 458 assert 3 == len(root.children) 459 assert 55 == len(list(iter(root))) 460 461 # Test __contains__ 462 for child in root.children: 463 assert child in root 464 assert child.key in root 465 466 assert -1 == root.depth 467 assert 0 == root.index 468 assert not root.has_instance 469 assert root.is_root 470 assert root.previous is None 471 assert root.next is None 472 assert root == root.root 473 474 assert "ROOT" == str(root) 475 476 msg = r"'RootNode' object has no attribute '_record'" 477 with pytest.raises(AttributeError, match=msg): 478 root.key 479 480 with pytest.raises(AttributeError, match=msg): 481 root.record_type 482 483 assert pytest.raises(StopIteration, next, root.reverse()) 484 485 # Test __getitem__ 486 for child in root.children: 487 assert child == root[child.key] 488 assert child == root[child] # bit silly 489 490 child = child.children[0] 491 with pytest.raises(KeyError): 492 root[child.key] 493 494 with pytest.raises(KeyError): 495 root[child] 496 497 with pytest.raises(KeyError): 498 del root[child.key] 499 500 with pytest.raises(KeyError): 501 del root[child] 502 503 # Test __delitem__ 504 del root[root.children[0]] 505 assert 2 == len(root.children) 506 assert 41 == len(list(iter(root))) 507 508 # Test __iter__ 509 gen = iter(root) 510 assert "PatientID='98890234'" in str(next(gen)) 511 assert "StudyDate=20010101" in str(next(gen)) 512 assert "SeriesNumber=4" in str(next(gen)) 513 assert "InstanceNumber=1" in str(next(gen)) 514 assert "InstanceNumber=2" in str(next(gen)) 515 assert "SeriesNumber=5" in str(next(gen)) 516 assert "InstanceNumber=6" in str(next(gen)) 517 assert "InstanceNumber=7" in str(next(gen)) 518 for ii in range(29): 519 next(gen) 520 521 assert "InstanceNumber=7" in str(next(gen)) 522 assert "PRIVATE" in str(next(gen)) 523 assert "PRIVATE" in str(next(gen)) 524 assert "PRIVATE" in str(next(gen)) 525 assert pytest.raises(StopIteration, next, gen) 526 527 def test_leaf(self, private): 528 """Test a leaf node.""" 529 fs = FileSet(private) 530 531 # non-PRIVATE 532 leaf = fs._instances[5].node 533 assert [] == leaf.children 534 assert leaf.has_instance 535 ancestors = leaf.ancestors 536 assert 3 == len(ancestors) 537 assert "IMAGE" == leaf.record_type 538 assert "IM000002" == leaf.component 539 assert "SERIES" in str(ancestors[0]) 540 assert "SE000000" == ancestors[0].component 541 assert "STUDY" in str(ancestors[1]) 542 assert "ST000001" == ancestors[1].component 543 assert "PATIENT" in str(ancestors[2]) 544 assert "PT000000" == ancestors[2].component 545 assert 3 == leaf.depth 546 assert fs == leaf.file_set 547 assert 2 == leaf.index 548 assert not leaf.is_root 549 gen = iter(leaf) 550 assert leaf == next(gen) 551 assert pytest.raises(StopIteration, next, gen) 552 assert leaf.parent.children[3] == leaf.next 553 assert leaf.parent.children[1] == leaf.previous 554 555 # PRIVATE 556 leaf = fs._instances[-1].node 557 assert [] == leaf.children 558 assert leaf.has_instance 559 ancestors = leaf.ancestors 560 assert 2 == len(ancestors) 561 assert "PRIVATE" == leaf.record_type 562 assert "P2000000" == leaf.component 563 assert "PRIVATE" in str(ancestors[0]) 564 assert "P1000000" == ancestors[0].component 565 assert "PRIVATE" in str(ancestors[1]) 566 assert "P0000002" == ancestors[1].component 567 assert 2 == leaf.depth 568 assert fs == leaf.file_set 569 assert 0 == leaf.index 570 assert not leaf.is_root 571 gen = iter(leaf) 572 assert leaf == next(gen) 573 assert pytest.raises(StopIteration, next, gen) 574 assert leaf.next is None 575 assert leaf.previous is None 576 577 def test_add(self, private, ct): 578 """Test instance added at end of children""" 579 fs = FileSet(private) 580 instance = fs._instances[0] 581 parent = instance.node.parent 582 assert 1 == len(parent.children) 583 assert 0 == instance.node.index 584 assert instance.node.next is None 585 assert instance.node.previous is None 586 587 ct.PatientID = instance.PatientID 588 ct.StudyInstanceUID = instance.StudyInstanceUID 589 ct.SeriesInstanceUID = instance.SeriesInstanceUID 590 added = fs.add(ct) 591 assert 2 == len(parent.children) 592 assert 0 == instance.node.index 593 assert added.node == instance.node.next 594 assert instance.node.previous is None 595 assert 1 == added.node.index 596 assert instance.node == added.node.previous 597 assert added.node.next is None 598 599 def test_key(self, private): 600 """Test the record keys.""" 601 fs = FileSet(private) 602 root = fs._tree 603 node = root.children[0] 604 assert node._record.PatientID == node.key 605 node = node.children[0] 606 assert node._record.StudyInstanceUID == node.key 607 node = node.children[0] 608 assert node._record.SeriesInstanceUID == node.key 609 node = node.children[0] 610 assert node._record.ReferencedSOPInstanceUIDInFile == node.key 611 612 node = root.children[-1] 613 assert node._record.PrivateRecordUID == node.key 614 node = node.children[-1] 615 assert node._record.PrivateRecordUID == node.key 616 node = node.children[-1] 617 assert node._record.PrivateRecordUID == node.key 618 619 # Test STUDY directly referencing an instance 620 ds = private 621 seq = ds.DirectoryRecordSequence 622 uid = seq[3].ReferencedSOPInstanceUIDInFile 623 seq[1].ReferencedSOPInstanceUIDInFile = uid 624 seq[1].ReferencedTransferSyntaxUIDInFile = ExplicitVRLittleEndian 625 seq[1].ReferencedSOPClassUID = ComputedRadiographyImageStorage 626 seq[1].OffsetOfReferencedLowerLevelDirectoryEntity = 0 627 seq[1].ReferencedFileID = seq[3].ReferencedFileID 628 del seq[1].StudyInstanceUID 629 fs = FileSet(ds) 630 # The leaf STUDY node 631 node = fs._tree.children[0].children[0] 632 assert [] == node.children 633 assert uid == node.key 634 assert node.has_instance 635 636 def test_key_raises(self, dummy): 637 """Test missing required element raises.""" 638 ds, opt = dummy 639 ds.SOPClassUID = ColorPaletteStorage 640 fs = FileSet() 641 instance = fs.add(ds) 642 del instance.node._record.ReferencedSOPInstanceUIDInFile 643 644 msg = ( 645 r"Invalid 'PALETTE' record - missing required element " 646 r"'Referenced SOP Instance UID in File'" 647 ) 648 with pytest.raises(AttributeError, match=msg): 649 instance.node.key 650 651 def test_bad_record(self, private): 652 """Test a bad directory record raises an exception when loading.""" 653 del private.DirectoryRecordSequence[0].PatientID 654 msg = ( 655 r"The PATIENT directory record at offset 396 is missing a " 656 r"required element" 657 ) 658 with pytest.raises(ValueError, match=msg): 659 FileSet(private) 660 661 private.DirectoryRecordSequence[0].PatientID = 77654033 662 del private.DirectoryRecordSequence[1].StudyInstanceUID 663 msg = ( 664 r"The STUDY directory record at offset 510 is missing a required " 665 r"element" 666 ) 667 with pytest.raises(ValueError, match=msg): 668 FileSet(private) 669 670 def test_bad_record_missing_req(self, private): 671 """Test bad directory record raises if missing required element.""" 672 del private.DirectoryRecordSequence[0].DirectoryRecordType 673 msg = ( 674 r"The directory record at offset 396 is missing one or more " 675 r"required elements: DirectoryRecordType" 676 ) 677 with pytest.raises(ValueError, match=msg): 678 FileSet(private) 679 680 def test_encoding(self, private, tdir): 681 """Test group element not added when encoding.""" 682 fs = FileSet(private) 683 node = fs._instances[0].node 684 fs._instances[0].node._record.add_new(0x00080000, "UL", 128) 685 fs._instances[0].node._record.PatientSex = 'F' 686 fs, ds, paths = copy_fs(fs, tdir.name) 687 item = ds.DirectoryRecordSequence[3] 688 assert 0x00080000 not in item 689 assert "PatientSex" in item 690 691 def test_remove_raises(self, private): 692 """Test RecordNode.remove() raises if not a leaf.""" 693 fs = FileSet(private) 694 node = fs._tree.children[0] 695 assert not node.has_instance 696 msg = r"Only leaf nodes can be removed" 697 with pytest.raises(ValueError, match=msg): 698 fs._tree.remove(node) 699 700 def test_file_id_singleton(self, ct, tdir): 701 """Test a singleton File ID.""" 702 fs = FileSet() 703 p = Path(tdir.name) 704 ct.save_as(p / "01") 705 fs.add(p / "01") 706 fs.write(p) 707 ds = dcmread(p / "DICOMDIR") 708 item = ds.DirectoryRecordSequence[-1] 709 assert "IMAGE" == item.DirectoryRecordType 710 item.ReferencedFileID = "01" 711 ds.save_as(p / "DICOMDIR") 712 fs = FileSet(ds) 713 assert fs._instances[0].node._file_id == Path("01") 714 715 def test_file_id_missing(self, ct): 716 """Test RecordNode._file_id if no Referenced File ID.""" 717 fs = FileSet() 718 instance = fs.add(ct) 719 del instance.node._record.ReferencedFileID 720 msg = r"No 'Referenced File ID' in the directory record" 721 with pytest.raises(AttributeError, match=msg): 722 instance.node._file_id 723 724 725@pytest.mark.filterwarnings("ignore:The 'DicomDir'") 726class TestFileInstance: 727 """Tests for FileInstance.""" 728 def test_getattr(self, dicomdir): 729 """Test FileInstance.__getattribute__.""" 730 fs = FileSet(dicomdir) 731 instance = fs._instances[0] 732 assert "20010101" == instance.StudyDate 733 instance.my_attr = 1234 734 assert 1234 == instance.my_attr 735 msg = r"'FileInstance' object has no attribute 'missing_attr'" 736 with pytest.raises(AttributeError, match=msg): 737 instance.missing_attr 738 739 def test_getattr_order(self, private): 740 """Test records are searched closest to furthest""" 741 fs = FileSet(private) 742 instance = fs._instances[-1] 743 assert instance.is_private 744 # a: root, b: middle, c: bottom, 745 c = instance.node 746 b = c.parent 747 a = b.parent 748 assert c._record.PrivateRecordUID != b._record.PrivateRecordUID 749 assert c._record.PrivateRecordUID != a._record.PrivateRecordUID 750 assert instance.PrivateRecordUID == c._record.PrivateRecordUID 751 752 def test_getitem(self, dicomdir): 753 """Test FileInstance.__getitem__.""" 754 fs = FileSet(dicomdir) 755 instance = fs._instances[0] 756 assert "20010101" == instance["StudyDate"].value 757 assert "20010101" == instance[0x00080020].value 758 assert "20010101" == instance[Tag(0x00080020)].value 759 assert "20010101" == instance[(0x0008, 0x0020)].value 760 assert "20010101" == instance["0x00080020"].value 761 762 with pytest.raises(KeyError, match=r"(0000, 0000)"): 763 instance[0x00000000] 764 765 def test_getitem_special(self, tiny): 766 """Test FileInstance.__getitem__ for the three special elements.""" 767 fs = FileSet(tiny) 768 instance = fs._instances[0] 769 elem = instance["SOPInstanceUID"] 770 assert ( 771 "1.2.826.0.1.3680043.8.498.66612287766462461480665815941164330386" 772 ) == elem.value 773 elem = instance["SOPClassUID"] 774 assert CTImageStorage == elem.value 775 elem = instance["TransferSyntaxUID"] 776 assert ExplicitVRLittleEndian == elem.value 777 778 def test_getitem_order(self, private): 779 """Test records are searched closest to furthest""" 780 fs = FileSet(private) 781 instance = fs._instances[-1] 782 assert instance.is_private 783 # a: root, b: middle, c: bottom, 784 c = instance.node 785 b = c.parent 786 a = b.parent 787 assert c._record["PrivateRecordUID"] != b._record["PrivateRecordUID"] 788 assert c._record["PrivateRecordUID"] != a._record["PrivateRecordUID"] 789 assert instance["PrivateRecordUID"] == c._record["PrivateRecordUID"] 790 791 def test_contains(self, dicomdir): 792 """Test FileInstance.__contains__.""" 793 fs = FileSet(dicomdir) 794 instance = fs._instances[0] 795 assert "StudyDate" in instance 796 assert 0x00080020 in instance 797 assert Tag(0x00080020) in instance 798 assert (0x0008, 0x0020) in instance 799 assert "0x00080020" in instance 800 assert 'bad' not in instance 801 802 def test_is_private(self, private): 803 """Test FileInstance.is_private""" 804 fs = FileSet(private) 805 instance = fs._instances[-1] 806 assert instance.is_private 807 instance = fs._instances[0] 808 assert not instance.is_private 809 810 def test_properties(self, dicomdir): 811 """Test the FileInstance properties.""" 812 fs = FileSet(dicomdir) 813 instance = fs._instances[0] 814 assert fs == instance.file_set 815 assert os.fspath(Path("77654033/CR1/6154")) in instance.path 816 assert isinstance(instance.path, str) 817 sop_instance = "1.3.6.1.4.1.5962.1.1.0.0.0.1196527414.5534.0.11" 818 819 nodes = [node for node in instance.node.ancestors] 820 assert 3 == len(nodes) 821 assert nodes[0].record_type == "SERIES" 822 assert nodes[1].record_type == "STUDY" 823 assert nodes[2].record_type == "PATIENT" 824 record = instance.node._record 825 assert sop_instance == record.ReferencedSOPInstanceUIDInFile 826 assert sop_instance == instance.SOPInstanceUID 827 assert ExplicitVRLittleEndian == instance.TransferSyntaxUID 828 assert "1.2.840.10008.5.1.4.1.1.1" == instance.SOPClassUID 829 830 def test_path(self, ct, tdir): 831 """Test FileInstance.path when not staged.""" 832 fs = FileSet() 833 fs.add(ct) 834 ds, paths = write_fs(fs, tdir.name) 835 836 assert 1 == len(fs) 837 instance = fs._instances[0] 838 assert not instance.is_staged 839 assert (Path(fs.path) / Path(instance.FileID)) == Path(instance.path) 840 841 def test_path_add(self, ct, tdir): 842 """Test FileInstance.path when staged for addition.""" 843 fs = FileSet() 844 fs.add(ct) 845 assert 1 == len(fs) 846 instance = fs._instances[0] 847 assert instance.is_staged 848 assert instance.for_addition 849 assert ( 850 Path(fs._stage['path']) / Path(instance.SOPInstanceUID) 851 ) == Path(instance.path) 852 assert isinstance(instance.path, str) 853 854 def test_path_move(self, dicomdir): 855 """Test FileInstance.path for an instance to be move.""" 856 fs = FileSet(dicomdir) 857 assert fs._stage['~'] 858 instance = fs._instances[0] 859 assert instance.is_staged 860 assert instance.for_moving 861 assert ( 862 Path(fs.path) / Path(*instance.ReferencedFileID) 863 ) == Path(instance.path) 864 assert isinstance(instance.path, str) 865 866 def test_path_removal(self, dicomdir, tdir): 867 """Test FileInstance.FileID when staged for removal.""" 868 fs = FileSet(dicomdir) 869 instance = fs._instances[0] 870 fs.remove(instance) 871 assert instance.is_staged 872 assert instance.for_removal 873 assert ( 874 Path(fs.path) / Path(*instance.ReferencedFileID) 875 ) == Path(instance.path) 876 assert isinstance(instance.path, str) 877 878 def test_load(self, ct, tdir): 879 """Test FileInstance.load() when not staged.""" 880 fs = FileSet() 881 fs.add(ct) 882 ds, paths = write_fs(fs, tdir.name) 883 884 assert 1 == len(fs) 885 instance = fs._instances[0] 886 assert not instance.is_staged 887 ds = instance.load() 888 assert isinstance(ds, Dataset) 889 assert ct.SOPInstanceUID == ds.SOPInstanceUID 890 891 def test_load_staged_add(self, ct, tdir): 892 """Test FileInstance.load() when staged for addition.""" 893 fs = FileSet() 894 fs.add(ct) 895 assert 1 == len(fs) 896 instance = fs._instances[0] 897 assert instance.is_staged 898 assert instance.for_addition 899 ds = instance.load() 900 assert isinstance(ds, Dataset) 901 assert ct.SOPInstanceUID == ds.SOPInstanceUID 902 903 def test_load_staged_move(self, dicomdir): 904 """Test FileInstance.load() for an instance to be moved.""" 905 fs = FileSet(dicomdir) 906 instance = fs._instances[0] 907 assert instance.is_staged 908 assert instance.for_moving 909 assert fs.is_staged 910 # At least one instance needs to be moved 911 assert fs._stage['~'] 912 ds = instance.load() 913 assert isinstance(ds, Dataset) 914 sop_instance = "1.3.6.1.4.1.5962.1.1.0.0.0.1196527414.5534.0.11" 915 assert sop_instance == ds.SOPInstanceUID 916 917 def test_load_staged_removal(self, dicomdir, tdir): 918 """Test FileInstance.load() when staged for removal.""" 919 fs = FileSet(dicomdir) 920 instance = fs._instances[0] 921 fs.remove(instance) 922 assert instance.is_staged 923 assert instance.for_removal 924 ds = instance.load() 925 assert isinstance(ds, Dataset) 926 sop_instance = "1.3.6.1.4.1.5962.1.1.0.0.0.1196527414.5534.0.11" 927 assert sop_instance == ds.SOPInstanceUID 928 929 def test_for_moving(self, dummy, ct, tdir): 930 """Test FileInstance.for_moving.""" 931 ds, opt = dummy 932 ds.SOPClassUID = ColorPaletteStorage 933 fs = FileSet() 934 # Single level File ID 935 instance = fs.add(ds) 936 assert instance.for_addition 937 assert not instance.for_removal 938 assert not instance.for_moving 939 940 # Four level File ID 941 instance = fs.add(ct) 942 assert instance.for_addition 943 assert not instance.for_removal 944 assert not instance.for_moving 945 946 ds, paths = write_fs(fs, tdir.name) 947 for instance in fs: 948 assert not instance.for_addition 949 assert not instance.for_removal 950 assert not instance.for_moving 951 952 def test_fileid(self, ct, tdir): 953 """Test FileInstance.FileID when not staged.""" 954 fs = FileSet() 955 fs.add(ct) 956 ds, paths = write_fs(fs, tdir.name) 957 958 assert 1 == len(fs) 959 instance = fs._instances[0] 960 assert not instance.is_staged 961 fileid = Path("PT000000/ST000000/SE000000/IM000000") 962 assert os.fspath(fileid) == instance.FileID 963 964 def test_fileid_add(self, ct, tdir): 965 """Test FileInstance.FileID when staged for addition.""" 966 fs = FileSet() 967 fs.add(ct) 968 assert 1 == len(fs) 969 instance = fs._instances[0] 970 assert instance.is_staged 971 assert instance.for_addition 972 fileid = Path("PT000000/ST000000/SE000000/IM000000") 973 assert os.fspath(fileid) == instance.FileID 974 975 def test_fileid_move(self, dicomdir): 976 """Test FileInstance.FileID for an instance to be moved.""" 977 fs = FileSet(dicomdir) 978 assert fs.is_staged 979 # At least one instance needs to be moved 980 assert fs._stage['~'] 981 instance = fs._instances[0] 982 assert instance.is_staged 983 assert instance.for_moving 984 fileid = Path("PT000000/ST000000/SE000000/IM000000") 985 assert os.fspath(fileid) == instance.FileID 986 987 def test_fileid_removal(self, dicomdir, tdir): 988 """Test FileInstance.FileID when staged for removal.""" 989 fs = FileSet(dicomdir) 990 instance = fs._instances[0] 991 fs.remove(instance) 992 assert instance.is_staged 993 assert instance.for_removal 994 fileid = Path("PT000000/ST000000/SE000000/IM000000") 995 assert os.fspath(fileid) == instance.FileID 996 997 def test_private(self, private): 998 """Test FileInstance with PRIVATE records.""" 999 fs = FileSet(private) 1000 1001 instances = fs._instances 1002 assert 32 == len(instances) 1003 1004 instance = instances[-1] 1005 assert 2 == len(instance.node.ancestors) 1006 for node in instance.node.reverse(): 1007 assert node.record_type == "PRIVATE" 1008 1009 path = os.fspath( 1010 Path("tiny_alpha/PT000000/ST000000/SE000000/IM000000") 1011 ) 1012 assert path in instances[-1].path 1013 1014 assert "1.2.3.4" == instance.SOPClassUID 1015 assert "1.2.276.0.7230010.3.1.4.0.31906.1359940846.78187" == ( 1016 instance.SOPInstanceUID 1017 ) 1018 assert ExplicitVRLittleEndian == instance.TransferSyntaxUID 1019 1020 1021@pytest.mark.filterwarnings("ignore:The 'DicomDir'") 1022class TestFileSet: 1023 """Tests for FileSet.""" 1024 def test_empty(self): 1025 """Test an new and empty File-set.""" 1026 fs = FileSet() 1027 assert 0 == len(fs) 1028 assert fs.ID is None 1029 assert fs.UID.is_valid 1030 assert fs.path is None 1031 assert fs.is_staged # New datasets are staged 1032 1033 with pytest.raises(StopIteration): 1034 next(iter(fs)) 1035 1036 s = str(fs) 1037 assert "DICOM File-set" in s 1038 assert "Root directory: (no value available)" in s 1039 assert "File-set ID: (no value available)" in s 1040 assert f"File-set UID: {fs.UID}" in s 1041 assert "Managed instances" not in s 1042 1043 def test_id(self, tdir): 1044 """Test the FileSet.ID property.""" 1045 fs = FileSet() 1046 assert fs.is_staged 1047 assert fs.ID is None 1048 fs.ID = "MYID" 1049 assert fs.is_staged 1050 assert "MYID" == fs.ID 1051 1052 s = str(fs) 1053 assert "DICOM File-set" in s 1054 assert "Root directory: (no value available)" in s 1055 assert "File-set ID: MYID" in s 1056 assert f"File-set UID: {fs.UID}" in s 1057 assert "Managed instances" not in s 1058 1059 ds, paths = write_fs(fs, tdir.name) 1060 assert not fs.is_staged 1061 assert "Changes staged for write():" not in str(fs) 1062 1063 fs.ID = "MYID" 1064 assert not fs.is_staged 1065 1066 fsids = [None, "A", "1" * 16] 1067 for fsid in fsids: 1068 fs.ID = fsid 1069 assert fs.is_staged 1070 assert fsid == fs.ID 1071 ds, paths = write_fs(fs) 1072 assert [] == paths 1073 if fsid is None: 1074 fsid = "" 1075 assert fsid == ds.FileSetID 1076 1077 msg = r"The maximum length of the 'File-set ID' is 16 characters" 1078 with pytest.raises(ValueError, match=msg): 1079 fs.ID = "1" * 17 1080 1081 assert "1" * 16 == fs.ID 1082 1083 def test_uid(self, tdir): 1084 """Test the FileSet.UID property.""" 1085 fs = FileSet() 1086 assert fs.is_staged 1087 uid = fs.UID 1088 ds, paths = write_fs(fs, tdir.name) 1089 assert [] == paths 1090 assert fs.UID == ds.file_meta.MediaStorageSOPInstanceUID 1091 1092 s = str(fs) 1093 assert "DICOM File-set" in s 1094 assert f"Root directory: (no value available)" not in s 1095 assert "File-set ID: (no value available)" in s 1096 assert f"File-set UID: {fs.UID}" in s 1097 assert "Managed instances" not in s 1098 assert "Changes staged for write():" not in s 1099 assert not fs.is_staged 1100 1101 fs.UID = uid 1102 assert not fs.is_staged 1103 1104 fs.UID = generate_uid() 1105 assert uid != fs.UID 1106 assert fs.is_staged 1107 1108 def test_descriptor(self): 1109 """Test FileSet.descriptor_file_id.""" 1110 fs = FileSet() 1111 assert fs.descriptor_file_id is None 1112 assert fs.is_staged 1113 fs._stage['^'] = False # Override 1114 assert not fs.is_staged 1115 fs.descriptor_file_id = None 1116 assert not fs.is_staged 1117 assert fs.descriptor_file_id is None 1118 fs.descriptor_file_id = "README" 1119 assert fs.is_staged 1120 assert "README" == fs.descriptor_file_id 1121 fs.descriptor_file_id = "README" 1122 assert "README" == fs.descriptor_file_id 1123 fs.descriptor_file_id = "A" * 16 1124 assert "A" * 16 == fs.descriptor_file_id 1125 fs.descriptor_file_id = None 1126 assert fs.descriptor_file_id is None 1127 fs.descriptor_file_id = ['A'] * 8 1128 assert ['A'] * 8 == fs.descriptor_file_id 1129 fs.descriptor_file_id = ["A", "", "B", "C"] 1130 assert ['A', 'B', 'C'] == fs.descriptor_file_id 1131 1132 # Test exceptions 1133 msg = r"The 'DescriptorFileID' must be a str, list of str, or None" 1134 with pytest.raises(TypeError, match=msg): 1135 fs.descriptor_file_id = 12 1136 msg = ( 1137 r"The 'File-set Descriptor File ID' has a maximum of 8 " 1138 r"components, each between 0 and 16 characters long" 1139 ) 1140 with pytest.raises(ValueError, match=msg): 1141 fs.descriptor_file_id = ['A'] * 9 1142 with pytest.raises(ValueError, match=msg): 1143 fs.descriptor_file_id = ['A' * 17] 1144 with pytest.raises(ValueError, match=msg): 1145 fs.descriptor_file_id = ['A', 1] 1146 msg = ( 1147 r"Each 'File-set Descriptor File ID' component has a " 1148 r"maximum length of 16 characters" 1149 ) 1150 with pytest.raises(ValueError, match=msg): 1151 fs.descriptor_file_id = "A" * 17 1152 1153 assert ['A', 'B', 'C'] == fs.descriptor_file_id 1154 1155 def test_descriptor_and_charset_written(self, tdir): 1156 """Test that the File-set Descriptor File ID gets written.""" 1157 fs = FileSet() 1158 fs.descriptor_file_id = "README" 1159 fs.descriptor_character_set = "ISO_IR 100" 1160 ds, paths = write_fs(fs, tdir.name) 1161 assert "README" == ds.FileSetDescriptorFileID 1162 assert "ISO_IR 100" == ds.SpecificCharacterSetOfFileSetDescriptorFile 1163 1164 def test_descriptor_dicomdir(self, dicomdir): 1165 """Test FileSet.descriptor_file_id with a DICOMDIR file.""" 1166 fs = FileSet(dicomdir) 1167 ds = fs._ds 1168 assert fs.descriptor_file_id is None 1169 assert "FileSetDescriptorFileID" not in ds 1170 assert fs.is_staged 1171 fs._stage['^'] = False # Override 1172 fs._stage['~'] = {} 1173 assert not fs.is_staged 1174 fs.descriptor_file_id = None 1175 assert "FileSetDescriptorFileID" not in ds 1176 assert not fs.is_staged 1177 assert fs.descriptor_file_id is None 1178 fs.descriptor_file_id = "README" 1179 assert "README" == ds.FileSetDescriptorFileID 1180 assert fs.is_staged 1181 assert "README" == fs.descriptor_file_id 1182 fs.descriptor_file_id = "README" 1183 assert "README" == fs.descriptor_file_id 1184 fs.descriptor_file_id = "A" * 16 1185 assert "A" * 16 == fs.descriptor_file_id 1186 assert "A" * 16 == ds.FileSetDescriptorFileID 1187 fs.descriptor_file_id = None 1188 assert fs.descriptor_file_id is None 1189 assert ds.FileSetDescriptorFileID is None 1190 fs.descriptor_file_id = ['A'] * 8 1191 assert ['A'] * 8 == fs.descriptor_file_id 1192 assert ['A'] * 8 == ds.FileSetDescriptorFileID 1193 1194 def test_descriptor_charset(self): 1195 """Test FileSet.descriptor_character_set.""" 1196 fs = FileSet() 1197 assert fs.descriptor_character_set is None 1198 assert fs.is_staged 1199 fs._stage['^'] = False # Override 1200 assert not fs.is_staged 1201 fs.descriptor_character_set = None 1202 assert not fs.is_staged 1203 assert fs.descriptor_character_set is None 1204 fs.descriptor_character_set = "README" 1205 assert fs.is_staged 1206 assert "README" == fs.descriptor_character_set 1207 1208 def test_descriptor_charset_dicomdir(self, dicomdir): 1209 """Test FileSet.descriptor_character_set.""" 1210 fs = FileSet(dicomdir) 1211 ds = fs._ds 1212 assert fs.descriptor_character_set is None 1213 assert "SpecificCharacterSetOfFileSetDescriptorFile" not in ds 1214 assert fs.is_staged 1215 fs._stage['^'] = False # Override 1216 fs._stage['~'] = {} 1217 assert not fs.is_staged 1218 fs.descriptor_character_set = None 1219 assert "SpecificCharacterSetOfFileSetDescriptorFile" not in ds 1220 assert not fs.is_staged 1221 assert fs.descriptor_character_set is None 1222 fs.descriptor_character_set = "README" 1223 assert "README" == ds.SpecificCharacterSetOfFileSetDescriptorFile 1224 assert fs.is_staged 1225 1226 def test_path(self, tdir): 1227 """Test setting the File-set's path.""" 1228 fs = FileSet() 1229 assert fs.path is None 1230 with pytest.raises(AttributeError, match=r"can't set attribute"): 1231 fs.path = tdir.name 1232 1233 # Test with str 1234 path = tdir.name 1235 assert isinstance(path, str) 1236 ds, paths = write_fs(fs, path) 1237 assert Path(path).parts[-2:] == Path(fs.path).parts[-2:] 1238 assert [] == paths 1239 root = os.fspath(Path(*Path(tdir.name).parts[-2:])) 1240 assert root in ds.filename 1241 assert root in str(fs) 1242 assert not fs.is_staged 1243 1244 # Test with PathLike 1245 fs = FileSet() 1246 path = Path(tdir.name) 1247 ds, paths = write_fs(fs, path) 1248 assert [] == paths 1249 root = os.fspath(Path(*Path(tdir.name).parts[-2:])) 1250 assert root in ds.filename 1251 assert root in str(fs) 1252 assert not fs.is_staged 1253 1254 def test_empty_write(self, tdir): 1255 """Test writing an empty File-set.""" 1256 fs = FileSet() 1257 uid = fs.UID 1258 msg = ( 1259 r"The path to the root directory is required for a new File-set" 1260 ) 1261 with pytest.raises(ValueError, match=msg): 1262 fs.write() 1263 1264 path = Path(tdir.name) 1265 fs.write(path) 1266 1267 # Should be the DICOMDIR file 1268 contents = list(path.glob('**/*')) 1269 assert "DICOMDIR" == contents[0].name 1270 assert 1 == len(contents) 1271 1272 ds = dcmread(contents[0]) 1273 meta = ds.file_meta 1274 assert MediaStorageDirectoryStorage == meta.MediaStorageSOPClassUID 1275 assert uid == meta.MediaStorageSOPInstanceUID 1276 assert meta.TransferSyntaxUID == ExplicitVRLittleEndian 1277 assert "" == ds.FileSetID 1278 assert 0 == ds.OffsetOfTheFirstDirectoryRecordOfTheRootDirectoryEntity 1279 assert 0 == ds.OffsetOfTheLastDirectoryRecordOfTheRootDirectoryEntity 1280 assert 0 == ds.FileSetConsistencyFlag 1281 assert [] == ds.DirectoryRecordSequence 1282 1283 def test_add_dataset(self, ct, tdir): 1284 """Test FileSet.add() with a Dataset.""" 1285 fs = FileSet() 1286 assert fs.is_staged 1287 fs.write(tdir.name) # write empty to unstage 1288 assert not fs.is_staged 1289 fs.add(ct) 1290 assert fs.is_staged 1291 1292 s = str(fs) 1293 assert "Managed instances" in s 1294 assert ( 1295 "PATIENT: PatientID='1CT1', " 1296 "PatientName='CompressedSamples^CT1'" 1297 ) in s 1298 assert ( 1299 "STUDY: StudyDate=20040119, StudyTime=072730, " 1300 "StudyDescription='e+1'" in s 1301 ) 1302 assert "SERIES: Modality=CT, SeriesNumber=1" in s 1303 assert 1 == len(fs) 1304 instances = [ii for ii in fs] 1305 file_id = Path("PT000000", "ST000000", "SE000000", "IM000000") 1306 assert os.fspath(file_id) == instances[0].FileID 1307 1308 ds, paths = write_fs(fs) 1309 assert 1 == len(fs) 1310 1311 # Test the DICOMDIR 1312 assert 398 == ( 1313 ds.OffsetOfTheFirstDirectoryRecordOfTheRootDirectoryEntity 1314 ) 1315 assert 398 == ( 1316 ds.OffsetOfTheLastDirectoryRecordOfTheRootDirectoryEntity 1317 ) 1318 1319 seq = ds.DirectoryRecordSequence 1320 assert 4 == len(seq) 1321 1322 item = seq[0] 1323 assert item.seq_item_tell == 398 1324 assert "PATIENT" == item.DirectoryRecordType 1325 assert ct.PatientName == item.PatientName 1326 assert ct.PatientID == item.PatientID 1327 assert 0xFFFF == item.RecordInUseFlag 1328 assert 0 == item.OffsetOfTheNextDirectoryRecord 1329 assert "ISO_IR 100" == item.SpecificCharacterSet 1330 assert 516 == item.OffsetOfReferencedLowerLevelDirectoryEntity 1331 1332 item = seq[1] 1333 assert item.seq_item_tell == 516 1334 assert "STUDY" == item.DirectoryRecordType 1335 assert ct.StudyDate == item.StudyDate 1336 assert ct.StudyTime == item.StudyTime 1337 assert ct.AccessionNumber == item.AccessionNumber 1338 assert ct.StudyDescription == item.StudyDescription 1339 assert ct.StudyInstanceUID == item.StudyInstanceUID 1340 assert 0xFFFF == item.RecordInUseFlag 1341 assert 0 == item.OffsetOfTheNextDirectoryRecord 1342 assert "ISO_IR 100" == item.SpecificCharacterSet 1343 assert 704 == item.OffsetOfReferencedLowerLevelDirectoryEntity 1344 1345 item = seq[2] 1346 assert item.seq_item_tell == 704 1347 assert "SERIES" == item.DirectoryRecordType 1348 assert ct.Modality == item.Modality 1349 assert ct.SeriesInstanceUID == item.SeriesInstanceUID 1350 assert ct.SeriesNumber == item.SeriesNumber 1351 assert 0xFFFF == item.RecordInUseFlag 1352 assert 0 == item.OffsetOfTheNextDirectoryRecord 1353 assert "ISO_IR 100" == item.SpecificCharacterSet 1354 assert 852 == item.OffsetOfReferencedLowerLevelDirectoryEntity 1355 1356 item = seq[3] 1357 assert item.seq_item_tell == 852 1358 assert "IMAGE" == item.DirectoryRecordType 1359 assert ['PT000000', 'ST000000', 'SE000000', 'IM000000'] == ( 1360 item.ReferencedFileID 1361 ) 1362 assert ct.SOPInstanceUID == item.ReferencedSOPInstanceUIDInFile 1363 assert ct.InstanceNumber == item.InstanceNumber 1364 assert ct.SOPClassUID == item.ReferencedSOPClassUIDInFile 1365 assert ct.file_meta.TransferSyntaxUID == ( 1366 item.ReferencedTransferSyntaxUIDInFile 1367 ) 1368 assert 0xFFFF == item.RecordInUseFlag 1369 assert 0 == item.OffsetOfTheNextDirectoryRecord 1370 assert "ISO_IR 100" == item.SpecificCharacterSet 1371 assert 0 == item.OffsetOfReferencedLowerLevelDirectoryEntity 1372 1373 assert not fs.is_staged 1374 s = str(fs) 1375 root = os.fspath(Path(*Path(tdir.name).parts[-2:])) 1376 assert root in s 1377 assert 1 == len(paths) 1378 assert ct == dcmread(paths[0]) 1379 1380 # Calling write() again shouldn't change anything 1381 ds2, paths = write_fs(fs) 1382 assert ds == ds2 1383 assert ds2.filename == ds.filename 1384 assert 1 == len(paths) 1385 assert ct == dcmread(paths[0]) 1386 1387 def test_add_bad_dataset(self, ct): 1388 """Test adding a dataset missing Type 1 element value.""" 1389 ct.PatientID = None 1390 fs = FileSet() 1391 msg = ( 1392 r"Unable to use the default 'PATIENT' record creator " 1393 r"as the instance is missing a required element or value. Either " 1394 r"update the instance, define your own record creation function " 1395 r"or use 'FileSet.add_custom\(\)' instead" 1396 ) 1397 with pytest.raises(ValueError, match=msg): 1398 fs.add(ct) 1399 1400 def test_add_path(self, tdir): 1401 """Test FileSet.add() with a Dataset.""" 1402 fs = FileSet() 1403 fs.write(tdir.name) 1404 assert not fs.is_staged 1405 fs.add(get_testdata_file("CT_small.dcm")) 1406 assert fs.is_staged 1407 1408 def test_add_add(self, ct, tdir): 1409 """Test calling FileSet.add() on the same Dataset.""" 1410 fs = FileSet() 1411 fs.add(ct) 1412 fs.add(ct) 1413 assert fs.is_staged 1414 assert 1 == len(fs) 1415 1416 ds, paths = write_fs(fs, tdir.name) 1417 assert 4 == len(ds.DirectoryRecordSequence) 1418 assert 1 == len(paths) 1419 1420 def test_remove(self, ct, tdir): 1421 """Test removing an instance.""" 1422 fs = FileSet() 1423 fs.add(ct) 1424 fs.write(tdir.name) 1425 assert "Managed instances" in str(fs) 1426 1427 instance = next(iter(fs)) 1428 assert isinstance(instance, FileInstance) 1429 fs.remove(instance) 1430 assert 0 == len(fs) 1431 with pytest.raises(StopIteration): 1432 next(iter(fs)) 1433 1434 ds, paths = write_fs(fs) 1435 assert [] == ds.DirectoryRecordSequence 1436 assert 0 == ds.OffsetOfTheFirstDirectoryRecordOfTheRootDirectoryEntity 1437 assert 0 == ds.OffsetOfTheLastDirectoryRecordOfTheRootDirectoryEntity 1438 assert [] == paths 1439 1440 def test_remove_iter(self, tiny): 1441 """Test FileSet.remove() with iter(FileSet).""" 1442 fs = FileSet(tiny) 1443 for instance in fs: 1444 fs.remove(instance) 1445 1446 assert 0 == len(fs) 1447 1448 def test_remove_remove(self, ct, tdir): 1449 """Test removing an instance that's already removed.""" 1450 fs = FileSet() 1451 fs.add(ct) 1452 fs.write(tdir.name) 1453 1454 instance = next(iter(fs)) 1455 assert isinstance(instance, FileInstance) 1456 fs.remove(instance) 1457 msg = r"No such instance in the File-set" 1458 with pytest.raises(ValueError, match=msg): 1459 fs.remove(instance) 1460 1461 def test_remove_add(self, ct, tdir): 1462 """Test adding an instance that's removed.""" 1463 fs = FileSet() 1464 fs.add(ct) 1465 fs.write(tdir.name) 1466 1467 instance = next(iter(fs)) 1468 assert isinstance(instance, FileInstance) 1469 fs.remove(instance) 1470 assert fs.is_staged 1471 fs.add(ct) 1472 assert not fs.is_staged 1473 ds, paths = write_fs(fs) 1474 assert 4 == len(ds.DirectoryRecordSequence) 1475 assert 1 == len(paths) 1476 1477 def test_add_remove(self, ct, tdir): 1478 """Test removing an instance that's added.""" 1479 fs = FileSet() 1480 fs.write(tdir.name) 1481 assert not fs.is_staged 1482 fs.add(ct) 1483 assert fs.is_staged 1484 instance = next(iter(fs)) 1485 assert isinstance(instance, FileInstance) 1486 fs.remove(instance) 1487 assert not fs.is_staged 1488 1489 ds, paths = write_fs(fs) 1490 assert [] == ds.DirectoryRecordSequence 1491 assert [] == paths 1492 1493 def test_file_ids_unique(self, dicomdir): 1494 """That that the File IDs are all unique within the File-set.""" 1495 fs = FileSet(dicomdir) 1496 ids = set([ii.FileID for ii in fs]) 1497 assert len(fs._instances) == len(ids) 1498 1499 def test_add_custom(self, ct, tdir, custom_leaf): 1500 """Test FileSet.add_custom() with a standard IOD.""" 1501 fs = FileSet() 1502 fs.add_custom(ct, custom_leaf) 1503 assert 1 == len(fs) 1504 assert 1 == len(fs.find(SOPInstanceUID=ct.SOPInstanceUID)) 1505 assert fs.is_staged 1506 instance = fs._instances[0] 1507 assert instance.SOPInstanceUID in fs._stage['+'] 1508 1509 ds, paths = write_fs(fs, tdir.name) 1510 assert 1 == len(paths) 1511 assert 4 == len(ds.DirectoryRecordSequence) 1512 assert Dataset(ct) == dcmread(paths[0]) 1513 1514 def test_add_custom_path(self, ct, tdir, custom_leaf): 1515 """Test add_custom() with a path.""" 1516 fs = FileSet() 1517 fs.add_custom(ct.filename, custom_leaf) 1518 assert 1 == len(fs) 1519 assert 1 == len(fs.find(SOPInstanceUID=ct.SOPInstanceUID)) 1520 assert fs.is_staged 1521 instance = fs._instances[0] 1522 assert instance.SOPInstanceUID in fs._stage['+'] 1523 1524 ds, paths = write_fs(fs, tdir.name) 1525 assert 1 == len(paths) 1526 assert 4 == len(ds.DirectoryRecordSequence) 1527 assert Dataset(ct) == dcmread(paths[0]) 1528 1529 def test_add_custom_private(self, ct, tdir): 1530 """Test add_custom() with a private instance.""" 1531 # Maximum of 8, including the top (root) node 1532 patient = _define_patient(ct) 1533 patient.DirectoryRecordType = "PATIENT" 1534 patient.OffsetOfTheNextDirectoryRecord = 0 1535 patient.RecordInUseFlag = 0xFFFF 1536 patient.OffsetOfReferencedLowerLevelDirectoryEntity = 0 1537 patient = RecordNode(patient) 1538 ds = Dataset() 1539 ds.PrivateRecordUID = generate_uid() 1540 ds.DirectoryRecordType = "PRIVATE" 1541 ds.ReferencedFileID = None 1542 ds.ReferencedSOPClassUIDInFile = ct.SOPClassUID 1543 ds.ReferencedSOPInstanceUIDInFile = ct.SOPInstanceUID 1544 ds.ReferencedTransferSyntaxUIDInFile = ( 1545 ct.file_meta.TransferSyntaxUID 1546 ) 1547 private = RecordNode(ds) 1548 private.parent = patient 1549 1550 assert 1 == private.depth 1551 1552 fs = FileSet() 1553 fs.add_custom(ct, private) 1554 1555 assert 1 == len(fs) 1556 assert 1 == len(fs.find(SOPInstanceUID=ct.SOPInstanceUID)) 1557 assert fs.is_staged 1558 instance = fs._instances[0] 1559 assert instance.SOPInstanceUID in fs._stage['+'] 1560 1561 ds, paths = write_fs(fs, tdir.name) 1562 assert 1 == len(paths) 1563 assert 2 == len(ds.DirectoryRecordSequence) 1564 assert "PATIENT" == ds.DirectoryRecordSequence[0].DirectoryRecordType 1565 assert "PRIVATE" == ds.DirectoryRecordSequence[1].DirectoryRecordType 1566 assert Dataset(ct) == dcmread(paths[0]) 1567 1568 def test_add_custom_too_deep(self, ct): 1569 """Test adding too many nodes raises exception.""" 1570 # Maximum of 8, including the top (root) node 1571 top = _define_patient(ct) 1572 top.DirectoryRecordType = "PATIENT" 1573 top.OffsetOfTheNextDirectoryRecord = 0 1574 top.RecordInUseFlag = 0xFFFF 1575 top.OffsetOfReferencedLowerLevelDirectoryEntity = 0 1576 top = RecordNode(top) 1577 for ii in range(8): 1578 ds = Dataset() 1579 ds.PrivateRecordUID = generate_uid() 1580 ds.DirectoryRecordType = "PRIVATE" 1581 1582 node = RecordNode(ds) 1583 node.parent = top 1584 top = node 1585 1586 top._record.ReferencedFileID = None 1587 top._record.ReferencedSOPClassUIDInFile = ct.SOPClassUID 1588 top._record.ReferencedSOPInstanceUIDInFile = ct.SOPInstanceUID 1589 top._record.ReferencedTransferSyntaxUIDInFile = ( 1590 ct.file_meta.TransferSyntaxUID 1591 ) 1592 assert 8 == top.depth 1593 1594 fs = FileSet() 1595 msg = ( 1596 r"The 'leaf' node must not have more than 7 ancestors as " 1597 r"'FileSet' supports a maximum directory structure depth of 8" 1598 ) 1599 with pytest.raises(ValueError, match=msg): 1600 fs.add_custom(ct, top) 1601 1602 def test_add_custom_bad_leaf(self, ct, tdir, custom_leaf): 1603 """Test FileSet.add_custom() with a bad leaf record.""" 1604 del custom_leaf._record.ReferencedSOPClassUIDInFile 1605 del custom_leaf._record.ReferencedFileID 1606 del custom_leaf._record.ReferencedSOPInstanceUIDInFile 1607 del custom_leaf._record.ReferencedTransferSyntaxUIDInFile 1608 1609 fs = FileSet() 1610 instance = fs.add_custom(ct, custom_leaf) 1611 assert 1 == len(fs) 1612 assert ct.SOPClassUID == instance.ReferencedSOPClassUIDInFile 1613 assert instance.ReferencedFileID is None 1614 assert ct.SOPInstanceUID == instance.ReferencedSOPInstanceUIDInFile 1615 assert ct.file_meta.TransferSyntaxUID == ( 1616 instance.ReferencedTransferSyntaxUIDInFile 1617 ) 1618 1619 def test_add_custom_add_add(self, ct, tdir, custom_leaf): 1620 """Test add_custom() if the instance is already in the File-set.""" 1621 fs = FileSet() 1622 fs.add(ct) 1623 assert 1 == len(fs) 1624 assert 1 == len(fs.find(SOPInstanceUID=ct.SOPInstanceUID)) 1625 1626 fs.add_custom(ct, custom_leaf) 1627 assert 1 == len(fs) 1628 assert 1 == len(fs.find(SOPInstanceUID=ct.SOPInstanceUID)) 1629 1630 def test_add_custom_remove_add(self, ct, tdir, custom_leaf): 1631 """Test adding a removed instance.""" 1632 fs = FileSet() 1633 fs.add_custom(ct, custom_leaf) 1634 ds, paths = write_fs(fs, tdir.name) 1635 assert not fs.is_staged 1636 assert 1 == len(fs) 1637 fs.remove(fs._instances[0]) 1638 fs.add_custom(ct, custom_leaf) 1639 assert 1 == len(fs) 1640 assert 1 == len(fs.find(SOPInstanceUID=ct.SOPInstanceUID)) 1641 1642 def test_clear(self, dicomdir, tdir): 1643 """Test FileSet.clear().""" 1644 fs = FileSet(dicomdir) 1645 fs.ID = "TESTID" 1646 fs.descriptor_file_id = "README" 1647 fs.descriptor_character_set = "ISO 1" 1648 fs, ds, paths = copy_fs(fs, tdir.name) 1649 assert "README" == fs.descriptor_file_id 1650 assert "ISO 1" == fs.descriptor_character_set 1651 assert [] != fs._instances 1652 assert fs._id is not None 1653 assert fs._path is not None 1654 uid = fs._uid 1655 assert fs._uid is not None 1656 assert fs._ds is not None 1657 assert fs._descriptor is not None 1658 assert fs._charset is not None 1659 assert [] != fs._tree.children 1660 1661 fs.clear() 1662 assert [] == fs._instances 1663 assert fs._id is None 1664 assert fs._path is None 1665 assert uid != fs._uid 1666 assert fs._uid.is_valid 1667 assert fs._ds == Dataset() 1668 assert fs._descriptor is None 1669 assert fs._charset is None 1670 assert [] == fs._tree.children 1671 1672 def test_str_empty(self, tdir): 1673 """Test str(FileSet) on an empty File-set.""" 1674 fs = FileSet() 1675 s = str(fs) 1676 assert "DICOM File-set" in s 1677 assert "Root directory: (no value available)" in s 1678 assert "File-set ID: (no value available)" in s 1679 assert "File-set UID: 1.2.826.0.1" in s 1680 assert "Descriptor file ID: (no value available)" in s 1681 assert "Descriptor file character set: (no value available)" in s 1682 assert "Changes staged for write(): DICOMDIR creation" in s 1683 assert "addition" not in s 1684 assert "removal" not in s 1685 assert "Managed instances:" not in s 1686 1687 # Set DICOMDIR elements 1688 fs.ID = "TEST ID" 1689 fs.descriptor_file_id = "README" 1690 fs.descriptor_character_set = "ISO WHATEVER" 1691 ds, paths = write_fs(fs, tdir.name) 1692 s = str(fs) 1693 assert "DICOM File-set" in s 1694 assert "Root directory: (no value available)" not in s 1695 assert "File-set ID: TEST ID" in s 1696 assert "File-set UID: 1.2.826.0.1" in s 1697 assert "Descriptor file ID: README" in s 1698 assert "Descriptor file character set: ISO WHATEVER" in s 1699 assert "Changes staged for write(): DICOMDIR creation" not in s 1700 assert "addition" not in s 1701 assert "removal" not in s 1702 assert "Managed instances:" not in s 1703 1704 def test_str(self, ct, dummy, tdir): 1705 """Test str(FileSet) with empty + additions.""" 1706 fs = FileSet() 1707 fs.add(ct) 1708 fs.add(get_testdata_file("MR_small.dcm")) 1709 1710 for p in list(Path(TINY_ALPHA_FILESET).parent.glob('**/*'))[::2]: 1711 if p.is_file() and p.name not in ['DICOMDIR', 'README']: 1712 fs.add(p) 1713 1714 instance = fs._instances[-1] 1715 ds = dcmread(get_testdata_file("rtdose.dcm")) 1716 ds.PatientID = '12345678' 1717 ds.InstanceNumber = '1' 1718 ds.StudyInstanceUID = instance.StudyInstanceUID 1719 ds.SeriesInstanceUID = instance.SeriesInstanceUID 1720 fs.add(ds) 1721 1722 ds = dcmread(get_testdata_file("rtplan.dcm")) 1723 ds.PatientID = '12345678' 1724 ds.InstanceNumber = '1' 1725 ds.StudyInstanceUID = instance.StudyInstanceUID 1726 ds.SeriesInstanceUID = instance.SeriesInstanceUID 1727 fs.add(ds) 1728 1729 ds, opt = dummy 1730 ds.SOPClassUID = ColorPaletteStorage 1731 fs.add(ds) 1732 1733 ref = ( 1734 "DICOM File-set\n" 1735 " Root directory: (no value available)\n" 1736 " File-set ID: (no value available)\n" 1737 f" File-set UID: {fs.UID}\n" 1738 " Descriptor file ID: (no value available)\n" 1739 " Descriptor file character set: (no value available)\n" 1740 " Changes staged for write(): DICOMDIR creation, 30 additions\n" 1741 "\n" 1742 " Managed instances:\n" 1743 " PATIENT: PatientID='1CT1', " 1744 "PatientName='CompressedSamples^CT1'\n" 1745 " STUDY: StudyDate=20040119, StudyTime=072730, " 1746 "StudyDescription='e+1'\n" 1747 " SERIES: Modality=CT, SeriesNumber=1\n" 1748 " IMAGE: 1 SOP Instance (1 addition)\n" 1749 " PATIENT: PatientID='4MR1', " 1750 "PatientName='CompressedSamples^MR1'\n" 1751 " STUDY: StudyDate=20040826, StudyTime=185059\n" 1752 " SERIES: Modality=MR, SeriesNumber=1\n" 1753 " IMAGE: 1 SOP Instance (1 addition)\n" 1754 " PATIENT: PatientID='12345678', PatientName='Citizen^Jan'\n" 1755 " STUDY: StudyDate=20200913, StudyTime=161900, " 1756 "StudyDescription='Testing File-set'\n" 1757 " SERIES: Modality=CT, SeriesNumber=1\n" 1758 " IMAGE: 25 SOP Instances (25 additions)\n" 1759 " RT DOSE: 1 SOP Instance (1 addition)\n" 1760 " RT PLAN: 1 SOP Instance (1 addition)\n" 1761 " PALETTE: 1 SOP Instance (to be added)" 1762 ) 1763 1764 assert ref == str(fs) 1765 1766 ds, paths = write_fs(fs, tdir.name) 1767 1768 ref = ( 1769 " File-set ID: (no value available)\n" 1770 f" File-set UID: {fs.UID}\n" 1771 " Descriptor file ID: (no value available)\n" 1772 " Descriptor file character set: (no value available)\n" 1773 "\n" 1774 " Managed instances:\n" 1775 " PATIENT: PatientID='1CT1', " 1776 "PatientName='CompressedSamples^CT1'\n" 1777 " STUDY: StudyDate=20040119, StudyTime=072730, " 1778 "StudyDescription='e+1'\n" 1779 " SERIES: Modality=CT, SeriesNumber=1\n" 1780 " IMAGE: 1 SOP Instance\n" 1781 " PATIENT: PatientID='4MR1', " 1782 "PatientName='CompressedSamples^MR1'\n" 1783 " STUDY: StudyDate=20040826, StudyTime=185059\n" 1784 " SERIES: Modality=MR, SeriesNumber=1\n" 1785 " IMAGE: 1 SOP Instance\n" 1786 " PATIENT: PatientID='12345678', PatientName='Citizen^Jan'\n" 1787 " STUDY: StudyDate=20200913, StudyTime=161900, " 1788 "StudyDescription='Testing File-set'\n" 1789 " SERIES: Modality=CT, SeriesNumber=1\n" 1790 " IMAGE: 25 SOP Instances\n" 1791 " RT DOSE: 1 SOP Instance\n" 1792 " RT PLAN: 1 SOP Instance\n" 1793 " PALETTE: 1 SOP Instance" 1794 ) 1795 1796 assert ref in str(fs) 1797 1798 for instance in fs: 1799 fs.remove(instance) 1800 1801 for p in list(Path(TINY_ALPHA_FILESET).parent.glob('**/*'))[1:40:2]: 1802 if p.is_file() and p.name not in ['DICOMDIR', 'README']: 1803 fs.add(p) 1804 1805 ref = ( 1806 " File-set ID: (no value available)\n" 1807 f" File-set UID: {fs.UID}\n" 1808 " Descriptor file ID: (no value available)\n" 1809 " Descriptor file character set: (no value available)\n" 1810 " Changes staged for write(): DICOMDIR update, 18 additions, " 1811 "30 removals\n" 1812 "\n" 1813 " Managed instances:\n" 1814 " PATIENT: PatientID='1CT1', " 1815 "PatientName='CompressedSamples^CT1'\n" 1816 " STUDY: StudyDate=20040119, StudyTime=072730, " 1817 "StudyDescription='e+1'\n" 1818 " SERIES: Modality=CT, SeriesNumber=1\n" 1819 " IMAGE: 0 SOP Instances (1 initial, 1 removal)\n" 1820 " PATIENT: PatientID='4MR1', " 1821 "PatientName='CompressedSamples^MR1'\n" 1822 " STUDY: StudyDate=20040826, StudyTime=185059\n" 1823 " SERIES: Modality=MR, SeriesNumber=1\n" 1824 " IMAGE: 0 SOP Instances (1 initial, 1 removal)\n" 1825 " PATIENT: PatientID='12345678', PatientName='Citizen^Jan'\n" 1826 " STUDY: StudyDate=20200913, StudyTime=161900, " 1827 "StudyDescription='Testing File-set'\n" 1828 " SERIES: Modality=CT, SeriesNumber=1\n" 1829 " IMAGE: 18 SOP Instances (25 initial, 18 additions, " 1830 "25 removals)\n" 1831 " RT DOSE: 0 SOP Instances (1 initial, 1 removal)\n" 1832 " RT PLAN: 0 SOP Instances (1 initial, 1 removal)\n" 1833 " PALETTE: 1 SOP Instance (to be removed)" 1834 ) 1835 1836 assert ref in str(fs) 1837 1838 def test_str_update_structure(self, dicomdir): 1839 """Test that the update structure comment appears.""" 1840 fs = FileSet(dicomdir) 1841 assert ( 1842 "Changes staged for write(): DICOMDIR update, directory " 1843 "structure update" 1844 ) in str(fs) 1845 1846 1847@pytest.mark.filterwarnings("ignore:The 'DicomDir'") 1848class TestFileSet_Load: 1849 """Tests for a loaded File-set.""" 1850 def test_write_dicomdir(self, dicomdir): 1851 """Test DICOMDIR writing""" 1852 fs = FileSet(dicomdir) 1853 out = DicomBytesIO() 1854 out.is_little_endian = True 1855 out.is_implicit_VR = False 1856 fs._write_dicomdir(out) 1857 out.seek(0) 1858 1859 new = dcmread(out) 1860 assert dicomdir.DirectoryRecordSequence == new.DirectoryRecordSequence 1861 assert ( 1862 396 == new.OffsetOfTheFirstDirectoryRecordOfTheRootDirectoryEntity 1863 ) 1864 assert ( 1865 3126 == new.OffsetOfTheLastDirectoryRecordOfTheRootDirectoryEntity 1866 ) 1867 1868 def test_write_new_path(self, dicomdir): 1869 """Test writing to a new path.""" 1870 fs = FileSet(dicomdir) 1871 assert fs.path is not None 1872 msg = ( 1873 r"The path for an existing File-set cannot be changed, use " 1874 r"'FileSet.copy\(\)' to write the File-set to a new location" 1875 ) 1876 with pytest.raises(ValueError, match=msg): 1877 fs.write("MYNEWPATH") 1878 1879 def test_bad_sop_class_raises(self, dicomdir): 1880 """Test loading using non-DICOMDIR.""" 1881 dicomdir.file_meta.MediaStorageSOPClassUID = '1.2.3' 1882 msg = ( 1883 r"Unable to load the File-set as the supplied dataset is " 1884 r"not a 'Media Storage Directory' instance" 1885 ) 1886 with pytest.raises(ValueError, match=msg): 1887 fs = FileSet(dicomdir) 1888 1889 def test_bad_filename_raises(self, dicomdir): 1890 """Test loading with a bad path.""" 1891 dicomdir.filename = 'bad' 1892 msg = ( 1893 r"Unable to load the File-set as the 'filename' attribute " 1894 r"for the DICOMDIR dataset is not a valid path: " 1895 r"bad" 1896 ) 1897 with pytest.raises(FileNotFoundError, match=msg): 1898 FileSet(dicomdir) 1899 1900 def test_bad_filename_type_raises(self, dicomdir): 1901 """Test loading with a bad DICOMDIR filename type.""" 1902 dicomdir.filename = None 1903 msg = ( 1904 r"Unable to load the File-set as the DICOMDIR dataset must " 1905 r"have a 'filename' attribute set to the path of the " 1906 r"DICOMDIR file" 1907 ) 1908 with pytest.raises(TypeError, match=msg): 1909 FileSet(dicomdir) 1910 1911 def test_find(self, dicomdir): 1912 """Tests for FileSet.find().""" 1913 fs = FileSet(dicomdir) 1914 assert 31 == len(fs.find()) 1915 assert 7 == len(fs.find(PatientID='77654033')) 1916 assert 24 == len(fs.find(PatientID='98890234')) 1917 1918 matches = fs.find(PatientID='98890234', StudyDate="20030505") 1919 assert 17 == len(matches) 1920 for ii in matches: 1921 assert isinstance(ii, FileInstance) 1922 1923 sop_instances = [ii.SOPInstanceUID for ii in matches] 1924 assert 17 == len(list(set(sop_instances))) 1925 1926 def test_find_load(self, private): 1927 """Test FileSet.find(load=True).""" 1928 fs = FileSet(private) 1929 msg = ( 1930 r"None of the records in the DICOMDIR dataset contain all " 1931 r"the query elements, consider using the 'load' parameter " 1932 r"to expand the search to the corresponding SOP instances" 1933 ) 1934 with pytest.warns(UserWarning, match=msg): 1935 results = fs.find( 1936 load=False, PhotometricInterpretation="MONOCHROME1" 1937 ) 1938 assert not results 1939 1940 results = fs.find( 1941 load=True, PhotometricInterpretation="MONOCHROME1" 1942 ) 1943 assert 3 == len(results) 1944 1945 def test_find_values(self, private): 1946 """Test searching the FileSet for element values.""" 1947 fs = FileSet(private) 1948 expected = { 1949 "PatientID": ['77654033', '98890234'], 1950 "StudyDescription": [ 1951 'XR C Spine Comp Min 4 Views', 1952 'CT, HEAD/BRAIN WO CONTRAST', 1953 '', 1954 'Carotids', 1955 'Brain', 1956 'Brain-MRA', 1957 ], 1958 } 1959 for k, v in expected.items(): 1960 assert fs.find_values(k) == v 1961 assert fs.find_values(list(expected.keys())) == expected 1962 1963 def test_find_values_load(self, private): 1964 """Test FileSet.find_values(load=True).""" 1965 fs = FileSet(private) 1966 search_element = "PhotometricInterpretation" 1967 msg = ( 1968 r"None of the records in the DICOMDIR dataset contain " 1969 fr"\['{search_element}'\], consider using the 'load' parameter " 1970 r"to expand the search to the corresponding SOP instances" 1971 ) 1972 with pytest.warns(UserWarning, match=msg): 1973 results = fs.find_values(search_element, load=False) 1974 assert not results 1975 1976 assert fs.find_values(search_element, load=True) == [ 1977 'MONOCHROME1', 'MONOCHROME2' 1978 ] 1979 1980 with pytest.warns(UserWarning, match=msg): 1981 results = fs.find_values([search_element], load=False) 1982 assert not results[search_element] 1983 1984 assert ( 1985 fs.find_values([search_element], load=True) 1986 ) == {search_element: ['MONOCHROME1', 'MONOCHROME2']} 1987 1988 def test_empty_file_id(self, dicomdir): 1989 """Test loading a record with an empty File ID.""" 1990 item = dicomdir.DirectoryRecordSequence[5] 1991 item.ReferencedFileID = None 1992 uid = item.ReferencedSOPInstanceUIDInFile 1993 fs = FileSet(dicomdir) 1994 assert [] == fs.find(SOPInstanceUID=uid) 1995 assert 30 == len(fs) 1996 1997 def test_bad_file_id(self, dicomdir): 1998 """Test loading a record with a bad File ID.""" 1999 item = dicomdir.DirectoryRecordSequence[5] 2000 item.ReferencedFileID[-1] = "MISSING" 2001 uid = item.ReferencedSOPInstanceUIDInFile 2002 msg = ( 2003 r"The referenced SOP Instance for the directory record at offset " 2004 r"1220 does not exist:" 2005 ) 2006 with pytest.warns(UserWarning, match=msg): 2007 fs = FileSet(dicomdir) 2008 2009 assert [] == fs.find(SOPInstanceUID=uid) 2010 assert 30 == len(fs) 2011 2012 def test_load_orphans_raise(self, private): 2013 """Test loading orphaned records raises exception.""" 2014 ds = private 2015 seq = ds.DirectoryRecordSequence 2016 uid = seq[3].ReferencedSOPInstanceUIDInFile 2017 seq[1].ReferencedSOPInstanceUIDInFile = uid 2018 seq[1].ReferencedTransferSyntaxUIDInFile = ExplicitVRLittleEndian 2019 seq[1].ReferencedSOPClassUID = ComputedRadiographyImageStorage 2020 seq[1].OffsetOfReferencedLowerLevelDirectoryEntity = 0 2021 seq[1].ReferencedFileID = seq[3].ReferencedFileID 2022 del seq[1].StudyInstanceUID 2023 fs = FileSet() 2024 msg = r"The DICOMDIR contains orphaned directory records" 2025 with pytest.raises(ValueError, match=msg): 2026 fs.load(ds, raise_orphans=True) 2027 2028 def test_load_orphans_exclude(self, private): 2029 """Test loading and ignore orphaned records.""" 2030 # The first study includes 3 series, each series with 1 image 2031 # so we're orphaning 3 instances 2032 seq = private.DirectoryRecordSequence 2033 seq[1].OffsetOfReferencedLowerLevelDirectoryEntity = 0 2034 fs = FileSet() 2035 msg = ( 2036 r"The DICOMDIR has 3 orphaned directory records that reference " 2037 r"an instance that will not be included in the File-set" 2038 ) 2039 with pytest.warns(UserWarning, match=msg): 2040 fs.load(private, include_orphans=False) 2041 2042 assert 29 == len(fs) 2043 assert [] == fs.find(StudyInstanceUID=seq[1].StudyInstanceUID) 2044 assert [] == fs.find(SeriesInstanceUID=seq[2].SeriesInstanceUID) 2045 for ii in range(3, 9, 2): 2046 assert "IMAGE" == seq[ii].DirectoryRecordType 2047 assert [] == fs.find( 2048 SOPInstanceUID=seq[ii].ReferencedSOPInstanceUIDInFile 2049 ) 2050 2051 def test_load_orphans_no_file_id(self, private): 2052 """Test loading orphaned records without a valid File ID.""" 2053 # The first study includes 3 series, each series with 1 image 2054 # so we're orphaning 3 instances, 1 with an invalid File ID 2055 seq = private.DirectoryRecordSequence 2056 seq[1].OffsetOfReferencedLowerLevelDirectoryEntity = 0 2057 seq[5].ReferencedFileID = None 2058 fs = FileSet() 2059 fs.load(private) 2060 2061 assert 31 == len(fs) 2062 assert "IMAGE" == seq[5].DirectoryRecordType 2063 assert [] == fs.find( 2064 SOPInstanceUID=seq[5].ReferencedSOPInstanceUIDInFile 2065 ) 2066 assert 1 == len( 2067 fs.find(SOPInstanceUID=seq[3].ReferencedSOPInstanceUIDInFile) 2068 ) 2069 assert 1 == len( 2070 fs.find(SOPInstanceUID=seq[7].ReferencedSOPInstanceUIDInFile) 2071 ) 2072 2073 def test_load_orphans_private(self, private): 2074 """Test loading an orphaned PRIVATE record.""" 2075 seq = private.DirectoryRecordSequence 2076 seq[-2].OffsetOfReferencedLowerLevelDirectoryEntity = 0 2077 fs = FileSet() 2078 fs.load(private) 2079 assert 32 == len(fs) 2080 assert 1 == len( 2081 fs.find(SOPInstanceUID=seq[-1].ReferencedSOPInstanceUIDInFile) 2082 ) 2083 2084 def test_load_dicomdir_big_endian(self, dicomdir, tdir): 2085 """Test loading a big endian DICOMDIR""" 2086 with pytest.warns(UserWarning): 2087 ds = dcmread(BIGENDIAN_TEST_FILE) 2088 msg = ( 2089 r"The DICOMDIR dataset uses an invalid transfer syntax " 2090 r"'Explicit VR Big Endian' and will be updated to use 'Explicit " 2091 r"VR Little Endian'" 2092 ) 2093 with pytest.warns(UserWarning, match=msg): 2094 fs = FileSet(ds) 2095 2096 # Should be written out as explicit little 2097 fs, ds, paths = copy_fs(fs, tdir.name) 2098 assert ExplicitVRLittleEndian == ds.file_meta.TransferSyntaxUID 2099 2100 ref = FileSet(dicomdir) 2101 assert len(ref) == len(fs) 2102 for ii, rr in zip(fs, ref): 2103 assert ii.SOPInstanceUID == rr.SOPInstanceUID 2104 2105 def test_load_dicomdir_implicit(self, dicomdir, tdir): 2106 """Test loading an implicit VR DICOMDIR.""" 2107 with pytest.warns(UserWarning): 2108 ds = dcmread(IMPLICIT_TEST_FILE) 2109 msg = ( 2110 r"The DICOMDIR dataset uses an invalid transfer syntax " 2111 r"'Implicit VR Little Endian' and will be updated to use " 2112 r"'Explicit VR Little Endian'" 2113 ) 2114 with pytest.warns(UserWarning, match=msg): 2115 fs = FileSet(ds) 2116 2117 # Should be written out as explicit little 2118 fs, ds, paths = copy_fs(fs, tdir.name) 2119 assert ExplicitVRLittleEndian == ds.file_meta.TransferSyntaxUID 2120 2121 ref = FileSet(dicomdir) 2122 assert len(ref) == len(fs) 2123 for ii, rr in zip(fs, ref): 2124 assert ii.SOPInstanceUID == rr.SOPInstanceUID 2125 2126 def test_load_dicomdir_reordered(self, dicomdir): 2127 """Test loading DICOMDIR-reordered""" 2128 ds = dcmread(get_testdata_file('DICOMDIR-reordered')) 2129 fs = FileSet(ds) 2130 ref = FileSet(dicomdir) 2131 assert len(ref) == len(fs) 2132 for ii, rr in zip(fs, ref): 2133 assert ii.SOPInstanceUID == rr.SOPInstanceUID 2134 2135 def test_load_dicomdir_no_offset(self, dicomdir): 2136 """Test loading DICOMDIR-nooffset""" 2137 ds = dcmread(get_testdata_file('DICOMDIR-nooffset')) 2138 fs = FileSet(ds) 2139 ref = FileSet(dicomdir) 2140 assert len(ref) == len(fs) 2141 for ii, rr in zip(fs, ref): 2142 assert ii.SOPInstanceUID == rr.SOPInstanceUID 2143 2144 def test_load_dicomdir_no_uid(self, dicomdir): 2145 """Test loading DICOMDIR with no UID""" 2146 del dicomdir.file_meta.MediaStorageSOPInstanceUID 2147 fs = FileSet(dicomdir) 2148 assert fs.UID.is_valid 2149 assert fs.UID == dicomdir.file_meta.MediaStorageSOPInstanceUID 2150 2151 2152@pytest.mark.filterwarnings("ignore:The 'DicomDir'") 2153class TestFileSet_Modify: 2154 """Tests for a modified File-set.""" 2155 def setup(self): 2156 self.fn = FileSet.__len__ 2157 2158 def teardown(self): 2159 FileSet.__len__ = self.fn 2160 2161 def test_write_dicomdir_fs_changes(self, dicomdir_copy): 2162 """Test FileSet.write() with only ^ changes.""" 2163 t, ds = dicomdir_copy 2164 fs = FileSet(ds) 2165 ds, paths = write_fs(fs) 2166 assert not fs._stage['^'] 2167 fs.descriptor_file_id = ["1", "2", "3"] 2168 assert fs._stage['^'] 2169 assert not fs._stage['~'] 2170 assert not fs._stage['+'] 2171 assert not fs._stage['-'] 2172 fs.write() 2173 assert not fs._stage['^'] 2174 ds = dcmread(Path(fs.path) / "DICOMDIR") 2175 assert ["1", "2", "3"] == ds.FileSetDescriptorFileID 2176 2177 def test_write_dicomdir_use_existing(self, dicomdir_copy): 2178 """Test FileSet.write() with use_existing.""" 2179 tdir, ds = dicomdir_copy 2180 assert "FileSetDescriptorFileID" not in ds 2181 fs = FileSet(ds) 2182 assert fs._stage['~'] 2183 assert not fs._stage['+'] 2184 assert not fs._stage['-'] 2185 fs.descriptor_file_id = ["1", "2", "3"] 2186 fs.write(use_existing=True) 2187 t = Path(tdir.name) 2188 # File IDs haven't changed 2189 assert [] == list(t.glob("PT000000")) 2190 assert 1 == len(list(t.glob("98892003"))) 2191 ds = dcmread(t / "DICOMDIR") 2192 assert ["1", "2", "3"] == ds.FileSetDescriptorFileID 2193 2194 def test_write_dicomdir_use_existing_raises(self, dicomdir_copy, ct): 2195 """Test FileSet.write() with use_existing raises with +/- changes.""" 2196 tdir, ds = dicomdir_copy 2197 assert "FileSetDescriptorFileID" not in ds 2198 fs = FileSet(ds) 2199 fs.add(ct) 2200 assert fs._stage['~'] 2201 assert fs._stage['+'] 2202 assert not fs._stage['-'] 2203 fs.descriptor_file_id = ["1", "2", "3"] 2204 msg = ( 2205 r"'Fileset.write\(\)' called with 'use_existing' but additions to " 2206 r"the File-set's managed instances are staged" 2207 ) 2208 with pytest.raises(ValueError, match=msg): 2209 fs.write(use_existing=True) 2210 2211 def test_remove_addition_bad_path(self, dicomdir, ct): 2212 """Test removing a missing file from the File-set's stage.""" 2213 fs = FileSet(dicomdir) 2214 fs.add(ct) 2215 instance = fs.find(SOPInstanceUID=ct.SOPInstanceUID)[0] 2216 assert instance.SOPInstanceUID in fs._stage['+'] 2217 assert instance in fs 2218 2219 path = instance._stage_path 2220 instance._stage_path = Path(fs.path) / "BADFILE" 2221 fs.remove(instance) 2222 assert instance not in fs 2223 assert instance.SOPInstanceUID not in fs._stage['-'] 2224 assert instance.SOPInstanceUID not in fs._stage['+'] 2225 # File should still exist 2226 assert path.exists() 2227 2228 def test_write_file_id(self, tiny): 2229 """Test that the File IDs character sets switch correctly.""" 2230 tdir, ds = temporary_fs(tiny) 2231 2232 def my_len(self): 2233 return 10**6 + 1 2234 2235 FileSet.__len__ = my_len 2236 fs = FileSet(ds) 2237 assert 10**6 + 1 == len(fs) 2238 ds, paths = write_fs(fs) 2239 instance = fs._instances[-1] 2240 # Was written with alphanumeric File IDs 2241 assert "IM00001D" in instance.path 2242 2243 def my_len(self): 2244 return 36**6 + 1 2245 2246 FileSet.__len__ = my_len 2247 fs = FileSet(ds) 2248 assert 36**6 + 1 == len(fs) 2249 msg = ( 2250 r"pydicom doesn't support writing File-sets with more than " 2251 r"2176782336 managed instances" 2252 ) 2253 with pytest.raises(NotImplementedError, match=msg): 2254 fs.write() 2255 2256 def test_write_missing_removal(self, tiny): 2257 """Test that missing files are ignored when removing during write.""" 2258 tdir, ds = temporary_fs(tiny) 2259 fs = FileSet(ds) 2260 instance = fs._instances[0] 2261 path = Path(instance.path) 2262 fs.remove(instance) 2263 assert path.exists() 2264 path.unlink() 2265 assert not path.exists() 2266 ds, paths = write_fs(fs) 2267 assert [] == fs.find(SOPInstanceUID=instance.SOPInstanceUID) 2268 assert 49 == len(fs) 2269 2270 def test_write_removal_addition_collision(self, tiny): 2271 """Test re-adding files staged for removal that also collide.""" 2272 # The colliding files are IM000010 to IM000019 which get 2273 # overwritten by IM00000A to IM00000J 2274 tdir, ds = temporary_fs(tiny) 2275 fs = FileSet(ds) 2276 # IM000010 to IM000013 2277 instances = fs._instances[36:40] 2278 assert "IM000010" == Path(instances[0].path).name 2279 assert "IM000011" == Path(instances[1].path).name 2280 assert "IM000012" == Path(instances[2].path).name 2281 assert "IM000013" == Path(instances[3].path).name 2282 fs.remove(instances) 2283 assert 46 == len(fs) 2284 for instance in instances: 2285 path = Path(instance.path) 2286 fs.add(path) 2287 2288 ds, paths = write_fs(fs) 2289 assert 50 == len(paths) 2290 original = FileSet(tiny) 2291 assert len(original) == len(fs) 2292 for ref, ii in zip(original, fs): 2293 assert ref.path != ii.path 2294 assert ref.SOPInstanceUID == ii.SOPInstanceUID 2295 rs = ref.load() 2296 ts = ii.load() 2297 assert Dataset(rs) == ts 2298 2299 def test_write_implicit(self, dicomdir, dicomdir_copy, tdir): 2300 """Test writing the DICOMDIR using Implicit VR""" 2301 tdir, ds = dicomdir_copy 2302 fs = FileSet(ds) 2303 with pytest.warns(UserWarning): 2304 fs.write(force_implicit=True, use_existing=True) 2305 with pytest.warns(UserWarning): 2306 ds = dcmread(Path(fs.path) / "DICOMDIR") 2307 assert ImplicitVRLittleEndian == ds.file_meta.TransferSyntaxUID 2308 2309 with pytest.warns(UserWarning): 2310 ref_ds = dcmread(IMPLICIT_TEST_FILE) 2311 assert Dataset(ref_ds) == ds 2312 2313 ref = FileSet(dicomdir) 2314 assert len(ref) == len(fs) 2315 for ii, rr in zip(fs, ref): 2316 assert ii.SOPInstanceUID == rr.SOPInstanceUID 2317 2318 def test_write_use_existing(self, dicomdir_copy): 2319 """Test write() with use_existing.""" 2320 tdir, ds = dicomdir_copy 2321 assert 52 == len(ds.DirectoryRecordSequence) 2322 fs = FileSet(ds) 2323 orig_paths = [p for p in fs._path.glob('**/*') if p.is_file()] 2324 instance = fs._instances[0] 2325 assert Path(instance.path) in orig_paths 2326 fs.remove(instance) 2327 orig_file_ids = [ii.ReferencedFileID for ii in fs] 2328 fs.write(use_existing=True) 2329 assert 50 == len(fs._ds.DirectoryRecordSequence) 2330 paths = [p for p in fs._path.glob('**/*') if p.is_file()] 2331 assert orig_file_ids == [ii.ReferencedFileID for ii in fs] 2332 assert Path(instance.path) not in paths 2333 assert sorted(orig_paths)[1:] == sorted(paths) 2334 assert {} == fs._stage['-'] 2335 assert not fs._stage['^'] 2336 assert {} == fs._stage['+'] 2337 assert fs._stage['~'] 2338 2339 def test_write_use_existing_raises(self, dicomdir, ct): 2340 """Test write() with use_existing raises if additions.""" 2341 fs = FileSet(dicomdir) 2342 fs.remove(fs._instances[0]) 2343 fs.add(ct) 2344 msg = ( 2345 r"'Fileset.write\(\)' called with 'use_existing' but additions " 2346 r"to the File-set's managed instances are staged" 2347 ) 2348 with pytest.raises(ValueError, match=msg): 2349 fs.write(use_existing=True) 2350 2351 def test_add_instance_missing(self, tdir): 2352 """Test adding an instance missing a required value.""" 2353 fs = FileSet() 2354 ds = dcmread(get_testdata_file("rtdose.dcm")) 2355 del ds.InstanceNumber 2356 msg = ( 2357 r"Unable to use the default 'RT DOSE' record creator " 2358 r"as the instance is missing a required element or value. Either " 2359 r"update the instance, define your own record creation function " 2360 r"or use 'FileSet.add_custom\(\)' instead" 2361 ) 2362 with pytest.raises(ValueError, match=msg): 2363 fs.add(ds) 2364 2365 def test_add_instance_missing_required_value(self, tdir): 2366 """Test adding an instance missing a required value.""" 2367 fs = FileSet() 2368 ds = dcmread(get_testdata_file("rtdose.dcm")) 2369 ds.InstanceNumber = None 2370 msg = ( 2371 r"Unable to use the default 'RT DOSE' record creator " 2372 r"as the instance is missing a required element or value. Either " 2373 r"update the instance, define your own record creation function " 2374 r"or use 'FileSet.add_custom\(\)' instead" 2375 ) 2376 with pytest.raises(ValueError, match=msg): 2377 fs.add(ds) 2378 2379 def test_add_rt_dose(self, tdir): 2380 """Test adding an RT Dose instance.""" 2381 fs = FileSet() 2382 ds = dcmread(get_testdata_file("rtdose.dcm")) 2383 ds.SpecificCharacterSet = "ISO_IR 100" 2384 ds.InstanceNumber = 1 # Type 2, but Type 1 in the RT DOSE record 2385 fs.add(ds) 2386 assert 1 == len(fs.find(SOPInstanceUID=ds.SOPInstanceUID)) 2387 dicomdir, paths = write_fs(fs, tdir.name) 2388 assert 1 == len(paths) 2389 seq = dicomdir.DirectoryRecordSequence 2390 assert "PATIENT" == seq[0].DirectoryRecordType 2391 assert "STUDY" == seq[1].DirectoryRecordType 2392 assert "SERIES" == seq[2].DirectoryRecordType 2393 assert "RT DOSE" == seq[3].DirectoryRecordType 2394 assert Dataset(ds) == dcmread(paths[0]) 2395 2396 def test_add_rt_structure_set(self, tdir): 2397 """Test adding an RT Structure Set instance.""" 2398 ds = dcmread(get_testdata_file("rtstruct.dcm"), force=True) 2399 ds.file_meta = FileMetaDataset() 2400 ds.file_meta.TransferSyntaxUID = ImplicitVRLittleEndian 2401 ds.StudyDate = "20201001" 2402 ds.StudyTime = "120000" 2403 2404 fs = FileSet() 2405 fs.add(ds) 2406 assert 1 == len(fs.find(SOPInstanceUID=ds.SOPInstanceUID)) 2407 dicomdir, paths = write_fs(fs, tdir.name) 2408 assert 1 == len(paths) 2409 seq = dicomdir.DirectoryRecordSequence 2410 assert "PATIENT" == seq[0].DirectoryRecordType 2411 assert "STUDY" == seq[1].DirectoryRecordType 2412 assert "SERIES" == seq[2].DirectoryRecordType 2413 assert "RT STRUCTURE SET" == seq[3].DirectoryRecordType 2414 assert Dataset(ds) == dcmread(paths[0]) 2415 2416 def test_add_rt_plan(self, tdir): 2417 """Test adding an RT Plan instance.""" 2418 ds = dcmread(get_testdata_file("rtplan.dcm"), force=True) 2419 ds.InstanceNumber = 1 2420 2421 fs = FileSet() 2422 fs.add(ds) 2423 assert 1 == len(fs.find(SOPInstanceUID=ds.SOPInstanceUID)) 2424 dicomdir, paths = write_fs(fs, tdir.name) 2425 assert 1 == len(paths) 2426 seq = dicomdir.DirectoryRecordSequence 2427 assert "PATIENT" == seq[0].DirectoryRecordType 2428 assert "STUDY" == seq[1].DirectoryRecordType 2429 assert "SERIES" == seq[2].DirectoryRecordType 2430 assert "RT PLAN" == seq[3].DirectoryRecordType 2431 assert Dataset(ds) == dcmread(paths[0]) 2432 2433 def test_remove_list(self, dicomdir, tdir): 2434 """Test remove using a list of instances.""" 2435 fs = FileSet(dicomdir) 2436 instances = fs.find(StudyDescription='XR C Spine Comp Min 4 Views') 2437 fs.remove(instances) 2438 assert 28 == len(fs) 2439 2440 def test_add_bad_one_level(self, dummy): 2441 """Test adding a bad one-level dataset raises.""" 2442 ds, opt = dummy 2443 ds.SOPClassUID = HangingProtocolStorage 2444 del ds.HangingProtocolCreator 2445 fs = FileSet() 2446 msg = ( 2447 r"Unable to use the default 'HANGING PROTOCOL' record creator " 2448 r"as the instance is missing a required element or value. Either " 2449 r"update the instance, define your own record creation function " 2450 r"or use 'FileSet.add_custom\(\)' instead" 2451 ) 2452 with pytest.raises(ValueError, match=msg): 2453 fs.add(ds) 2454 2455 2456@pytest.mark.filterwarnings("ignore:The 'DicomDir'") 2457class TestFileSet_Copy: 2458 """Tests for copying a File-set.""" 2459 def setup(self): 2460 self.orig = FileSet.__len__ 2461 2462 def teardown(self): 2463 FileSet.__len__ = self.orig 2464 2465 def test_copy(self, dicomdir, tdir): 2466 """Test FileSet.copy()""" 2467 orig_root = Path(dicomdir.filename).parent 2468 fs = FileSet(dicomdir) 2469 2470 fs.ID = "NEW ID" 2471 uid = fs.UID = generate_uid() 2472 fs.descriptor_file_id = "README" 2473 fs.descriptor_character_set = "ISO_IR 100" 2474 cp, ds, paths = copy_fs(fs, tdir.name) 2475 assert 31 == len(paths) 2476 assert ( 2477 ('PT000000', 'ST000000', 'SE000000', 'IM000000') 2478 ) == paths[0].parts[-4:] 2479 assert ( 2480 ('PT000001', 'ST000003', 'SE000002', 'IM000006') 2481 ) == paths[-1].parts[-4:] 2482 2483 # Check existing File-set remains the same 2484 assert "NEW ID" == fs.ID 2485 assert dicomdir.file_meta.TransferSyntaxUID == ExplicitVRLittleEndian 2486 assert uid == fs.UID 2487 assert dicomdir.file_meta.MediaStorageSOPInstanceUID == fs.UID 2488 assert "README" == fs.descriptor_file_id 2489 assert "ISO_IR 100" == fs.descriptor_character_set 2490 assert not bool(fs._stage['+']) 2491 assert not bool(fs._stage['-']) 2492 assert fs.is_staged 2493 paths = list(orig_root.glob('98892001/**/*')) 2494 paths += list(orig_root.glob('98892003/**/*')) 2495 paths += list(orig_root.glob('77654033/**/*')) 2496 paths = [p for p in paths if p.is_file()] 2497 2498 # Test new File-set 2499 assert len(fs) == len(cp) 2500 for ref, instance in zip(fs, cp): 2501 assert ref.SOPInstanceUID == instance.SOPInstanceUID 2502 2503 assert ds.file_meta.TransferSyntaxUID == ExplicitVRLittleEndian 2504 assert not ds.is_implicit_VR 2505 assert ds.is_little_endian 2506 assert not cp.is_staged 2507 assert "NEW ID" == cp.ID 2508 assert uid == cp.UID 2509 assert ds.file_meta.MediaStorageSOPInstanceUID == cp.UID 2510 assert "README" == cp.descriptor_file_id 2511 assert "ISO_IR 100" == cp.descriptor_character_set 2512 2513 def test_copy_raises(self, dicomdir, tdir): 2514 """Test exceptions raised by FileSet.copy().""" 2515 fs = FileSet(dicomdir) 2516 msg = r"Cannot copy the File-set as the 'path' is unchanged" 2517 with pytest.raises(ValueError, match=msg): 2518 fs.copy(fs.path) 2519 2520 def test_copy_implicit(self, dicomdir, tdir): 2521 """Test copy() with implicit VR.""" 2522 assert not dicomdir.is_implicit_VR 2523 fs = FileSet(dicomdir) 2524 with pytest.warns(UserWarning): 2525 cp, ds, paths = copy_fs(fs, tdir.name, as_implicit=True) 2526 2527 # Check existing File-set remains the same 2528 assert "PYDICOM_TEST" == fs.ID 2529 assert dicomdir.file_meta.TransferSyntaxUID == ExplicitVRLittleEndian 2530 assert dicomdir.file_meta.MediaStorageSOPInstanceUID == fs.UID 2531 assert fs.descriptor_file_id is None 2532 assert fs.descriptor_character_set is None 2533 assert not bool(fs._stage['+']) 2534 assert not bool(fs._stage['-']) 2535 2536 assert 31 == len(paths) 2537 2538 assert len(fs) == len(cp) 2539 for ref, instance in zip(fs, cp): 2540 assert ref.SOPInstanceUID == instance.SOPInstanceUID 2541 2542 assert ds.file_meta.TransferSyntaxUID == ImplicitVRLittleEndian 2543 assert ds.is_implicit_VR 2544 assert ds.is_little_endian 2545 2546 def test_file_id(self, tiny, tdir): 2547 """Test that the File IDs character sets switch correctly.""" 2548 def my_len(self): 2549 return 10**6 + 1 2550 2551 FileSet.__len__ = my_len 2552 fs = FileSet(tiny) 2553 assert 10**6 + 1 == len(fs) 2554 fs, ds, paths = copy_fs(fs, tdir.name) 2555 instance = fs._instances[-1] 2556 # Was written with alphanumeric File IDs 2557 assert "IM00001D" in instance.path 2558 2559 def my_len(self): 2560 return 36**6 + 1 2561 2562 FileSet.__len__ = my_len 2563 fs = FileSet(tiny) 2564 assert 36**6 + 1 == len(fs) 2565 msg = ( 2566 r"pydicom doesn't support writing File-sets with more than " 2567 r"2176782336 managed instances" 2568 ) 2569 with pytest.raises(NotImplementedError, match=msg): 2570 fs.copy(tdir.name) 2571 2572 def test_additions(self, tiny, ct, tdir): 2573 """Test that additions get added when copying.""" 2574 fs = FileSet(tiny) 2575 assert [] == fs.find(PatientID="1CT1") 2576 fs.add(ct) 2577 cp, ds, paths = copy_fs(fs, tdir.name) 2578 assert 51 == len(paths) 2579 assert ( 2580 ('PT000001', 'ST000000', 'SE000000', 'IM000000') 2581 ) == paths[-1].parts[-4:] 2582 assert 51 == len(cp) 2583 assert not cp.is_staged 2584 instances = cp.find(PatientID="1CT1") 2585 assert 1 == len(instances) 2586 assert ct.SOPInstanceUID == instances[0].SOPInstanceUID 2587 2588 # Test addition is still staged in original fs 2589 assert fs.is_staged 2590 assert 51 == len(fs) 2591 assert 1 == len(fs._stage['+']) 2592 assert ct.SOPInstanceUID in fs._stage['+'] 2593 2594 def test_removals(self, tiny, tdir): 2595 """Test that additions get added when copying.""" 2596 fs = FileSet(tiny) 2597 instance = fs._instances[0] 2598 uid = instance.SOPInstanceUID 2599 instances = fs.find(SOPInstanceUID=uid) 2600 assert 1 == len(instances) 2601 fs.remove(instance) 2602 assert fs.is_staged 2603 2604 cp, ds, paths = copy_fs(fs, tdir.name) 2605 assert 49 == len(paths) 2606 assert ( 2607 ('PT000000', 'ST000000', 'SE000000', 'IM000048') 2608 ) == paths[-1].parts[-4:] 2609 assert not cp.is_staged 2610 names = [p.name for p in paths] 2611 assert 49 == len(names) 2612 assert "IM000000" in names 2613 assert "IM000048" in names 2614 assert "IM000049" not in names 2615 assert [] == cp.find(SOPInstanceUID=uid) 2616 2617 # Test removal is still staged in original fs 2618 assert fs.is_staged 2619 assert 1 == len(fs._stage['-']) 2620 assert uid in fs._stage['-'] 2621 2622 def test_additions_removals(self, tiny, ct, tdir): 2623 """Test copying with additions and removals.""" 2624 mr = dcmread(get_testdata_file("MR_small.dcm")) 2625 fs = FileSet(tiny) 2626 assert [] == fs.find(PatientID=ct.PatientID) 2627 assert [] == fs.find(PatientID=mr.PatientID) 2628 fs.add(ct) 2629 fs.add(mr) 2630 2631 instances = fs._instances[:5] 2632 for instance in instances: 2633 matches = fs.find(SOPInstanceUID=instance.SOPInstanceUID) 2634 assert 1 == len(matches) 2635 fs.remove(instance) 2636 2637 assert fs.is_staged 2638 cp, ds, paths = copy_fs(fs, tdir.name) 2639 2640 # Test written instances 2641 parts = [p.parts for p in paths] 2642 assert 47 == len(parts) 2643 assert 'IM000000' == parts[0][-1] 2644 assert 'IM000044' == parts[44][-1] 2645 for ii in range(45): 2646 assert ('PT000000', 'ST000000', 'SE000000') == parts[ii][-4:-1] 2647 2648 assert ( 2649 ('PT000001', 'ST000000', 'SE000000', 'IM000000') == parts[45][-4:] 2650 ) 2651 assert ( 2652 ('PT000002', 'ST000000', 'SE000000', 'IM000000') == parts[46][-4:] 2653 ) 2654 2655 # Test copied fileset 2656 assert not cp.is_staged 2657 assert 1 == len(cp.find(SOPInstanceUID=ct.SOPInstanceUID)) 2658 assert 1 == len(cp.find(SOPInstanceUID=mr.SOPInstanceUID)) 2659 for instance in instances: 2660 matches = cp.find(SOPInstanceUID=instance.SOPInstanceUID) 2661 assert 0 == len(matches) 2662 assert instance.SOPInstanceUID not in cp._stage['-'] 2663 2664 # Test original fileset 2665 assert fs.is_staged 2666 for instance in instances: 2667 assert instance.SOPInstanceUID in fs._stage['-'] 2668 assert 2 == len(fs._stage['+']) 2669 assert 1 == len(fs.find(SOPInstanceUID=ct.SOPInstanceUID)) 2670 assert 1 == len(fs.find(SOPInstanceUID=mr.SOPInstanceUID)) 2671 2672 2673# record type 2674REFERENCE_1LEVEL = [ 2675 ("HANGING PROTOCOL", HangingProtocolStorage), 2676 ("IMPLANT", GenericImplantTemplateStorage), 2677 ("IMPLANT ASSY", ImplantAssemblyTemplateStorage), 2678 ("IMPLANT GROUP", ImplantTemplateGroupStorage), 2679 ("PALETTE", ColorPaletteStorage), 2680] 2681# PATIENT -> STUDY -> SERIES -> record type 2682REFERENCE_4LEVEL = [ 2683 # Record type, SOP Class, Modality, Optional element to include 2684 ("IMAGE", CTImageStorage, "CT", None), 2685 ("RT DOSE", RTDoseStorage, "RTDOSE", None), 2686 ("RT STRUCTURE SET", RTStructureSetStorage, "RTSTRUCT", None), 2687 ("RT PLAN", RTPlanStorage, "RTPLAN", "RTPlanLabel"), 2688 ("RT TREAT RECORD", RTBeamsTreatmentRecordStorage, "RTRECORD", None), 2689 ("PRESENTATION", GrayscaleSoftcopyPresentationStateStorage, "PR", None), 2690 ("WAVEFORM", TwelveLeadECGWaveformStorage, "ECG", None), 2691 ("SR DOCUMENT", BasicTextSRStorage, "SR", None), 2692 ("KEY OBJECT DOC", KeyObjectSelectionDocumentStorage, "KO", None), 2693 ("SPECTROSCOPY", MRSpectroscopyStorage, "MS", None), 2694 ("RAW DATA", RawDataStorage, "OT", None), 2695 ("REGISTRATION", SpatialRegistrationStorage, "REG", None), 2696 ("FIDUCIAL", SpatialFiducialsStorage, "FID", None), 2697 ("ENCAP DOC", EncapsulatedPDFStorage, "DOC", "EncapsulatedDocument"), 2698 ("VALUE MAP", RealWorldValueMappingStorage, "RWV", None), 2699 ("STEREOMETRIC", StereometricRelationshipStorage, "SMR", None), 2700 ("PLAN", RTBeamsDeliveryInstructionStorage, "PLAN", None), 2701 ("MEASUREMENT", LensometryMeasurementsStorage, "LEN", None), 2702 ("SURFACE", SurfaceSegmentationStorage, "LS", None), 2703 ("SURFACE SCAN", SurfaceScanMeshStorage, "LS", None), 2704 ("TRACT", TractographyResultsStorage, "None", None), 2705 ("ASSESSMENT", ContentAssessmentResultsStorage, "ASMT", None), 2706 ("RADIOTHERAPY", CArmPhotonElectronRadiationStorage, "RTRAD", None), 2707] 2708 2709 2710@pytest.mark.filterwarnings("ignore:The 'DicomDir'") 2711@pytest.mark.parametrize("rtype, sop", REFERENCE_1LEVEL) 2712def test_one_level_record(rtype, sop, dummy, tdir): 2713 """Test adding instances that require a single level hierarchy.""" 2714 ds, opt = dummy 2715 ds.SOPClassUID = sop 2716 2717 fs = FileSet() 2718 fs.add(ds) 2719 assert 1 == len(fs.find(SOPInstanceUID=ds.SOPInstanceUID)) 2720 dicomdir, paths = write_fs(fs, tdir.name) 2721 assert 1 == len(paths) 2722 [leaf] = dicomdir.DirectoryRecordSequence 2723 assert rtype == leaf.DirectoryRecordType 2724 assert sop == leaf.ReferencedSOPClassUIDInFile 2725 assert ds.SOPInstanceUID == leaf.ReferencedSOPInstanceUIDInFile 2726 assert ds.file_meta.TransferSyntaxUID == ( 2727 leaf.ReferencedTransferSyntaxUIDInFile 2728 ) 2729 file_id = list(paths[0].relative_to(fs.path).parts) 2730 assert file_id == [leaf.ReferencedFileID] 2731 assert Dataset(ds) == dcmread(paths[0]) 2732 2733 # Test the 1C elements 2734 ds.update(opt) 2735 fs = FileSet() 2736 fs.add(ds) 2737 2738 2739@pytest.mark.filterwarnings("ignore:The 'DicomDir'") 2740@pytest.mark.parametrize("rtype, sop, modality, kw", REFERENCE_4LEVEL) 2741def test_four_level_record(rtype, sop, modality, kw, dummy, tdir): 2742 """Test adding instances that require the 4-level hierarchy.""" 2743 ds, opt = dummy 2744 ds.SOPClassUID = sop 2745 ds.Modality = modality 2746 if kw == "RTPlanLabel": 2747 setattr(ds, kw, "Value") 2748 elif kw == "EncapsulatedDocument": 2749 setattr(ds, kw, b'\x00\x01') 2750 2751 fs = FileSet() 2752 fs.add(ds) 2753 assert 1 == len(fs.find(SOPInstanceUID=ds.SOPInstanceUID)) 2754 dicomdir, paths = write_fs(fs, tdir.name) 2755 assert 1 == len(paths) 2756 [pt, st, se, leaf] = dicomdir.DirectoryRecordSequence 2757 assert "PATIENT" == pt.DirectoryRecordType 2758 assert ds.PatientID == pt.PatientID 2759 assert ds.PatientName == pt.PatientName 2760 assert "STUDY" == st.DirectoryRecordType 2761 assert ds.StudyDate == st.StudyDate 2762 assert ds.StudyTime == st.StudyTime 2763 assert ds.StudyInstanceUID == st.StudyInstanceUID 2764 assert "SERIES" == se.DirectoryRecordType 2765 assert modality == se.Modality 2766 assert rtype == leaf.DirectoryRecordType 2767 assert sop == leaf.ReferencedSOPClassUIDInFile 2768 assert ds.SOPInstanceUID == leaf.ReferencedSOPInstanceUIDInFile 2769 assert ds.file_meta.TransferSyntaxUID == ( 2770 leaf.ReferencedTransferSyntaxUIDInFile 2771 ) 2772 file_id = list(paths[0].relative_to(fs.path).parts) 2773 assert file_id == leaf.ReferencedFileID 2774 assert Dataset(ds) == dcmread(paths[0]) 2775 2776 # Test the 1C elements 2777 ds.update(opt) 2778 fs = FileSet() 2779 fs.add(ds) 2780