1# cython: profile=False
2# cython: boundscheck=False, initializedcheck=False
3from cython import Py_ssize_t
4import numpy as np
5
6import pandas.io.sas.sas_constants as const
7
8ctypedef signed long long   int64_t
9ctypedef unsigned char      uint8_t
10ctypedef unsigned short     uint16_t
11
12# rle_decompress decompresses data using a Run Length Encoding
13# algorithm.  It is partially documented here:
14#
15# https://cran.r-project.org/package=sas7bdat/vignettes/sas7bdat.pdf
16cdef const uint8_t[:] rle_decompress(int result_length, const uint8_t[:] inbuff):
17
18    cdef:
19        uint8_t control_byte, x
20        uint8_t[:] result = np.zeros(result_length, np.uint8)
21        int rpos = 0
22        int i, nbytes, end_of_first_byte
23        Py_ssize_t ipos = 0, length = len(inbuff)
24
25    while ipos < length:
26        control_byte = inbuff[ipos] & 0xF0
27        end_of_first_byte = <int>(inbuff[ipos] & 0x0F)
28        ipos += 1
29
30        if control_byte == 0x00:
31            if end_of_first_byte != 0:
32                raise ValueError("Unexpected non-zero end_of_first_byte")
33            nbytes = <int>(inbuff[ipos]) + 64
34            ipos += 1
35            for _ in range(nbytes):
36                result[rpos] = inbuff[ipos]
37                rpos += 1
38                ipos += 1
39        elif control_byte == 0x40:
40            # not documented
41            nbytes = end_of_first_byte * 16
42            nbytes += <int>(inbuff[ipos])
43            ipos += 1
44            for _ in range(nbytes):
45                result[rpos] = inbuff[ipos]
46                rpos += 1
47            ipos += 1
48        elif control_byte == 0x60:
49            nbytes = end_of_first_byte * 256 + <int>(inbuff[ipos]) + 17
50            ipos += 1
51            for _ in range(nbytes):
52                result[rpos] = 0x20
53                rpos += 1
54        elif control_byte == 0x70:
55            nbytes = end_of_first_byte * 256 + <int>(inbuff[ipos]) + 17
56            ipos += 1
57            for _ in range(nbytes):
58                result[rpos] = 0x00
59                rpos += 1
60        elif control_byte == 0x80:
61            nbytes = end_of_first_byte + 1
62            for i in range(nbytes):
63                result[rpos] = inbuff[ipos + i]
64                rpos += 1
65            ipos += nbytes
66        elif control_byte == 0x90:
67            nbytes = end_of_first_byte + 17
68            for i in range(nbytes):
69                result[rpos] = inbuff[ipos + i]
70                rpos += 1
71            ipos += nbytes
72        elif control_byte == 0xA0:
73            nbytes = end_of_first_byte + 33
74            for i in range(nbytes):
75                result[rpos] = inbuff[ipos + i]
76                rpos += 1
77            ipos += nbytes
78        elif control_byte == 0xB0:
79            nbytes = end_of_first_byte + 49
80            for i in range(nbytes):
81                result[rpos] = inbuff[ipos + i]
82                rpos += 1
83            ipos += nbytes
84        elif control_byte == 0xC0:
85            nbytes = end_of_first_byte + 3
86            x = inbuff[ipos]
87            ipos += 1
88            for _ in range(nbytes):
89                result[rpos] = x
90                rpos += 1
91        elif control_byte == 0xD0:
92            nbytes = end_of_first_byte + 2
93            for _ in range(nbytes):
94                result[rpos] = 0x40
95                rpos += 1
96        elif control_byte == 0xE0:
97            nbytes = end_of_first_byte + 2
98            for _ in range(nbytes):
99                result[rpos] = 0x20
100                rpos += 1
101        elif control_byte == 0xF0:
102            nbytes = end_of_first_byte + 2
103            for _ in range(nbytes):
104                result[rpos] = 0x00
105                rpos += 1
106        else:
107            raise ValueError(f"unknown control byte: {control_byte}")
108
109    # In py37 cython/clang sees `len(outbuff)` as size_t and not Py_ssize_t
110    if <Py_ssize_t>len(result) != <Py_ssize_t>result_length:
111        raise ValueError(f"RLE: {len(result)} != {result_length}")
112
113    return np.asarray(result)
114
115
116# rdc_decompress decompresses data using the Ross Data Compression algorithm:
117#
118# http://collaboration.cmc.ec.gc.ca/science/rpn/biblio/ddj/Website/articles/CUJ/1992/9210/ross/ross.htm
119cdef const uint8_t[:] rdc_decompress(int result_length, const uint8_t[:] inbuff):
120
121    cdef:
122        uint8_t cmd
123        uint16_t ctrl_bits = 0, ctrl_mask = 0, ofs, cnt
124        int rpos = 0, k
125        uint8_t[:] outbuff = np.zeros(result_length, dtype=np.uint8)
126        Py_ssize_t ipos = 0, length = len(inbuff)
127
128    ii = -1
129
130    while ipos < length:
131        ii += 1
132        ctrl_mask = ctrl_mask >> 1
133        if ctrl_mask == 0:
134            ctrl_bits = ((<uint16_t>inbuff[ipos] << 8) +
135                         <uint16_t>inbuff[ipos + 1])
136            ipos += 2
137            ctrl_mask = 0x8000
138
139        if ctrl_bits & ctrl_mask == 0:
140            outbuff[rpos] = inbuff[ipos]
141            ipos += 1
142            rpos += 1
143            continue
144
145        cmd = (inbuff[ipos] >> 4) & 0x0F
146        cnt = <uint16_t>(inbuff[ipos] & 0x0F)
147        ipos += 1
148
149        # short RLE
150        if cmd == 0:
151            cnt += 3
152            for k in range(cnt):
153                outbuff[rpos + k] = inbuff[ipos]
154            rpos += cnt
155            ipos += 1
156
157        # long RLE
158        elif cmd == 1:
159            cnt += <uint16_t>inbuff[ipos] << 4
160            cnt += 19
161            ipos += 1
162            for k in range(cnt):
163                outbuff[rpos + k] = inbuff[ipos]
164            rpos += cnt
165            ipos += 1
166
167        # long pattern
168        elif cmd == 2:
169            ofs = cnt + 3
170            ofs += <uint16_t>inbuff[ipos] << 4
171            ipos += 1
172            cnt = <uint16_t>inbuff[ipos]
173            ipos += 1
174            cnt += 16
175            for k in range(cnt):
176                outbuff[rpos + k] = outbuff[rpos - <int>ofs + k]
177            rpos += cnt
178
179        # short pattern
180        elif (cmd >= 3) & (cmd <= 15):
181            ofs = cnt + 3
182            ofs += <uint16_t>inbuff[ipos] << 4
183            ipos += 1
184            for k in range(cmd):
185                outbuff[rpos + k] = outbuff[rpos - <int>ofs + k]
186            rpos += cmd
187
188        else:
189            raise ValueError("unknown RDC command")
190
191    # In py37 cython/clang sees `len(outbuff)` as size_t and not Py_ssize_t
192    if <Py_ssize_t>len(outbuff) != <Py_ssize_t>result_length:
193        raise ValueError(f"RDC: {len(outbuff)} != {result_length}\n")
194
195    return np.asarray(outbuff)
196
197
198cdef enum ColumnTypes:
199    column_type_decimal = 1
200    column_type_string = 2
201
202
203# type the page_data types
204cdef:
205    int page_meta_type = const.page_meta_type
206    int page_mix_types_0 = const.page_mix_types[0]
207    int page_mix_types_1 = const.page_mix_types[1]
208    int page_data_type = const.page_data_type
209    int subheader_pointers_offset = const.subheader_pointers_offset
210
211
212cdef class Parser:
213
214    cdef:
215        int column_count
216        int64_t[:] lengths
217        int64_t[:] offsets
218        int64_t[:] column_types
219        uint8_t[:, :] byte_chunk
220        object[:, :] string_chunk
221        char *cached_page
222        int current_row_on_page_index
223        int current_page_block_count
224        int current_page_data_subheader_pointers_len
225        int current_page_subheaders_count
226        int current_row_in_chunk_index
227        int current_row_in_file_index
228        int header_length
229        int row_length
230        int bit_offset
231        int subheader_pointer_length
232        int current_page_type
233        bint is_little_endian
234        const uint8_t[:] (*decompress)(int result_length, const uint8_t[:] inbuff)
235        object parser
236
237    def __init__(self, object parser):
238        cdef:
239            int j
240            char[:] column_types
241
242        self.parser = parser
243        self.header_length = self.parser.header_length
244        self.column_count = parser.column_count
245        self.lengths = parser.column_data_lengths()
246        self.offsets = parser.column_data_offsets()
247        self.byte_chunk = parser._byte_chunk
248        self.string_chunk = parser._string_chunk
249        self.row_length = parser.row_length
250        self.bit_offset = self.parser._page_bit_offset
251        self.subheader_pointer_length = self.parser._subheader_pointer_length
252        self.is_little_endian = parser.byte_order == "<"
253        self.column_types = np.empty(self.column_count, dtype='int64')
254
255        # page indicators
256        self.update_next_page()
257
258        column_types = parser.column_types()
259
260        # map column types
261        for j in range(self.column_count):
262            if column_types[j] == b'd':
263                self.column_types[j] = column_type_decimal
264            elif column_types[j] == b's':
265                self.column_types[j] = column_type_string
266            else:
267                raise ValueError(f"unknown column type: {self.parser.columns[j].ctype}")
268
269        # compression
270        if parser.compression == const.rle_compression:
271            self.decompress = rle_decompress
272        elif parser.compression == const.rdc_compression:
273            self.decompress = rdc_decompress
274        else:
275            self.decompress = NULL
276
277        # update to current state of the parser
278        self.current_row_in_chunk_index = parser._current_row_in_chunk_index
279        self.current_row_in_file_index = parser._current_row_in_file_index
280        self.current_row_on_page_index = parser._current_row_on_page_index
281
282    def read(self, int nrows):
283        cdef:
284            bint done
285            int i
286
287        for _ in range(nrows):
288            done = self.readline()
289            if done:
290                break
291
292        # update the parser
293        self.parser._current_row_on_page_index = self.current_row_on_page_index
294        self.parser._current_row_in_chunk_index = self.current_row_in_chunk_index
295        self.parser._current_row_in_file_index = self.current_row_in_file_index
296
297    cdef bint read_next_page(self):
298        cdef done
299
300        done = self.parser._read_next_page()
301        if done:
302            self.cached_page = NULL
303        else:
304            self.update_next_page()
305        return done
306
307    cdef update_next_page(self):
308        # update data for the current page
309
310        self.cached_page = <char *>self.parser._cached_page
311        self.current_row_on_page_index = 0
312        self.current_page_type = self.parser._current_page_type
313        self.current_page_block_count = self.parser._current_page_block_count
314        self.current_page_data_subheader_pointers_len = len(
315            self.parser._current_page_data_subheader_pointers
316        )
317        self.current_page_subheaders_count = self.parser._current_page_subheaders_count
318
319    cdef readline(self):
320
321        cdef:
322            int offset, bit_offset, align_correction
323            int subheader_pointer_length, mn
324            bint done, flag
325
326        bit_offset = self.bit_offset
327        subheader_pointer_length = self.subheader_pointer_length
328
329        # If there is no page, go to the end of the header and read a page.
330        if self.cached_page == NULL:
331            self.parser._path_or_buf.seek(self.header_length)
332            done = self.read_next_page()
333            if done:
334                return True
335
336        # Loop until a data row is read
337        while True:
338            if self.current_page_type == page_meta_type:
339                flag = self.current_row_on_page_index >=\
340                    self.current_page_data_subheader_pointers_len
341                if flag:
342                    done = self.read_next_page()
343                    if done:
344                        return True
345                    continue
346                current_subheader_pointer = (
347                    self.parser._current_page_data_subheader_pointers[
348                        self.current_row_on_page_index])
349                self.process_byte_array_with_data(
350                    current_subheader_pointer.offset,
351                    current_subheader_pointer.length)
352                return False
353            elif (self.current_page_type == page_mix_types_0 or
354                    self.current_page_type == page_mix_types_1):
355                align_correction = (
356                    bit_offset
357                    + subheader_pointers_offset
358                    + self.current_page_subheaders_count * subheader_pointer_length
359                )
360                align_correction = align_correction % 8
361                offset = bit_offset + align_correction
362                offset += subheader_pointers_offset
363                offset += self.current_page_subheaders_count * subheader_pointer_length
364                offset += self.current_row_on_page_index * self.row_length
365                self.process_byte_array_with_data(offset, self.row_length)
366                mn = min(self.parser.row_count, self.parser._mix_page_row_count)
367                if self.current_row_on_page_index == mn:
368                    done = self.read_next_page()
369                    if done:
370                        return True
371                return False
372            elif self.current_page_type & page_data_type == page_data_type:
373                self.process_byte_array_with_data(
374                    bit_offset
375                    + subheader_pointers_offset
376                    + self.current_row_on_page_index * self.row_length,
377                    self.row_length,
378                )
379                flag = self.current_row_on_page_index == self.current_page_block_count
380                if flag:
381                    done = self.read_next_page()
382                    if done:
383                        return True
384                return False
385            else:
386                raise ValueError(f"unknown page type: {self.current_page_type}")
387
388    cdef void process_byte_array_with_data(self, int offset, int length):
389
390        cdef:
391            Py_ssize_t j
392            int s, k, m, jb, js, current_row
393            int64_t lngt, start, ct
394            const uint8_t[:] source
395            int64_t[:] column_types
396            int64_t[:] lengths
397            int64_t[:] offsets
398            uint8_t[:, :] byte_chunk
399            object[:, :] string_chunk
400
401        source = np.frombuffer(
402            self.cached_page[offset:offset + length], dtype=np.uint8)
403
404        if self.decompress != NULL and (length < self.row_length):
405            source = self.decompress(self.row_length, source)
406
407        current_row = self.current_row_in_chunk_index
408        column_types = self.column_types
409        lengths = self.lengths
410        offsets = self.offsets
411        byte_chunk = self.byte_chunk
412        string_chunk = self.string_chunk
413        s = 8 * self.current_row_in_chunk_index
414        js = 0
415        jb = 0
416        for j in range(self.column_count):
417            lngt = lengths[j]
418            if lngt == 0:
419                break
420            start = offsets[j]
421            ct = column_types[j]
422            if ct == column_type_decimal:
423                # decimal
424                if self.is_little_endian:
425                    m = s + 8 - lngt
426                else:
427                    m = s
428                for k in range(lngt):
429                    byte_chunk[jb, m + k] = source[start + k]
430                jb += 1
431            elif column_types[j] == column_type_string:
432                # string
433                string_chunk[js, current_row] = np.array(source[start:(
434                    start + lngt)]).tobytes().rstrip(b"\x00 ")
435                js += 1
436
437        self.current_row_on_page_index += 1
438        self.current_row_in_chunk_index += 1
439        self.current_row_in_file_index += 1
440