1
2import os
3from pathlib import Path
4import shutil
5from tempfile import TemporaryDirectory
6
7import pytest
8
9from pydicom import config, dcmread
10from pydicom.data import get_testdata_file
11from pydicom.dataset import Dataset, FileMetaDataset
12from pydicom.filebase import DicomBytesIO
13from pydicom.fileset import (
14    FileSet, FileInstance, RecordNode, is_conformant_file_id,
15    generate_filename, _define_patient, _define_study, _define_series,
16    _define_image, _PREFIXES
17)
18from pydicom.filewriter import write_dataset
19from pydicom.tag import Tag, BaseTag
20from pydicom._storage_sopclass_uids import (
21    MediaStorageDirectoryStorage, ComputedRadiographyImageStorage,
22    CTImageStorage, RTBeamsTreatmentRecordStorage, RTPlanStorage,
23    GrayscaleSoftcopyPresentationStateStorage, BasicTextSRStorage,
24    KeyObjectSelectionDocumentStorage, MRSpectroscopyStorage,
25    HangingProtocolStorage, EncapsulatedPDFStorage, ColorPaletteStorage,
26    GenericImplantTemplateStorage, ImplantAssemblyTemplateStorage,
27    ImplantTemplateGroupStorage, TwelveLeadECGWaveformStorage,
28    RawDataStorage, SpatialRegistrationStorage, SpatialFiducialsStorage,
29    RealWorldValueMappingStorage, StereometricRelationshipStorage,
30    LensometryMeasurementsStorage, SurfaceSegmentationStorage,
31    TractographyResultsStorage, SurfaceScanMeshStorage, RTDoseStorage,
32    ContentAssessmentResultsStorage, RTStructureSetStorage,
33    RTBeamsDeliveryInstructionStorage, CArmPhotonElectronRadiationStorage
34)
35from pydicom.uid import (
36    ExplicitVRLittleEndian, generate_uid, ImplicitVRLittleEndian
37)
38
39
40TEST_FILE = get_testdata_file('DICOMDIR')
41TINY_ALPHA_FILESET = get_testdata_file("tiny_alpha/DICOMDIR")
42IMPLICIT_TEST_FILE = get_testdata_file('DICOMDIR-implicit')
43BIGENDIAN_TEST_FILE = get_testdata_file('DICOMDIR-bigEnd')
44
45
46@pytest.fixture
47def tiny():
48    """Return the tiny alphanumeric File-set."""
49    return dcmread(TINY_ALPHA_FILESET)
50
51
52@pytest.fixture
53def dicomdir():
54    """Return the DICOMDIR dataset."""
55    return dcmread(TEST_FILE)
56
57
58@pytest.fixture
59def dicomdir_copy():
60    """Copy the File-set to a temporary directory and return its DICOMDIR."""
61    t = TemporaryDirectory()
62    src = Path(TEST_FILE).parent
63    dst = Path(t.name)
64
65    shutil.copyfile(src / 'DICOMDIR', dst / 'DICOMDIR')
66    shutil.copytree(src / "77654033", dst / "77654033")
67    shutil.copytree(src / "98892003", dst / "98892003")
68    shutil.copytree(src / "98892001", dst / "98892001")
69
70    return t, dcmread(dst / "DICOMDIR")
71
72
73@pytest.fixture
74def ct():
75    """Return a DICOMDIR dataset."""
76    return dcmread(get_testdata_file("CT_small.dcm"))
77
78
79@pytest.fixture
80def tdir():
81    """Return a TemporaryDirectory instance."""
82    return TemporaryDirectory()
83
84
85@pytest.fixture
86def custom_leaf():
87    """Return the leaf node from a custom 4-level record hierarchy"""
88    ct = dcmread(get_testdata_file("CT_small.dcm"))
89    patient = _define_patient(ct)
90    study = _define_study(ct)
91    series = _define_series(ct)
92    image = _define_image(ct)
93    for ii, record in enumerate([patient, study, series, image]):
94        rtypes = ["PATIENT", "STUDY", "SERIES", "IMAGE"]
95        record.DirectoryRecordType = rtypes[ii]
96        record.OffsetOfTheNextDirectoryRecord = 0
97        record.RecordInUseFlag = 0xFFFF
98        record.OffsetOfReferencedLowerLevelDirectoryEntity = 0
99
100    patient = RecordNode(patient)
101    study = RecordNode(study)
102    series = RecordNode(series)
103
104    image.ReferencedFileID = None
105    image.ReferencedSOPClassUIDInFile = ct.SOPClassUID
106    image.ReferencedSOPInstanceUIDInFile = ct.SOPInstanceUID
107    image.ReferencedTransferSyntaxUIDInFile = (
108        ct.file_meta.TransferSyntaxUID
109    )
110    image = RecordNode(image)
111
112    image.parent = series
113    series.parent = study
114    study.parent = patient
115
116    return image
117
118
119@pytest.fixture
120def private(dicomdir):
121    """Return a DICOMDIR dataset with PRIVATE records."""
122    def write_record(ds):
123        """Return `ds` as explicit little encoded bytes."""
124        fp = DicomBytesIO()
125        fp.is_implicit_VR = False
126        fp.is_little_endian = True
127        write_dataset(fp, ds)
128
129        return fp.parent.getvalue()
130
131    def private_record():
132        record = Dataset()
133        record.OffsetOfReferencedLowerLevelDirectoryEntity = 0
134        record.RecordInUseFlag = 65535
135        record.OffsetOfTheNextDirectoryRecord = 0
136        record.DirectoryRecordType = "PRIVATE"
137        record.PrivateRecordUID = generate_uid()
138
139        return record
140
141    ds = dicomdir
142
143    top = private_record()
144    middle = private_record()
145    bottom = private_record()
146    bottom.ReferencedSOPClassUIDInFile = "1.2.3.4"
147    bottom.ReferencedFileID = [
148        "tiny_alpha", "PT000000", "ST000000", "SE000000", "IM000000"
149    ]
150    bottom.ReferencedSOPInstanceUIDInFile = (
151        "1.2.276.0.7230010.3.1.4.0.31906.1359940846.78187"
152    )
153    bottom.ReferencedTransferSyntaxUIDInFile = ExplicitVRLittleEndian
154
155    len_top = len(write_record(top))  # 112
156    len_middle = len(write_record(middle))  # 112
157    len_bottom = len(write_record(bottom))  # 238
158    len_last = len(write_record(ds.DirectoryRecordSequence[-1]))  # 248
159
160    records = {}
161    for item in ds.DirectoryRecordSequence:
162        records[item.seq_item_tell] = item
163
164    # Top PRIVATE
165    # Offset to the top PRIVATE - 10860 + 248 + 8
166    offset = ds.DirectoryRecordSequence[-1].seq_item_tell + len_last + 8
167
168    # Change the last top-level record to point at the top PRIVATE
169    # Original is 3126
170    last = ds.OffsetOfTheLastDirectoryRecordOfTheRootDirectoryEntity
171    record = records[last]
172    record.OffsetOfTheNextDirectoryRecord = offset
173
174    # Change the last record offset
175    ds.OffsetOfTheLastDirectoryRecordOfTheRootDirectoryEntity = offset
176    top.seq_item_tell = offset
177
178    # Offset to the middle PRIVATE
179    offset += len_top + 8
180    top.OffsetOfReferencedLowerLevelDirectoryEntity = offset
181    ds.DirectoryRecordSequence.append(top)
182    middle.seq_item_tell = offset
183
184    # Middle PRIVATE
185    # Offset to the bottom PRIVATE
186    offset += len_middle + 8
187    middle.OffsetOfReferencedLowerLevelDirectoryEntity = offset
188    ds.DirectoryRecordSequence.append(middle)
189
190    # Bottom PRIVATE
191    ds.DirectoryRecordSequence.append(bottom)
192    bottom.seq_item_tell = offset
193
194    # Redo the record parsing to reflect changes
195    ds.parse_records()
196
197    return ds
198
199
200@pytest.fixture
201def dummy():
202    """Return a dummy dataset used for testing the record creators"""
203    ds = Dataset()
204    ds.file_meta = FileMetaDataset()
205    ds.file_meta.TransferSyntaxUID = ExplicitVRLittleEndian
206    ds.PatientID = "12345678"
207    ds.PatientName = "Citizen^Jan"
208    ds.StudyDate = "20201001"
209    ds.StudyTime = "120000"
210    ds.StudyID = "1"
211    ds.StudyInstanceUID = "1.2.3"
212    ds.SeriesInstanceUID = "1.2.3.4"
213    ds.SeriesNumber = "1"
214    ds.SOPInstanceUID = "1.2.3.4.5"
215    ds.InstanceNumber = "1"
216    ds.ContentDate = "20201002"
217    ds.ContentTime = "120100"
218    ds.ContentLabel = "Content label"
219    ds.ContentDescription = "Content description"
220    ds.ContentCreatorName = "Content^Creator^Name"
221    ds.TreatmentDate = "20201003"
222    ds.TreatmentTime = "120200"
223    ds.PresentationCreationDate = "20201004"
224    ds.PresentationCreationTime = "120300"
225    ds.InstanceCreationDate = "20200105"
226    ds.InstanceCreationTime = "120400"
227    ds.CompletionFlag = "COMPLETE"
228    ds.VerificationFlag = "VERIFIED"
229    ds.ConceptNameCodeSequence = [Dataset()]
230    ds.ImageType = "ADDITION"
231    ds.NumberOfFrames = 7
232    ds.Rows = 10
233    ds.Columns = 11
234    ds.DataPointRows = 12
235    ds.DataPointColumns = 13
236    ds.HangingProtocolCreator = "HP Creator"
237    ds.HangingProtocolCreationDateTime = "20201001120000"
238    ds.HangingProtocolDefinitionSequence = [Dataset()]
239    ds.NumberOfPriorsReferenced = 2
240    ds.HangingProtocolUserIdentificationCodeSequence = [Dataset()]
241    ds.DocumentTitle = "Document title"
242    ds.MIMETypeOfEncapsulatedDocument = "PDF"
243    ds.Manufacturer = "Implant manufacturer"
244    ds.ImplantName = "Implant name"
245    ds.ImplantPartNumber = "PN01"
246    ds.ImplantAssemblyTemplateName = "Template name"
247    ds.ProcedureTypeCodeSequence = [Dataset()]
248    ds.ImplantTemplateGroupName = "Group name"
249    ds.ImplantTemplateGroupIssuer = "Group issuer"
250    ds.RTPlanDate = "20201006"
251    ds.RTPlanTime = "120600"
252    ds.DoseSummationType = "PLAN"
253    ds.StructureSetLabel = "Structure set label"
254    ds.StructureSetDate = "20201007"
255    ds.StructureSetTime = "120700"
256
257    # To be customised
258    ds.Modality = "CT"  # PLAN, STSEGANN
259    ds.SOPClassUID = CTImageStorage
260
261    # To be added
262    # ds.EncapsulatedDocument = None
263    # ds.RTPlanLabel = None
264
265    # 1C elements
266    opt = Dataset()
267    opt.SpecificCharacterSet = "ISO_IR 100"
268    opt.BlendingSequence = [Dataset()]
269    opt.ReferencedSeriesSequence = [Dataset()]
270    opt.VerificationDateTime = "20201001120000"
271    opt.ContentSequence = [Dataset()]
272    opt.ReferencedImageEvidenceSequence = [Dataset()]
273    opt.HL7InstanceIdentifier = "HL7 identifier"
274    opt.ImplantSize = "13.4x12.5"
275    opt.UserContentLabel = "Content label"
276    opt.UserContentLongLabel = "Content long label"
277
278    return ds, opt
279
280
281def write_fs(fs, path=None):
282    """Call FileSet.write(path).
283
284    Returns
285    -------
286    pydicom.dataset.Dataset
287        The resulting DICOMDIR dataset
288    list of PathLike
289        A list of paths for the non-DICOMDIR files in the File-set.
290    """
291    fs.write(path)
292    path = Path(fs.path)
293    paths = [
294        p for p in path.glob('**/*')
295        if p.is_file() and p.name != 'DICOMDIR'
296    ]
297    return dcmread(path / "DICOMDIR"), sorted(paths)
298
299
300def copy_fs(fs, path, as_implicit=False):
301    """Call FileSet.copy(path).
302
303    Returns
304    -------
305    pydicom.fileset.FileSet
306        The new FileSet,
307    pydicom.dataset.Dataset
308        The new File-set's DICOMDIR dataset
309    list of PathLike
310        A list of paths for the non-DICOMDIR files in the new File-set.
311    """
312    path = Path(path)
313    fs = fs.copy(path, force_implicit=as_implicit)
314    paths = [
315        p for p in path.glob('**/*')
316        if p.is_file() and p.name != 'DICOMDIR'
317    ]
318    return fs, dcmread(path / "DICOMDIR"), sorted(paths)
319
320
321def temporary_fs(ds):
322    """Copy a File-set to a temporary directory."""
323    t = TemporaryDirectory()
324    src = Path(ds.filename).parent
325    dst = Path(t.name)
326
327    shutil.copyfile(src / 'DICOMDIR', dst / 'DICOMDIR')
328    for d in src.glob('*'):
329        if d.is_dir():
330            shutil.copytree(d, dst / d.name)
331
332    return t, dcmread(dst / "DICOMDIR")
333
334
335def test_is_conformant_file_id():
336    """Test conformant and non-conformant File ID paths"""
337    bad = [
338        "aBCDEF123", "aBCD1234", "ABCD!", "1234)", " ",
339        "1/2/3/4/5/6/7/8/9", "لنزار", "ABCD.DCM", "123 ABCD"
340    ]
341    for p in bad:
342        assert not is_conformant_file_id(Path(p))
343
344    good = [
345        "ACBDEFGH", "12345678", "1/2/3/4/5/6/7/8", "0", "9", "A", "Z",
346        "ABCD1234", "1234ABCD", "_", "_ABCD", "ABCD_", "AB_CD", "________",
347        "A_______", "_______1"
348    ]
349    for p in good:
350        assert is_conformant_file_id(Path(p))
351
352
353def test_prefixes():
354    """Test that the file ID prefixes are unique."""
355    prefixes = set(_PREFIXES.values())
356    assert len(_PREFIXES) == len(prefixes)
357
358
359class TestGenerateFilename:
360    """Tests for generate_filename()."""
361    def test_numeric(self):
362        """Test generating numeric suffixes."""
363        gen = generate_filename(start=0, alphanumeric=False)
364        assert '00000000' == next(gen)
365        assert '00000001' == next(gen)
366        assert '00000002' == next(gen)
367        assert '00000003' == next(gen)
368        assert '00000004' == next(gen)
369        assert '00000005' == next(gen)
370        assert '00000006' == next(gen)
371        assert '00000007' == next(gen)
372        assert '00000008' == next(gen)
373        assert '00000009' == next(gen)
374        assert '00000010' == next(gen)
375
376    def test_numeric_prefix(self):
377        """Test prefix for numeric filenames."""
378        for ii in range(1, 8):
379            prefix = "A" * ii
380            gen = generate_filename(
381                prefix="A" * ii, start=0, alphanumeric=False
382            )
383            assert prefix + '0' * (8 - ii) == next(gen)
384
385    def test_numeric_start(self):
386        """Test start point with numeric suffixes."""
387        gen = generate_filename(start=10, alphanumeric=False)
388        assert '00000010' == next(gen)
389        assert '00000011' == next(gen)
390        assert '00000012' == next(gen)
391
392    def test_alphanumeric(self):
393        """Test generating alphanumeric suffixes."""
394        gen = generate_filename(start=0, alphanumeric=True)
395        assert '00000000' == next(gen)
396        assert '00000001' == next(gen)
397        assert '00000002' == next(gen)
398        assert '00000003' == next(gen)
399        assert '00000004' == next(gen)
400        assert '00000005' == next(gen)
401        assert '00000006' == next(gen)
402        assert '00000007' == next(gen)
403        assert '00000008' == next(gen)
404        assert '00000009' == next(gen)
405        assert '0000000A' == next(gen)
406        for ii in range(24):
407            next(gen)
408        assert '0000000Z' == next(gen)
409        assert '00000010' == next(gen)
410
411    def test_alphanumeric_prefix(self):
412        """Test length of the suffixes."""
413        for ii in range(1, 8):
414            prefix = "A" * ii
415            gen = generate_filename(
416                prefix="A" * ii, start=0, alphanumeric=True
417            )
418            assert prefix + '0' * (8 - ii) == next(gen)
419            assert prefix + '0' * (7 - ii) + '1' == next(gen)
420            assert prefix + '0' * (7 - ii) + '2' == next(gen)
421            assert prefix + '0' * (7 - ii) + '3' == next(gen)
422            assert prefix + '0' * (7 - ii) + '4' == next(gen)
423            assert prefix + '0' * (7 - ii) + '5' == next(gen)
424            assert prefix + '0' * (7 - ii) + '6' == next(gen)
425            assert prefix + '0' * (7 - ii) + '7' == next(gen)
426            assert prefix + '0' * (7 - ii) + '8' == next(gen)
427            assert prefix + '0' * (7 - ii) + '9' == next(gen)
428            assert prefix + '0' * (7 - ii) + 'A' == next(gen)
429
430    def test_alphanumeric_start(self):
431        """Test start point with alphanumeric suffixes."""
432        gen = generate_filename(start=10, alphanumeric=True)
433        assert '0000000A' == next(gen)
434        assert '0000000B' == next(gen)
435        assert '0000000C' == next(gen)
436
437    def test_long_prefix_raises(self):
438        """Test too long a prefix."""
439        msg = r"The 'prefix' must be less than 8 characters long"
440        with pytest.raises(ValueError, match=msg):
441            next(generate_filename('A' * 8))
442
443
444@pytest.mark.filterwarnings("ignore:The 'DicomDir'")
445class TestRecordNode:
446    """Tests for RecordNode."""
447    def test_root(self, private):
448        """Tests the root node."""
449        fs = FileSet(private)
450        root = fs._tree
451        assert [] == root.ancestors
452        msg = r"The root node doesn't contribute a File ID component"
453        with pytest.raises(ValueError, match=msg):
454            root.component
455
456        assert root.file_set == fs
457        assert root.parent is None
458        assert 3 == len(root.children)
459        assert 55 == len(list(iter(root)))
460
461        # Test __contains__
462        for child in root.children:
463            assert child in root
464            assert child.key in root
465
466        assert -1 == root.depth
467        assert 0 == root.index
468        assert not root.has_instance
469        assert root.is_root
470        assert root.previous is None
471        assert root.next is None
472        assert root == root.root
473
474        assert "ROOT" == str(root)
475
476        msg = r"'RootNode' object has no attribute '_record'"
477        with pytest.raises(AttributeError, match=msg):
478            root.key
479
480        with pytest.raises(AttributeError, match=msg):
481            root.record_type
482
483        assert pytest.raises(StopIteration, next, root.reverse())
484
485        # Test __getitem__
486        for child in root.children:
487            assert child == root[child.key]
488            assert child == root[child]  # bit silly
489
490        child = child.children[0]
491        with pytest.raises(KeyError):
492            root[child.key]
493
494        with pytest.raises(KeyError):
495            root[child]
496
497        with pytest.raises(KeyError):
498            del root[child.key]
499
500        with pytest.raises(KeyError):
501            del root[child]
502
503        # Test __delitem__
504        del root[root.children[0]]
505        assert 2 == len(root.children)
506        assert 41 == len(list(iter(root)))
507
508        # Test __iter__
509        gen = iter(root)
510        assert "PatientID='98890234'" in str(next(gen))
511        assert "StudyDate=20010101" in str(next(gen))
512        assert "SeriesNumber=4" in str(next(gen))
513        assert "InstanceNumber=1" in str(next(gen))
514        assert "InstanceNumber=2" in str(next(gen))
515        assert "SeriesNumber=5" in str(next(gen))
516        assert "InstanceNumber=6" in str(next(gen))
517        assert "InstanceNumber=7" in str(next(gen))
518        for ii in range(29):
519            next(gen)
520
521        assert "InstanceNumber=7" in str(next(gen))
522        assert "PRIVATE" in str(next(gen))
523        assert "PRIVATE" in str(next(gen))
524        assert "PRIVATE" in str(next(gen))
525        assert pytest.raises(StopIteration, next, gen)
526
527    def test_leaf(self, private):
528        """Test a leaf node."""
529        fs = FileSet(private)
530
531        # non-PRIVATE
532        leaf = fs._instances[5].node
533        assert [] == leaf.children
534        assert leaf.has_instance
535        ancestors = leaf.ancestors
536        assert 3 == len(ancestors)
537        assert "IMAGE" == leaf.record_type
538        assert "IM000002" == leaf.component
539        assert "SERIES" in str(ancestors[0])
540        assert "SE000000" == ancestors[0].component
541        assert "STUDY" in str(ancestors[1])
542        assert "ST000001" == ancestors[1].component
543        assert "PATIENT" in str(ancestors[2])
544        assert "PT000000" == ancestors[2].component
545        assert 3 == leaf.depth
546        assert fs == leaf.file_set
547        assert 2 == leaf.index
548        assert not leaf.is_root
549        gen = iter(leaf)
550        assert leaf == next(gen)
551        assert pytest.raises(StopIteration, next, gen)
552        assert leaf.parent.children[3] == leaf.next
553        assert leaf.parent.children[1] == leaf.previous
554
555        # PRIVATE
556        leaf = fs._instances[-1].node
557        assert [] == leaf.children
558        assert leaf.has_instance
559        ancestors = leaf.ancestors
560        assert 2 == len(ancestors)
561        assert "PRIVATE" == leaf.record_type
562        assert "P2000000" == leaf.component
563        assert "PRIVATE" in str(ancestors[0])
564        assert "P1000000" == ancestors[0].component
565        assert "PRIVATE" in str(ancestors[1])
566        assert "P0000002" == ancestors[1].component
567        assert 2 == leaf.depth
568        assert fs == leaf.file_set
569        assert 0 == leaf.index
570        assert not leaf.is_root
571        gen = iter(leaf)
572        assert leaf == next(gen)
573        assert pytest.raises(StopIteration, next, gen)
574        assert leaf.next is None
575        assert leaf.previous is None
576
577    def test_add(self, private, ct):
578        """Test instance added at end of children"""
579        fs = FileSet(private)
580        instance = fs._instances[0]
581        parent = instance.node.parent
582        assert 1 == len(parent.children)
583        assert 0 == instance.node.index
584        assert instance.node.next is None
585        assert instance.node.previous is None
586
587        ct.PatientID = instance.PatientID
588        ct.StudyInstanceUID = instance.StudyInstanceUID
589        ct.SeriesInstanceUID = instance.SeriesInstanceUID
590        added = fs.add(ct)
591        assert 2 == len(parent.children)
592        assert 0 == instance.node.index
593        assert added.node == instance.node.next
594        assert instance.node.previous is None
595        assert 1 == added.node.index
596        assert instance.node == added.node.previous
597        assert added.node.next is None
598
599    def test_key(self, private):
600        """Test the record keys."""
601        fs = FileSet(private)
602        root = fs._tree
603        node = root.children[0]
604        assert node._record.PatientID == node.key
605        node = node.children[0]
606        assert node._record.StudyInstanceUID == node.key
607        node = node.children[0]
608        assert node._record.SeriesInstanceUID == node.key
609        node = node.children[0]
610        assert node._record.ReferencedSOPInstanceUIDInFile == node.key
611
612        node = root.children[-1]
613        assert node._record.PrivateRecordUID == node.key
614        node = node.children[-1]
615        assert node._record.PrivateRecordUID == node.key
616        node = node.children[-1]
617        assert node._record.PrivateRecordUID == node.key
618
619        # Test STUDY directly referencing an instance
620        ds = private
621        seq = ds.DirectoryRecordSequence
622        uid = seq[3].ReferencedSOPInstanceUIDInFile
623        seq[1].ReferencedSOPInstanceUIDInFile = uid
624        seq[1].ReferencedTransferSyntaxUIDInFile = ExplicitVRLittleEndian
625        seq[1].ReferencedSOPClassUID = ComputedRadiographyImageStorage
626        seq[1].OffsetOfReferencedLowerLevelDirectoryEntity = 0
627        seq[1].ReferencedFileID = seq[3].ReferencedFileID
628        del seq[1].StudyInstanceUID
629        fs = FileSet(ds)
630        # The leaf STUDY node
631        node = fs._tree.children[0].children[0]
632        assert [] == node.children
633        assert uid == node.key
634        assert node.has_instance
635
636    def test_key_raises(self, dummy):
637        """Test missing required element raises."""
638        ds, opt = dummy
639        ds.SOPClassUID = ColorPaletteStorage
640        fs = FileSet()
641        instance = fs.add(ds)
642        del instance.node._record.ReferencedSOPInstanceUIDInFile
643
644        msg = (
645            r"Invalid 'PALETTE' record - missing required element "
646            r"'Referenced SOP Instance UID in File'"
647        )
648        with pytest.raises(AttributeError, match=msg):
649            instance.node.key
650
651    def test_bad_record(self, private):
652        """Test a bad directory record raises an exception when loading."""
653        del private.DirectoryRecordSequence[0].PatientID
654        msg = (
655            r"The PATIENT directory record at offset 396 is missing a "
656            r"required element"
657        )
658        with pytest.raises(ValueError, match=msg):
659            FileSet(private)
660
661        private.DirectoryRecordSequence[0].PatientID = 77654033
662        del private.DirectoryRecordSequence[1].StudyInstanceUID
663        msg = (
664            r"The STUDY directory record at offset 510 is missing a required "
665            r"element"
666        )
667        with pytest.raises(ValueError, match=msg):
668            FileSet(private)
669
670    def test_bad_record_missing_req(self, private):
671        """Test bad directory record raises if missing required element."""
672        del private.DirectoryRecordSequence[0].DirectoryRecordType
673        msg = (
674            r"The directory record at offset 396 is missing one or more "
675            r"required elements: DirectoryRecordType"
676        )
677        with pytest.raises(ValueError, match=msg):
678            FileSet(private)
679
680    def test_encoding(self, private, tdir):
681        """Test group element not added when encoding."""
682        fs = FileSet(private)
683        node = fs._instances[0].node
684        fs._instances[0].node._record.add_new(0x00080000, "UL", 128)
685        fs._instances[0].node._record.PatientSex = 'F'
686        fs, ds, paths = copy_fs(fs, tdir.name)
687        item = ds.DirectoryRecordSequence[3]
688        assert 0x00080000 not in item
689        assert "PatientSex" in item
690
691    def test_remove_raises(self, private):
692        """Test RecordNode.remove() raises if not a leaf."""
693        fs = FileSet(private)
694        node = fs._tree.children[0]
695        assert not node.has_instance
696        msg = r"Only leaf nodes can be removed"
697        with pytest.raises(ValueError, match=msg):
698            fs._tree.remove(node)
699
700    def test_file_id_singleton(self, ct, tdir):
701        """Test a singleton File ID."""
702        fs = FileSet()
703        p = Path(tdir.name)
704        ct.save_as(p / "01")
705        fs.add(p / "01")
706        fs.write(p)
707        ds = dcmread(p / "DICOMDIR")
708        item = ds.DirectoryRecordSequence[-1]
709        assert "IMAGE" == item.DirectoryRecordType
710        item.ReferencedFileID = "01"
711        ds.save_as(p / "DICOMDIR")
712        fs = FileSet(ds)
713        assert fs._instances[0].node._file_id == Path("01")
714
715    def test_file_id_missing(self, ct):
716        """Test RecordNode._file_id if no Referenced File ID."""
717        fs = FileSet()
718        instance = fs.add(ct)
719        del instance.node._record.ReferencedFileID
720        msg = r"No 'Referenced File ID' in the directory record"
721        with pytest.raises(AttributeError, match=msg):
722            instance.node._file_id
723
724
725@pytest.mark.filterwarnings("ignore:The 'DicomDir'")
726class TestFileInstance:
727    """Tests for FileInstance."""
728    def test_getattr(self, dicomdir):
729        """Test FileInstance.__getattribute__."""
730        fs = FileSet(dicomdir)
731        instance = fs._instances[0]
732        assert "20010101" == instance.StudyDate
733        instance.my_attr = 1234
734        assert 1234 == instance.my_attr
735        msg = r"'FileInstance' object has no attribute 'missing_attr'"
736        with pytest.raises(AttributeError, match=msg):
737            instance.missing_attr
738
739    def test_getattr_order(self, private):
740        """Test records are searched closest to furthest"""
741        fs = FileSet(private)
742        instance = fs._instances[-1]
743        assert instance.is_private
744        # a: root, b: middle, c: bottom,
745        c = instance.node
746        b = c.parent
747        a = b.parent
748        assert c._record.PrivateRecordUID != b._record.PrivateRecordUID
749        assert c._record.PrivateRecordUID != a._record.PrivateRecordUID
750        assert instance.PrivateRecordUID == c._record.PrivateRecordUID
751
752    def test_getitem(self, dicomdir):
753        """Test FileInstance.__getitem__."""
754        fs = FileSet(dicomdir)
755        instance = fs._instances[0]
756        assert "20010101" == instance["StudyDate"].value
757        assert "20010101" == instance[0x00080020].value
758        assert "20010101" == instance[Tag(0x00080020)].value
759        assert "20010101" == instance[(0x0008, 0x0020)].value
760        assert "20010101" == instance["0x00080020"].value
761
762        with pytest.raises(KeyError, match=r"(0000, 0000)"):
763            instance[0x00000000]
764
765    def test_getitem_special(self, tiny):
766        """Test FileInstance.__getitem__ for the three special elements."""
767        fs = FileSet(tiny)
768        instance = fs._instances[0]
769        elem = instance["SOPInstanceUID"]
770        assert (
771            "1.2.826.0.1.3680043.8.498.66612287766462461480665815941164330386"
772        ) == elem.value
773        elem = instance["SOPClassUID"]
774        assert CTImageStorage == elem.value
775        elem = instance["TransferSyntaxUID"]
776        assert ExplicitVRLittleEndian == elem.value
777
778    def test_getitem_order(self, private):
779        """Test records are searched closest to furthest"""
780        fs = FileSet(private)
781        instance = fs._instances[-1]
782        assert instance.is_private
783        # a: root, b: middle, c: bottom,
784        c = instance.node
785        b = c.parent
786        a = b.parent
787        assert c._record["PrivateRecordUID"] != b._record["PrivateRecordUID"]
788        assert c._record["PrivateRecordUID"] != a._record["PrivateRecordUID"]
789        assert instance["PrivateRecordUID"] == c._record["PrivateRecordUID"]
790
791    def test_contains(self, dicomdir):
792        """Test FileInstance.__contains__."""
793        fs = FileSet(dicomdir)
794        instance = fs._instances[0]
795        assert "StudyDate" in instance
796        assert 0x00080020 in instance
797        assert Tag(0x00080020) in instance
798        assert (0x0008, 0x0020) in instance
799        assert "0x00080020" in instance
800        assert 'bad' not in instance
801
802    def test_is_private(self, private):
803        """Test FileInstance.is_private"""
804        fs = FileSet(private)
805        instance = fs._instances[-1]
806        assert instance.is_private
807        instance = fs._instances[0]
808        assert not instance.is_private
809
810    def test_properties(self, dicomdir):
811        """Test the FileInstance properties."""
812        fs = FileSet(dicomdir)
813        instance = fs._instances[0]
814        assert fs == instance.file_set
815        assert os.fspath(Path("77654033/CR1/6154")) in instance.path
816        assert isinstance(instance.path, str)
817        sop_instance = "1.3.6.1.4.1.5962.1.1.0.0.0.1196527414.5534.0.11"
818
819        nodes = [node for node in instance.node.ancestors]
820        assert 3 == len(nodes)
821        assert nodes[0].record_type == "SERIES"
822        assert nodes[1].record_type == "STUDY"
823        assert nodes[2].record_type == "PATIENT"
824        record = instance.node._record
825        assert sop_instance == record.ReferencedSOPInstanceUIDInFile
826        assert sop_instance == instance.SOPInstanceUID
827        assert ExplicitVRLittleEndian == instance.TransferSyntaxUID
828        assert "1.2.840.10008.5.1.4.1.1.1" == instance.SOPClassUID
829
830    def test_path(self, ct, tdir):
831        """Test FileInstance.path when not staged."""
832        fs = FileSet()
833        fs.add(ct)
834        ds, paths = write_fs(fs, tdir.name)
835
836        assert 1 == len(fs)
837        instance = fs._instances[0]
838        assert not instance.is_staged
839        assert (Path(fs.path) / Path(instance.FileID)) == Path(instance.path)
840
841    def test_path_add(self, ct, tdir):
842        """Test FileInstance.path when staged for addition."""
843        fs = FileSet()
844        fs.add(ct)
845        assert 1 == len(fs)
846        instance = fs._instances[0]
847        assert instance.is_staged
848        assert instance.for_addition
849        assert (
850            Path(fs._stage['path']) / Path(instance.SOPInstanceUID)
851        ) == Path(instance.path)
852        assert isinstance(instance.path, str)
853
854    def test_path_move(self, dicomdir):
855        """Test FileInstance.path for an instance to be move."""
856        fs = FileSet(dicomdir)
857        assert fs._stage['~']
858        instance = fs._instances[0]
859        assert instance.is_staged
860        assert instance.for_moving
861        assert (
862            Path(fs.path) / Path(*instance.ReferencedFileID)
863        ) == Path(instance.path)
864        assert isinstance(instance.path, str)
865
866    def test_path_removal(self, dicomdir, tdir):
867        """Test FileInstance.FileID when staged for removal."""
868        fs = FileSet(dicomdir)
869        instance = fs._instances[0]
870        fs.remove(instance)
871        assert instance.is_staged
872        assert instance.for_removal
873        assert (
874            Path(fs.path) / Path(*instance.ReferencedFileID)
875        ) == Path(instance.path)
876        assert isinstance(instance.path, str)
877
878    def test_load(self, ct, tdir):
879        """Test FileInstance.load() when not staged."""
880        fs = FileSet()
881        fs.add(ct)
882        ds, paths = write_fs(fs, tdir.name)
883
884        assert 1 == len(fs)
885        instance = fs._instances[0]
886        assert not instance.is_staged
887        ds = instance.load()
888        assert isinstance(ds, Dataset)
889        assert ct.SOPInstanceUID == ds.SOPInstanceUID
890
891    def test_load_staged_add(self, ct, tdir):
892        """Test FileInstance.load() when staged for addition."""
893        fs = FileSet()
894        fs.add(ct)
895        assert 1 == len(fs)
896        instance = fs._instances[0]
897        assert instance.is_staged
898        assert instance.for_addition
899        ds = instance.load()
900        assert isinstance(ds, Dataset)
901        assert ct.SOPInstanceUID == ds.SOPInstanceUID
902
903    def test_load_staged_move(self, dicomdir):
904        """Test FileInstance.load() for an instance to be moved."""
905        fs = FileSet(dicomdir)
906        instance = fs._instances[0]
907        assert instance.is_staged
908        assert instance.for_moving
909        assert fs.is_staged
910        # At least one instance needs to be moved
911        assert fs._stage['~']
912        ds = instance.load()
913        assert isinstance(ds, Dataset)
914        sop_instance = "1.3.6.1.4.1.5962.1.1.0.0.0.1196527414.5534.0.11"
915        assert sop_instance == ds.SOPInstanceUID
916
917    def test_load_staged_removal(self, dicomdir, tdir):
918        """Test FileInstance.load() when staged for removal."""
919        fs = FileSet(dicomdir)
920        instance = fs._instances[0]
921        fs.remove(instance)
922        assert instance.is_staged
923        assert instance.for_removal
924        ds = instance.load()
925        assert isinstance(ds, Dataset)
926        sop_instance = "1.3.6.1.4.1.5962.1.1.0.0.0.1196527414.5534.0.11"
927        assert sop_instance == ds.SOPInstanceUID
928
929    def test_for_moving(self, dummy, ct, tdir):
930        """Test FileInstance.for_moving."""
931        ds, opt = dummy
932        ds.SOPClassUID = ColorPaletteStorage
933        fs = FileSet()
934        # Single level File ID
935        instance = fs.add(ds)
936        assert instance.for_addition
937        assert not instance.for_removal
938        assert not instance.for_moving
939
940        # Four level File ID
941        instance = fs.add(ct)
942        assert instance.for_addition
943        assert not instance.for_removal
944        assert not instance.for_moving
945
946        ds, paths = write_fs(fs, tdir.name)
947        for instance in fs:
948            assert not instance.for_addition
949            assert not instance.for_removal
950            assert not instance.for_moving
951
952    def test_fileid(self, ct, tdir):
953        """Test FileInstance.FileID when not staged."""
954        fs = FileSet()
955        fs.add(ct)
956        ds, paths = write_fs(fs, tdir.name)
957
958        assert 1 == len(fs)
959        instance = fs._instances[0]
960        assert not instance.is_staged
961        fileid = Path("PT000000/ST000000/SE000000/IM000000")
962        assert os.fspath(fileid) == instance.FileID
963
964    def test_fileid_add(self, ct, tdir):
965        """Test FileInstance.FileID when staged for addition."""
966        fs = FileSet()
967        fs.add(ct)
968        assert 1 == len(fs)
969        instance = fs._instances[0]
970        assert instance.is_staged
971        assert instance.for_addition
972        fileid = Path("PT000000/ST000000/SE000000/IM000000")
973        assert os.fspath(fileid) == instance.FileID
974
975    def test_fileid_move(self, dicomdir):
976        """Test FileInstance.FileID for an instance to be moved."""
977        fs = FileSet(dicomdir)
978        assert fs.is_staged
979        # At least one instance needs to be moved
980        assert fs._stage['~']
981        instance = fs._instances[0]
982        assert instance.is_staged
983        assert instance.for_moving
984        fileid = Path("PT000000/ST000000/SE000000/IM000000")
985        assert os.fspath(fileid) == instance.FileID
986
987    def test_fileid_removal(self, dicomdir, tdir):
988        """Test FileInstance.FileID when staged for removal."""
989        fs = FileSet(dicomdir)
990        instance = fs._instances[0]
991        fs.remove(instance)
992        assert instance.is_staged
993        assert instance.for_removal
994        fileid = Path("PT000000/ST000000/SE000000/IM000000")
995        assert os.fspath(fileid) == instance.FileID
996
997    def test_private(self, private):
998        """Test FileInstance with PRIVATE records."""
999        fs = FileSet(private)
1000
1001        instances = fs._instances
1002        assert 32 == len(instances)
1003
1004        instance = instances[-1]
1005        assert 2 == len(instance.node.ancestors)
1006        for node in instance.node.reverse():
1007            assert node.record_type == "PRIVATE"
1008
1009        path = os.fspath(
1010            Path("tiny_alpha/PT000000/ST000000/SE000000/IM000000")
1011        )
1012        assert path in instances[-1].path
1013
1014        assert "1.2.3.4" == instance.SOPClassUID
1015        assert "1.2.276.0.7230010.3.1.4.0.31906.1359940846.78187" == (
1016            instance.SOPInstanceUID
1017        )
1018        assert ExplicitVRLittleEndian == instance.TransferSyntaxUID
1019
1020
1021@pytest.mark.filterwarnings("ignore:The 'DicomDir'")
1022class TestFileSet:
1023    """Tests for FileSet."""
1024    def test_empty(self):
1025        """Test an new and empty File-set."""
1026        fs = FileSet()
1027        assert 0 == len(fs)
1028        assert fs.ID is None
1029        assert fs.UID.is_valid
1030        assert fs.path is None
1031        assert fs.is_staged  # New datasets are staged
1032
1033        with pytest.raises(StopIteration):
1034            next(iter(fs))
1035
1036        s = str(fs)
1037        assert "DICOM File-set" in s
1038        assert "Root directory: (no value available)" in s
1039        assert "File-set ID: (no value available)" in s
1040        assert f"File-set UID: {fs.UID}" in s
1041        assert "Managed instances" not in s
1042
1043    def test_id(self, tdir):
1044        """Test the FileSet.ID property."""
1045        fs = FileSet()
1046        assert fs.is_staged
1047        assert fs.ID is None
1048        fs.ID = "MYID"
1049        assert fs.is_staged
1050        assert "MYID" == fs.ID
1051
1052        s = str(fs)
1053        assert "DICOM File-set" in s
1054        assert "Root directory: (no value available)" in s
1055        assert "File-set ID: MYID" in s
1056        assert f"File-set UID: {fs.UID}" in s
1057        assert "Managed instances" not in s
1058
1059        ds, paths = write_fs(fs, tdir.name)
1060        assert not fs.is_staged
1061        assert "Changes staged for write():" not in str(fs)
1062
1063        fs.ID = "MYID"
1064        assert not fs.is_staged
1065
1066        fsids = [None, "A", "1" * 16]
1067        for fsid in fsids:
1068            fs.ID = fsid
1069            assert fs.is_staged
1070            assert fsid == fs.ID
1071            ds, paths = write_fs(fs)
1072            assert [] == paths
1073            if fsid is None:
1074                fsid = ""
1075            assert fsid == ds.FileSetID
1076
1077        msg = r"The maximum length of the 'File-set ID' is 16 characters"
1078        with pytest.raises(ValueError, match=msg):
1079            fs.ID = "1" * 17
1080
1081        assert "1" * 16 == fs.ID
1082
1083    def test_uid(self, tdir):
1084        """Test the FileSet.UID property."""
1085        fs = FileSet()
1086        assert fs.is_staged
1087        uid = fs.UID
1088        ds, paths = write_fs(fs, tdir.name)
1089        assert [] == paths
1090        assert fs.UID == ds.file_meta.MediaStorageSOPInstanceUID
1091
1092        s = str(fs)
1093        assert "DICOM File-set" in s
1094        assert f"Root directory: (no value available)" not in s
1095        assert "File-set ID: (no value available)" in s
1096        assert f"File-set UID: {fs.UID}" in s
1097        assert "Managed instances" not in s
1098        assert "Changes staged for write():" not in s
1099        assert not fs.is_staged
1100
1101        fs.UID = uid
1102        assert not fs.is_staged
1103
1104        fs.UID = generate_uid()
1105        assert uid != fs.UID
1106        assert fs.is_staged
1107
1108    def test_descriptor(self):
1109        """Test FileSet.descriptor_file_id."""
1110        fs = FileSet()
1111        assert fs.descriptor_file_id is None
1112        assert fs.is_staged
1113        fs._stage['^'] = False  # Override
1114        assert not fs.is_staged
1115        fs.descriptor_file_id = None
1116        assert not fs.is_staged
1117        assert fs.descriptor_file_id is None
1118        fs.descriptor_file_id = "README"
1119        assert fs.is_staged
1120        assert "README" == fs.descriptor_file_id
1121        fs.descriptor_file_id = "README"
1122        assert "README" == fs.descriptor_file_id
1123        fs.descriptor_file_id = "A" * 16
1124        assert "A" * 16 == fs.descriptor_file_id
1125        fs.descriptor_file_id = None
1126        assert fs.descriptor_file_id is None
1127        fs.descriptor_file_id = ['A'] * 8
1128        assert ['A'] * 8 == fs.descriptor_file_id
1129        fs.descriptor_file_id = ["A", "", "B", "C"]
1130        assert ['A', 'B', 'C'] == fs.descriptor_file_id
1131
1132        # Test exceptions
1133        msg = r"The 'DescriptorFileID' must be a str, list of str, or None"
1134        with pytest.raises(TypeError, match=msg):
1135            fs.descriptor_file_id = 12
1136        msg = (
1137            r"The 'File-set Descriptor File ID' has a maximum of 8 "
1138            r"components, each between 0 and 16 characters long"
1139        )
1140        with pytest.raises(ValueError, match=msg):
1141            fs.descriptor_file_id = ['A'] * 9
1142        with pytest.raises(ValueError, match=msg):
1143            fs.descriptor_file_id = ['A' * 17]
1144        with pytest.raises(ValueError, match=msg):
1145            fs.descriptor_file_id = ['A', 1]
1146        msg = (
1147            r"Each 'File-set Descriptor File ID' component has a "
1148            r"maximum length of 16 characters"
1149        )
1150        with pytest.raises(ValueError, match=msg):
1151            fs.descriptor_file_id = "A" * 17
1152
1153        assert ['A', 'B', 'C'] == fs.descriptor_file_id
1154
1155    def test_descriptor_and_charset_written(self, tdir):
1156        """Test that the File-set Descriptor File ID gets written."""
1157        fs = FileSet()
1158        fs.descriptor_file_id = "README"
1159        fs.descriptor_character_set = "ISO_IR 100"
1160        ds, paths = write_fs(fs, tdir.name)
1161        assert "README" == ds.FileSetDescriptorFileID
1162        assert "ISO_IR 100" == ds.SpecificCharacterSetOfFileSetDescriptorFile
1163
1164    def test_descriptor_dicomdir(self, dicomdir):
1165        """Test FileSet.descriptor_file_id with a DICOMDIR file."""
1166        fs = FileSet(dicomdir)
1167        ds = fs._ds
1168        assert fs.descriptor_file_id is None
1169        assert "FileSetDescriptorFileID" not in ds
1170        assert fs.is_staged
1171        fs._stage['^'] = False  # Override
1172        fs._stage['~'] = {}
1173        assert not fs.is_staged
1174        fs.descriptor_file_id = None
1175        assert "FileSetDescriptorFileID" not in ds
1176        assert not fs.is_staged
1177        assert fs.descriptor_file_id is None
1178        fs.descriptor_file_id = "README"
1179        assert "README" == ds.FileSetDescriptorFileID
1180        assert fs.is_staged
1181        assert "README" == fs.descriptor_file_id
1182        fs.descriptor_file_id = "README"
1183        assert "README" == fs.descriptor_file_id
1184        fs.descriptor_file_id = "A" * 16
1185        assert "A" * 16 == fs.descriptor_file_id
1186        assert "A" * 16 == ds.FileSetDescriptorFileID
1187        fs.descriptor_file_id = None
1188        assert fs.descriptor_file_id is None
1189        assert ds.FileSetDescriptorFileID is None
1190        fs.descriptor_file_id = ['A'] * 8
1191        assert ['A'] * 8 == fs.descriptor_file_id
1192        assert ['A'] * 8 == ds.FileSetDescriptorFileID
1193
1194    def test_descriptor_charset(self):
1195        """Test FileSet.descriptor_character_set."""
1196        fs = FileSet()
1197        assert fs.descriptor_character_set is None
1198        assert fs.is_staged
1199        fs._stage['^'] = False  # Override
1200        assert not fs.is_staged
1201        fs.descriptor_character_set = None
1202        assert not fs.is_staged
1203        assert fs.descriptor_character_set is None
1204        fs.descriptor_character_set = "README"
1205        assert fs.is_staged
1206        assert "README" == fs.descriptor_character_set
1207
1208    def test_descriptor_charset_dicomdir(self, dicomdir):
1209        """Test FileSet.descriptor_character_set."""
1210        fs = FileSet(dicomdir)
1211        ds = fs._ds
1212        assert fs.descriptor_character_set is None
1213        assert "SpecificCharacterSetOfFileSetDescriptorFile" not in ds
1214        assert fs.is_staged
1215        fs._stage['^'] = False  # Override
1216        fs._stage['~'] = {}
1217        assert not fs.is_staged
1218        fs.descriptor_character_set = None
1219        assert "SpecificCharacterSetOfFileSetDescriptorFile" not in ds
1220        assert not fs.is_staged
1221        assert fs.descriptor_character_set is None
1222        fs.descriptor_character_set = "README"
1223        assert "README" == ds.SpecificCharacterSetOfFileSetDescriptorFile
1224        assert fs.is_staged
1225
1226    def test_path(self, tdir):
1227        """Test setting the File-set's path."""
1228        fs = FileSet()
1229        assert fs.path is None
1230        with pytest.raises(AttributeError, match=r"can't set attribute"):
1231            fs.path = tdir.name
1232
1233        # Test with str
1234        path = tdir.name
1235        assert isinstance(path, str)
1236        ds, paths = write_fs(fs, path)
1237        assert Path(path).parts[-2:] == Path(fs.path).parts[-2:]
1238        assert [] == paths
1239        root = os.fspath(Path(*Path(tdir.name).parts[-2:]))
1240        assert root in ds.filename
1241        assert root in str(fs)
1242        assert not fs.is_staged
1243
1244        # Test with PathLike
1245        fs = FileSet()
1246        path = Path(tdir.name)
1247        ds, paths = write_fs(fs, path)
1248        assert [] == paths
1249        root = os.fspath(Path(*Path(tdir.name).parts[-2:]))
1250        assert root in ds.filename
1251        assert root in str(fs)
1252        assert not fs.is_staged
1253
1254    def test_empty_write(self, tdir):
1255        """Test writing an empty File-set."""
1256        fs = FileSet()
1257        uid = fs.UID
1258        msg = (
1259            r"The path to the root directory is required for a new File-set"
1260        )
1261        with pytest.raises(ValueError, match=msg):
1262            fs.write()
1263
1264        path = Path(tdir.name)
1265        fs.write(path)
1266
1267        # Should be the DICOMDIR file
1268        contents = list(path.glob('**/*'))
1269        assert "DICOMDIR" == contents[0].name
1270        assert 1 == len(contents)
1271
1272        ds = dcmread(contents[0])
1273        meta = ds.file_meta
1274        assert MediaStorageDirectoryStorage == meta.MediaStorageSOPClassUID
1275        assert uid == meta.MediaStorageSOPInstanceUID
1276        assert meta.TransferSyntaxUID == ExplicitVRLittleEndian
1277        assert "" == ds.FileSetID
1278        assert 0 == ds.OffsetOfTheFirstDirectoryRecordOfTheRootDirectoryEntity
1279        assert 0 == ds.OffsetOfTheLastDirectoryRecordOfTheRootDirectoryEntity
1280        assert 0 == ds.FileSetConsistencyFlag
1281        assert [] == ds.DirectoryRecordSequence
1282
1283    def test_add_dataset(self, ct, tdir):
1284        """Test FileSet.add() with a Dataset."""
1285        fs = FileSet()
1286        assert fs.is_staged
1287        fs.write(tdir.name)  # write empty to unstage
1288        assert not fs.is_staged
1289        fs.add(ct)
1290        assert fs.is_staged
1291
1292        s = str(fs)
1293        assert "Managed instances" in s
1294        assert (
1295            "PATIENT: PatientID='1CT1', "
1296            "PatientName='CompressedSamples^CT1'"
1297        ) in s
1298        assert (
1299            "STUDY: StudyDate=20040119, StudyTime=072730, "
1300            "StudyDescription='e+1'" in s
1301        )
1302        assert "SERIES: Modality=CT, SeriesNumber=1" in s
1303        assert 1 == len(fs)
1304        instances = [ii for ii in fs]
1305        file_id = Path("PT000000", "ST000000", "SE000000", "IM000000")
1306        assert os.fspath(file_id) == instances[0].FileID
1307
1308        ds, paths = write_fs(fs)
1309        assert 1 == len(fs)
1310
1311        # Test the DICOMDIR
1312        assert 398 == (
1313            ds.OffsetOfTheFirstDirectoryRecordOfTheRootDirectoryEntity
1314        )
1315        assert 398 == (
1316            ds.OffsetOfTheLastDirectoryRecordOfTheRootDirectoryEntity
1317        )
1318
1319        seq = ds.DirectoryRecordSequence
1320        assert 4 == len(seq)
1321
1322        item = seq[0]
1323        assert item.seq_item_tell == 398
1324        assert "PATIENT" == item.DirectoryRecordType
1325        assert ct.PatientName == item.PatientName
1326        assert ct.PatientID == item.PatientID
1327        assert 0xFFFF == item.RecordInUseFlag
1328        assert 0 == item.OffsetOfTheNextDirectoryRecord
1329        assert "ISO_IR 100" == item.SpecificCharacterSet
1330        assert 516 == item.OffsetOfReferencedLowerLevelDirectoryEntity
1331
1332        item = seq[1]
1333        assert item.seq_item_tell == 516
1334        assert "STUDY" == item.DirectoryRecordType
1335        assert ct.StudyDate == item.StudyDate
1336        assert ct.StudyTime == item.StudyTime
1337        assert ct.AccessionNumber == item.AccessionNumber
1338        assert ct.StudyDescription == item.StudyDescription
1339        assert ct.StudyInstanceUID == item.StudyInstanceUID
1340        assert 0xFFFF == item.RecordInUseFlag
1341        assert 0 == item.OffsetOfTheNextDirectoryRecord
1342        assert "ISO_IR 100" == item.SpecificCharacterSet
1343        assert 704 == item.OffsetOfReferencedLowerLevelDirectoryEntity
1344
1345        item = seq[2]
1346        assert item.seq_item_tell == 704
1347        assert "SERIES" == item.DirectoryRecordType
1348        assert ct.Modality == item.Modality
1349        assert ct.SeriesInstanceUID == item.SeriesInstanceUID
1350        assert ct.SeriesNumber == item.SeriesNumber
1351        assert 0xFFFF == item.RecordInUseFlag
1352        assert 0 == item.OffsetOfTheNextDirectoryRecord
1353        assert "ISO_IR 100" == item.SpecificCharacterSet
1354        assert 852 == item.OffsetOfReferencedLowerLevelDirectoryEntity
1355
1356        item = seq[3]
1357        assert item.seq_item_tell == 852
1358        assert "IMAGE" == item.DirectoryRecordType
1359        assert ['PT000000', 'ST000000', 'SE000000', 'IM000000'] == (
1360            item.ReferencedFileID
1361        )
1362        assert ct.SOPInstanceUID == item.ReferencedSOPInstanceUIDInFile
1363        assert ct.InstanceNumber == item.InstanceNumber
1364        assert ct.SOPClassUID == item.ReferencedSOPClassUIDInFile
1365        assert ct.file_meta.TransferSyntaxUID == (
1366            item.ReferencedTransferSyntaxUIDInFile
1367        )
1368        assert 0xFFFF == item.RecordInUseFlag
1369        assert 0 == item.OffsetOfTheNextDirectoryRecord
1370        assert "ISO_IR 100" == item.SpecificCharacterSet
1371        assert 0 == item.OffsetOfReferencedLowerLevelDirectoryEntity
1372
1373        assert not fs.is_staged
1374        s = str(fs)
1375        root = os.fspath(Path(*Path(tdir.name).parts[-2:]))
1376        assert root in s
1377        assert 1 == len(paths)
1378        assert ct == dcmread(paths[0])
1379
1380        # Calling write() again shouldn't change anything
1381        ds2, paths = write_fs(fs)
1382        assert ds == ds2
1383        assert ds2.filename == ds.filename
1384        assert 1 == len(paths)
1385        assert ct == dcmread(paths[0])
1386
1387    def test_add_bad_dataset(self, ct):
1388        """Test adding a dataset missing Type 1 element value."""
1389        ct.PatientID = None
1390        fs = FileSet()
1391        msg = (
1392            r"Unable to use the default 'PATIENT' record creator "
1393            r"as the instance is missing a required element or value. Either "
1394            r"update the instance, define your own record creation function "
1395            r"or use 'FileSet.add_custom\(\)' instead"
1396        )
1397        with pytest.raises(ValueError, match=msg):
1398            fs.add(ct)
1399
1400    def test_add_path(self, tdir):
1401        """Test FileSet.add() with a Dataset."""
1402        fs = FileSet()
1403        fs.write(tdir.name)
1404        assert not fs.is_staged
1405        fs.add(get_testdata_file("CT_small.dcm"))
1406        assert fs.is_staged
1407
1408    def test_add_add(self, ct, tdir):
1409        """Test calling FileSet.add() on the same Dataset."""
1410        fs = FileSet()
1411        fs.add(ct)
1412        fs.add(ct)
1413        assert fs.is_staged
1414        assert 1 == len(fs)
1415
1416        ds, paths = write_fs(fs, tdir.name)
1417        assert 4 == len(ds.DirectoryRecordSequence)
1418        assert 1 == len(paths)
1419
1420    def test_remove(self, ct, tdir):
1421        """Test removing an instance."""
1422        fs = FileSet()
1423        fs.add(ct)
1424        fs.write(tdir.name)
1425        assert "Managed instances" in str(fs)
1426
1427        instance = next(iter(fs))
1428        assert isinstance(instance, FileInstance)
1429        fs.remove(instance)
1430        assert 0 == len(fs)
1431        with pytest.raises(StopIteration):
1432            next(iter(fs))
1433
1434        ds, paths = write_fs(fs)
1435        assert [] == ds.DirectoryRecordSequence
1436        assert 0 == ds.OffsetOfTheFirstDirectoryRecordOfTheRootDirectoryEntity
1437        assert 0 == ds.OffsetOfTheLastDirectoryRecordOfTheRootDirectoryEntity
1438        assert [] == paths
1439
1440    def test_remove_iter(self, tiny):
1441        """Test FileSet.remove() with iter(FileSet)."""
1442        fs = FileSet(tiny)
1443        for instance in fs:
1444            fs.remove(instance)
1445
1446        assert 0 == len(fs)
1447
1448    def test_remove_remove(self, ct, tdir):
1449        """Test removing an instance that's already removed."""
1450        fs = FileSet()
1451        fs.add(ct)
1452        fs.write(tdir.name)
1453
1454        instance = next(iter(fs))
1455        assert isinstance(instance, FileInstance)
1456        fs.remove(instance)
1457        msg = r"No such instance in the File-set"
1458        with pytest.raises(ValueError, match=msg):
1459            fs.remove(instance)
1460
1461    def test_remove_add(self, ct, tdir):
1462        """Test adding an instance that's removed."""
1463        fs = FileSet()
1464        fs.add(ct)
1465        fs.write(tdir.name)
1466
1467        instance = next(iter(fs))
1468        assert isinstance(instance, FileInstance)
1469        fs.remove(instance)
1470        assert fs.is_staged
1471        fs.add(ct)
1472        assert not fs.is_staged
1473        ds, paths = write_fs(fs)
1474        assert 4 == len(ds.DirectoryRecordSequence)
1475        assert 1 == len(paths)
1476
1477    def test_add_remove(self, ct, tdir):
1478        """Test removing an instance that's added."""
1479        fs = FileSet()
1480        fs.write(tdir.name)
1481        assert not fs.is_staged
1482        fs.add(ct)
1483        assert fs.is_staged
1484        instance = next(iter(fs))
1485        assert isinstance(instance, FileInstance)
1486        fs.remove(instance)
1487        assert not fs.is_staged
1488
1489        ds, paths = write_fs(fs)
1490        assert [] == ds.DirectoryRecordSequence
1491        assert [] == paths
1492
1493    def test_file_ids_unique(self, dicomdir):
1494        """That that the File IDs are all unique within the File-set."""
1495        fs = FileSet(dicomdir)
1496        ids = set([ii.FileID for ii in fs])
1497        assert len(fs._instances) == len(ids)
1498
1499    def test_add_custom(self, ct, tdir, custom_leaf):
1500        """Test FileSet.add_custom() with a standard IOD."""
1501        fs = FileSet()
1502        fs.add_custom(ct, custom_leaf)
1503        assert 1 == len(fs)
1504        assert 1 == len(fs.find(SOPInstanceUID=ct.SOPInstanceUID))
1505        assert fs.is_staged
1506        instance = fs._instances[0]
1507        assert instance.SOPInstanceUID in fs._stage['+']
1508
1509        ds, paths = write_fs(fs, tdir.name)
1510        assert 1 == len(paths)
1511        assert 4 == len(ds.DirectoryRecordSequence)
1512        assert Dataset(ct) == dcmread(paths[0])
1513
1514    def test_add_custom_path(self, ct, tdir, custom_leaf):
1515        """Test add_custom() with a path."""
1516        fs = FileSet()
1517        fs.add_custom(ct.filename, custom_leaf)
1518        assert 1 == len(fs)
1519        assert 1 == len(fs.find(SOPInstanceUID=ct.SOPInstanceUID))
1520        assert fs.is_staged
1521        instance = fs._instances[0]
1522        assert instance.SOPInstanceUID in fs._stage['+']
1523
1524        ds, paths = write_fs(fs, tdir.name)
1525        assert 1 == len(paths)
1526        assert 4 == len(ds.DirectoryRecordSequence)
1527        assert Dataset(ct) == dcmread(paths[0])
1528
1529    def test_add_custom_private(self, ct, tdir):
1530        """Test add_custom() with a private instance."""
1531        # Maximum of 8, including the top (root) node
1532        patient = _define_patient(ct)
1533        patient.DirectoryRecordType = "PATIENT"
1534        patient.OffsetOfTheNextDirectoryRecord = 0
1535        patient.RecordInUseFlag = 0xFFFF
1536        patient.OffsetOfReferencedLowerLevelDirectoryEntity = 0
1537        patient = RecordNode(patient)
1538        ds = Dataset()
1539        ds.PrivateRecordUID = generate_uid()
1540        ds.DirectoryRecordType = "PRIVATE"
1541        ds.ReferencedFileID = None
1542        ds.ReferencedSOPClassUIDInFile = ct.SOPClassUID
1543        ds.ReferencedSOPInstanceUIDInFile = ct.SOPInstanceUID
1544        ds.ReferencedTransferSyntaxUIDInFile = (
1545            ct.file_meta.TransferSyntaxUID
1546        )
1547        private = RecordNode(ds)
1548        private.parent = patient
1549
1550        assert 1 == private.depth
1551
1552        fs = FileSet()
1553        fs.add_custom(ct, private)
1554
1555        assert 1 == len(fs)
1556        assert 1 == len(fs.find(SOPInstanceUID=ct.SOPInstanceUID))
1557        assert fs.is_staged
1558        instance = fs._instances[0]
1559        assert instance.SOPInstanceUID in fs._stage['+']
1560
1561        ds, paths = write_fs(fs, tdir.name)
1562        assert 1 == len(paths)
1563        assert 2 == len(ds.DirectoryRecordSequence)
1564        assert "PATIENT" == ds.DirectoryRecordSequence[0].DirectoryRecordType
1565        assert "PRIVATE" == ds.DirectoryRecordSequence[1].DirectoryRecordType
1566        assert Dataset(ct) == dcmread(paths[0])
1567
1568    def test_add_custom_too_deep(self, ct):
1569        """Test adding too many nodes raises exception."""
1570        # Maximum of 8, including the top (root) node
1571        top = _define_patient(ct)
1572        top.DirectoryRecordType = "PATIENT"
1573        top.OffsetOfTheNextDirectoryRecord = 0
1574        top.RecordInUseFlag = 0xFFFF
1575        top.OffsetOfReferencedLowerLevelDirectoryEntity = 0
1576        top = RecordNode(top)
1577        for ii in range(8):
1578            ds = Dataset()
1579            ds.PrivateRecordUID = generate_uid()
1580            ds.DirectoryRecordType = "PRIVATE"
1581
1582            node = RecordNode(ds)
1583            node.parent = top
1584            top = node
1585
1586        top._record.ReferencedFileID = None
1587        top._record.ReferencedSOPClassUIDInFile = ct.SOPClassUID
1588        top._record.ReferencedSOPInstanceUIDInFile = ct.SOPInstanceUID
1589        top._record.ReferencedTransferSyntaxUIDInFile = (
1590            ct.file_meta.TransferSyntaxUID
1591        )
1592        assert 8 == top.depth
1593
1594        fs = FileSet()
1595        msg = (
1596            r"The 'leaf' node must not have more than 7 ancestors as "
1597            r"'FileSet' supports a maximum directory structure depth of 8"
1598        )
1599        with pytest.raises(ValueError, match=msg):
1600            fs.add_custom(ct, top)
1601
1602    def test_add_custom_bad_leaf(self, ct, tdir, custom_leaf):
1603        """Test FileSet.add_custom() with a bad leaf record."""
1604        del custom_leaf._record.ReferencedSOPClassUIDInFile
1605        del custom_leaf._record.ReferencedFileID
1606        del custom_leaf._record.ReferencedSOPInstanceUIDInFile
1607        del custom_leaf._record.ReferencedTransferSyntaxUIDInFile
1608
1609        fs = FileSet()
1610        instance = fs.add_custom(ct, custom_leaf)
1611        assert 1 == len(fs)
1612        assert ct.SOPClassUID == instance.ReferencedSOPClassUIDInFile
1613        assert instance.ReferencedFileID is None
1614        assert ct.SOPInstanceUID == instance.ReferencedSOPInstanceUIDInFile
1615        assert ct.file_meta.TransferSyntaxUID == (
1616            instance.ReferencedTransferSyntaxUIDInFile
1617        )
1618
1619    def test_add_custom_add_add(self, ct, tdir, custom_leaf):
1620        """Test add_custom() if the instance is already in the File-set."""
1621        fs = FileSet()
1622        fs.add(ct)
1623        assert 1 == len(fs)
1624        assert 1 == len(fs.find(SOPInstanceUID=ct.SOPInstanceUID))
1625
1626        fs.add_custom(ct, custom_leaf)
1627        assert 1 == len(fs)
1628        assert 1 == len(fs.find(SOPInstanceUID=ct.SOPInstanceUID))
1629
1630    def test_add_custom_remove_add(self, ct, tdir, custom_leaf):
1631        """Test adding a removed instance."""
1632        fs = FileSet()
1633        fs.add_custom(ct, custom_leaf)
1634        ds, paths = write_fs(fs, tdir.name)
1635        assert not fs.is_staged
1636        assert 1 == len(fs)
1637        fs.remove(fs._instances[0])
1638        fs.add_custom(ct, custom_leaf)
1639        assert 1 == len(fs)
1640        assert 1 == len(fs.find(SOPInstanceUID=ct.SOPInstanceUID))
1641
1642    def test_clear(self, dicomdir, tdir):
1643        """Test FileSet.clear()."""
1644        fs = FileSet(dicomdir)
1645        fs.ID = "TESTID"
1646        fs.descriptor_file_id = "README"
1647        fs.descriptor_character_set = "ISO 1"
1648        fs, ds, paths = copy_fs(fs, tdir.name)
1649        assert "README" == fs.descriptor_file_id
1650        assert "ISO 1" == fs.descriptor_character_set
1651        assert [] != fs._instances
1652        assert fs._id is not None
1653        assert fs._path is not None
1654        uid = fs._uid
1655        assert fs._uid is not None
1656        assert fs._ds is not None
1657        assert fs._descriptor is not None
1658        assert fs._charset is not None
1659        assert [] != fs._tree.children
1660
1661        fs.clear()
1662        assert [] == fs._instances
1663        assert fs._id is None
1664        assert fs._path is None
1665        assert uid != fs._uid
1666        assert fs._uid.is_valid
1667        assert fs._ds == Dataset()
1668        assert fs._descriptor is None
1669        assert fs._charset is None
1670        assert [] == fs._tree.children
1671
1672    def test_str_empty(self, tdir):
1673        """Test str(FileSet) on an empty File-set."""
1674        fs = FileSet()
1675        s = str(fs)
1676        assert "DICOM File-set" in s
1677        assert "Root directory: (no value available)" in s
1678        assert "File-set ID: (no value available)" in s
1679        assert "File-set UID: 1.2.826.0.1" in s
1680        assert "Descriptor file ID: (no value available)" in s
1681        assert "Descriptor file character set: (no value available)" in s
1682        assert "Changes staged for write(): DICOMDIR creation" in s
1683        assert "addition" not in s
1684        assert "removal" not in s
1685        assert "Managed instances:" not in s
1686
1687        # Set DICOMDIR elements
1688        fs.ID = "TEST ID"
1689        fs.descriptor_file_id = "README"
1690        fs.descriptor_character_set = "ISO WHATEVER"
1691        ds, paths = write_fs(fs, tdir.name)
1692        s = str(fs)
1693        assert "DICOM File-set" in s
1694        assert "Root directory: (no value available)" not in s
1695        assert "File-set ID: TEST ID" in s
1696        assert "File-set UID: 1.2.826.0.1" in s
1697        assert "Descriptor file ID: README" in s
1698        assert "Descriptor file character set: ISO WHATEVER" in s
1699        assert "Changes staged for write(): DICOMDIR creation" not in s
1700        assert "addition" not in s
1701        assert "removal" not in s
1702        assert "Managed instances:" not in s
1703
1704    def test_str(self, ct, dummy, tdir):
1705        """Test str(FileSet) with empty + additions."""
1706        fs = FileSet()
1707        fs.add(ct)
1708        fs.add(get_testdata_file("MR_small.dcm"))
1709
1710        for p in list(Path(TINY_ALPHA_FILESET).parent.glob('**/*'))[::2]:
1711            if p.is_file() and p.name not in ['DICOMDIR', 'README']:
1712                fs.add(p)
1713
1714        instance = fs._instances[-1]
1715        ds = dcmread(get_testdata_file("rtdose.dcm"))
1716        ds.PatientID = '12345678'
1717        ds.InstanceNumber = '1'
1718        ds.StudyInstanceUID = instance.StudyInstanceUID
1719        ds.SeriesInstanceUID = instance.SeriesInstanceUID
1720        fs.add(ds)
1721
1722        ds = dcmread(get_testdata_file("rtplan.dcm"))
1723        ds.PatientID = '12345678'
1724        ds.InstanceNumber = '1'
1725        ds.StudyInstanceUID = instance.StudyInstanceUID
1726        ds.SeriesInstanceUID = instance.SeriesInstanceUID
1727        fs.add(ds)
1728
1729        ds, opt = dummy
1730        ds.SOPClassUID = ColorPaletteStorage
1731        fs.add(ds)
1732
1733        ref = (
1734            "DICOM File-set\n"
1735            "  Root directory: (no value available)\n"
1736            "  File-set ID: (no value available)\n"
1737            f"  File-set UID: {fs.UID}\n"
1738            "  Descriptor file ID: (no value available)\n"
1739            "  Descriptor file character set: (no value available)\n"
1740            "  Changes staged for write(): DICOMDIR creation, 30 additions\n"
1741            "\n"
1742            "  Managed instances:\n"
1743            "    PATIENT: PatientID='1CT1', "
1744            "PatientName='CompressedSamples^CT1'\n"
1745            "      STUDY: StudyDate=20040119, StudyTime=072730, "
1746            "StudyDescription='e+1'\n"
1747            "        SERIES: Modality=CT, SeriesNumber=1\n"
1748            "          IMAGE: 1 SOP Instance (1 addition)\n"
1749            "    PATIENT: PatientID='4MR1', "
1750            "PatientName='CompressedSamples^MR1'\n"
1751            "      STUDY: StudyDate=20040826, StudyTime=185059\n"
1752            "        SERIES: Modality=MR, SeriesNumber=1\n"
1753            "          IMAGE: 1 SOP Instance (1 addition)\n"
1754            "    PATIENT: PatientID='12345678', PatientName='Citizen^Jan'\n"
1755            "      STUDY: StudyDate=20200913, StudyTime=161900, "
1756            "StudyDescription='Testing File-set'\n"
1757            "        SERIES: Modality=CT, SeriesNumber=1\n"
1758            "          IMAGE: 25 SOP Instances (25 additions)\n"
1759            "          RT DOSE: 1 SOP Instance (1 addition)\n"
1760            "          RT PLAN: 1 SOP Instance (1 addition)\n"
1761            "    PALETTE: 1 SOP Instance (to be added)"
1762        )
1763
1764        assert ref == str(fs)
1765
1766        ds, paths = write_fs(fs, tdir.name)
1767
1768        ref = (
1769            "  File-set ID: (no value available)\n"
1770            f"  File-set UID: {fs.UID}\n"
1771            "  Descriptor file ID: (no value available)\n"
1772            "  Descriptor file character set: (no value available)\n"
1773            "\n"
1774            "  Managed instances:\n"
1775            "    PATIENT: PatientID='1CT1', "
1776            "PatientName='CompressedSamples^CT1'\n"
1777            "      STUDY: StudyDate=20040119, StudyTime=072730, "
1778            "StudyDescription='e+1'\n"
1779            "        SERIES: Modality=CT, SeriesNumber=1\n"
1780            "          IMAGE: 1 SOP Instance\n"
1781            "    PATIENT: PatientID='4MR1', "
1782            "PatientName='CompressedSamples^MR1'\n"
1783            "      STUDY: StudyDate=20040826, StudyTime=185059\n"
1784            "        SERIES: Modality=MR, SeriesNumber=1\n"
1785            "          IMAGE: 1 SOP Instance\n"
1786            "    PATIENT: PatientID='12345678', PatientName='Citizen^Jan'\n"
1787            "      STUDY: StudyDate=20200913, StudyTime=161900, "
1788            "StudyDescription='Testing File-set'\n"
1789            "        SERIES: Modality=CT, SeriesNumber=1\n"
1790            "          IMAGE: 25 SOP Instances\n"
1791            "          RT DOSE: 1 SOP Instance\n"
1792            "          RT PLAN: 1 SOP Instance\n"
1793            "    PALETTE: 1 SOP Instance"
1794        )
1795
1796        assert ref in str(fs)
1797
1798        for instance in fs:
1799            fs.remove(instance)
1800
1801        for p in list(Path(TINY_ALPHA_FILESET).parent.glob('**/*'))[1:40:2]:
1802            if p.is_file() and p.name not in ['DICOMDIR', 'README']:
1803                fs.add(p)
1804
1805        ref = (
1806            "  File-set ID: (no value available)\n"
1807            f"  File-set UID: {fs.UID}\n"
1808            "  Descriptor file ID: (no value available)\n"
1809            "  Descriptor file character set: (no value available)\n"
1810            "  Changes staged for write(): DICOMDIR update, 18 additions, "
1811            "30 removals\n"
1812            "\n"
1813            "  Managed instances:\n"
1814            "    PATIENT: PatientID='1CT1', "
1815            "PatientName='CompressedSamples^CT1'\n"
1816            "      STUDY: StudyDate=20040119, StudyTime=072730, "
1817            "StudyDescription='e+1'\n"
1818            "        SERIES: Modality=CT, SeriesNumber=1\n"
1819            "          IMAGE: 0 SOP Instances (1 initial, 1 removal)\n"
1820            "    PATIENT: PatientID='4MR1', "
1821            "PatientName='CompressedSamples^MR1'\n"
1822            "      STUDY: StudyDate=20040826, StudyTime=185059\n"
1823            "        SERIES: Modality=MR, SeriesNumber=1\n"
1824            "          IMAGE: 0 SOP Instances (1 initial, 1 removal)\n"
1825            "    PATIENT: PatientID='12345678', PatientName='Citizen^Jan'\n"
1826            "      STUDY: StudyDate=20200913, StudyTime=161900, "
1827            "StudyDescription='Testing File-set'\n"
1828            "        SERIES: Modality=CT, SeriesNumber=1\n"
1829            "          IMAGE: 18 SOP Instances (25 initial, 18 additions, "
1830            "25 removals)\n"
1831            "          RT DOSE: 0 SOP Instances (1 initial, 1 removal)\n"
1832            "          RT PLAN: 0 SOP Instances (1 initial, 1 removal)\n"
1833            "    PALETTE: 1 SOP Instance (to be removed)"
1834        )
1835
1836        assert ref in str(fs)
1837
1838    def test_str_update_structure(self, dicomdir):
1839        """Test that the update structure comment appears."""
1840        fs = FileSet(dicomdir)
1841        assert (
1842            "Changes staged for write(): DICOMDIR update, directory "
1843            "structure update"
1844        ) in str(fs)
1845
1846
1847@pytest.mark.filterwarnings("ignore:The 'DicomDir'")
1848class TestFileSet_Load:
1849    """Tests for a loaded File-set."""
1850    def test_write_dicomdir(self, dicomdir):
1851        """Test DICOMDIR writing"""
1852        fs = FileSet(dicomdir)
1853        out = DicomBytesIO()
1854        out.is_little_endian = True
1855        out.is_implicit_VR = False
1856        fs._write_dicomdir(out)
1857        out.seek(0)
1858
1859        new = dcmread(out)
1860        assert dicomdir.DirectoryRecordSequence == new.DirectoryRecordSequence
1861        assert (
1862            396 == new.OffsetOfTheFirstDirectoryRecordOfTheRootDirectoryEntity
1863        )
1864        assert (
1865            3126 == new.OffsetOfTheLastDirectoryRecordOfTheRootDirectoryEntity
1866        )
1867
1868    def test_write_new_path(self, dicomdir):
1869        """Test writing to a new path."""
1870        fs = FileSet(dicomdir)
1871        assert fs.path is not None
1872        msg = (
1873            r"The path for an existing File-set cannot be changed, use "
1874            r"'FileSet.copy\(\)' to write the File-set to a new location"
1875        )
1876        with pytest.raises(ValueError, match=msg):
1877            fs.write("MYNEWPATH")
1878
1879    def test_bad_sop_class_raises(self, dicomdir):
1880        """Test loading using non-DICOMDIR."""
1881        dicomdir.file_meta.MediaStorageSOPClassUID = '1.2.3'
1882        msg = (
1883            r"Unable to load the File-set as the supplied dataset is "
1884            r"not a 'Media Storage Directory' instance"
1885        )
1886        with pytest.raises(ValueError, match=msg):
1887            fs = FileSet(dicomdir)
1888
1889    def test_bad_filename_raises(self, dicomdir):
1890        """Test loading with a bad path."""
1891        dicomdir.filename = 'bad'
1892        msg = (
1893            r"Unable to load the File-set as the 'filename' attribute "
1894            r"for the DICOMDIR dataset is not a valid path: "
1895            r"bad"
1896        )
1897        with pytest.raises(FileNotFoundError, match=msg):
1898            FileSet(dicomdir)
1899
1900    def test_bad_filename_type_raises(self, dicomdir):
1901        """Test loading with a bad DICOMDIR filename type."""
1902        dicomdir.filename = None
1903        msg = (
1904            r"Unable to load the File-set as the DICOMDIR dataset must "
1905            r"have a 'filename' attribute set to the path of the "
1906            r"DICOMDIR file"
1907        )
1908        with pytest.raises(TypeError, match=msg):
1909            FileSet(dicomdir)
1910
1911    def test_find(self, dicomdir):
1912        """Tests for FileSet.find()."""
1913        fs = FileSet(dicomdir)
1914        assert 31 == len(fs.find())
1915        assert 7 == len(fs.find(PatientID='77654033'))
1916        assert 24 == len(fs.find(PatientID='98890234'))
1917
1918        matches = fs.find(PatientID='98890234', StudyDate="20030505")
1919        assert 17 == len(matches)
1920        for ii in matches:
1921            assert isinstance(ii, FileInstance)
1922
1923        sop_instances = [ii.SOPInstanceUID for ii in matches]
1924        assert 17 == len(list(set(sop_instances)))
1925
1926    def test_find_load(self, private):
1927        """Test FileSet.find(load=True)."""
1928        fs = FileSet(private)
1929        msg = (
1930            r"None of the records in the DICOMDIR dataset contain all "
1931            r"the query elements, consider using the 'load' parameter "
1932            r"to expand the search to the corresponding SOP instances"
1933        )
1934        with pytest.warns(UserWarning, match=msg):
1935            results = fs.find(
1936                load=False, PhotometricInterpretation="MONOCHROME1"
1937            )
1938            assert not results
1939
1940        results = fs.find(
1941            load=True, PhotometricInterpretation="MONOCHROME1"
1942        )
1943        assert 3 == len(results)
1944
1945    def test_find_values(self, private):
1946        """Test searching the FileSet for element values."""
1947        fs = FileSet(private)
1948        expected = {
1949            "PatientID": ['77654033', '98890234'],
1950            "StudyDescription": [
1951                'XR C Spine Comp Min 4 Views',
1952                'CT, HEAD/BRAIN WO CONTRAST',
1953                '',
1954                'Carotids',
1955                'Brain',
1956                'Brain-MRA',
1957            ],
1958        }
1959        for k, v in expected.items():
1960            assert fs.find_values(k) == v
1961        assert fs.find_values(list(expected.keys())) == expected
1962
1963    def test_find_values_load(self, private):
1964        """Test FileSet.find_values(load=True)."""
1965        fs = FileSet(private)
1966        search_element = "PhotometricInterpretation"
1967        msg = (
1968            r"None of the records in the DICOMDIR dataset contain "
1969            fr"\['{search_element}'\], consider using the 'load' parameter "
1970            r"to expand the search to the corresponding SOP instances"
1971        )
1972        with pytest.warns(UserWarning, match=msg):
1973            results = fs.find_values(search_element, load=False)
1974            assert not results
1975
1976        assert fs.find_values(search_element, load=True) == [
1977            'MONOCHROME1', 'MONOCHROME2'
1978        ]
1979
1980        with pytest.warns(UserWarning, match=msg):
1981            results = fs.find_values([search_element], load=False)
1982            assert not results[search_element]
1983
1984        assert (
1985            fs.find_values([search_element], load=True)
1986        ) == {search_element: ['MONOCHROME1', 'MONOCHROME2']}
1987
1988    def test_empty_file_id(self, dicomdir):
1989        """Test loading a record with an empty File ID."""
1990        item = dicomdir.DirectoryRecordSequence[5]
1991        item.ReferencedFileID = None
1992        uid = item.ReferencedSOPInstanceUIDInFile
1993        fs = FileSet(dicomdir)
1994        assert [] == fs.find(SOPInstanceUID=uid)
1995        assert 30 == len(fs)
1996
1997    def test_bad_file_id(self, dicomdir):
1998        """Test loading a record with a bad File ID."""
1999        item = dicomdir.DirectoryRecordSequence[5]
2000        item.ReferencedFileID[-1] = "MISSING"
2001        uid = item.ReferencedSOPInstanceUIDInFile
2002        msg = (
2003            r"The referenced SOP Instance for the directory record at offset "
2004            r"1220 does not exist:"
2005        )
2006        with pytest.warns(UserWarning, match=msg):
2007            fs = FileSet(dicomdir)
2008
2009        assert [] == fs.find(SOPInstanceUID=uid)
2010        assert 30 == len(fs)
2011
2012    def test_load_orphans_raise(self, private):
2013        """Test loading orphaned records raises exception."""
2014        ds = private
2015        seq = ds.DirectoryRecordSequence
2016        uid = seq[3].ReferencedSOPInstanceUIDInFile
2017        seq[1].ReferencedSOPInstanceUIDInFile = uid
2018        seq[1].ReferencedTransferSyntaxUIDInFile = ExplicitVRLittleEndian
2019        seq[1].ReferencedSOPClassUID = ComputedRadiographyImageStorage
2020        seq[1].OffsetOfReferencedLowerLevelDirectoryEntity = 0
2021        seq[1].ReferencedFileID = seq[3].ReferencedFileID
2022        del seq[1].StudyInstanceUID
2023        fs = FileSet()
2024        msg = r"The DICOMDIR contains orphaned directory records"
2025        with pytest.raises(ValueError, match=msg):
2026            fs.load(ds, raise_orphans=True)
2027
2028    def test_load_orphans_exclude(self, private):
2029        """Test loading and ignore orphaned records."""
2030        # The first study includes 3 series, each series with 1 image
2031        #   so we're orphaning 3 instances
2032        seq = private.DirectoryRecordSequence
2033        seq[1].OffsetOfReferencedLowerLevelDirectoryEntity = 0
2034        fs = FileSet()
2035        msg = (
2036            r"The DICOMDIR has 3 orphaned directory records that reference "
2037            r"an instance that will not be included in the File-set"
2038        )
2039        with pytest.warns(UserWarning, match=msg):
2040            fs.load(private, include_orphans=False)
2041
2042        assert 29 == len(fs)
2043        assert [] == fs.find(StudyInstanceUID=seq[1].StudyInstanceUID)
2044        assert [] == fs.find(SeriesInstanceUID=seq[2].SeriesInstanceUID)
2045        for ii in range(3, 9, 2):
2046            assert "IMAGE" == seq[ii].DirectoryRecordType
2047            assert [] == fs.find(
2048                SOPInstanceUID=seq[ii].ReferencedSOPInstanceUIDInFile
2049            )
2050
2051    def test_load_orphans_no_file_id(self, private):
2052        """Test loading orphaned records without a valid File ID."""
2053        # The first study includes 3 series, each series with 1 image
2054        #   so we're orphaning 3 instances, 1 with an invalid File ID
2055        seq = private.DirectoryRecordSequence
2056        seq[1].OffsetOfReferencedLowerLevelDirectoryEntity = 0
2057        seq[5].ReferencedFileID = None
2058        fs = FileSet()
2059        fs.load(private)
2060
2061        assert 31 == len(fs)
2062        assert "IMAGE" == seq[5].DirectoryRecordType
2063        assert [] == fs.find(
2064            SOPInstanceUID=seq[5].ReferencedSOPInstanceUIDInFile
2065        )
2066        assert 1 == len(
2067            fs.find(SOPInstanceUID=seq[3].ReferencedSOPInstanceUIDInFile)
2068        )
2069        assert 1 == len(
2070            fs.find(SOPInstanceUID=seq[7].ReferencedSOPInstanceUIDInFile)
2071        )
2072
2073    def test_load_orphans_private(self, private):
2074        """Test loading an orphaned PRIVATE record."""
2075        seq = private.DirectoryRecordSequence
2076        seq[-2].OffsetOfReferencedLowerLevelDirectoryEntity = 0
2077        fs = FileSet()
2078        fs.load(private)
2079        assert 32 == len(fs)
2080        assert 1 == len(
2081            fs.find(SOPInstanceUID=seq[-1].ReferencedSOPInstanceUIDInFile)
2082        )
2083
2084    def test_load_dicomdir_big_endian(self, dicomdir, tdir):
2085        """Test loading a big endian DICOMDIR"""
2086        with pytest.warns(UserWarning):
2087            ds = dcmread(BIGENDIAN_TEST_FILE)
2088        msg = (
2089            r"The DICOMDIR dataset uses an invalid transfer syntax "
2090            r"'Explicit VR Big Endian' and will be updated to use 'Explicit "
2091            r"VR Little Endian'"
2092        )
2093        with pytest.warns(UserWarning, match=msg):
2094            fs = FileSet(ds)
2095
2096        # Should be written out as explicit little
2097        fs, ds, paths = copy_fs(fs, tdir.name)
2098        assert ExplicitVRLittleEndian == ds.file_meta.TransferSyntaxUID
2099
2100        ref = FileSet(dicomdir)
2101        assert len(ref) == len(fs)
2102        for ii, rr in zip(fs, ref):
2103            assert ii.SOPInstanceUID == rr.SOPInstanceUID
2104
2105    def test_load_dicomdir_implicit(self, dicomdir, tdir):
2106        """Test loading an implicit VR DICOMDIR."""
2107        with pytest.warns(UserWarning):
2108            ds = dcmread(IMPLICIT_TEST_FILE)
2109        msg = (
2110            r"The DICOMDIR dataset uses an invalid transfer syntax "
2111            r"'Implicit VR Little Endian' and will be updated to use "
2112            r"'Explicit VR Little Endian'"
2113        )
2114        with pytest.warns(UserWarning, match=msg):
2115            fs = FileSet(ds)
2116
2117        # Should be written out as explicit little
2118        fs, ds, paths = copy_fs(fs, tdir.name)
2119        assert ExplicitVRLittleEndian == ds.file_meta.TransferSyntaxUID
2120
2121        ref = FileSet(dicomdir)
2122        assert len(ref) == len(fs)
2123        for ii, rr in zip(fs, ref):
2124            assert ii.SOPInstanceUID == rr.SOPInstanceUID
2125
2126    def test_load_dicomdir_reordered(self, dicomdir):
2127        """Test loading DICOMDIR-reordered"""
2128        ds = dcmread(get_testdata_file('DICOMDIR-reordered'))
2129        fs = FileSet(ds)
2130        ref = FileSet(dicomdir)
2131        assert len(ref) == len(fs)
2132        for ii, rr in zip(fs, ref):
2133            assert ii.SOPInstanceUID == rr.SOPInstanceUID
2134
2135    def test_load_dicomdir_no_offset(self, dicomdir):
2136        """Test loading DICOMDIR-nooffset"""
2137        ds = dcmread(get_testdata_file('DICOMDIR-nooffset'))
2138        fs = FileSet(ds)
2139        ref = FileSet(dicomdir)
2140        assert len(ref) == len(fs)
2141        for ii, rr in zip(fs, ref):
2142            assert ii.SOPInstanceUID == rr.SOPInstanceUID
2143
2144    def test_load_dicomdir_no_uid(self, dicomdir):
2145        """Test loading DICOMDIR with no UID"""
2146        del dicomdir.file_meta.MediaStorageSOPInstanceUID
2147        fs = FileSet(dicomdir)
2148        assert fs.UID.is_valid
2149        assert fs.UID == dicomdir.file_meta.MediaStorageSOPInstanceUID
2150
2151
2152@pytest.mark.filterwarnings("ignore:The 'DicomDir'")
2153class TestFileSet_Modify:
2154    """Tests for a modified File-set."""
2155    def setup(self):
2156        self.fn = FileSet.__len__
2157
2158    def teardown(self):
2159        FileSet.__len__ = self.fn
2160
2161    def test_write_dicomdir_fs_changes(self, dicomdir_copy):
2162        """Test FileSet.write() with only ^ changes."""
2163        t, ds = dicomdir_copy
2164        fs = FileSet(ds)
2165        ds, paths = write_fs(fs)
2166        assert not fs._stage['^']
2167        fs.descriptor_file_id = ["1", "2", "3"]
2168        assert fs._stage['^']
2169        assert not fs._stage['~']
2170        assert not fs._stage['+']
2171        assert not fs._stage['-']
2172        fs.write()
2173        assert not fs._stage['^']
2174        ds = dcmread(Path(fs.path) / "DICOMDIR")
2175        assert ["1", "2", "3"] == ds.FileSetDescriptorFileID
2176
2177    def test_write_dicomdir_use_existing(self, dicomdir_copy):
2178        """Test FileSet.write() with use_existing."""
2179        tdir, ds = dicomdir_copy
2180        assert "FileSetDescriptorFileID" not in ds
2181        fs = FileSet(ds)
2182        assert fs._stage['~']
2183        assert not fs._stage['+']
2184        assert not fs._stage['-']
2185        fs.descriptor_file_id = ["1", "2", "3"]
2186        fs.write(use_existing=True)
2187        t = Path(tdir.name)
2188        # File IDs haven't changed
2189        assert [] == list(t.glob("PT000000"))
2190        assert 1 == len(list(t.glob("98892003")))
2191        ds = dcmread(t / "DICOMDIR")
2192        assert ["1", "2", "3"] == ds.FileSetDescriptorFileID
2193
2194    def test_write_dicomdir_use_existing_raises(self, dicomdir_copy, ct):
2195        """Test FileSet.write() with use_existing raises with +/- changes."""
2196        tdir, ds = dicomdir_copy
2197        assert "FileSetDescriptorFileID" not in ds
2198        fs = FileSet(ds)
2199        fs.add(ct)
2200        assert fs._stage['~']
2201        assert fs._stage['+']
2202        assert not fs._stage['-']
2203        fs.descriptor_file_id = ["1", "2", "3"]
2204        msg = (
2205            r"'Fileset.write\(\)' called with 'use_existing' but additions to "
2206            r"the File-set's managed instances are staged"
2207        )
2208        with pytest.raises(ValueError, match=msg):
2209            fs.write(use_existing=True)
2210
2211    def test_remove_addition_bad_path(self, dicomdir, ct):
2212        """Test removing a missing file from the File-set's stage."""
2213        fs = FileSet(dicomdir)
2214        fs.add(ct)
2215        instance = fs.find(SOPInstanceUID=ct.SOPInstanceUID)[0]
2216        assert instance.SOPInstanceUID in fs._stage['+']
2217        assert instance in fs
2218
2219        path = instance._stage_path
2220        instance._stage_path = Path(fs.path) / "BADFILE"
2221        fs.remove(instance)
2222        assert instance not in fs
2223        assert instance.SOPInstanceUID not in fs._stage['-']
2224        assert instance.SOPInstanceUID not in fs._stage['+']
2225        # File should still exist
2226        assert path.exists()
2227
2228    def test_write_file_id(self, tiny):
2229        """Test that the File IDs character sets switch correctly."""
2230        tdir, ds = temporary_fs(tiny)
2231
2232        def my_len(self):
2233            return 10**6 + 1
2234
2235        FileSet.__len__ = my_len
2236        fs = FileSet(ds)
2237        assert 10**6 + 1 == len(fs)
2238        ds, paths = write_fs(fs)
2239        instance = fs._instances[-1]
2240        # Was written with alphanumeric File IDs
2241        assert "IM00001D" in instance.path
2242
2243        def my_len(self):
2244            return 36**6 + 1
2245
2246        FileSet.__len__ = my_len
2247        fs = FileSet(ds)
2248        assert 36**6 + 1 == len(fs)
2249        msg = (
2250            r"pydicom doesn't support writing File-sets with more than "
2251            r"2176782336 managed instances"
2252        )
2253        with pytest.raises(NotImplementedError, match=msg):
2254            fs.write()
2255
2256    def test_write_missing_removal(self, tiny):
2257        """Test that missing files are ignored when removing during write."""
2258        tdir, ds = temporary_fs(tiny)
2259        fs = FileSet(ds)
2260        instance = fs._instances[0]
2261        path = Path(instance.path)
2262        fs.remove(instance)
2263        assert path.exists()
2264        path.unlink()
2265        assert not path.exists()
2266        ds, paths = write_fs(fs)
2267        assert [] == fs.find(SOPInstanceUID=instance.SOPInstanceUID)
2268        assert 49 == len(fs)
2269
2270    def test_write_removal_addition_collision(self, tiny):
2271        """Test re-adding files staged for removal that also collide."""
2272        # The colliding files are IM000010 to IM000019 which get
2273        #   overwritten by IM00000A to IM00000J
2274        tdir, ds = temporary_fs(tiny)
2275        fs = FileSet(ds)
2276        # IM000010 to IM000013
2277        instances = fs._instances[36:40]
2278        assert "IM000010" == Path(instances[0].path).name
2279        assert "IM000011" == Path(instances[1].path).name
2280        assert "IM000012" == Path(instances[2].path).name
2281        assert "IM000013" == Path(instances[3].path).name
2282        fs.remove(instances)
2283        assert 46 == len(fs)
2284        for instance in instances:
2285            path = Path(instance.path)
2286            fs.add(path)
2287
2288        ds, paths = write_fs(fs)
2289        assert 50 == len(paths)
2290        original = FileSet(tiny)
2291        assert len(original) == len(fs)
2292        for ref, ii in zip(original, fs):
2293            assert ref.path != ii.path
2294            assert ref.SOPInstanceUID == ii.SOPInstanceUID
2295            rs = ref.load()
2296            ts = ii.load()
2297            assert Dataset(rs) == ts
2298
2299    def test_write_implicit(self, dicomdir, dicomdir_copy, tdir):
2300        """Test writing the DICOMDIR using Implicit VR"""
2301        tdir, ds = dicomdir_copy
2302        fs = FileSet(ds)
2303        with pytest.warns(UserWarning):
2304            fs.write(force_implicit=True, use_existing=True)
2305        with pytest.warns(UserWarning):
2306            ds = dcmread(Path(fs.path) / "DICOMDIR")
2307        assert ImplicitVRLittleEndian == ds.file_meta.TransferSyntaxUID
2308
2309        with pytest.warns(UserWarning):
2310            ref_ds = dcmread(IMPLICIT_TEST_FILE)
2311        assert Dataset(ref_ds) == ds
2312
2313        ref = FileSet(dicomdir)
2314        assert len(ref) == len(fs)
2315        for ii, rr in zip(fs, ref):
2316            assert ii.SOPInstanceUID == rr.SOPInstanceUID
2317
2318    def test_write_use_existing(self, dicomdir_copy):
2319        """Test write() with use_existing."""
2320        tdir, ds = dicomdir_copy
2321        assert 52 == len(ds.DirectoryRecordSequence)
2322        fs = FileSet(ds)
2323        orig_paths = [p for p in fs._path.glob('**/*') if p.is_file()]
2324        instance = fs._instances[0]
2325        assert Path(instance.path) in orig_paths
2326        fs.remove(instance)
2327        orig_file_ids = [ii.ReferencedFileID for ii in fs]
2328        fs.write(use_existing=True)
2329        assert 50 == len(fs._ds.DirectoryRecordSequence)
2330        paths = [p for p in fs._path.glob('**/*') if p.is_file()]
2331        assert orig_file_ids == [ii.ReferencedFileID for ii in fs]
2332        assert Path(instance.path) not in paths
2333        assert sorted(orig_paths)[1:] == sorted(paths)
2334        assert {} == fs._stage['-']
2335        assert not fs._stage['^']
2336        assert {} == fs._stage['+']
2337        assert fs._stage['~']
2338
2339    def test_write_use_existing_raises(self, dicomdir, ct):
2340        """Test write() with use_existing raises if additions."""
2341        fs = FileSet(dicomdir)
2342        fs.remove(fs._instances[0])
2343        fs.add(ct)
2344        msg = (
2345            r"'Fileset.write\(\)' called with 'use_existing' but additions "
2346            r"to the File-set's managed instances are staged"
2347        )
2348        with pytest.raises(ValueError, match=msg):
2349            fs.write(use_existing=True)
2350
2351    def test_add_instance_missing(self, tdir):
2352        """Test adding an instance missing a required value."""
2353        fs = FileSet()
2354        ds = dcmread(get_testdata_file("rtdose.dcm"))
2355        del ds.InstanceNumber
2356        msg = (
2357            r"Unable to use the default 'RT DOSE' record creator "
2358            r"as the instance is missing a required element or value. Either "
2359            r"update the instance, define your own record creation function "
2360            r"or use 'FileSet.add_custom\(\)' instead"
2361        )
2362        with pytest.raises(ValueError, match=msg):
2363            fs.add(ds)
2364
2365    def test_add_instance_missing_required_value(self, tdir):
2366        """Test adding an instance missing a required value."""
2367        fs = FileSet()
2368        ds = dcmread(get_testdata_file("rtdose.dcm"))
2369        ds.InstanceNumber = None
2370        msg = (
2371            r"Unable to use the default 'RT DOSE' record creator "
2372            r"as the instance is missing a required element or value. Either "
2373            r"update the instance, define your own record creation function "
2374            r"or use 'FileSet.add_custom\(\)' instead"
2375        )
2376        with pytest.raises(ValueError, match=msg):
2377            fs.add(ds)
2378
2379    def test_add_rt_dose(self, tdir):
2380        """Test adding an RT Dose instance."""
2381        fs = FileSet()
2382        ds = dcmread(get_testdata_file("rtdose.dcm"))
2383        ds.SpecificCharacterSet = "ISO_IR 100"
2384        ds.InstanceNumber = 1  # Type 2, but Type 1 in the RT DOSE record
2385        fs.add(ds)
2386        assert 1 == len(fs.find(SOPInstanceUID=ds.SOPInstanceUID))
2387        dicomdir, paths = write_fs(fs, tdir.name)
2388        assert 1 == len(paths)
2389        seq = dicomdir.DirectoryRecordSequence
2390        assert "PATIENT" == seq[0].DirectoryRecordType
2391        assert "STUDY" == seq[1].DirectoryRecordType
2392        assert "SERIES" == seq[2].DirectoryRecordType
2393        assert "RT DOSE" == seq[3].DirectoryRecordType
2394        assert Dataset(ds) == dcmread(paths[0])
2395
2396    def test_add_rt_structure_set(self, tdir):
2397        """Test adding an RT Structure Set instance."""
2398        ds = dcmread(get_testdata_file("rtstruct.dcm"), force=True)
2399        ds.file_meta = FileMetaDataset()
2400        ds.file_meta.TransferSyntaxUID = ImplicitVRLittleEndian
2401        ds.StudyDate = "20201001"
2402        ds.StudyTime = "120000"
2403
2404        fs = FileSet()
2405        fs.add(ds)
2406        assert 1 == len(fs.find(SOPInstanceUID=ds.SOPInstanceUID))
2407        dicomdir, paths = write_fs(fs, tdir.name)
2408        assert 1 == len(paths)
2409        seq = dicomdir.DirectoryRecordSequence
2410        assert "PATIENT" == seq[0].DirectoryRecordType
2411        assert "STUDY" == seq[1].DirectoryRecordType
2412        assert "SERIES" == seq[2].DirectoryRecordType
2413        assert "RT STRUCTURE SET" == seq[3].DirectoryRecordType
2414        assert Dataset(ds) == dcmread(paths[0])
2415
2416    def test_add_rt_plan(self, tdir):
2417        """Test adding an RT Plan instance."""
2418        ds = dcmread(get_testdata_file("rtplan.dcm"), force=True)
2419        ds.InstanceNumber = 1
2420
2421        fs = FileSet()
2422        fs.add(ds)
2423        assert 1 == len(fs.find(SOPInstanceUID=ds.SOPInstanceUID))
2424        dicomdir, paths = write_fs(fs, tdir.name)
2425        assert 1 == len(paths)
2426        seq = dicomdir.DirectoryRecordSequence
2427        assert "PATIENT" == seq[0].DirectoryRecordType
2428        assert "STUDY" == seq[1].DirectoryRecordType
2429        assert "SERIES" == seq[2].DirectoryRecordType
2430        assert "RT PLAN" == seq[3].DirectoryRecordType
2431        assert Dataset(ds) == dcmread(paths[0])
2432
2433    def test_remove_list(self, dicomdir, tdir):
2434        """Test remove using a list of instances."""
2435        fs = FileSet(dicomdir)
2436        instances = fs.find(StudyDescription='XR C Spine Comp Min 4 Views')
2437        fs.remove(instances)
2438        assert 28 == len(fs)
2439
2440    def test_add_bad_one_level(self, dummy):
2441        """Test adding a bad one-level dataset raises."""
2442        ds, opt = dummy
2443        ds.SOPClassUID = HangingProtocolStorage
2444        del ds.HangingProtocolCreator
2445        fs = FileSet()
2446        msg = (
2447            r"Unable to use the default 'HANGING PROTOCOL' record creator "
2448            r"as the instance is missing a required element or value. Either "
2449            r"update the instance, define your own record creation function "
2450            r"or use 'FileSet.add_custom\(\)' instead"
2451        )
2452        with pytest.raises(ValueError, match=msg):
2453            fs.add(ds)
2454
2455
2456@pytest.mark.filterwarnings("ignore:The 'DicomDir'")
2457class TestFileSet_Copy:
2458    """Tests for copying a File-set."""
2459    def setup(self):
2460        self.orig = FileSet.__len__
2461
2462    def teardown(self):
2463        FileSet.__len__ = self.orig
2464
2465    def test_copy(self, dicomdir, tdir):
2466        """Test FileSet.copy()"""
2467        orig_root = Path(dicomdir.filename).parent
2468        fs = FileSet(dicomdir)
2469
2470        fs.ID = "NEW ID"
2471        uid = fs.UID = generate_uid()
2472        fs.descriptor_file_id = "README"
2473        fs.descriptor_character_set = "ISO_IR 100"
2474        cp, ds, paths = copy_fs(fs, tdir.name)
2475        assert 31 == len(paths)
2476        assert (
2477            ('PT000000', 'ST000000', 'SE000000', 'IM000000')
2478        ) == paths[0].parts[-4:]
2479        assert (
2480            ('PT000001', 'ST000003', 'SE000002', 'IM000006')
2481        ) == paths[-1].parts[-4:]
2482
2483        # Check existing File-set remains the same
2484        assert "NEW ID" == fs.ID
2485        assert dicomdir.file_meta.TransferSyntaxUID == ExplicitVRLittleEndian
2486        assert uid == fs.UID
2487        assert dicomdir.file_meta.MediaStorageSOPInstanceUID == fs.UID
2488        assert "README" == fs.descriptor_file_id
2489        assert "ISO_IR 100" == fs.descriptor_character_set
2490        assert not bool(fs._stage['+'])
2491        assert not bool(fs._stage['-'])
2492        assert fs.is_staged
2493        paths = list(orig_root.glob('98892001/**/*'))
2494        paths += list(orig_root.glob('98892003/**/*'))
2495        paths += list(orig_root.glob('77654033/**/*'))
2496        paths = [p for p in paths if p.is_file()]
2497
2498        # Test new File-set
2499        assert len(fs) == len(cp)
2500        for ref, instance in zip(fs, cp):
2501            assert ref.SOPInstanceUID == instance.SOPInstanceUID
2502
2503        assert ds.file_meta.TransferSyntaxUID == ExplicitVRLittleEndian
2504        assert not ds.is_implicit_VR
2505        assert ds.is_little_endian
2506        assert not cp.is_staged
2507        assert "NEW ID" == cp.ID
2508        assert uid == cp.UID
2509        assert ds.file_meta.MediaStorageSOPInstanceUID == cp.UID
2510        assert "README" == cp.descriptor_file_id
2511        assert "ISO_IR 100" == cp.descriptor_character_set
2512
2513    def test_copy_raises(self, dicomdir, tdir):
2514        """Test exceptions raised by FileSet.copy()."""
2515        fs = FileSet(dicomdir)
2516        msg = r"Cannot copy the File-set as the 'path' is unchanged"
2517        with pytest.raises(ValueError, match=msg):
2518            fs.copy(fs.path)
2519
2520    def test_copy_implicit(self, dicomdir, tdir):
2521        """Test copy() with implicit VR."""
2522        assert not dicomdir.is_implicit_VR
2523        fs = FileSet(dicomdir)
2524        with pytest.warns(UserWarning):
2525            cp, ds, paths = copy_fs(fs, tdir.name, as_implicit=True)
2526
2527        # Check existing File-set remains the same
2528        assert "PYDICOM_TEST" == fs.ID
2529        assert dicomdir.file_meta.TransferSyntaxUID == ExplicitVRLittleEndian
2530        assert dicomdir.file_meta.MediaStorageSOPInstanceUID == fs.UID
2531        assert fs.descriptor_file_id is None
2532        assert fs.descriptor_character_set is None
2533        assert not bool(fs._stage['+'])
2534        assert not bool(fs._stage['-'])
2535
2536        assert 31 == len(paths)
2537
2538        assert len(fs) == len(cp)
2539        for ref, instance in zip(fs, cp):
2540            assert ref.SOPInstanceUID == instance.SOPInstanceUID
2541
2542        assert ds.file_meta.TransferSyntaxUID == ImplicitVRLittleEndian
2543        assert ds.is_implicit_VR
2544        assert ds.is_little_endian
2545
2546    def test_file_id(self, tiny, tdir):
2547        """Test that the File IDs character sets switch correctly."""
2548        def my_len(self):
2549            return 10**6 + 1
2550
2551        FileSet.__len__ = my_len
2552        fs = FileSet(tiny)
2553        assert 10**6 + 1 == len(fs)
2554        fs, ds, paths = copy_fs(fs, tdir.name)
2555        instance = fs._instances[-1]
2556        # Was written with alphanumeric File IDs
2557        assert "IM00001D" in instance.path
2558
2559        def my_len(self):
2560            return 36**6 + 1
2561
2562        FileSet.__len__ = my_len
2563        fs = FileSet(tiny)
2564        assert 36**6 + 1 == len(fs)
2565        msg = (
2566            r"pydicom doesn't support writing File-sets with more than "
2567            r"2176782336 managed instances"
2568        )
2569        with pytest.raises(NotImplementedError, match=msg):
2570            fs.copy(tdir.name)
2571
2572    def test_additions(self, tiny, ct, tdir):
2573        """Test that additions get added when copying."""
2574        fs = FileSet(tiny)
2575        assert [] == fs.find(PatientID="1CT1")
2576        fs.add(ct)
2577        cp, ds, paths = copy_fs(fs, tdir.name)
2578        assert 51 == len(paths)
2579        assert (
2580            ('PT000001', 'ST000000', 'SE000000', 'IM000000')
2581        ) == paths[-1].parts[-4:]
2582        assert 51 == len(cp)
2583        assert not cp.is_staged
2584        instances = cp.find(PatientID="1CT1")
2585        assert 1 == len(instances)
2586        assert ct.SOPInstanceUID == instances[0].SOPInstanceUID
2587
2588        # Test addition is still staged in original fs
2589        assert fs.is_staged
2590        assert 51 == len(fs)
2591        assert 1 == len(fs._stage['+'])
2592        assert ct.SOPInstanceUID in fs._stage['+']
2593
2594    def test_removals(self, tiny, tdir):
2595        """Test that additions get added when copying."""
2596        fs = FileSet(tiny)
2597        instance = fs._instances[0]
2598        uid = instance.SOPInstanceUID
2599        instances = fs.find(SOPInstanceUID=uid)
2600        assert 1 == len(instances)
2601        fs.remove(instance)
2602        assert fs.is_staged
2603
2604        cp, ds, paths = copy_fs(fs, tdir.name)
2605        assert 49 == len(paths)
2606        assert (
2607            ('PT000000', 'ST000000', 'SE000000', 'IM000048')
2608        ) == paths[-1].parts[-4:]
2609        assert not cp.is_staged
2610        names = [p.name for p in paths]
2611        assert 49 == len(names)
2612        assert "IM000000" in names
2613        assert "IM000048" in names
2614        assert "IM000049" not in names
2615        assert [] == cp.find(SOPInstanceUID=uid)
2616
2617        # Test removal is still staged in original fs
2618        assert fs.is_staged
2619        assert 1 == len(fs._stage['-'])
2620        assert uid in fs._stage['-']
2621
2622    def test_additions_removals(self, tiny, ct, tdir):
2623        """Test copying with additions and removals."""
2624        mr = dcmread(get_testdata_file("MR_small.dcm"))
2625        fs = FileSet(tiny)
2626        assert [] == fs.find(PatientID=ct.PatientID)
2627        assert [] == fs.find(PatientID=mr.PatientID)
2628        fs.add(ct)
2629        fs.add(mr)
2630
2631        instances = fs._instances[:5]
2632        for instance in instances:
2633            matches = fs.find(SOPInstanceUID=instance.SOPInstanceUID)
2634            assert 1 == len(matches)
2635            fs.remove(instance)
2636
2637        assert fs.is_staged
2638        cp, ds, paths = copy_fs(fs, tdir.name)
2639
2640        # Test written instances
2641        parts = [p.parts for p in paths]
2642        assert 47 == len(parts)
2643        assert 'IM000000' == parts[0][-1]
2644        assert 'IM000044' == parts[44][-1]
2645        for ii in range(45):
2646            assert ('PT000000', 'ST000000', 'SE000000') == parts[ii][-4:-1]
2647
2648        assert (
2649            ('PT000001', 'ST000000', 'SE000000', 'IM000000') == parts[45][-4:]
2650        )
2651        assert (
2652            ('PT000002', 'ST000000', 'SE000000', 'IM000000') == parts[46][-4:]
2653        )
2654
2655        # Test copied fileset
2656        assert not cp.is_staged
2657        assert 1 == len(cp.find(SOPInstanceUID=ct.SOPInstanceUID))
2658        assert 1 == len(cp.find(SOPInstanceUID=mr.SOPInstanceUID))
2659        for instance in instances:
2660            matches = cp.find(SOPInstanceUID=instance.SOPInstanceUID)
2661            assert 0 == len(matches)
2662            assert instance.SOPInstanceUID not in cp._stage['-']
2663
2664        # Test original fileset
2665        assert fs.is_staged
2666        for instance in instances:
2667            assert instance.SOPInstanceUID in fs._stage['-']
2668        assert 2 == len(fs._stage['+'])
2669        assert 1 == len(fs.find(SOPInstanceUID=ct.SOPInstanceUID))
2670        assert 1 == len(fs.find(SOPInstanceUID=mr.SOPInstanceUID))
2671
2672
2673# record type
2674REFERENCE_1LEVEL = [
2675    ("HANGING PROTOCOL", HangingProtocolStorage),
2676    ("IMPLANT", GenericImplantTemplateStorage),
2677    ("IMPLANT ASSY", ImplantAssemblyTemplateStorage),
2678    ("IMPLANT GROUP", ImplantTemplateGroupStorage),
2679    ("PALETTE", ColorPaletteStorage),
2680]
2681# PATIENT -> STUDY -> SERIES -> record type
2682REFERENCE_4LEVEL = [
2683    # Record type, SOP Class, Modality, Optional element to include
2684    ("IMAGE", CTImageStorage, "CT", None),
2685    ("RT DOSE", RTDoseStorage, "RTDOSE", None),
2686    ("RT STRUCTURE SET", RTStructureSetStorage, "RTSTRUCT", None),
2687    ("RT PLAN", RTPlanStorage, "RTPLAN", "RTPlanLabel"),
2688    ("RT TREAT RECORD", RTBeamsTreatmentRecordStorage, "RTRECORD", None),
2689    ("PRESENTATION", GrayscaleSoftcopyPresentationStateStorage, "PR", None),
2690    ("WAVEFORM", TwelveLeadECGWaveformStorage, "ECG", None),
2691    ("SR DOCUMENT", BasicTextSRStorage, "SR", None),
2692    ("KEY OBJECT DOC", KeyObjectSelectionDocumentStorage, "KO", None),
2693    ("SPECTROSCOPY", MRSpectroscopyStorage, "MS", None),
2694    ("RAW DATA", RawDataStorage, "OT", None),
2695    ("REGISTRATION", SpatialRegistrationStorage, "REG", None),
2696    ("FIDUCIAL", SpatialFiducialsStorage, "FID", None),
2697    ("ENCAP DOC", EncapsulatedPDFStorage, "DOC", "EncapsulatedDocument"),
2698    ("VALUE MAP", RealWorldValueMappingStorage, "RWV", None),
2699    ("STEREOMETRIC", StereometricRelationshipStorage, "SMR", None),
2700    ("PLAN", RTBeamsDeliveryInstructionStorage, "PLAN", None),
2701    ("MEASUREMENT", LensometryMeasurementsStorage, "LEN", None),
2702    ("SURFACE", SurfaceSegmentationStorage, "LS", None),
2703    ("SURFACE SCAN", SurfaceScanMeshStorage, "LS", None),
2704    ("TRACT", TractographyResultsStorage, "None", None),
2705    ("ASSESSMENT", ContentAssessmentResultsStorage, "ASMT", None),
2706    ("RADIOTHERAPY", CArmPhotonElectronRadiationStorage, "RTRAD", None),
2707]
2708
2709
2710@pytest.mark.filterwarnings("ignore:The 'DicomDir'")
2711@pytest.mark.parametrize("rtype, sop", REFERENCE_1LEVEL)
2712def test_one_level_record(rtype, sop, dummy, tdir):
2713    """Test adding instances that require a single level hierarchy."""
2714    ds, opt = dummy
2715    ds.SOPClassUID = sop
2716
2717    fs = FileSet()
2718    fs.add(ds)
2719    assert 1 == len(fs.find(SOPInstanceUID=ds.SOPInstanceUID))
2720    dicomdir, paths = write_fs(fs, tdir.name)
2721    assert 1 == len(paths)
2722    [leaf] = dicomdir.DirectoryRecordSequence
2723    assert rtype == leaf.DirectoryRecordType
2724    assert sop == leaf.ReferencedSOPClassUIDInFile
2725    assert ds.SOPInstanceUID == leaf.ReferencedSOPInstanceUIDInFile
2726    assert ds.file_meta.TransferSyntaxUID == (
2727        leaf.ReferencedTransferSyntaxUIDInFile
2728    )
2729    file_id = list(paths[0].relative_to(fs.path).parts)
2730    assert file_id == [leaf.ReferencedFileID]
2731    assert Dataset(ds) == dcmread(paths[0])
2732
2733    # Test the 1C elements
2734    ds.update(opt)
2735    fs = FileSet()
2736    fs.add(ds)
2737
2738
2739@pytest.mark.filterwarnings("ignore:The 'DicomDir'")
2740@pytest.mark.parametrize("rtype, sop, modality, kw", REFERENCE_4LEVEL)
2741def test_four_level_record(rtype, sop, modality, kw, dummy, tdir):
2742    """Test adding instances that require the 4-level hierarchy."""
2743    ds, opt = dummy
2744    ds.SOPClassUID = sop
2745    ds.Modality = modality
2746    if kw == "RTPlanLabel":
2747        setattr(ds, kw, "Value")
2748    elif kw == "EncapsulatedDocument":
2749        setattr(ds, kw, b'\x00\x01')
2750
2751    fs = FileSet()
2752    fs.add(ds)
2753    assert 1 == len(fs.find(SOPInstanceUID=ds.SOPInstanceUID))
2754    dicomdir, paths = write_fs(fs, tdir.name)
2755    assert 1 == len(paths)
2756    [pt, st, se, leaf] = dicomdir.DirectoryRecordSequence
2757    assert "PATIENT" == pt.DirectoryRecordType
2758    assert ds.PatientID == pt.PatientID
2759    assert ds.PatientName == pt.PatientName
2760    assert "STUDY" == st.DirectoryRecordType
2761    assert ds.StudyDate == st.StudyDate
2762    assert ds.StudyTime == st.StudyTime
2763    assert ds.StudyInstanceUID == st.StudyInstanceUID
2764    assert "SERIES" == se.DirectoryRecordType
2765    assert modality == se.Modality
2766    assert rtype == leaf.DirectoryRecordType
2767    assert sop == leaf.ReferencedSOPClassUIDInFile
2768    assert ds.SOPInstanceUID == leaf.ReferencedSOPInstanceUIDInFile
2769    assert ds.file_meta.TransferSyntaxUID == (
2770        leaf.ReferencedTransferSyntaxUIDInFile
2771    )
2772    file_id = list(paths[0].relative_to(fs.path).parts)
2773    assert file_id == leaf.ReferencedFileID
2774    assert Dataset(ds) == dcmread(paths[0])
2775
2776    # Test the 1C elements
2777    ds.update(opt)
2778    fs = FileSet()
2779    fs.add(ds)
2780