1# Copyright (c) 2014-2020, Manfred Moitzi
2# License: MIT License
3from typing import Iterable, Any, Sequence, Union, Tuple
4from array import array
5import struct
6from binascii import unhexlify
7from codecs import decode
8
9Bytes = Union[bytes, bytearray, memoryview]
10
11
12def hex_strings_to_bytes(data: Iterable[str]) -> bytes:
13    """ Returns multiple hex strings `data` as bytes. """
14    byte_array = array('B')
15    for hexstr in data:
16        byte_array.extend(unhexlify(hexstr))
17    return byte_array.tobytes()
18
19
20def bytes_to_hexstr(data: bytes) -> str:
21    """ Returns `data` bytes as plain hex string. """
22    return ''.join("%0.2X" % byte for byte in data)
23
24
25NULL_NULL = b'\x00\x00'
26
27
28class EndOfBufferError(EOFError):
29    pass
30
31
32class ByteStream:
33    """ Process little endian binary data organized as bytes, data is padded to
34    4 byte boundaries by default.
35    """
36
37    # Created for Proxy Entity Graphic decoding
38    def __init__(self, buffer: Bytes, align: int = 4):
39        self.buffer = memoryview(buffer)
40        self.index: int = 0
41        self._align: int = align
42
43    @property
44    def has_data(self) -> bool:
45        return self.index < len(self.buffer)
46
47    def align(self, index: int) -> int:
48        modulo = index % self._align
49        return index + self._align - modulo if modulo else index
50
51    def read_struct(self, fmt: str) -> Any:
52        """ Read data defined by a struct format string. Insert little endian
53        format character '<' as first character, if machine has native big
54        endian byte order.
55        """
56        if not self.has_data:
57            raise EndOfBufferError('Unexpected end of buffer.')
58
59        result = struct.unpack_from(fmt, self.buffer, offset=self.index)
60        self.index = self.align(self.index + struct.calcsize(fmt))
61        return result
62
63    def read_float(self):
64        return self.read_struct('<d')[0]
65
66    def read_long(self):
67        return self.read_struct('<L')[0]
68
69    def read_signed_long(self):
70        return self.read_struct('<l')[0]
71
72    def read_vertex(self):
73        return self.read_struct('<3d')
74
75    def read_padded_string(self, encoding: str = 'utf_8') -> str:
76        """ PS: Padded String. This is a string, terminated with a zero byte.
77        The file’s text encoding (code page) is used to encode/decode the bytes
78        into a string.
79        """
80        buffer = self.buffer
81        for end_index in range(self.index, len(buffer)):
82            if buffer[end_index] == 0:
83                start_index = self.index
84                self.index = self.align(end_index + 1)
85                # noinspection PyTypeChecker
86                return decode(buffer[start_index:end_index], encoding=encoding)
87        raise EndOfBufferError(
88            'Unexpected end of buffer, did not detect terminating zero byte.')
89
90    def read_padded_unicode_string(self) -> str:
91        """ PUS: Padded Unicode String. The bytes are encoded using Unicode
92        encoding. The bytes consist of byte pairs and the string is terminated
93        by 2 zero bytes.
94        """
95        buffer = self.buffer
96        for end_index in range(self.index, len(buffer), 2):
97            if buffer[end_index:end_index + 2] == NULL_NULL:
98                start_index = self.index
99                self.index = self.align(end_index + 2)
100                # noinspection PyTypeChecker
101                return decode(buffer[start_index:end_index],
102                              encoding='utf_16_le')
103        raise EndOfBufferError(
104            'Unexpected end of buffer, did not detect terminating zero bytes.')
105
106
107class BitStream:
108    """ Process little endian binary data organized as bit stream. """
109
110    # Created for Proxy Entity Graphic decoding and DWG bit stream decoding
111    def __init__(self, buffer: Bytes, dxfversion: str = 'AC1015',
112                 encoding: str = 'cp1252'):
113        self.buffer = memoryview(buffer)
114        self.bit_index: int = 0
115        self.dxfversion = dxfversion
116        self.encoding = encoding
117
118    @property
119    def has_data(self) -> bool:
120        return self.bit_index >> 3 < len(self.buffer)
121
122    def align(self, count: int) -> None:
123        """ Align to byte border. """
124        byte_index = (self.bit_index >> 3) + bool(self.bit_index & 7)
125        modulo = byte_index % count
126        if modulo:
127            byte_index += count - modulo
128        self.bit_index = byte_index << 3
129
130    def skip(self, count: int) -> None:
131        """ Skip `count` bits. """
132        self.bit_index += count
133
134    def read_bit(self) -> int:
135        """ Read one bit from buffer. """
136        index = self.bit_index
137        self.bit_index += 1
138        try:
139            return 1 if self.buffer[index >> 3] & (0x80 >> (index & 7)) else 0
140        except IndexError:
141            raise EndOfBufferError('Unexpected end of buffer.')
142
143    def read_bits(self, count) -> int:
144        """ Read `count` bits from buffer. """
145        index = self.bit_index
146        buffer = self.buffer
147        # index of next bit after reading `count` bits
148        next_bit_index = index + count
149
150        if (next_bit_index - 1) >> 3 > len(buffer):
151            # not enough data to read all bits
152            raise EndOfBufferError('Unexpected end of buffer.')
153        self.bit_index = next_bit_index
154
155        test_bit = 0x80 >> (index & 7)
156        test_byte_index = index >> 3
157        value = 0
158        test_byte = buffer[test_byte_index]
159        while count > 0:
160            value <<= 1
161            if test_byte & test_bit:
162                value |= 1
163            count -= 1
164            test_bit >>= 1
165            if not test_bit and count:
166                test_bit = 0x80
167                test_byte_index += 1
168                test_byte = buffer[test_byte_index]
169        return value
170
171    def read_unsigned_byte(self) -> int:
172        """ Read an unsigned byte (8 bit) from buffer. """
173        return self.read_bits(8)
174
175    def read_signed_byte(self) -> int:
176        """ Read a signed byte (8 bit) from buffer. """
177        value = self.read_bits(8)
178        if value & 0x80:
179            # 2er complement
180            return -((~value & 0xff) + 1)
181        else:
182            return value
183
184    def read_aligned_bytes(self, count: int) -> Sequence[int]:
185        buffer = self.buffer
186        start_index = self.bit_index >> 3
187        end_index = start_index + count
188        if end_index <= len(buffer):
189            self.bit_index += count << 3
190            return buffer[start_index: end_index]
191        else:
192            raise EndOfBufferError('Unexpected end of buffer.')
193
194    def read_unsigned_short(self) -> int:
195        """ Read an unsigned short (16 bit) from buffer. """
196        if self.bit_index & 7:
197            s1 = self.read_bits(8)
198            s2 = self.read_bits(8)
199        else:  # aligned data
200            s1, s2 = self.read_aligned_bytes(2)
201        return (s2 << 8) + s1
202
203    def read_signed_short(self) -> int:
204        """ Read a signed short (16 bit) from buffer. """
205        value = self.read_unsigned_short()
206        if value & 0x8000:
207            # 2er complement
208            return -((~value & 0xffff) + 1)
209        else:
210            return value
211
212    def read_unsigned_long(self) -> int:
213        """ Read an unsigned long (32 bit) from buffer. """
214        if self.bit_index & 7:
215            read_bits = self.read_bits
216            l1 = read_bits(8)
217            l2 = read_bits(8)
218            l3 = read_bits(8)
219            l4 = read_bits(8)
220        else:  # aligned data
221            l1, l2, l3, l4 = self.read_aligned_bytes(4)
222        return (l4 << 24) + (l3 << 16) + (l2 << 8) + l1
223
224    def read_signed_long(self) -> int:
225        """ Read a signed long (32 bit) from buffer. """
226        value = self.read_unsigned_long()
227        if value & 0x80000000:
228            # 2er complement
229            return -((~value & 0xffffffff) + 1)
230        else:
231            return value
232
233    def read_float(self) -> float:
234        if self.bit_index & 7:
235            read_bits = self.read_bits
236            data = bytes(read_bits(8) for _ in range(8))
237        else:  # aligned data
238            data = bytes(self.read_aligned_bytes(8))
239        return struct.unpack('<d', data)[0]
240
241    def read_3_bits(self) -> int:
242        bit = self.read_bit()
243        if bit:  # 1
244            bit = self.read_bit()
245            if bit:  # 11
246                bit = self.read_bit()
247                if bit:
248                    return 7  # 111
249                else:
250                    return 6  # 110
251            return 2  # 10
252        else:
253            return 0  # 0
254
255    def read_bit_short(self, count=1) -> Union[int, Sequence[int]]:
256        def _read():
257            bits = self.read_bits(2)
258            if bits == 0:
259                return self.read_signed_short()
260            elif bits == 1:
261                return self.read_unsigned_byte()
262            elif bits == 2:
263                return 0
264            else:
265                return 256
266
267        if count == 1:
268            return _read()
269        else:
270            return tuple(_read() for _ in range(count))
271
272    def read_bit_long(self, count: int = 1) -> Union[int, Sequence[int]]:
273        def _read():
274            bits = self.read_bits(2)
275            if bits == 0:
276                return self.read_signed_long()
277            elif bits == 1:
278                return self.read_unsigned_byte()
279            elif bits == 2:
280                return 0
281            else:  # not used!
282                return 256  # ???
283
284        if count == 1:
285            return _read()
286        else:
287            return tuple(_read() for _ in range(count))
288
289    # LibreDWG: https://github.com/LibreDWG/libredwg/blob/master/src/bits.c
290    # Read 1 bitlonglong (compacted uint64_t) for REQUIREDVERSIONS, preview_size.
291    # ODA doc bug. ODA say 1-3 bits until the first 0 bit. See 3BLL.
292    # The first 3 bits indicate the length l (see paragraph 2.1). Then
293    # l bytes follow, which represent the number (the least significant
294    # byte is first).
295    def read_bit_long_long(self) -> int:
296        value = 0
297        shifting = 0
298        length = self.read_bits(3)  # or read_3_bits() ?
299        while length > 0:
300            value += (self.read_unsigned_byte() << shifting)
301            length -= 1
302            shifting += 8
303        return value
304
305    def read_raw_double(self, count: int = 1) -> Union[float, Sequence[float]]:
306        if count == 1:
307            return self.read_float()
308        else:
309            return tuple(self.read_float() for _ in range(count))
310
311    def read_bit_double(self, count: int = 1) -> Union[float, Sequence[float]]:
312        def _read():
313            bits = self.read_bits(2)
314            if bits == 0:
315                return self.read_float()
316            elif bits == 1:
317                return 1.0
318            elif bits == 2:
319                return 0.0
320            else:  # not used!
321                return 0.0
322
323        if count == 1:
324            return _read()
325        else:
326            return tuple(_read() for _ in range(count))
327
328    def read_bit_double_default(
329            self, count: int = 1, default=0.0) -> Union[float, Sequence[float]]:
330        data = struct.pack('<d', default)
331
332        def _read():
333            bits = self.read_bits(2)
334            if bits == 0:
335                return default
336            elif bits == 1:
337                _data = bytes(self.read_unsigned_byte()
338                              for _ in range(4)) + data[4:]
339                return struct.unpack('<d', _data)
340            elif bits == 2:
341                _data = bytearray(data)
342                _data[4] = self.read_unsigned_byte()
343                _data[5] = self.read_unsigned_byte()
344                _data[0] = self.read_unsigned_byte()
345                _data[1] = self.read_unsigned_byte()
346                _data[2] = self.read_unsigned_byte()
347                _data[3] = self.read_unsigned_byte()
348                return struct.unpack('<d', _data)
349            else:
350                return self.read_float()
351
352        if count == 1:
353            return _read()
354        else:
355            return tuple(_read() for _ in range(count))
356
357    def read_signed_modular_chars(self) -> int:
358        """ Modular characters are a method of storing compressed integer
359        values. They consist of a stream of bytes, terminating when the high
360        bit (8) of the byte is 0 else another byte follows. Negative numbers
361        are indicated by bit 7 set in the last byte.
362
363        """
364        shifting = 0
365        value = 0
366        while True:
367            char = self.read_unsigned_byte()
368            if char & 0x80:
369                # bit 8 set = another char follows
370                value |= ((char & 0x7f) << shifting)
371                shifting += 7
372            else:
373                # bit 8 clear = end of modular char
374                # bit 7 set = negative number
375                value |= ((char & 0x3f) << shifting)
376                return -value if char & 0x40 else value
377
378    def read_unsigned_modular_chars(self) -> int:
379        """ Modular characters are a method of storing compressed integer
380        values. They consist of a stream of bytes, terminating when the high
381        bit (8) of the byte is 0 else another byte follows.
382
383        """
384        shifting = 0
385        value = 0
386        while True:
387            char = self.read_unsigned_byte()
388            value |= ((char & 0x7f) << shifting)
389            shifting += 7
390            # bit 8 set = another char follows
391            if not (char & 0x80):
392                return value
393
394    def read_modular_shorts(self) -> int:
395        """ Modular shorts are a method of storing compressed unsigned integer
396        values. Only 1 or 2 shorts in practical usage (1GB), if the high
397        bit (16) of the first short is set another short follows.
398
399        """
400        short = self.read_unsigned_short()
401        if short & 0x8000:
402            return (self.read_unsigned_short() << 15) | (short & 0x7fff)
403        else:
404            return short
405
406    def read_bit_extrusion(self) -> Tuple[float, float, float]:
407        if self.read_bit():
408            return 0.0, 0.0, 1.0
409        else:
410            return self.read_bit_double(3)
411
412    def read_bit_thickness(self, dxfversion='AC1015') -> float:
413        if dxfversion >= 'AC1015':
414            if self.read_bit():
415                return 0.0
416        return self.read_bit_double()
417
418    def read_cm_color(self) -> int:
419        return self.read_bit_short()
420
421    def read_text(self) -> str:
422        length = self.read_bit_short()
423        data = bytes(self.read_unsigned_byte() for _ in range(length))
424        return data.decode(encoding=self.encoding)
425
426    def read_text_unicode(self) -> str:
427        # Unicode text is read from the "string stream" within the object data,
428        # see the main Object description section for details.
429        length = self.read_bit_short()
430        data = bytes(self.read_unsigned_byte() for _ in range(length * 2))
431        return data.decode(encoding='utf16')
432
433    def read_text_variable(self) -> str:
434        if self.dxfversion < 'AC1018':  # R2004
435            return self.read_text()
436        else:
437            return self.read_text_unicode()
438
439    def read_cm_color_cms(self) -> Tuple[int, str, str]:
440        """ Returns tuple (rgb, color_name, book_name).
441        """
442        _ = self.read_bit_short()  # index always 0
443        color_name = ''
444        book_name = ''
445        rgb = self.read_bit_long()
446        rc = self.read_unsigned_byte()
447        if rc & 1:
448            color_name = self.read_text_variable()
449        if rc & 2:
450            book_name = self.read_text_variable()
451        return rgb, color_name, book_name
452
453    def read_cm_color_enc(self) -> Union[int, Tuple[int, int, int, int]]:
454        """ Returns color index as int or tuple (rgb, color_handle,
455        transparency_type, transparency).
456        """
457        flags_and_index = self.read_bit_short()
458        flags = flags_and_index >> 8
459        index = flags_and_index & 0xff
460        if flags:
461            rgb = None
462            color_handle = None
463            transparency_type = None
464            transparency = None
465            if flags & 0x80:
466                rgb = self.read_bit_short() & 0x00ffffff
467            if flags & 0x40:
468                _, color_handle = self.read_handle()
469            if flags & 0x20:
470                data = self.read_bit_long()
471                transparency_type = data >> 24
472                transparency = data & 0xff
473            return rgb, color_handle, transparency_type, transparency
474        else:
475            return index
476
477    def read_object_type(self) -> int:
478        bits = self.read_bits(2)
479        if bits == 0:
480            return self.read_unsigned_byte()
481        elif bits == 1:
482            return self.read_unsigned_byte() + 0x1f0
483        else:
484            return self.read_unsigned_short()
485
486    def read_handle(self, reference: int = 0) -> int:
487        """ Returns handle as integer value. """
488        code = self.read_bits(4)
489        length = self.read_bits(4)
490        if code == 6:
491            return reference + 1
492        if code == 8:
493            return reference - 1
494
495        data = bytearray(b'\x00\x00\x00\x00\x00\x00\x00\x00')
496        for index in range(length):
497            data[index] = self.read_unsigned_byte()
498        offset = struct.unpack('<Q', data)[0]
499
500        if code < 6:
501            return offset
502        else:
503            if code == 10:
504                return reference + offset
505            if code == 12:
506                return reference - offset
507
508    def read_hex_handle(self, reference: int = 0) -> str:
509        """ Returns handle as hex string. """
510        return '%X' % self.read_handle(reference)
511
512    def read_code(self, code: str):
513        """ Read data from bit stream by data codes defined in the
514        ODA reference.
515
516        """
517        if code == 'B':
518            return self.read_bit()
519        elif code == 'RC':
520            return self.read_unsigned_byte()
521        elif code == 'RS':
522            return self.read_signed_short()
523        elif code == 'BS':
524            return self.read_bit_short()
525        elif code == 'RL':
526            return self.read_signed_long()
527        elif code == 'BL':
528            return self.read_bit_long()
529        elif code == 'RD':
530            return self.read_raw_double()
531        elif code == '2RD':
532            return self.read_raw_double(2)
533        elif code == 'BD':
534            return self.read_bit_double()
535        elif code == '2BD':
536            return self.read_bit_double(2)
537        elif code == '3BD':
538            return self.read_bit_double(3)
539        elif code == 'T':
540            return self.read_text()
541        elif code == 'TV':
542            return self.read_text_variable()
543        elif code == 'H':
544            return self.read_hex_handle()
545        elif code == 'BLL':
546            return self.read_bit_long_long()
547        elif code == 'CMC':
548            return self.read_cm_color()
549        raise ValueError(f'Unknown code: {code}')
550