1# Copyright (c) 2014-2020, Manfred Moitzi 2# License: MIT License 3from typing import Iterable, Any, Sequence, Union, Tuple 4from array import array 5import struct 6from binascii import unhexlify 7from codecs import decode 8 9Bytes = Union[bytes, bytearray, memoryview] 10 11 12def hex_strings_to_bytes(data: Iterable[str]) -> bytes: 13 """ Returns multiple hex strings `data` as bytes. """ 14 byte_array = array('B') 15 for hexstr in data: 16 byte_array.extend(unhexlify(hexstr)) 17 return byte_array.tobytes() 18 19 20def bytes_to_hexstr(data: bytes) -> str: 21 """ Returns `data` bytes as plain hex string. """ 22 return ''.join("%0.2X" % byte for byte in data) 23 24 25NULL_NULL = b'\x00\x00' 26 27 28class EndOfBufferError(EOFError): 29 pass 30 31 32class ByteStream: 33 """ Process little endian binary data organized as bytes, data is padded to 34 4 byte boundaries by default. 35 """ 36 37 # Created for Proxy Entity Graphic decoding 38 def __init__(self, buffer: Bytes, align: int = 4): 39 self.buffer = memoryview(buffer) 40 self.index: int = 0 41 self._align: int = align 42 43 @property 44 def has_data(self) -> bool: 45 return self.index < len(self.buffer) 46 47 def align(self, index: int) -> int: 48 modulo = index % self._align 49 return index + self._align - modulo if modulo else index 50 51 def read_struct(self, fmt: str) -> Any: 52 """ Read data defined by a struct format string. Insert little endian 53 format character '<' as first character, if machine has native big 54 endian byte order. 55 """ 56 if not self.has_data: 57 raise EndOfBufferError('Unexpected end of buffer.') 58 59 result = struct.unpack_from(fmt, self.buffer, offset=self.index) 60 self.index = self.align(self.index + struct.calcsize(fmt)) 61 return result 62 63 def read_float(self): 64 return self.read_struct('<d')[0] 65 66 def read_long(self): 67 return self.read_struct('<L')[0] 68 69 def read_signed_long(self): 70 return self.read_struct('<l')[0] 71 72 def read_vertex(self): 73 return self.read_struct('<3d') 74 75 def read_padded_string(self, encoding: str = 'utf_8') -> str: 76 """ PS: Padded String. This is a string, terminated with a zero byte. 77 The file’s text encoding (code page) is used to encode/decode the bytes 78 into a string. 79 """ 80 buffer = self.buffer 81 for end_index in range(self.index, len(buffer)): 82 if buffer[end_index] == 0: 83 start_index = self.index 84 self.index = self.align(end_index + 1) 85 # noinspection PyTypeChecker 86 return decode(buffer[start_index:end_index], encoding=encoding) 87 raise EndOfBufferError( 88 'Unexpected end of buffer, did not detect terminating zero byte.') 89 90 def read_padded_unicode_string(self) -> str: 91 """ PUS: Padded Unicode String. The bytes are encoded using Unicode 92 encoding. The bytes consist of byte pairs and the string is terminated 93 by 2 zero bytes. 94 """ 95 buffer = self.buffer 96 for end_index in range(self.index, len(buffer), 2): 97 if buffer[end_index:end_index + 2] == NULL_NULL: 98 start_index = self.index 99 self.index = self.align(end_index + 2) 100 # noinspection PyTypeChecker 101 return decode(buffer[start_index:end_index], 102 encoding='utf_16_le') 103 raise EndOfBufferError( 104 'Unexpected end of buffer, did not detect terminating zero bytes.') 105 106 107class BitStream: 108 """ Process little endian binary data organized as bit stream. """ 109 110 # Created for Proxy Entity Graphic decoding and DWG bit stream decoding 111 def __init__(self, buffer: Bytes, dxfversion: str = 'AC1015', 112 encoding: str = 'cp1252'): 113 self.buffer = memoryview(buffer) 114 self.bit_index: int = 0 115 self.dxfversion = dxfversion 116 self.encoding = encoding 117 118 @property 119 def has_data(self) -> bool: 120 return self.bit_index >> 3 < len(self.buffer) 121 122 def align(self, count: int) -> None: 123 """ Align to byte border. """ 124 byte_index = (self.bit_index >> 3) + bool(self.bit_index & 7) 125 modulo = byte_index % count 126 if modulo: 127 byte_index += count - modulo 128 self.bit_index = byte_index << 3 129 130 def skip(self, count: int) -> None: 131 """ Skip `count` bits. """ 132 self.bit_index += count 133 134 def read_bit(self) -> int: 135 """ Read one bit from buffer. """ 136 index = self.bit_index 137 self.bit_index += 1 138 try: 139 return 1 if self.buffer[index >> 3] & (0x80 >> (index & 7)) else 0 140 except IndexError: 141 raise EndOfBufferError('Unexpected end of buffer.') 142 143 def read_bits(self, count) -> int: 144 """ Read `count` bits from buffer. """ 145 index = self.bit_index 146 buffer = self.buffer 147 # index of next bit after reading `count` bits 148 next_bit_index = index + count 149 150 if (next_bit_index - 1) >> 3 > len(buffer): 151 # not enough data to read all bits 152 raise EndOfBufferError('Unexpected end of buffer.') 153 self.bit_index = next_bit_index 154 155 test_bit = 0x80 >> (index & 7) 156 test_byte_index = index >> 3 157 value = 0 158 test_byte = buffer[test_byte_index] 159 while count > 0: 160 value <<= 1 161 if test_byte & test_bit: 162 value |= 1 163 count -= 1 164 test_bit >>= 1 165 if not test_bit and count: 166 test_bit = 0x80 167 test_byte_index += 1 168 test_byte = buffer[test_byte_index] 169 return value 170 171 def read_unsigned_byte(self) -> int: 172 """ Read an unsigned byte (8 bit) from buffer. """ 173 return self.read_bits(8) 174 175 def read_signed_byte(self) -> int: 176 """ Read a signed byte (8 bit) from buffer. """ 177 value = self.read_bits(8) 178 if value & 0x80: 179 # 2er complement 180 return -((~value & 0xff) + 1) 181 else: 182 return value 183 184 def read_aligned_bytes(self, count: int) -> Sequence[int]: 185 buffer = self.buffer 186 start_index = self.bit_index >> 3 187 end_index = start_index + count 188 if end_index <= len(buffer): 189 self.bit_index += count << 3 190 return buffer[start_index: end_index] 191 else: 192 raise EndOfBufferError('Unexpected end of buffer.') 193 194 def read_unsigned_short(self) -> int: 195 """ Read an unsigned short (16 bit) from buffer. """ 196 if self.bit_index & 7: 197 s1 = self.read_bits(8) 198 s2 = self.read_bits(8) 199 else: # aligned data 200 s1, s2 = self.read_aligned_bytes(2) 201 return (s2 << 8) + s1 202 203 def read_signed_short(self) -> int: 204 """ Read a signed short (16 bit) from buffer. """ 205 value = self.read_unsigned_short() 206 if value & 0x8000: 207 # 2er complement 208 return -((~value & 0xffff) + 1) 209 else: 210 return value 211 212 def read_unsigned_long(self) -> int: 213 """ Read an unsigned long (32 bit) from buffer. """ 214 if self.bit_index & 7: 215 read_bits = self.read_bits 216 l1 = read_bits(8) 217 l2 = read_bits(8) 218 l3 = read_bits(8) 219 l4 = read_bits(8) 220 else: # aligned data 221 l1, l2, l3, l4 = self.read_aligned_bytes(4) 222 return (l4 << 24) + (l3 << 16) + (l2 << 8) + l1 223 224 def read_signed_long(self) -> int: 225 """ Read a signed long (32 bit) from buffer. """ 226 value = self.read_unsigned_long() 227 if value & 0x80000000: 228 # 2er complement 229 return -((~value & 0xffffffff) + 1) 230 else: 231 return value 232 233 def read_float(self) -> float: 234 if self.bit_index & 7: 235 read_bits = self.read_bits 236 data = bytes(read_bits(8) for _ in range(8)) 237 else: # aligned data 238 data = bytes(self.read_aligned_bytes(8)) 239 return struct.unpack('<d', data)[0] 240 241 def read_3_bits(self) -> int: 242 bit = self.read_bit() 243 if bit: # 1 244 bit = self.read_bit() 245 if bit: # 11 246 bit = self.read_bit() 247 if bit: 248 return 7 # 111 249 else: 250 return 6 # 110 251 return 2 # 10 252 else: 253 return 0 # 0 254 255 def read_bit_short(self, count=1) -> Union[int, Sequence[int]]: 256 def _read(): 257 bits = self.read_bits(2) 258 if bits == 0: 259 return self.read_signed_short() 260 elif bits == 1: 261 return self.read_unsigned_byte() 262 elif bits == 2: 263 return 0 264 else: 265 return 256 266 267 if count == 1: 268 return _read() 269 else: 270 return tuple(_read() for _ in range(count)) 271 272 def read_bit_long(self, count: int = 1) -> Union[int, Sequence[int]]: 273 def _read(): 274 bits = self.read_bits(2) 275 if bits == 0: 276 return self.read_signed_long() 277 elif bits == 1: 278 return self.read_unsigned_byte() 279 elif bits == 2: 280 return 0 281 else: # not used! 282 return 256 # ??? 283 284 if count == 1: 285 return _read() 286 else: 287 return tuple(_read() for _ in range(count)) 288 289 # LibreDWG: https://github.com/LibreDWG/libredwg/blob/master/src/bits.c 290 # Read 1 bitlonglong (compacted uint64_t) for REQUIREDVERSIONS, preview_size. 291 # ODA doc bug. ODA say 1-3 bits until the first 0 bit. See 3BLL. 292 # The first 3 bits indicate the length l (see paragraph 2.1). Then 293 # l bytes follow, which represent the number (the least significant 294 # byte is first). 295 def read_bit_long_long(self) -> int: 296 value = 0 297 shifting = 0 298 length = self.read_bits(3) # or read_3_bits() ? 299 while length > 0: 300 value += (self.read_unsigned_byte() << shifting) 301 length -= 1 302 shifting += 8 303 return value 304 305 def read_raw_double(self, count: int = 1) -> Union[float, Sequence[float]]: 306 if count == 1: 307 return self.read_float() 308 else: 309 return tuple(self.read_float() for _ in range(count)) 310 311 def read_bit_double(self, count: int = 1) -> Union[float, Sequence[float]]: 312 def _read(): 313 bits = self.read_bits(2) 314 if bits == 0: 315 return self.read_float() 316 elif bits == 1: 317 return 1.0 318 elif bits == 2: 319 return 0.0 320 else: # not used! 321 return 0.0 322 323 if count == 1: 324 return _read() 325 else: 326 return tuple(_read() for _ in range(count)) 327 328 def read_bit_double_default( 329 self, count: int = 1, default=0.0) -> Union[float, Sequence[float]]: 330 data = struct.pack('<d', default) 331 332 def _read(): 333 bits = self.read_bits(2) 334 if bits == 0: 335 return default 336 elif bits == 1: 337 _data = bytes(self.read_unsigned_byte() 338 for _ in range(4)) + data[4:] 339 return struct.unpack('<d', _data) 340 elif bits == 2: 341 _data = bytearray(data) 342 _data[4] = self.read_unsigned_byte() 343 _data[5] = self.read_unsigned_byte() 344 _data[0] = self.read_unsigned_byte() 345 _data[1] = self.read_unsigned_byte() 346 _data[2] = self.read_unsigned_byte() 347 _data[3] = self.read_unsigned_byte() 348 return struct.unpack('<d', _data) 349 else: 350 return self.read_float() 351 352 if count == 1: 353 return _read() 354 else: 355 return tuple(_read() for _ in range(count)) 356 357 def read_signed_modular_chars(self) -> int: 358 """ Modular characters are a method of storing compressed integer 359 values. They consist of a stream of bytes, terminating when the high 360 bit (8) of the byte is 0 else another byte follows. Negative numbers 361 are indicated by bit 7 set in the last byte. 362 363 """ 364 shifting = 0 365 value = 0 366 while True: 367 char = self.read_unsigned_byte() 368 if char & 0x80: 369 # bit 8 set = another char follows 370 value |= ((char & 0x7f) << shifting) 371 shifting += 7 372 else: 373 # bit 8 clear = end of modular char 374 # bit 7 set = negative number 375 value |= ((char & 0x3f) << shifting) 376 return -value if char & 0x40 else value 377 378 def read_unsigned_modular_chars(self) -> int: 379 """ Modular characters are a method of storing compressed integer 380 values. They consist of a stream of bytes, terminating when the high 381 bit (8) of the byte is 0 else another byte follows. 382 383 """ 384 shifting = 0 385 value = 0 386 while True: 387 char = self.read_unsigned_byte() 388 value |= ((char & 0x7f) << shifting) 389 shifting += 7 390 # bit 8 set = another char follows 391 if not (char & 0x80): 392 return value 393 394 def read_modular_shorts(self) -> int: 395 """ Modular shorts are a method of storing compressed unsigned integer 396 values. Only 1 or 2 shorts in practical usage (1GB), if the high 397 bit (16) of the first short is set another short follows. 398 399 """ 400 short = self.read_unsigned_short() 401 if short & 0x8000: 402 return (self.read_unsigned_short() << 15) | (short & 0x7fff) 403 else: 404 return short 405 406 def read_bit_extrusion(self) -> Tuple[float, float, float]: 407 if self.read_bit(): 408 return 0.0, 0.0, 1.0 409 else: 410 return self.read_bit_double(3) 411 412 def read_bit_thickness(self, dxfversion='AC1015') -> float: 413 if dxfversion >= 'AC1015': 414 if self.read_bit(): 415 return 0.0 416 return self.read_bit_double() 417 418 def read_cm_color(self) -> int: 419 return self.read_bit_short() 420 421 def read_text(self) -> str: 422 length = self.read_bit_short() 423 data = bytes(self.read_unsigned_byte() for _ in range(length)) 424 return data.decode(encoding=self.encoding) 425 426 def read_text_unicode(self) -> str: 427 # Unicode text is read from the "string stream" within the object data, 428 # see the main Object description section for details. 429 length = self.read_bit_short() 430 data = bytes(self.read_unsigned_byte() for _ in range(length * 2)) 431 return data.decode(encoding='utf16') 432 433 def read_text_variable(self) -> str: 434 if self.dxfversion < 'AC1018': # R2004 435 return self.read_text() 436 else: 437 return self.read_text_unicode() 438 439 def read_cm_color_cms(self) -> Tuple[int, str, str]: 440 """ Returns tuple (rgb, color_name, book_name). 441 """ 442 _ = self.read_bit_short() # index always 0 443 color_name = '' 444 book_name = '' 445 rgb = self.read_bit_long() 446 rc = self.read_unsigned_byte() 447 if rc & 1: 448 color_name = self.read_text_variable() 449 if rc & 2: 450 book_name = self.read_text_variable() 451 return rgb, color_name, book_name 452 453 def read_cm_color_enc(self) -> Union[int, Tuple[int, int, int, int]]: 454 """ Returns color index as int or tuple (rgb, color_handle, 455 transparency_type, transparency). 456 """ 457 flags_and_index = self.read_bit_short() 458 flags = flags_and_index >> 8 459 index = flags_and_index & 0xff 460 if flags: 461 rgb = None 462 color_handle = None 463 transparency_type = None 464 transparency = None 465 if flags & 0x80: 466 rgb = self.read_bit_short() & 0x00ffffff 467 if flags & 0x40: 468 _, color_handle = self.read_handle() 469 if flags & 0x20: 470 data = self.read_bit_long() 471 transparency_type = data >> 24 472 transparency = data & 0xff 473 return rgb, color_handle, transparency_type, transparency 474 else: 475 return index 476 477 def read_object_type(self) -> int: 478 bits = self.read_bits(2) 479 if bits == 0: 480 return self.read_unsigned_byte() 481 elif bits == 1: 482 return self.read_unsigned_byte() + 0x1f0 483 else: 484 return self.read_unsigned_short() 485 486 def read_handle(self, reference: int = 0) -> int: 487 """ Returns handle as integer value. """ 488 code = self.read_bits(4) 489 length = self.read_bits(4) 490 if code == 6: 491 return reference + 1 492 if code == 8: 493 return reference - 1 494 495 data = bytearray(b'\x00\x00\x00\x00\x00\x00\x00\x00') 496 for index in range(length): 497 data[index] = self.read_unsigned_byte() 498 offset = struct.unpack('<Q', data)[0] 499 500 if code < 6: 501 return offset 502 else: 503 if code == 10: 504 return reference + offset 505 if code == 12: 506 return reference - offset 507 508 def read_hex_handle(self, reference: int = 0) -> str: 509 """ Returns handle as hex string. """ 510 return '%X' % self.read_handle(reference) 511 512 def read_code(self, code: str): 513 """ Read data from bit stream by data codes defined in the 514 ODA reference. 515 516 """ 517 if code == 'B': 518 return self.read_bit() 519 elif code == 'RC': 520 return self.read_unsigned_byte() 521 elif code == 'RS': 522 return self.read_signed_short() 523 elif code == 'BS': 524 return self.read_bit_short() 525 elif code == 'RL': 526 return self.read_signed_long() 527 elif code == 'BL': 528 return self.read_bit_long() 529 elif code == 'RD': 530 return self.read_raw_double() 531 elif code == '2RD': 532 return self.read_raw_double(2) 533 elif code == 'BD': 534 return self.read_bit_double() 535 elif code == '2BD': 536 return self.read_bit_double(2) 537 elif code == '3BD': 538 return self.read_bit_double(3) 539 elif code == 'T': 540 return self.read_text() 541 elif code == 'TV': 542 return self.read_text_variable() 543 elif code == 'H': 544 return self.read_hex_handle() 545 elif code == 'BLL': 546 return self.read_bit_long_long() 547 elif code == 'CMC': 548 return self.read_cm_color() 549 raise ValueError(f'Unknown code: {code}') 550