1#!/usr/bin/env python3 2 3import os 4import sys 5import hashlib 6from io import BytesIO 7from typing import List, Dict, Undefined, Union, IO 8 9 10B_SIZE = 512 11B_COUNT = 880 * 2 12T_HEADER = 2 13T_DATA = 8 14T_LIST = 16 15ST_ROOT = 1 16ST_USERDIR = 2 17ST_FILE = 0xFFFFFFFF - 3 + 1 18 19 20def char(block: bytes, pos: int) -> str: 21 return block[pos : pos + 1].decode("ISO-8859-1") 22 23 24def ubyte(block: bytes, pos: int) -> int: 25 return block[pos] 26 27 28def ulong(block: bytes, pos: int) -> int: 29 a = block[pos] 30 b = block[pos + 1] 31 c = block[pos + 2] 32 d = block[pos + 3] 33 return (a << 24) | (b << 16) | (c << 8) | d 34 35 36def checksum_block(block: bytes) -> int: 37 # clear checksum field 38 data = block[:20] + b"\0\0\0\0" + block[24:] 39 sum = 0 40 for pos in range(0, B_SIZE, 4): 41 sum += ulong(data, pos) 42 sum &= 0xFFFFFFFF 43 sum = -sum 44 sum &= 0xFFFFFFFF 45 return sum 46 47 48def verify_block(data: bytes) -> bool: 49 sum = 0 50 for pos in range(0, B_SIZE, 4): 51 sum += ulong(data, pos) 52 sum &= 0xFFFFFFFF 53 return sum == 0 54 55 56class Block(object): 57 def __init__(self, data: bytes) -> None: 58 self.data = data 59 60 def char(self, pos: int) -> str: 61 return char(self.data, pos) 62 63 def string(self, pos: int, len: int) -> str: 64 return self.data[pos : pos + len].decode("ISO-8859-1") 65 66 def ubyte(self, pos: int) -> int: 67 return ubyte(self.data, pos) 68 69 def ulong(self, pos: int) -> int: 70 return ulong(self.data, pos) 71 72 73class FileInfo(object): 74 def __init__(self) -> None: 75 self.block_list = List[int]() 76 self.header_block = -1 77 self.name = "" 78 self.path = "" 79 self.comment = Undefined(str) 80 self.time = "" 81 self.mode = 0 82 self.size = 0 83 84 85class ADFFile(object): 86 87 FFS_FLAG = 1 88 INTL_ONLY_FLAG = 2 89 DIRC_AND_INTL_FLAG = 4 90 91 def __init__(self, obj: Union[str, bytes, IO[bytes]]) -> None: 92 if isinstance(obj, str): 93 with open(obj, "rb") as f: 94 data = f.read() 95 elif isinstance(obj, bytes): 96 data = obj 97 else: 98 data = obj.read() 99 assert isinstance(data, bytes) 100 # print(len(data)) 101 assert len(data) == B_SIZE * B_COUNT 102 self.blocks = List[Block]() 103 for i in range(0, B_SIZE * B_COUNT, B_SIZE): 104 self.blocks.append(Block(data[i : i + B_SIZE])) 105 assert len(self.blocks) == B_COUNT 106 self.block_usage = [List[str]() for _ in range(B_COUNT)] 107 self.dos = False 108 self.ofs = False 109 self.ffs = False 110 self.warnings = List[str]() 111 self.file_map = Dict[str, FileInfo]() 112 self.root_block_number = 880 113 self.bitmap_pages = List[int]() 114 self._parse() 115 116 def root_block(self) -> Block: 117 return self.blocks[self.root_block_number] 118 119 def _parse(self) -> None: 120 b = self.blocks[0] 121 if b.char(0) == "D" and b.char(1) == "O" and b.char(2) == "S": 122 self.dos = True 123 else: 124 return 125 flags = b.ubyte(3) 126 self.ffs = flags & self.FFS_FLAG != 0 127 self.ofs = not self.ffs 128 129 self.root_block_number = b.ulong(8) 130 self.bitmap_pages = [] 131 if self.root_block_number != 880: 132 self.warnings.append( 133 "Root block is at position {0}, " 134 "not 880".format(self.root_block_number) 135 ) 136 self.root_block_number = 880 137 self.warnings.append("Trying 880 anyway...") 138 139 self._parse_root_block() 140 self.block_usage[self.root_block_number].append("root block") 141 142 for i, usage in enumerate(self.block_usage): 143 if "used" in usage: 144 if len(usage) < 2: 145 self.warnings.append( 146 "block {0} marked as used but no actual usage".format( 147 i 148 ) 149 ) 150 if "free" in usage: 151 if len(usage) != 1: 152 self.warnings.append( 153 "block {0} marked as used but used for {1}".format( 154 i, repr(usage) 155 ) 156 ) 157 158 def _parse_root_block(self) -> None: 159 b = self.root_block() 160 type = b.ulong(0) 161 secondary_type = b.ulong(B_SIZE - 4) 162 if type != T_HEADER: 163 self.warnings.append("root block does not have T_HEADER type") 164 if secondary_type != ST_ROOT: 165 self.warnings.append( 166 "root block does not have ST_ROOT secondary " "type" 167 ) 168 169 days = b.ulong(B_SIZE - 92) 170 mins = b.ulong(B_SIZE - 88) 171 ticks = b.ulong(B_SIZE - 84) 172 self.root_mtime = "{0:05d}:{1:04d}:{2:03d}".format(days, mins, ticks) 173 174 days = b.ulong(B_SIZE - 40) 175 mins = b.ulong(B_SIZE - 36) 176 ticks = b.ulong(B_SIZE - 32) 177 self.disk_mtime = "{0:05d}:{1:04d}:{2:03d}".format(days, mins, ticks) 178 179 days = b.ulong(B_SIZE - 28) 180 mins = b.ulong(B_SIZE - 24) 181 ticks = b.ulong(B_SIZE - 20) 182 self.disk_ctime = "{0:05d}:{1:04d}:{2:03d}".format(days, mins, ticks) 183 184 bm_flag = b.ulong(B_SIZE - 200) 185 if bm_flag != 0xFFFFFFFF: 186 self.warnings.append( 187 "bm_flag != 0xffffffff ({0:8x})".format(bm_flag) 188 ) 189 if bm_flag == 0: 190 self.warnings.append("bm_flag is ZERO") 191 for i in range(25): 192 self.bitmap_pages.append(b.ulong(B_SIZE - 196 + 4 * i)) 193 self._parse_used_blocks() 194 195 name_len = b.ubyte(B_SIZE - 80) 196 self.volume_name = b.string(B_SIZE - 79, name_len) 197 # self.volume_name = self.volume_name.decode("ISO-8859-1") 198 self._parse_directory_content("", self.root_block_number) 199 if not verify_block(b.data): 200 self.warnings.append("bad root checksum") 201 202 def _parse_used_blocks(self) -> None: 203 for i in range(len(self.bitmap_pages)): 204 block_number = self.bitmap_pages[i] 205 if block_number: 206 if block_number >= B_COUNT: 207 self.warnings.append( 208 "invalid block number {0} in bitmap pages".format( 209 block_number 210 ) 211 ) 212 continue 213 self.block_usage[block_number].append("bitmap block") 214 215 if i == 0: 216 if not block_number: 217 self.warnings.append("no bitmap block for disk") 218 continue 219 else: 220 if block_number: 221 self.warnings.append( 222 "unexpected additional bitmap block..." 223 ) 224 continue 225 226 b = self.blocks[block_number] 227 if not verify_block(b.data): 228 self.warnings.append( 229 "bitmap block checksum is invalid (" "ignoring this block)" 230 ) 231 continue 232 for bit_index in range(1758): 233 long_index = bit_index // 32 234 bit = bit_index % 32 235 ul = b.ulong(4 + long_index * 4) 236 free = ul & (1 << bit) 237 if free: 238 self.block_usage[2 + bit_index].append("free") 239 else: 240 self.block_usage[2 + bit_index].append("used") 241 242 def _parse_directory_content(self, path: str, block_number: int) -> None: 243 d_b = self.blocks[block_number] 244 d_secondary_type = d_b.ulong(B_SIZE - 4) 245 if d_secondary_type == ST_ROOT: 246 ht_size = d_b.ulong(12) 247 assert ht_size == (B_SIZE // 4) - 56 248 249 for i in range((B_SIZE // 4) - 56): 250 entry = d_b.ulong(24 + i * 4) 251 while entry: 252 # print(entry) 253 if entry >= B_COUNT: 254 self.warnings.append( 255 "directory entry refers to an invalid block " 256 "number {0}".format(entry) 257 ) 258 entry = 0 259 continue 260 b = self.blocks[entry] 261 type = b.ulong(0) 262 secondary_type = b.ulong(B_SIZE - 4) 263 parent = b.ulong(B_SIZE - 12) 264 assert type == T_HEADER 265 assert secondary_type in [ST_USERDIR, ST_FILE] 266 assert parent == block_number 267 268 if secondary_type == ST_USERDIR: 269 self._parse_directory(path, entry) 270 elif secondary_type == ST_FILE: 271 self._parse_file(path, entry) 272 else: 273 raise Exception("neither ST_USERDIR or ST_FILE") 274 275 entry = b.ulong(B_SIZE - 16) 276 277 def _parse_file(self, path: str, block_number: int) -> None: 278 b = self.blocks[block_number] 279 header_key = b.ulong(4) 280 assert header_key == block_number 281 282 file_info = FileInfo() 283 file_info.header_block = block_number 284 285 name_len = b.ubyte(B_SIZE - 80) 286 file_info.name = b.string(B_SIZE - 79, name_len) 287 file_info.path = path + file_info.name 288 # print(file_info.path) 289 file_key = file_info.path.lower() 290 if file_key in self.file_map: 291 self.warnings.append( 292 "duplicate entries for file name " "{0}".format(file_key) 293 ) 294 self.file_map[file_key] = file_info 295 296 self.block_usage[block_number].append( 297 "header block for file " + file_info.path 298 ) 299 if not verify_block(b.data): 300 self.warnings.append( 301 "bad checksum for header for " + file_info.path 302 ) 303 304 file_info.mode = b.ulong(B_SIZE - 192) 305 file_info.size = b.ulong(B_SIZE - 188) 306 comment_len = b.ubyte(B_SIZE - 184) 307 file_info.comment = b.string(B_SIZE - 183, comment_len) 308 309 days = b.ulong(B_SIZE - 92) 310 mins = b.ulong(B_SIZE - 88) 311 ticks = b.ulong(B_SIZE - 84) 312 file_info.time = "{0:05d}:{1:04d}:{2:03d}".format(days, mins, ticks) 313 314 first_data = b.ulong(16) 315 next_data = first_data 316 file_blocks_1 = List[int]() 317 ofs_accum_size = 0 318 k = 0 319 320 while next_data and self.ofs: 321 file_blocks_1.append(next_data) 322 data_b = self.blocks[next_data] 323 data_type = data_b.ulong(0) 324 assert data_type == T_DATA 325 header_key = data_b.ulong(4) 326 if header_key != block_number: 327 self.warnings.append("header_key != block_number") 328 329 seq_num = data_b.ulong(8) 330 if seq_num != len(file_blocks_1): 331 self.warnings.append( 332 "block {0:04d} expected seq_num {1} (found {2}) for " 333 "file {3}".format( 334 next_data, len(file_blocks_1), seq_num, file_info.path 335 ) 336 ) 337 data_size = data_b.ulong(12) 338 ofs_accum_size += data_size 339 340 if not verify_block(data_b.data): 341 self.warnings.append( 342 "block {1:04d} - bad checksum for data block #{0} for " 343 "file {2}".format(k + 1, next_data, file_info.path) 344 ) 345 # self.block_usage[next_data].append( 346 # "block {2:04d} - data block #{0} for file {1}".format( 347 # k + 1, file_info.path, next_data)) 348 349 next_data = data_b.ulong(16) 350 k += 1 351 352 file_blocks_2 = List[int]() 353 extension = block_number 354 k = 0 355 while extension: 356 ext_b = self.blocks[extension] 357 ext_type = ext_b.ulong(0) 358 ext_secondary_type = ext_b.ulong(B_SIZE - 4) 359 if extension != block_number: 360 assert ext_type == T_LIST 361 assert ext_secondary_type == ST_FILE 362 high_seq = ext_b.ulong(8) 363 for i in range(high_seq): 364 file_blocks_2.append(ext_b.ulong(B_SIZE - 204 - i * 4)) 365 366 parent = ext_b.ulong(B_SIZE - 12) 367 if k > 0 and parent != block_number: 368 self.warnings.append( 369 "block {3:04d} - expected parent {0} (found {1} for OFS " 370 "extension block #{2} for file {4}".format( 371 block_number, parent, k, extension, file_info.path 372 ) 373 ) 374 375 if not verify_block(ext_b.data): 376 self.warnings.append( 377 "block {2:04d} - bad checksum for extension block #{0} " 378 "for file {1}".format(k, file_info.path, extension) 379 ) 380 extension = ext_b.ulong(B_SIZE - 8) 381 if extension: 382 self.block_usage[extension].append( 383 "file extension block #{0} for file {1}".format( 384 k, file_info.path 385 ) 386 ) 387 k += 1 388 389 if self.ofs: 390 if file_blocks_1 != file_blocks_2: 391 self.warnings.append( 392 "block list mismatch (extension vs OFS headers) for " 393 "file {0}".format(file_info.path) 394 ) 395 if len(file_blocks_1) != len(file_blocks_2): 396 self.warnings.append( 397 "block list length mismatch (extension {0} vs OFS " 398 "headers {1}) for file {2}".format( 399 len(file_blocks_2), len(file_blocks_1), file_info.path 400 ) 401 ) 402 if ofs_accum_size != file_info.size: 403 self.warnings.append( 404 "file size mismatch (file header {0} vs OFS data block " 405 "headers {1}) for file {2}".format( 406 file_info.size, ofs_accum_size, file_info.path 407 ) 408 ) 409 410 file_info.block_list = file_blocks_2 411 412 for i, bn in enumerate(file_blocks_2): 413 self.block_usage[bn].append( 414 "data block #{0} for file {1}".format( 415 i + 1, file_info.path, bn 416 ) 417 ) 418 419 def _parse_directory(self, path: str, block_number: int) -> None: 420 b = self.blocks[block_number] 421 header_key = b.ulong(4) 422 assert header_key == block_number 423 424 file_info = FileInfo() 425 file_info.header_block = block_number 426 427 name_len = b.ubyte(B_SIZE - 80) 428 file_info.name = b.string(B_SIZE - 79, name_len) 429 file_info.path = path + file_info.name + "/" 430 # print(file_info.path) 431 file_key = file_info.path.lower() 432 if file_key[:-1] in self.file_map: 433 self.warnings.append( 434 "Duplicate entries for file name " "{0}".format(file_key) 435 ) 436 self.file_map[file_key] = file_info 437 438 self.block_usage[block_number].append( 439 "header block for file " + file_info.path 440 ) 441 if not verify_block(b.data): 442 self.warnings.append( 443 "bad checksum for header for " + file_info.path 444 ) 445 446 file_info.mode = b.ulong(B_SIZE - 192) 447 file_info.size = 0 448 comment_len = b.ubyte(B_SIZE - 184) 449 file_info.comment = b.string(B_SIZE - 183, comment_len) 450 451 days = b.ulong(B_SIZE - 92) 452 mins = b.ulong(B_SIZE - 88) 453 ticks = b.ulong(B_SIZE - 84) 454 file_info.time = "{0:05d}:{1:04d}:{2:03d}".format(days, mins, ticks) 455 456 self._parse_directory_content(file_info.path, block_number) 457 458 def namelist(self) -> List[str]: 459 names = List[str]() 460 keys = sorted(self.file_map.keys()) 461 for key in keys: 462 names.append(self.file_map[key].path) 463 return names 464 465 def getinfo(self, name: str) -> FileInfo: 466 name = name.lower() 467 return self.file_map[name] 468 469 def open(self, name: str, mode: str = "r") -> BytesIO: 470 assert mode == "r" 471 return BytesIO(self.read(name)) 472 473 def read(self, name: str) -> bytes: 474 name = name.lower() 475 print(repr(self.file_map)) 476 print(repr(name)) 477 file_info = self.file_map[name] 478 bytes_left = file_info.size 479 data = List[bytes]() 480 for block_number in file_info.block_list: 481 if self.ffs: 482 start_index = 0 483 else: 484 start_index = 24 485 max_bsize = B_SIZE - start_index 486 read_size = min(bytes_left, max_bsize) 487 block_data = self.blocks[block_number].data[ 488 start_index : start_index + read_size 489 ] 490 bytes_left -= len(block_data) 491 data.append(block_data) 492 assert bytes_left == 0 493 return b"".join(data) 494 495 496def main() -> None: 497 adf = ADFFile(sys.argv[1]) 498 print("") 499 print(os.path.basename(sys.argv[1])) 500 print("Volume:", adf.volume_name, "(OFS)" if adf.ofs else "(FFS)") 501 print("") 502 for name in adf.namelist(): 503 assert isinstance(name, str) 504 print(name) 505 if not name.endswith("/"): 506 data = adf.open(name).read() 507 print(" ", len(data), "bytes") 508 print(" ", hashlib.sha1(data).hexdigest()) 509 # tracks = set() 510 # for block in adf.file_map[name.lower()].block_list: 511 # track = block // 11 512 # cylinder = track // 2 513 # side = track % 2 514 # tracks.add((cylinder, side)) 515 # tracks = sorted(tracks) 516 # print(tracks) 517 print("") 518 for warning in adf.warnings: 519 print("WARNING:", warning) 520 pass 521 # for i, usage in enumerate(adf.block_usage): 522 # print(i, usage) 523 for name in sys.argv[2:]: 524 data = adf.open(name).read() 525 with open(name + ".dump", "wb") as f: 526 f.write(data) 527 print("wrote " + name + ".dump") 528 529 530if __name__ == "__main__": 531 main() 532