1#!/usr/bin/env python3
2
3import os
4import sys
5import hashlib
6from io import BytesIO
7from typing import List, Dict, Undefined, Union, IO
8
9
10B_SIZE = 512
11B_COUNT = 880 * 2
12T_HEADER = 2
13T_DATA = 8
14T_LIST = 16
15ST_ROOT = 1
16ST_USERDIR = 2
17ST_FILE = 0xFFFFFFFF - 3 + 1
18
19
20def char(block: bytes, pos: int) -> str:
21    return block[pos : pos + 1].decode("ISO-8859-1")
22
23
24def ubyte(block: bytes, pos: int) -> int:
25    return block[pos]
26
27
28def ulong(block: bytes, pos: int) -> int:
29    a = block[pos]
30    b = block[pos + 1]
31    c = block[pos + 2]
32    d = block[pos + 3]
33    return (a << 24) | (b << 16) | (c << 8) | d
34
35
36def checksum_block(block: bytes) -> int:
37    # clear checksum field
38    data = block[:20] + b"\0\0\0\0" + block[24:]
39    sum = 0
40    for pos in range(0, B_SIZE, 4):
41        sum += ulong(data, pos)
42        sum &= 0xFFFFFFFF
43    sum = -sum
44    sum &= 0xFFFFFFFF
45    return sum
46
47
48def verify_block(data: bytes) -> bool:
49    sum = 0
50    for pos in range(0, B_SIZE, 4):
51        sum += ulong(data, pos)
52        sum &= 0xFFFFFFFF
53    return sum == 0
54
55
56class Block(object):
57    def __init__(self, data: bytes) -> None:
58        self.data = data
59
60    def char(self, pos: int) -> str:
61        return char(self.data, pos)
62
63    def string(self, pos: int, len: int) -> str:
64        return self.data[pos : pos + len].decode("ISO-8859-1")
65
66    def ubyte(self, pos: int) -> int:
67        return ubyte(self.data, pos)
68
69    def ulong(self, pos: int) -> int:
70        return ulong(self.data, pos)
71
72
73class FileInfo(object):
74    def __init__(self) -> None:
75        self.block_list = List[int]()
76        self.header_block = -1
77        self.name = ""
78        self.path = ""
79        self.comment = Undefined(str)
80        self.time = ""
81        self.mode = 0
82        self.size = 0
83
84
85class ADFFile(object):
86
87    FFS_FLAG = 1
88    INTL_ONLY_FLAG = 2
89    DIRC_AND_INTL_FLAG = 4
90
91    def __init__(self, obj: Union[str, bytes, IO[bytes]]) -> None:
92        if isinstance(obj, str):
93            with open(obj, "rb") as f:
94                data = f.read()
95        elif isinstance(obj, bytes):
96            data = obj
97        else:
98            data = obj.read()
99        assert isinstance(data, bytes)
100        # print(len(data))
101        assert len(data) == B_SIZE * B_COUNT
102        self.blocks = List[Block]()
103        for i in range(0, B_SIZE * B_COUNT, B_SIZE):
104            self.blocks.append(Block(data[i : i + B_SIZE]))
105        assert len(self.blocks) == B_COUNT
106        self.block_usage = [List[str]() for _ in range(B_COUNT)]
107        self.dos = False
108        self.ofs = False
109        self.ffs = False
110        self.warnings = List[str]()
111        self.file_map = Dict[str, FileInfo]()
112        self.root_block_number = 880
113        self.bitmap_pages = List[int]()
114        self._parse()
115
116    def root_block(self) -> Block:
117        return self.blocks[self.root_block_number]
118
119    def _parse(self) -> None:
120        b = self.blocks[0]
121        if b.char(0) == "D" and b.char(1) == "O" and b.char(2) == "S":
122            self.dos = True
123        else:
124            return
125        flags = b.ubyte(3)
126        self.ffs = flags & self.FFS_FLAG != 0
127        self.ofs = not self.ffs
128
129        self.root_block_number = b.ulong(8)
130        self.bitmap_pages = []
131        if self.root_block_number != 880:
132            self.warnings.append(
133                "Root block is at position {0}, "
134                "not 880".format(self.root_block_number)
135            )
136            self.root_block_number = 880
137            self.warnings.append("Trying 880 anyway...")
138
139        self._parse_root_block()
140        self.block_usage[self.root_block_number].append("root block")
141
142        for i, usage in enumerate(self.block_usage):
143            if "used" in usage:
144                if len(usage) < 2:
145                    self.warnings.append(
146                        "block {0} marked as used but no actual usage".format(
147                            i
148                        )
149                    )
150            if "free" in usage:
151                if len(usage) != 1:
152                    self.warnings.append(
153                        "block {0} marked as used but used for {1}".format(
154                            i, repr(usage)
155                        )
156                    )
157
158    def _parse_root_block(self) -> None:
159        b = self.root_block()
160        type = b.ulong(0)
161        secondary_type = b.ulong(B_SIZE - 4)
162        if type != T_HEADER:
163            self.warnings.append("root block does not have T_HEADER type")
164        if secondary_type != ST_ROOT:
165            self.warnings.append(
166                "root block does not have ST_ROOT secondary " "type"
167            )
168
169        days = b.ulong(B_SIZE - 92)
170        mins = b.ulong(B_SIZE - 88)
171        ticks = b.ulong(B_SIZE - 84)
172        self.root_mtime = "{0:05d}:{1:04d}:{2:03d}".format(days, mins, ticks)
173
174        days = b.ulong(B_SIZE - 40)
175        mins = b.ulong(B_SIZE - 36)
176        ticks = b.ulong(B_SIZE - 32)
177        self.disk_mtime = "{0:05d}:{1:04d}:{2:03d}".format(days, mins, ticks)
178
179        days = b.ulong(B_SIZE - 28)
180        mins = b.ulong(B_SIZE - 24)
181        ticks = b.ulong(B_SIZE - 20)
182        self.disk_ctime = "{0:05d}:{1:04d}:{2:03d}".format(days, mins, ticks)
183
184        bm_flag = b.ulong(B_SIZE - 200)
185        if bm_flag != 0xFFFFFFFF:
186            self.warnings.append(
187                "bm_flag != 0xffffffff ({0:8x})".format(bm_flag)
188            )
189            if bm_flag == 0:
190                self.warnings.append("bm_flag is ZERO")
191        for i in range(25):
192            self.bitmap_pages.append(b.ulong(B_SIZE - 196 + 4 * i))
193        self._parse_used_blocks()
194
195        name_len = b.ubyte(B_SIZE - 80)
196        self.volume_name = b.string(B_SIZE - 79, name_len)
197        # self.volume_name = self.volume_name.decode("ISO-8859-1")
198        self._parse_directory_content("", self.root_block_number)
199        if not verify_block(b.data):
200            self.warnings.append("bad root checksum")
201
202    def _parse_used_blocks(self) -> None:
203        for i in range(len(self.bitmap_pages)):
204            block_number = self.bitmap_pages[i]
205            if block_number:
206                if block_number >= B_COUNT:
207                    self.warnings.append(
208                        "invalid block number {0} in bitmap pages".format(
209                            block_number
210                        )
211                    )
212                    continue
213                self.block_usage[block_number].append("bitmap block")
214
215            if i == 0:
216                if not block_number:
217                    self.warnings.append("no bitmap block for disk")
218                    continue
219            else:
220                if block_number:
221                    self.warnings.append(
222                        "unexpected additional bitmap block..."
223                    )
224                continue
225
226            b = self.blocks[block_number]
227            if not verify_block(b.data):
228                self.warnings.append(
229                    "bitmap block checksum is invalid (" "ignoring this block)"
230                )
231                continue
232            for bit_index in range(1758):
233                long_index = bit_index // 32
234                bit = bit_index % 32
235                ul = b.ulong(4 + long_index * 4)
236                free = ul & (1 << bit)
237                if free:
238                    self.block_usage[2 + bit_index].append("free")
239                else:
240                    self.block_usage[2 + bit_index].append("used")
241
242    def _parse_directory_content(self, path: str, block_number: int) -> None:
243        d_b = self.blocks[block_number]
244        d_secondary_type = d_b.ulong(B_SIZE - 4)
245        if d_secondary_type == ST_ROOT:
246            ht_size = d_b.ulong(12)
247            assert ht_size == (B_SIZE // 4) - 56
248
249        for i in range((B_SIZE // 4) - 56):
250            entry = d_b.ulong(24 + i * 4)
251            while entry:
252                # print(entry)
253                if entry >= B_COUNT:
254                    self.warnings.append(
255                        "directory entry refers to an invalid block "
256                        "number {0}".format(entry)
257                    )
258                    entry = 0
259                    continue
260                b = self.blocks[entry]
261                type = b.ulong(0)
262                secondary_type = b.ulong(B_SIZE - 4)
263                parent = b.ulong(B_SIZE - 12)
264                assert type == T_HEADER
265                assert secondary_type in [ST_USERDIR, ST_FILE]
266                assert parent == block_number
267
268                if secondary_type == ST_USERDIR:
269                    self._parse_directory(path, entry)
270                elif secondary_type == ST_FILE:
271                    self._parse_file(path, entry)
272                else:
273                    raise Exception("neither ST_USERDIR or ST_FILE")
274
275                entry = b.ulong(B_SIZE - 16)
276
277    def _parse_file(self, path: str, block_number: int) -> None:
278        b = self.blocks[block_number]
279        header_key = b.ulong(4)
280        assert header_key == block_number
281
282        file_info = FileInfo()
283        file_info.header_block = block_number
284
285        name_len = b.ubyte(B_SIZE - 80)
286        file_info.name = b.string(B_SIZE - 79, name_len)
287        file_info.path = path + file_info.name
288        # print(file_info.path)
289        file_key = file_info.path.lower()
290        if file_key in self.file_map:
291            self.warnings.append(
292                "duplicate entries for file name " "{0}".format(file_key)
293            )
294        self.file_map[file_key] = file_info
295
296        self.block_usage[block_number].append(
297            "header block for file " + file_info.path
298        )
299        if not verify_block(b.data):
300            self.warnings.append(
301                "bad checksum for header for " + file_info.path
302            )
303
304        file_info.mode = b.ulong(B_SIZE - 192)
305        file_info.size = b.ulong(B_SIZE - 188)
306        comment_len = b.ubyte(B_SIZE - 184)
307        file_info.comment = b.string(B_SIZE - 183, comment_len)
308
309        days = b.ulong(B_SIZE - 92)
310        mins = b.ulong(B_SIZE - 88)
311        ticks = b.ulong(B_SIZE - 84)
312        file_info.time = "{0:05d}:{1:04d}:{2:03d}".format(days, mins, ticks)
313
314        first_data = b.ulong(16)
315        next_data = first_data
316        file_blocks_1 = List[int]()
317        ofs_accum_size = 0
318        k = 0
319
320        while next_data and self.ofs:
321            file_blocks_1.append(next_data)
322            data_b = self.blocks[next_data]
323            data_type = data_b.ulong(0)
324            assert data_type == T_DATA
325            header_key = data_b.ulong(4)
326            if header_key != block_number:
327                self.warnings.append("header_key != block_number")
328
329            seq_num = data_b.ulong(8)
330            if seq_num != len(file_blocks_1):
331                self.warnings.append(
332                    "block {0:04d} expected seq_num {1} (found {2}) for "
333                    "file {3}".format(
334                        next_data, len(file_blocks_1), seq_num, file_info.path
335                    )
336                )
337            data_size = data_b.ulong(12)
338            ofs_accum_size += data_size
339
340            if not verify_block(data_b.data):
341                self.warnings.append(
342                    "block {1:04d} - bad checksum for data block #{0} for "
343                    "file {2}".format(k + 1, next_data, file_info.path)
344                )
345            # self.block_usage[next_data].append(
346            #     "block {2:04d} - data block #{0} for file {1}".format(
347            #         k + 1, file_info.path, next_data))
348
349            next_data = data_b.ulong(16)
350            k += 1
351
352        file_blocks_2 = List[int]()
353        extension = block_number
354        k = 0
355        while extension:
356            ext_b = self.blocks[extension]
357            ext_type = ext_b.ulong(0)
358            ext_secondary_type = ext_b.ulong(B_SIZE - 4)
359            if extension != block_number:
360                assert ext_type == T_LIST
361            assert ext_secondary_type == ST_FILE
362            high_seq = ext_b.ulong(8)
363            for i in range(high_seq):
364                file_blocks_2.append(ext_b.ulong(B_SIZE - 204 - i * 4))
365
366            parent = ext_b.ulong(B_SIZE - 12)
367            if k > 0 and parent != block_number:
368                self.warnings.append(
369                    "block {3:04d} - expected parent {0} (found {1} for OFS "
370                    "extension block #{2} for file {4}".format(
371                        block_number, parent, k, extension, file_info.path
372                    )
373                )
374
375            if not verify_block(ext_b.data):
376                self.warnings.append(
377                    "block {2:04d} - bad checksum for extension block #{0} "
378                    "for file {1}".format(k, file_info.path, extension)
379                )
380            extension = ext_b.ulong(B_SIZE - 8)
381            if extension:
382                self.block_usage[extension].append(
383                    "file extension block #{0} for file {1}".format(
384                        k, file_info.path
385                    )
386                )
387            k += 1
388
389        if self.ofs:
390            if file_blocks_1 != file_blocks_2:
391                self.warnings.append(
392                    "block list mismatch (extension vs OFS headers) for "
393                    "file {0}".format(file_info.path)
394                )
395            if len(file_blocks_1) != len(file_blocks_2):
396                self.warnings.append(
397                    "block list length mismatch (extension {0} vs OFS "
398                    "headers {1}) for file {2}".format(
399                        len(file_blocks_2), len(file_blocks_1), file_info.path
400                    )
401                )
402            if ofs_accum_size != file_info.size:
403                self.warnings.append(
404                    "file size mismatch (file header {0} vs OFS data block "
405                    "headers {1}) for file {2}".format(
406                        file_info.size, ofs_accum_size, file_info.path
407                    )
408                )
409
410        file_info.block_list = file_blocks_2
411
412        for i, bn in enumerate(file_blocks_2):
413            self.block_usage[bn].append(
414                "data block #{0} for file {1}".format(
415                    i + 1, file_info.path, bn
416                )
417            )
418
419    def _parse_directory(self, path: str, block_number: int) -> None:
420        b = self.blocks[block_number]
421        header_key = b.ulong(4)
422        assert header_key == block_number
423
424        file_info = FileInfo()
425        file_info.header_block = block_number
426
427        name_len = b.ubyte(B_SIZE - 80)
428        file_info.name = b.string(B_SIZE - 79, name_len)
429        file_info.path = path + file_info.name + "/"
430        # print(file_info.path)
431        file_key = file_info.path.lower()
432        if file_key[:-1] in self.file_map:
433            self.warnings.append(
434                "Duplicate entries for file name " "{0}".format(file_key)
435            )
436        self.file_map[file_key] = file_info
437
438        self.block_usage[block_number].append(
439            "header block for file " + file_info.path
440        )
441        if not verify_block(b.data):
442            self.warnings.append(
443                "bad checksum for header for " + file_info.path
444            )
445
446        file_info.mode = b.ulong(B_SIZE - 192)
447        file_info.size = 0
448        comment_len = b.ubyte(B_SIZE - 184)
449        file_info.comment = b.string(B_SIZE - 183, comment_len)
450
451        days = b.ulong(B_SIZE - 92)
452        mins = b.ulong(B_SIZE - 88)
453        ticks = b.ulong(B_SIZE - 84)
454        file_info.time = "{0:05d}:{1:04d}:{2:03d}".format(days, mins, ticks)
455
456        self._parse_directory_content(file_info.path, block_number)
457
458    def namelist(self) -> List[str]:
459        names = List[str]()
460        keys = sorted(self.file_map.keys())
461        for key in keys:
462            names.append(self.file_map[key].path)
463        return names
464
465    def getinfo(self, name: str) -> FileInfo:
466        name = name.lower()
467        return self.file_map[name]
468
469    def open(self, name: str, mode: str = "r") -> BytesIO:
470        assert mode == "r"
471        return BytesIO(self.read(name))
472
473    def read(self, name: str) -> bytes:
474        name = name.lower()
475        print(repr(self.file_map))
476        print(repr(name))
477        file_info = self.file_map[name]
478        bytes_left = file_info.size
479        data = List[bytes]()
480        for block_number in file_info.block_list:
481            if self.ffs:
482                start_index = 0
483            else:
484                start_index = 24
485            max_bsize = B_SIZE - start_index
486            read_size = min(bytes_left, max_bsize)
487            block_data = self.blocks[block_number].data[
488                start_index : start_index + read_size
489            ]
490            bytes_left -= len(block_data)
491            data.append(block_data)
492        assert bytes_left == 0
493        return b"".join(data)
494
495
496def main() -> None:
497    adf = ADFFile(sys.argv[1])
498    print("")
499    print(os.path.basename(sys.argv[1]))
500    print("Volume:", adf.volume_name, "(OFS)" if adf.ofs else "(FFS)")
501    print("")
502    for name in adf.namelist():
503        assert isinstance(name, str)
504        print(name)
505        if not name.endswith("/"):
506            data = adf.open(name).read()
507            print(" ", len(data), "bytes")
508            print(" ", hashlib.sha1(data).hexdigest())
509            # tracks = set()
510            # for block in adf.file_map[name.lower()].block_list:
511            #     track = block // 11
512            #     cylinder = track // 2
513            #     side = track % 2
514            #     tracks.add((cylinder, side))
515            # tracks = sorted(tracks)
516            # print(tracks)
517    print("")
518    for warning in adf.warnings:
519        print("WARNING:", warning)
520    pass
521    # for i, usage in enumerate(adf.block_usage):
522    #     print(i, usage)
523    for name in sys.argv[2:]:
524        data = adf.open(name).read()
525        with open(name + ".dump", "wb") as f:
526            f.write(data)
527            print("wrote " + name + ".dump")
528
529
530if __name__ == "__main__":
531    main()
532