1# cython: profile=False 2# cython: boundscheck=False, initializedcheck=False 3from cython import Py_ssize_t 4import numpy as np 5 6import pandas.io.sas.sas_constants as const 7 8ctypedef signed long long int64_t 9ctypedef unsigned char uint8_t 10ctypedef unsigned short uint16_t 11 12# rle_decompress decompresses data using a Run Length Encoding 13# algorithm. It is partially documented here: 14# 15# https://cran.r-project.org/package=sas7bdat/vignettes/sas7bdat.pdf 16cdef const uint8_t[:] rle_decompress(int result_length, const uint8_t[:] inbuff): 17 18 cdef: 19 uint8_t control_byte, x 20 uint8_t[:] result = np.zeros(result_length, np.uint8) 21 int rpos = 0 22 int i, nbytes, end_of_first_byte 23 Py_ssize_t ipos = 0, length = len(inbuff) 24 25 while ipos < length: 26 control_byte = inbuff[ipos] & 0xF0 27 end_of_first_byte = <int>(inbuff[ipos] & 0x0F) 28 ipos += 1 29 30 if control_byte == 0x00: 31 if end_of_first_byte != 0: 32 raise ValueError("Unexpected non-zero end_of_first_byte") 33 nbytes = <int>(inbuff[ipos]) + 64 34 ipos += 1 35 for _ in range(nbytes): 36 result[rpos] = inbuff[ipos] 37 rpos += 1 38 ipos += 1 39 elif control_byte == 0x40: 40 # not documented 41 nbytes = end_of_first_byte * 16 42 nbytes += <int>(inbuff[ipos]) 43 ipos += 1 44 for _ in range(nbytes): 45 result[rpos] = inbuff[ipos] 46 rpos += 1 47 ipos += 1 48 elif control_byte == 0x60: 49 nbytes = end_of_first_byte * 256 + <int>(inbuff[ipos]) + 17 50 ipos += 1 51 for _ in range(nbytes): 52 result[rpos] = 0x20 53 rpos += 1 54 elif control_byte == 0x70: 55 nbytes = end_of_first_byte * 256 + <int>(inbuff[ipos]) + 17 56 ipos += 1 57 for _ in range(nbytes): 58 result[rpos] = 0x00 59 rpos += 1 60 elif control_byte == 0x80: 61 nbytes = end_of_first_byte + 1 62 for i in range(nbytes): 63 result[rpos] = inbuff[ipos + i] 64 rpos += 1 65 ipos += nbytes 66 elif control_byte == 0x90: 67 nbytes = end_of_first_byte + 17 68 for i in range(nbytes): 69 result[rpos] = inbuff[ipos + i] 70 rpos += 1 71 ipos += nbytes 72 elif control_byte == 0xA0: 73 nbytes = end_of_first_byte + 33 74 for i in range(nbytes): 75 result[rpos] = inbuff[ipos + i] 76 rpos += 1 77 ipos += nbytes 78 elif control_byte == 0xB0: 79 nbytes = end_of_first_byte + 49 80 for i in range(nbytes): 81 result[rpos] = inbuff[ipos + i] 82 rpos += 1 83 ipos += nbytes 84 elif control_byte == 0xC0: 85 nbytes = end_of_first_byte + 3 86 x = inbuff[ipos] 87 ipos += 1 88 for _ in range(nbytes): 89 result[rpos] = x 90 rpos += 1 91 elif control_byte == 0xD0: 92 nbytes = end_of_first_byte + 2 93 for _ in range(nbytes): 94 result[rpos] = 0x40 95 rpos += 1 96 elif control_byte == 0xE0: 97 nbytes = end_of_first_byte + 2 98 for _ in range(nbytes): 99 result[rpos] = 0x20 100 rpos += 1 101 elif control_byte == 0xF0: 102 nbytes = end_of_first_byte + 2 103 for _ in range(nbytes): 104 result[rpos] = 0x00 105 rpos += 1 106 else: 107 raise ValueError(f"unknown control byte: {control_byte}") 108 109 # In py37 cython/clang sees `len(outbuff)` as size_t and not Py_ssize_t 110 if <Py_ssize_t>len(result) != <Py_ssize_t>result_length: 111 raise ValueError(f"RLE: {len(result)} != {result_length}") 112 113 return np.asarray(result) 114 115 116# rdc_decompress decompresses data using the Ross Data Compression algorithm: 117# 118# http://collaboration.cmc.ec.gc.ca/science/rpn/biblio/ddj/Website/articles/CUJ/1992/9210/ross/ross.htm 119cdef const uint8_t[:] rdc_decompress(int result_length, const uint8_t[:] inbuff): 120 121 cdef: 122 uint8_t cmd 123 uint16_t ctrl_bits = 0, ctrl_mask = 0, ofs, cnt 124 int rpos = 0, k 125 uint8_t[:] outbuff = np.zeros(result_length, dtype=np.uint8) 126 Py_ssize_t ipos = 0, length = len(inbuff) 127 128 ii = -1 129 130 while ipos < length: 131 ii += 1 132 ctrl_mask = ctrl_mask >> 1 133 if ctrl_mask == 0: 134 ctrl_bits = ((<uint16_t>inbuff[ipos] << 8) + 135 <uint16_t>inbuff[ipos + 1]) 136 ipos += 2 137 ctrl_mask = 0x8000 138 139 if ctrl_bits & ctrl_mask == 0: 140 outbuff[rpos] = inbuff[ipos] 141 ipos += 1 142 rpos += 1 143 continue 144 145 cmd = (inbuff[ipos] >> 4) & 0x0F 146 cnt = <uint16_t>(inbuff[ipos] & 0x0F) 147 ipos += 1 148 149 # short RLE 150 if cmd == 0: 151 cnt += 3 152 for k in range(cnt): 153 outbuff[rpos + k] = inbuff[ipos] 154 rpos += cnt 155 ipos += 1 156 157 # long RLE 158 elif cmd == 1: 159 cnt += <uint16_t>inbuff[ipos] << 4 160 cnt += 19 161 ipos += 1 162 for k in range(cnt): 163 outbuff[rpos + k] = inbuff[ipos] 164 rpos += cnt 165 ipos += 1 166 167 # long pattern 168 elif cmd == 2: 169 ofs = cnt + 3 170 ofs += <uint16_t>inbuff[ipos] << 4 171 ipos += 1 172 cnt = <uint16_t>inbuff[ipos] 173 ipos += 1 174 cnt += 16 175 for k in range(cnt): 176 outbuff[rpos + k] = outbuff[rpos - <int>ofs + k] 177 rpos += cnt 178 179 # short pattern 180 elif (cmd >= 3) & (cmd <= 15): 181 ofs = cnt + 3 182 ofs += <uint16_t>inbuff[ipos] << 4 183 ipos += 1 184 for k in range(cmd): 185 outbuff[rpos + k] = outbuff[rpos - <int>ofs + k] 186 rpos += cmd 187 188 else: 189 raise ValueError("unknown RDC command") 190 191 # In py37 cython/clang sees `len(outbuff)` as size_t and not Py_ssize_t 192 if <Py_ssize_t>len(outbuff) != <Py_ssize_t>result_length: 193 raise ValueError(f"RDC: {len(outbuff)} != {result_length}\n") 194 195 return np.asarray(outbuff) 196 197 198cdef enum ColumnTypes: 199 column_type_decimal = 1 200 column_type_string = 2 201 202 203# type the page_data types 204cdef: 205 int page_meta_type = const.page_meta_type 206 int page_mix_types_0 = const.page_mix_types[0] 207 int page_mix_types_1 = const.page_mix_types[1] 208 int page_data_type = const.page_data_type 209 int subheader_pointers_offset = const.subheader_pointers_offset 210 211 212cdef class Parser: 213 214 cdef: 215 int column_count 216 int64_t[:] lengths 217 int64_t[:] offsets 218 int64_t[:] column_types 219 uint8_t[:, :] byte_chunk 220 object[:, :] string_chunk 221 char *cached_page 222 int current_row_on_page_index 223 int current_page_block_count 224 int current_page_data_subheader_pointers_len 225 int current_page_subheaders_count 226 int current_row_in_chunk_index 227 int current_row_in_file_index 228 int header_length 229 int row_length 230 int bit_offset 231 int subheader_pointer_length 232 int current_page_type 233 bint is_little_endian 234 const uint8_t[:] (*decompress)(int result_length, const uint8_t[:] inbuff) 235 object parser 236 237 def __init__(self, object parser): 238 cdef: 239 int j 240 char[:] column_types 241 242 self.parser = parser 243 self.header_length = self.parser.header_length 244 self.column_count = parser.column_count 245 self.lengths = parser.column_data_lengths() 246 self.offsets = parser.column_data_offsets() 247 self.byte_chunk = parser._byte_chunk 248 self.string_chunk = parser._string_chunk 249 self.row_length = parser.row_length 250 self.bit_offset = self.parser._page_bit_offset 251 self.subheader_pointer_length = self.parser._subheader_pointer_length 252 self.is_little_endian = parser.byte_order == "<" 253 self.column_types = np.empty(self.column_count, dtype='int64') 254 255 # page indicators 256 self.update_next_page() 257 258 column_types = parser.column_types() 259 260 # map column types 261 for j in range(self.column_count): 262 if column_types[j] == b'd': 263 self.column_types[j] = column_type_decimal 264 elif column_types[j] == b's': 265 self.column_types[j] = column_type_string 266 else: 267 raise ValueError(f"unknown column type: {self.parser.columns[j].ctype}") 268 269 # compression 270 if parser.compression == const.rle_compression: 271 self.decompress = rle_decompress 272 elif parser.compression == const.rdc_compression: 273 self.decompress = rdc_decompress 274 else: 275 self.decompress = NULL 276 277 # update to current state of the parser 278 self.current_row_in_chunk_index = parser._current_row_in_chunk_index 279 self.current_row_in_file_index = parser._current_row_in_file_index 280 self.current_row_on_page_index = parser._current_row_on_page_index 281 282 def read(self, int nrows): 283 cdef: 284 bint done 285 int i 286 287 for _ in range(nrows): 288 done = self.readline() 289 if done: 290 break 291 292 # update the parser 293 self.parser._current_row_on_page_index = self.current_row_on_page_index 294 self.parser._current_row_in_chunk_index = self.current_row_in_chunk_index 295 self.parser._current_row_in_file_index = self.current_row_in_file_index 296 297 cdef bint read_next_page(self): 298 cdef done 299 300 done = self.parser._read_next_page() 301 if done: 302 self.cached_page = NULL 303 else: 304 self.update_next_page() 305 return done 306 307 cdef update_next_page(self): 308 # update data for the current page 309 310 self.cached_page = <char *>self.parser._cached_page 311 self.current_row_on_page_index = 0 312 self.current_page_type = self.parser._current_page_type 313 self.current_page_block_count = self.parser._current_page_block_count 314 self.current_page_data_subheader_pointers_len = len( 315 self.parser._current_page_data_subheader_pointers 316 ) 317 self.current_page_subheaders_count = self.parser._current_page_subheaders_count 318 319 cdef readline(self): 320 321 cdef: 322 int offset, bit_offset, align_correction 323 int subheader_pointer_length, mn 324 bint done, flag 325 326 bit_offset = self.bit_offset 327 subheader_pointer_length = self.subheader_pointer_length 328 329 # If there is no page, go to the end of the header and read a page. 330 if self.cached_page == NULL: 331 self.parser._path_or_buf.seek(self.header_length) 332 done = self.read_next_page() 333 if done: 334 return True 335 336 # Loop until a data row is read 337 while True: 338 if self.current_page_type == page_meta_type: 339 flag = self.current_row_on_page_index >=\ 340 self.current_page_data_subheader_pointers_len 341 if flag: 342 done = self.read_next_page() 343 if done: 344 return True 345 continue 346 current_subheader_pointer = ( 347 self.parser._current_page_data_subheader_pointers[ 348 self.current_row_on_page_index]) 349 self.process_byte_array_with_data( 350 current_subheader_pointer.offset, 351 current_subheader_pointer.length) 352 return False 353 elif (self.current_page_type == page_mix_types_0 or 354 self.current_page_type == page_mix_types_1): 355 align_correction = ( 356 bit_offset 357 + subheader_pointers_offset 358 + self.current_page_subheaders_count * subheader_pointer_length 359 ) 360 align_correction = align_correction % 8 361 offset = bit_offset + align_correction 362 offset += subheader_pointers_offset 363 offset += self.current_page_subheaders_count * subheader_pointer_length 364 offset += self.current_row_on_page_index * self.row_length 365 self.process_byte_array_with_data(offset, self.row_length) 366 mn = min(self.parser.row_count, self.parser._mix_page_row_count) 367 if self.current_row_on_page_index == mn: 368 done = self.read_next_page() 369 if done: 370 return True 371 return False 372 elif self.current_page_type & page_data_type == page_data_type: 373 self.process_byte_array_with_data( 374 bit_offset 375 + subheader_pointers_offset 376 + self.current_row_on_page_index * self.row_length, 377 self.row_length, 378 ) 379 flag = self.current_row_on_page_index == self.current_page_block_count 380 if flag: 381 done = self.read_next_page() 382 if done: 383 return True 384 return False 385 else: 386 raise ValueError(f"unknown page type: {self.current_page_type}") 387 388 cdef void process_byte_array_with_data(self, int offset, int length): 389 390 cdef: 391 Py_ssize_t j 392 int s, k, m, jb, js, current_row 393 int64_t lngt, start, ct 394 const uint8_t[:] source 395 int64_t[:] column_types 396 int64_t[:] lengths 397 int64_t[:] offsets 398 uint8_t[:, :] byte_chunk 399 object[:, :] string_chunk 400 401 source = np.frombuffer( 402 self.cached_page[offset:offset + length], dtype=np.uint8) 403 404 if self.decompress != NULL and (length < self.row_length): 405 source = self.decompress(self.row_length, source) 406 407 current_row = self.current_row_in_chunk_index 408 column_types = self.column_types 409 lengths = self.lengths 410 offsets = self.offsets 411 byte_chunk = self.byte_chunk 412 string_chunk = self.string_chunk 413 s = 8 * self.current_row_in_chunk_index 414 js = 0 415 jb = 0 416 for j in range(self.column_count): 417 lngt = lengths[j] 418 if lngt == 0: 419 break 420 start = offsets[j] 421 ct = column_types[j] 422 if ct == column_type_decimal: 423 # decimal 424 if self.is_little_endian: 425 m = s + 8 - lngt 426 else: 427 m = s 428 for k in range(lngt): 429 byte_chunk[jb, m + k] = source[start + k] 430 jb += 1 431 elif column_types[j] == column_type_string: 432 # string 433 string_chunk[js, current_row] = np.array(source[start:( 434 start + lngt)]).tobytes().rstrip(b"\x00 ") 435 js += 1 436 437 self.current_row_on_page_index += 1 438 self.current_row_in_chunk_index += 1 439 self.current_row_in_file_index += 1 440