1# -*- coding: utf-8 -*- 2from __future__ import division 3from __future__ import print_function 4from __future__ import absolute_import 5from __future__ import unicode_literals 6 7import re 8import six 9import gzip 10import zipfile 11import tempfile 12import warnings 13from copy import copy 14from itertools import chain 15from collections import deque 16from .loaders.stream import StreamLoader 17from . import exceptions 18from . import helpers 19from . import config 20 21 22# Module API 23 24# TODO: merge pick/skip rows logic 25class Stream(object): 26 """Stream of tabular data. 27 28 This is the main `tabulator` class. It loads a data source, and allows you 29 to stream its parsed contents. 30 31 # Arguments 32 33 source (str): 34 Path to file as ``<scheme>\\://path/to/file.<format>``. 35 If not explicitly set, the scheme (file, http, ...) and 36 format (csv, xls, ...) are inferred from the source string. 37 38 headers (Union[int, List[int], List[str]], optional): 39 Either a row 40 number or list of row numbers (in case of multi-line headers) to be 41 considered as headers (rows start counting at 1), or the actual 42 headers defined a list of strings. If not set, all rows will be 43 treated as containing values. 44 45 scheme (str, optional): 46 Scheme for loading the file (file, http, ...). 47 If not set, it'll be inferred from `source`. 48 49 format (str, optional): 50 File source's format (csv, xls, ...). If not 51 set, it'll be inferred from `source`. inferred 52 53 encoding (str, optional): 54 Source encoding. If not set, it'll be inferred. 55 56 compression (str, optional): 57 Source file compression (zip, ...). If not set, it'll be inferred. 58 59 pick_rows (List[Union[int, str, dict]], optional): 60 The same as `skip_rows` but it's for picking rows instead of skipping. 61 62 skip_rows (List[Union[int, str, dict]], optional): 63 List of row numbers, strings and regex patterns as dicts to skip. 64 If a string, it'll skip rows that their first cells begin with it e.g. '#' and '//'. 65 To skip only completely blank rows use `{'type'\\: 'preset', 'value'\\: 'blank'}` 66 To provide a regex pattern use `{'type'\\: 'regex', 'value'\\: '^#'}` 67 For example\\: `skip_rows=[1, '# comment', {'type'\\: 'regex', 'value'\\: '^# (regex|comment)'}]` 68 69 pick_fields (List[Union[int, str]], optional): 70 When passed, ignores all columns with headers 71 that the given list DOES NOT include 72 73 skip_fields (List[Union[int, str]], optional): 74 When passed, ignores all columns with headers 75 that the given list includes. If it contains an empty string it will skip 76 empty headers 77 78 sample_size (int, optional): 79 Controls the number of sample rows used to 80 infer properties from the data (headers, encoding, etc.). Set to 81 ``0`` to disable sampling, in which case nothing will be inferred 82 from the data. Defaults to ``config.DEFAULT_SAMPLE_SIZE``. 83 84 bytes_sample_size (int, optional): 85 Same as `sample_size`, but instead 86 of number of rows, controls number of bytes. Defaults to 87 ``config.DEFAULT_BYTES_SAMPLE_SIZE``. 88 89 allow_html (bool, optional): 90 Allow the file source to be an HTML page. 91 If False, raises ``exceptions.FormatError`` if the loaded file is 92 an HTML page. Defaults to False. 93 94 multiline_headers_joiner (str, optional): 95 When passed, it's used to join multiline headers 96 as `<passed-value>.join(header1_1, header1_2)` 97 Defaults to ' ' (space). 98 99 multiline_headers_duplicates (bool, optional): 100 By default tabulator will exclude a cell of a miltilne header from joining 101 if it's exactly the same as the previous seen value in this field. 102 Enabling this option will force duplicates inclusion 103 Defaults to False. 104 105 hashing_algorithm (func, optional): 106 It supports: md5, sha1, sha256, sha512 107 Defaults to sha256 108 109 force_strings (bool, optional): 110 When True, casts all data to strings. 111 Defaults to False. 112 113 force_parse (bool, optional): 114 When True, don't raise exceptions when 115 parsing malformed rows, simply returning an empty value. Defaults 116 to False. 117 118 post_parse (List[function], optional): 119 List of generator functions that 120 receives a list of rows and headers, processes them, and yields 121 them (or not). Useful to pre-process the data. Defaults to None. 122 123 custom_loaders (dict, optional): 124 Dictionary with keys as scheme names, 125 and values as their respective ``Loader`` class implementations. 126 Defaults to None. 127 128 custom_parsers (dict, optional): 129 Dictionary with keys as format names, 130 and values as their respective ``Parser`` class implementations. 131 Defaults to None. 132 133 custom_loaders (dict, optional): 134 Dictionary with keys as writer format 135 names, and values as their respective ``Writer`` class 136 implementations. Defaults to None. 137 138 **options (Any, optional): Extra options passed to the loaders and parsers. 139 140 """ 141 142 # Public 143 144 def __init__(self, 145 source, 146 headers=None, 147 scheme=None, 148 format=None, 149 encoding=None, 150 compression=None, 151 allow_html=False, 152 sample_size=config.DEFAULT_SAMPLE_SIZE, 153 bytes_sample_size=config.DEFAULT_BYTES_SAMPLE_SIZE, 154 ignore_blank_headers=False, 155 ignore_listed_headers=None, 156 ignore_not_listed_headers=None, 157 multiline_headers_joiner=' ', 158 multiline_headers_duplicates=False, 159 hashing_algorithm='sha256', 160 force_strings=False, 161 force_parse=False, 162 pick_columns=None, 163 skip_columns=None, 164 pick_fields=None, 165 skip_fields=None, 166 limit_fields=None, 167 offset_fields=None, 168 pick_rows=None, 169 skip_rows=None, 170 limit_rows=None, 171 offset_rows=None, 172 post_parse=[], 173 custom_loaders={}, 174 custom_parsers={}, 175 custom_writers={}, 176 **options): 177 178 # Translate aliases 179 if pick_fields is not None: 180 pick_columns = pick_fields 181 if skip_fields is not None: 182 skip_columns = skip_fields 183 if pick_columns is not None: 184 ignore_not_listed_headers = pick_columns 185 if skip_columns is not None: 186 ignore_listed_headers = skip_columns 187 if '' in skip_columns: 188 ignore_blank_headers = True 189 190 # Set headers 191 self.__headers = None 192 self.__headers_row = None 193 self.__headers_row_last = None 194 if isinstance(headers, int): 195 self.__headers_row = headers 196 self.__headers_row_last = headers 197 elif isinstance(headers, (tuple, list)): 198 if (len(headers) == 2 and 199 isinstance(headers[0], int) and 200 isinstance(headers[1], int)): 201 self.__headers_row = headers[0] 202 self.__headers_row_last = headers[1] 203 else: 204 self.__headers = list(headers) 205 206 # Set pick rows 207 self.__pick_rows = pick_rows 208 self.__pick_rows_by_numbers = [] 209 self.__pick_rows_by_patterns = [] 210 self.__pick_rows_by_comments = [] 211 self.__pick_rows_by_presets = {} 212 for directive in copy(pick_rows or []): 213 if isinstance(directive, int): 214 self.__pick_rows_by_numbers.append(directive) 215 elif isinstance(directive, dict): 216 if directive['type'] == 'regex': 217 self.__pick_rows_by_patterns.append(re.compile(directive['value'])) 218 elif directive['type'] == 'preset' and directive['value'] == 'blank': 219 self.__pick_rows_by_presets['blank'] = True 220 else: 221 raise ValueError('Not supported pick rows: %s' % directive) 222 else: 223 self.__pick_rows_by_comments.append(str(directive)) 224 225 # Set skip rows 226 self.__skip_rows = skip_rows 227 self.__skip_rows_by_numbers = [] 228 self.__skip_rows_by_patterns = [] 229 self.__skip_rows_by_comments = [] 230 self.__skip_rows_by_presets = {} 231 for directive in copy(skip_rows or []): 232 if isinstance(directive, int): 233 self.__skip_rows_by_numbers.append(directive) 234 elif isinstance(directive, dict): 235 if directive['type'] == 'regex': 236 self.__skip_rows_by_patterns.append(re.compile(directive['value'])) 237 elif directive['type'] == 'preset' and directive['value'] == 'blank': 238 self.__skip_rows_by_presets['blank'] = True 239 else: 240 raise ValueError('Not supported skip rows: %s' % directive) 241 else: 242 self.__skip_rows_by_comments.append(str(directive)) 243 244 # Support for pathlib.Path 245 if hasattr(source, 'joinpath'): 246 source = str(source) 247 248 # Set attributes 249 self.__source = source 250 self.__scheme = scheme 251 self.__format = format 252 self.__encoding = encoding 253 self.__compression = compression 254 self.__allow_html = allow_html 255 self.__sample_size = sample_size 256 self.__bytes_sample_size = bytes_sample_size 257 self.__ignore_blank_headers = ignore_blank_headers 258 self.__ignore_listed_headers = ignore_listed_headers 259 self.__ignore_not_listed_headers = ignore_not_listed_headers 260 self.__multiline_headers_joiner = multiline_headers_joiner 261 self.__multiline_headers_duplicates = multiline_headers_duplicates 262 self.__ignored_headers_indexes = [] 263 self.__hashing_algorithm = hashing_algorithm 264 self.__force_strings = force_strings 265 self.__force_parse = force_parse 266 self.__limit_fields = limit_fields 267 self.__offset_fields = offset_fields 268 self.__limit_rows = limit_rows 269 self.__offset_rows = offset_rows 270 self.__post_parse = copy(post_parse) 271 self.__custom_loaders = copy(custom_loaders) 272 self.__custom_parsers = copy(custom_parsers) 273 self.__custom_writers = copy(custom_writers) 274 self.__actual_scheme = scheme 275 self.__actual_format = format 276 self.__actual_encoding = encoding 277 self.__actual_compression = compression 278 self.__options = options 279 self.__sample_extended_rows = [] 280 self.__field_positions = None 281 self.__loader = None 282 self.__parser = None 283 self.__row_number = 0 284 self.__stats = None 285 286 def __enter__(self): 287 if self.closed: 288 self.open() 289 return self 290 291 def __exit__(self, type, value, traceback): 292 if not self.closed: 293 self.close() 294 295 def __iter__(self): 296 return self.iter() 297 298 @property 299 def closed(self): 300 """Returns True if the underlying stream is closed, False otherwise. 301 302 # Returns 303 bool: whether closed 304 305 """ 306 return not self.__parser or self.__parser.closed 307 308 def open(self): 309 """Opens the stream for reading. 310 311 # Raises: 312 TabulatorException: if an error 313 314 """ 315 source = self.__source 316 options = copy(self.__options) 317 318 # Programming error assertions 319 assert self.__hashing_algorithm in config.SUPPORTED_HASHING_ALGORITHMS 320 321 # Validate compression 322 if self.__compression: 323 if self.__compression not in config.SUPPORTED_COMPRESSION: 324 message = 'Not supported compression "%s"' % self.__compression 325 raise exceptions.CompressionError(message) 326 327 # Get scheme and format if not already given 328 compression = None 329 if self.__scheme is None or self.__format is None: 330 detected_scheme, detected_format = helpers.detect_scheme_and_format(source) 331 scheme = self.__scheme or detected_scheme 332 format = self.__format or detected_format 333 # Get compression 334 for type in config.SUPPORTED_COMPRESSION: 335 if self.__compression == type or detected_format == type: 336 compression = type 337 else: 338 scheme = self.__scheme 339 format = self.__format 340 341 # Initiate loader 342 self.__loader = None 343 if scheme is not None: 344 loader_class = self.__custom_loaders.get(scheme) 345 if loader_class is None: 346 if scheme not in config.LOADERS: 347 message = 'Scheme "%s" is not supported' % scheme 348 raise exceptions.SchemeError(message) 349 loader_path = config.LOADERS[scheme] 350 if loader_path: 351 loader_class = helpers.import_attribute(loader_path) 352 if loader_class is not None: 353 loader_options = helpers.extract_options(options, loader_class.options) 354 if compression and 'http_stream' in loader_class.options: 355 loader_options['http_stream'] = False 356 self.__loader = loader_class( 357 bytes_sample_size=self.__bytes_sample_size, 358 **loader_options) 359 360 # Zip compression 361 if compression == 'zip' and six.PY3: 362 source = self.__loader.load(source, mode='b') 363 with zipfile.ZipFile(source) as archive: 364 name = archive.namelist()[0] 365 if 'filename' in options.keys(): 366 name = options['filename'] 367 del options['filename'] 368 with archive.open(name) as file: 369 source = tempfile.NamedTemporaryFile(suffix='.' + name) 370 for line in file: 371 source.write(line) 372 source.seek(0) 373 # We redefine loader/format/schema after decompression 374 self.__loader = StreamLoader(bytes_sample_size=self.__bytes_sample_size) 375 format = self.__format or helpers.detect_scheme_and_format(source.name)[1] 376 scheme = 'stream' 377 378 # Gzip compression 379 elif compression == 'gz' and six.PY3: 380 name = '' 381 if isinstance(source, str): 382 name = source.replace('.gz', '') 383 source = gzip.open(self.__loader.load(source, mode='b')) 384 # We redefine loader/format/schema after decompression 385 self.__loader = StreamLoader(bytes_sample_size=self.__bytes_sample_size) 386 format = self.__format or helpers.detect_scheme_and_format(name)[1] 387 scheme = 'stream' 388 389 # Not supported compression 390 elif compression: 391 message = 'Compression "%s" is not supported for your Python version' 392 raise exceptions.TabulatorException(message % compression) 393 394 # Attach stats to the loader 395 if getattr(self.__loader, 'attach_stats', None): 396 self.__stats = {'size': 0, 'hash': '', 'hashing_algorithm': self.__hashing_algorithm} 397 getattr(self.__loader, 'attach_stats')(self.__stats) 398 399 # Initiate parser 400 parser_class = self.__custom_parsers.get(format) 401 if parser_class is None: 402 if format not in config.PARSERS: 403 # If not existent it's a not-found error 404 # Next line will raise IOError/HTTPError 405 chars = self.__loader.load(source) 406 chars.close() 407 # Otherwise it's a format error 408 message = 'Format "%s" is not supported' % format 409 raise exceptions.FormatError(message) 410 parser_class = helpers.import_attribute(config.PARSERS[format]) 411 parser_options = helpers.extract_options(options, parser_class.options) 412 self.__parser = parser_class(self.__loader, 413 force_parse=self.__force_parse, 414 **parser_options) 415 416 # Bad options 417 if options: 418 message = 'Not supported option(s) "%s" for scheme "%s" and format "%s"' 419 message = message % (', '.join(options), scheme, format) 420 warnings.warn(message, UserWarning) 421 422 # Open and setup 423 self.__parser.open(source, encoding=self.__encoding) 424 self.__extract_sample() 425 self.__extract_headers() 426 if not self.__allow_html: 427 self.__detect_html() 428 429 # Set scheme/format/encoding 430 self.__actual_scheme = scheme 431 self.__actual_format = format 432 self.__actual_encoding = self.__parser.encoding 433 self.__actual_compression = compression 434 435 return self 436 437 def close(self): 438 """Closes the stream. 439 """ 440 self.__parser.close() 441 self.__row_number = 0 442 443 def reset(self): 444 """Resets the stream pointer to the beginning of the file. 445 """ 446 if self.__row_number > self.__sample_size: 447 self.__stats = {'size': 0, 'hash': ''} 448 self.__parser.reset() 449 self.__extract_sample() 450 self.__extract_headers() 451 self.__row_number = 0 452 453 @property 454 def source(self): 455 """Source 456 457 # Returns 458 any: stream source 459 460 """ 461 return self.__source 462 463 @property 464 def headers(self): 465 """Headers 466 467 # Returns 468 str[]/None: headers if available 469 470 """ 471 return self.__headers 472 473 @headers.setter 474 def headers(self, headers): 475 """Set headers 476 477 # Arguments 478 str[]: headers 479 480 """ 481 self.__headers = headers 482 483 @property 484 def scheme(self): 485 """Path's scheme 486 487 # Returns 488 str: scheme 489 490 """ 491 return self.__actual_scheme or 'inline' 492 493 @property 494 def format(self): 495 """Path's format 496 497 # Returns 498 str: format 499 500 """ 501 return self.__actual_format or 'inline' 502 503 @property 504 def encoding(self): 505 """Stream's encoding 506 507 # Returns 508 str: encoding 509 510 """ 511 return self.__actual_encoding or 'no' 512 513 @property 514 def compression(self): 515 """Stream's compression ("no" if no compression) 516 517 # Returns 518 str: compression 519 520 """ 521 return self.__actual_compression or 'no' 522 523 @property 524 def fragment(self): 525 """Path's fragment 526 527 # Returns 528 str: fragment 529 530 """ 531 if self.__parser: 532 return getattr(self.__parser, 'fragment', None) 533 return None 534 535 @property 536 def dialect(self): 537 """Dialect (if available) 538 539 # Returns 540 dict/None: dialect 541 542 """ 543 if self.__parser: 544 return getattr(self.__parser, 'dialect', {}) 545 return None 546 547 @property 548 def size(self): 549 """Returns the BYTE count of the read chunks if available 550 551 # Returns 552 int/None: BYTE count 553 554 """ 555 if self.__stats: 556 return self.__stats['size'] 557 558 @property 559 def hash(self): 560 """Returns the SHA256 (or according to the "hashing_algorithm" parameter) 561 hash of the read chunks if available 562 563 # Returns 564 str/None: bytes hash 565 566 """ 567 if self.__stats: 568 return self.__stats['hash'] 569 570 @property 571 def sample(self): 572 """Returns the stream's rows used as sample. 573 574 These sample rows are used internally to infer characteristics of the 575 source file (e.g. encoding, headers, ...). 576 577 # Returns 578 list[]: sample 579 580 """ 581 sample = [] 582 iterator = iter(self.__sample_extended_rows) 583 iterator = self.__apply_processors(iterator) 584 for row_number, headers, row in iterator: 585 sample.append(row) 586 return sample 587 588 @property 589 def field_positions(self): 590 if self.__field_positions is None: 591 self.__field_positions = [] 592 if self.__headers: 593 size = len(self.__headers) + len(self.__ignored_headers_indexes) 594 for index in range(size): 595 if index not in self.__ignored_headers_indexes: 596 self.__field_positions.append(index + 1) 597 return self.__field_positions 598 599 @property 600 def hashing_algorithm(self): 601 return self.__hashing_algorithm 602 603 def iter(self, keyed=False, extended=False): 604 """Iterate over the rows. 605 606 Each row is returned in a format that depends on the arguments `keyed` 607 and `extended`. By default, each row is returned as list of their 608 values. 609 610 # Arguments 611 keyed (bool, optional): 612 When True, each returned row will be a 613 `dict` mapping the header name to its value in the current row. 614 For example, `[{'name'\\: 'J Smith', 'value'\\: '10'}]`. Ignored if 615 ``extended`` is True. Defaults to False. 616 extended (bool, optional): 617 When True, returns each row as a tuple 618 with row number (starts at 1), list of headers, and list of row 619 values. For example, `(1, ['name', 'value'], ['J Smith', '10'])`. 620 Defaults to False. 621 622 # Raises 623 exceptions.TabulatorException: If the stream is closed. 624 625 # Returns 626 Iterator[Union[List[Any], Dict[str, Any], Tuple[int, List[str], List[Any]]]]: 627 The row itself. The format depends on the values of `keyed` and 628 `extended` arguments. 629 630 """ 631 632 # Error if closed 633 if self.closed: 634 message = 'Stream is closed. Please call "stream.open()" first.' 635 raise exceptions.TabulatorException(message) 636 637 # Create iterator 638 iterator = chain( 639 self.__sample_extended_rows, 640 self.__parser.extended_rows) 641 iterator = self.__apply_processors(iterator) 642 643 # Yield rows from iterator 644 try: 645 count = 0 646 for row_number, headers, row in iterator: 647 if row_number > self.__row_number: 648 count += 1 649 if self.__limit_rows or self.__offset_rows: 650 offset = self.__offset_rows or 0 651 limit = self.__limit_rows + offset if self.__limit_rows else None 652 if offset and count <= offset: 653 continue 654 if limit and count > limit: 655 break 656 self.__row_number = row_number 657 if extended: 658 yield (row_number, headers, row) 659 elif keyed: 660 yield dict(zip(headers, row)) 661 else: 662 yield row 663 except UnicodeError as error: 664 message = 'Cannot parse the source "%s" using "%s" encoding at "%s"' 665 raise exceptions.EncodingError(message % (self.__source, error.encoding, error.start)) 666 except Exception as error: 667 raise exceptions.SourceError(str(error)) 668 669 def read(self, keyed=False, extended=False, limit=None): 670 """Returns a list of rows. 671 672 # Arguments 673 keyed (bool, optional): See :func:`Stream.iter`. 674 extended (bool, optional): See :func:`Stream.iter`. 675 limit (int, optional): 676 Number of rows to return. If None, returns all rows. Defaults to None. 677 678 # Returns 679 List[Union[List[Any], Dict[str, Any], Tuple[int, List[str], List[Any]]]]: 680 The list of rows. The format depends on the values of `keyed` 681 and `extended` arguments. 682 """ 683 result = [] 684 rows = self.iter(keyed=keyed, extended=extended) 685 for count, row in enumerate(rows, start=1): 686 result.append(row) 687 if count == limit: 688 break 689 return result 690 691 def save(self, target, format=None, encoding=None, **options): 692 """Save stream to the local filesystem. 693 694 # Arguments 695 target (str): Path where to save the stream. 696 format (str, optional): 697 The format the stream will be saved as. If 698 None, detects from the ``target`` path. Defaults to None. 699 encoding (str, optional): 700 Saved file encoding. Defaults to ``config.DEFAULT_ENCODING``. 701 **options: Extra options passed to the writer. 702 703 # Returns 704 count (int?): Written rows count if available 705 """ 706 707 # Get encoding/format 708 if encoding is None: 709 encoding = config.DEFAULT_ENCODING 710 if format is None: 711 _, format = helpers.detect_scheme_and_format(target) 712 713 # Prepare writer class 714 writer_class = self.__custom_writers.get(format) 715 if writer_class is None: 716 if format not in config.WRITERS: 717 message = 'Format "%s" is not supported' % format 718 raise exceptions.FormatError(message) 719 writer_class = helpers.import_attribute(config.WRITERS[format]) 720 721 # Prepare writer options 722 writer_options = helpers.extract_options(options, writer_class.options) 723 if options: 724 message = 'Not supported options "%s" for format "%s"' 725 message = message % (', '.join(options), format) 726 raise exceptions.TabulatorException(message) 727 728 # Write data to target 729 writer = writer_class(**writer_options) 730 return writer.write(self.iter(), target, headers=self.headers, encoding=encoding) 731 732 # Private 733 734 def __extract_sample(self): 735 736 # Sample is not requested 737 if not self.__sample_size: 738 return 739 740 # Extract sample rows 741 self.__sample_extended_rows = [] 742 for _ in range(self.__sample_size): 743 try: 744 row_number, headers, row = next(self.__parser.extended_rows) 745 if self.__headers_row and self.__headers_row >= row_number: 746 if self.__check_if_row_for_skipping(row_number, headers, row): 747 self.__headers_row += 1 748 self.__headers_row_last += 1 749 self.__sample_extended_rows.append((row_number, headers, row)) 750 except StopIteration: 751 break 752 except UnicodeError as error: 753 message = 'Cannot parse the source "%s" using "%s" encoding at "%s"' 754 raise exceptions.EncodingError(message % (self.__source, error.encoding, error.start)) 755 except Exception as error: 756 raise exceptions.SourceError(str(error)) 757 758 def __extract_headers(self): 759 760 # Heders row is not set 761 if not self.__headers_row: 762 return 763 764 # Sample is too short 765 if self.__headers_row > self.__sample_size: 766 message = 'Headers row (%s) can\'t be more than sample_size (%s)' 767 message = message % (self.__headers_row, self.__sample_size) 768 raise exceptions.TabulatorException(message) 769 770 # Get headers from data 771 last_merged = {} 772 keyed_source = False 773 for row_number, headers, row in self.__sample_extended_rows: 774 keyed_source = keyed_source or headers is not None 775 headers = headers if keyed_source else row 776 for index, header in enumerate(headers): 777 if header is not None: 778 headers[index] = six.text_type(header).strip() 779 if row_number == self.__headers_row: 780 self.__headers = headers 781 last_merged = {index: header for index, header in enumerate(headers)} 782 if row_number > self.__headers_row: 783 for index in range(0, len(self.__headers)): 784 if len(headers) > index and headers[index] is not None: 785 if not self.__headers[index]: 786 self.__headers[index] = headers[index] 787 else: 788 if (self.__multiline_headers_duplicates or 789 last_merged.get(index) != headers[index]): 790 self.__headers[index] += ( 791 self.__multiline_headers_joiner + headers[index]) 792 last_merged[index] = headers[index] 793 if row_number == self.__headers_row_last: 794 break 795 796 # Ignore headers 797 if (self.__ignore_blank_headers or 798 self.__ignore_listed_headers is not None or 799 self.__ignore_not_listed_headers is not None): 800 self.__ignored_headers_indexes = [] 801 raw_headers, self.__headers = self.__headers, [] 802 for index, header in list(enumerate(raw_headers)): 803 ignore = False 804 # Ignore blank headers 805 if header in ['', None]: 806 ignore = True 807 # Ignore listed headers 808 if self.__ignore_listed_headers is not None: 809 if (header in self.__ignore_listed_headers or 810 index + 1 in self.__ignore_listed_headers): 811 ignore = True 812 # Regex 813 for item in self.__ignore_listed_headers: 814 if isinstance(item, dict) and item.get('type') == 'regex': 815 if bool(re.search(item['value'], header)): 816 ignore = True 817 # Ignore not-listed headers 818 if self.__ignore_not_listed_headers is not None: 819 if (header not in self.__ignore_not_listed_headers and 820 index + 1 not in self.__ignore_not_listed_headers): 821 ignore = True 822 # Regex 823 for item in self.__ignore_not_listed_headers: 824 if isinstance(item, dict) and item.get('type') == 'regex': 825 if bool(re.search(item['value'], header)): 826 ignore = False 827 # Add to the list and skip 828 if ignore: 829 self.__ignored_headers_indexes.append(index) 830 continue 831 self.__headers.append(header) 832 self.__ignored_headers_indexes = list(sorted(self.__ignored_headers_indexes, reverse=True)) 833 834 # Limit/offset fields 835 if self.__limit_fields or self.__offset_fields: 836 ignore = [] 837 headers = [] 838 min = self.__offset_fields or 0 839 max = self.__limit_fields + min if self.__limit_fields else len(self.__headers) 840 for position, header in enumerate(self.__headers, start=1): 841 if position <= min: 842 ignore.append(position - 1) 843 continue 844 if position > max: 845 ignore.append(position - 1) 846 continue 847 headers.append(header) 848 for index in ignore: 849 if index not in self.__ignored_headers_indexes: 850 self.__ignored_headers_indexes.append(index) 851 self.__ignored_headers_indexes = list(sorted(self.__ignored_headers_indexes, reverse=True)) 852 self.__headers = headers 853 854 # Remove headers from data 855 if not keyed_source: 856 del self.__sample_extended_rows[:self.__headers_row_last] 857 858 # Stringify headers 859 if isinstance(self.__headers, list): 860 str_headers = [] 861 for header in self.__headers: 862 str_headers.append(six.text_type(header) if header is not None else '') 863 self.__headers = str_headers 864 865 def __detect_html(self): 866 867 # Prepare text 868 text = '' 869 for row_number, headers, row in self.__sample_extended_rows: 870 for value in row: 871 if isinstance(value, six.string_types): 872 text += value 873 874 # Detect html content 875 html_source = helpers.detect_html(text) 876 if html_source: 877 message = 'Format has been detected as HTML (not supported)' 878 raise exceptions.FormatError(message) 879 880 def __apply_processors(self, iterator): 881 882 # Base processor 883 def builtin_processor(extended_rows): 884 for row_number, headers, row in extended_rows: 885 886 # Sync headers/row 887 if headers != self.__headers: 888 if headers and self.__headers: 889 keyed_row = dict(zip(headers, row)) 890 row = [keyed_row.get(header) for header in self.__headers] 891 elif self.__ignored_headers_indexes: 892 row = [value for index, value in enumerate(row) if index not in self.__ignored_headers_indexes] 893 headers = self.__headers 894 895 # Skip rows by numbers/comments 896 if self.__check_if_row_for_skipping(row_number, headers, row): 897 continue 898 899 yield (row_number, headers, row) 900 901 # Skip nagative rows processor 902 def skip_negative_rows(extended_rows): 903 ''' 904 This processor will skip rows which counts from the end, e.g. 905 -1: skip last row, -2: skip pre-last row, etc. 906 Rows to skip are taken from Stream.__skip_rows_by_numbers 907 ''' 908 rows_to_skip = [n for n in self.__skip_rows_by_numbers if n < 0] 909 buffer_size = abs(min(rows_to_skip)) 910 # collections.deque - takes O[1] time to push/pop values from any side. 911 buffer = deque() 912 913 # use buffer to save last rows 914 for row in extended_rows: 915 buffer.append(row) 916 if len(buffer) > buffer_size: 917 yield buffer.popleft() 918 919 # Now squeeze out the buffer 920 n = len(buffer) 921 for i, row in enumerate(buffer): 922 if i - n not in rows_to_skip: 923 yield row 924 925 # Force values to strings processor 926 def force_strings_processor(extended_rows): 927 for row_number, headers, row in extended_rows: 928 row = list(map(helpers.stringify_value, row)) 929 yield (row_number, headers, row) 930 931 # Form a processors list 932 processors = [builtin_processor] 933 # if we have to delete some rows with negative index (counting from the end) 934 if [n for n in self.__skip_rows_by_numbers if n < 0]: 935 processors.insert(0, skip_negative_rows) 936 if self.__post_parse: 937 processors += self.__post_parse 938 if self.__force_strings: 939 processors.append(force_strings_processor) 940 941 # Apply processors to iterator 942 for processor in processors: 943 iterator = processor(iterator) 944 945 return iterator 946 947 def __check_if_row_for_skipping(self, row_number, headers, row): 948 949 # Pick rows 950 if self.__pick_rows: 951 952 # Skip by number 953 if row_number in self.__pick_rows_by_numbers: 954 return False 955 956 # Get first cell 957 cell = row[0] if row else None 958 959 # Handle blank cell/row 960 if cell in [None, '']: 961 if '' in self.__pick_rows_by_comments: 962 return False 963 if self.__pick_rows_by_presets.get('blank'): 964 if not list(filter(lambda cell: cell not in [None, ''], row)): 965 return False 966 return True 967 968 # Pick by pattern 969 for pattern in self.__pick_rows_by_patterns: 970 if bool(pattern.search(cell)): 971 return False 972 973 # Pick by comment 974 for comment in filter(None, self.__pick_rows_by_comments): 975 if six.text_type(cell).startswith(comment): 976 return False 977 978 # Default 979 return True 980 981 # Skip rows 982 if self.__skip_rows: 983 984 # Skip by number 985 if row_number in self.__skip_rows_by_numbers: 986 return True 987 988 # Get first cell 989 cell = row[0] if row else None 990 991 # Handle blank cell/row 992 if cell in [None, '']: 993 if '' in self.__skip_rows_by_comments: 994 return True 995 if self.__skip_rows_by_presets.get('blank'): 996 if not list(filter(lambda cell: cell not in [None, ''], row)): 997 return True 998 return False 999 1000 # Skip by pattern 1001 for pattern in self.__skip_rows_by_patterns: 1002 if bool(pattern.search(cell)): 1003 return True 1004 1005 # Skip by comment 1006 for comment in filter(None, self.__skip_rows_by_comments): 1007 if six.text_type(cell).startswith(comment): 1008 return True 1009 1010 # Default 1011 return False 1012 1013 # No pick/skip 1014 return False 1015