1# -*- coding: utf-8 -*-
2from __future__ import division
3from __future__ import print_function
4from __future__ import absolute_import
5from __future__ import unicode_literals
6
7import re
8import six
9import gzip
10import zipfile
11import tempfile
12import warnings
13from copy import copy
14from itertools import chain
15from collections import deque
16from .loaders.stream import StreamLoader
17from . import exceptions
18from . import helpers
19from . import config
20
21
22# Module API
23
24# TODO: merge pick/skip rows logic
25class Stream(object):
26    """Stream of tabular data.
27
28    This is the main `tabulator` class. It loads a data source, and allows you
29    to stream its parsed contents.
30
31    # Arguments
32
33        source (str):
34            Path to file as ``<scheme>\\://path/to/file.<format>``.
35            If not explicitly set, the scheme (file, http, ...) and
36            format (csv, xls, ...) are inferred from the source string.
37
38        headers (Union[int, List[int], List[str]], optional):
39            Either a row
40            number or list of row numbers (in case of multi-line headers) to be
41            considered as headers (rows start counting at 1), or the actual
42            headers defined a list of strings. If not set, all rows will be
43            treated as containing values.
44
45        scheme (str, optional):
46            Scheme for loading the file (file, http, ...).
47            If not set, it'll be inferred from `source`.
48
49        format (str, optional):
50            File source's format (csv, xls, ...). If not
51            set, it'll be inferred from `source`. inferred
52
53        encoding (str, optional):
54            Source encoding. If not set, it'll be inferred.
55
56        compression (str, optional):
57            Source file compression (zip, ...). If not set, it'll be inferred.
58
59        pick_rows (List[Union[int, str, dict]], optional):
60            The same as `skip_rows` but it's for picking rows instead of skipping.
61
62        skip_rows (List[Union[int, str, dict]], optional):
63            List of row numbers, strings and regex patterns as dicts to skip.
64            If a string, it'll skip rows that their first cells begin with it e.g. '#' and '//'.
65            To skip only completely blank rows use `{'type'\\: 'preset', 'value'\\: 'blank'}`
66            To provide a regex pattern use  `{'type'\\: 'regex', 'value'\\: '^#'}`
67            For example\\: `skip_rows=[1, '# comment', {'type'\\: 'regex', 'value'\\: '^# (regex|comment)'}]`
68
69        pick_fields (List[Union[int, str]], optional):
70            When passed, ignores all columns with headers
71            that the given list DOES NOT include
72
73        skip_fields (List[Union[int, str]], optional):
74            When passed, ignores all columns with headers
75            that the given list includes. If it contains an empty string it will skip
76            empty headers
77
78        sample_size (int, optional):
79            Controls the number of sample rows used to
80            infer properties from the data (headers, encoding, etc.). Set to
81            ``0`` to disable sampling, in which case nothing will be inferred
82            from the data. Defaults to ``config.DEFAULT_SAMPLE_SIZE``.
83
84        bytes_sample_size (int, optional):
85            Same as `sample_size`, but instead
86            of number of rows, controls number of bytes. Defaults to
87            ``config.DEFAULT_BYTES_SAMPLE_SIZE``.
88
89        allow_html (bool, optional):
90            Allow the file source to be an HTML page.
91            If False, raises ``exceptions.FormatError`` if the loaded file is
92            an HTML page. Defaults to False.
93
94        multiline_headers_joiner (str, optional):
95            When passed, it's used to join multiline headers
96            as `<passed-value>.join(header1_1, header1_2)`
97            Defaults to ' ' (space).
98
99        multiline_headers_duplicates (bool, optional):
100            By default tabulator will exclude a cell of a miltilne header from joining
101            if it's exactly the same as the previous seen value in this field.
102            Enabling this option will force duplicates inclusion
103            Defaults to False.
104
105        hashing_algorithm (func, optional):
106            It supports: md5, sha1, sha256, sha512
107            Defaults to sha256
108
109        force_strings (bool, optional):
110            When True, casts all data to strings.
111            Defaults to False.
112
113        force_parse (bool, optional):
114            When True, don't raise exceptions when
115            parsing malformed rows, simply returning an empty value. Defaults
116            to False.
117
118        post_parse (List[function], optional):
119            List of generator functions that
120            receives a list of rows and headers, processes them, and yields
121            them (or not). Useful to pre-process the data. Defaults to None.
122
123        custom_loaders (dict, optional):
124            Dictionary with keys as scheme names,
125            and values as their respective ``Loader`` class implementations.
126            Defaults to None.
127
128        custom_parsers (dict, optional):
129            Dictionary with keys as format names,
130            and values as their respective ``Parser`` class implementations.
131            Defaults to None.
132
133        custom_loaders (dict, optional):
134            Dictionary with keys as writer format
135            names, and values as their respective ``Writer`` class
136            implementations. Defaults to None.
137
138        **options (Any, optional): Extra options passed to the loaders and parsers.
139
140    """
141
142    # Public
143
144    def __init__(self,
145                 source,
146                 headers=None,
147                 scheme=None,
148                 format=None,
149                 encoding=None,
150                 compression=None,
151                 allow_html=False,
152                 sample_size=config.DEFAULT_SAMPLE_SIZE,
153                 bytes_sample_size=config.DEFAULT_BYTES_SAMPLE_SIZE,
154                 ignore_blank_headers=False,
155                 ignore_listed_headers=None,
156                 ignore_not_listed_headers=None,
157                 multiline_headers_joiner=' ',
158                 multiline_headers_duplicates=False,
159                 hashing_algorithm='sha256',
160                 force_strings=False,
161                 force_parse=False,
162                 pick_columns=None,
163                 skip_columns=None,
164                 pick_fields=None,
165                 skip_fields=None,
166                 limit_fields=None,
167                 offset_fields=None,
168                 pick_rows=None,
169                 skip_rows=None,
170                 limit_rows=None,
171                 offset_rows=None,
172                 post_parse=[],
173                 custom_loaders={},
174                 custom_parsers={},
175                 custom_writers={},
176                 **options):
177
178        # Translate aliases
179        if pick_fields is not None:
180            pick_columns = pick_fields
181        if skip_fields is not None:
182            skip_columns = skip_fields
183        if pick_columns is not None:
184            ignore_not_listed_headers = pick_columns
185        if skip_columns is not None:
186            ignore_listed_headers = skip_columns
187            if '' in skip_columns:
188                ignore_blank_headers = True
189
190        # Set headers
191        self.__headers = None
192        self.__headers_row = None
193        self.__headers_row_last = None
194        if isinstance(headers, int):
195            self.__headers_row = headers
196            self.__headers_row_last = headers
197        elif isinstance(headers, (tuple, list)):
198            if (len(headers) == 2 and
199                    isinstance(headers[0], int) and
200                    isinstance(headers[1], int)):
201                self.__headers_row = headers[0]
202                self.__headers_row_last = headers[1]
203            else:
204                self.__headers = list(headers)
205
206        # Set pick rows
207        self.__pick_rows = pick_rows
208        self.__pick_rows_by_numbers = []
209        self.__pick_rows_by_patterns = []
210        self.__pick_rows_by_comments = []
211        self.__pick_rows_by_presets = {}
212        for directive in copy(pick_rows or []):
213            if isinstance(directive, int):
214                self.__pick_rows_by_numbers.append(directive)
215            elif isinstance(directive, dict):
216                if directive['type'] == 'regex':
217                    self.__pick_rows_by_patterns.append(re.compile(directive['value']))
218                elif directive['type'] == 'preset' and directive['value'] == 'blank':
219                    self.__pick_rows_by_presets['blank'] = True
220                else:
221                    raise ValueError('Not supported pick rows: %s' % directive)
222            else:
223                self.__pick_rows_by_comments.append(str(directive))
224
225        # Set skip rows
226        self.__skip_rows = skip_rows
227        self.__skip_rows_by_numbers = []
228        self.__skip_rows_by_patterns = []
229        self.__skip_rows_by_comments = []
230        self.__skip_rows_by_presets = {}
231        for directive in copy(skip_rows or []):
232            if isinstance(directive, int):
233                self.__skip_rows_by_numbers.append(directive)
234            elif isinstance(directive, dict):
235                if directive['type'] == 'regex':
236                    self.__skip_rows_by_patterns.append(re.compile(directive['value']))
237                elif directive['type'] == 'preset' and directive['value'] == 'blank':
238                    self.__skip_rows_by_presets['blank'] = True
239                else:
240                    raise ValueError('Not supported skip rows: %s' % directive)
241            else:
242                self.__skip_rows_by_comments.append(str(directive))
243
244        # Support for pathlib.Path
245        if hasattr(source, 'joinpath'):
246            source = str(source)
247
248        # Set attributes
249        self.__source = source
250        self.__scheme = scheme
251        self.__format = format
252        self.__encoding = encoding
253        self.__compression = compression
254        self.__allow_html = allow_html
255        self.__sample_size = sample_size
256        self.__bytes_sample_size = bytes_sample_size
257        self.__ignore_blank_headers = ignore_blank_headers
258        self.__ignore_listed_headers = ignore_listed_headers
259        self.__ignore_not_listed_headers = ignore_not_listed_headers
260        self.__multiline_headers_joiner = multiline_headers_joiner
261        self.__multiline_headers_duplicates = multiline_headers_duplicates
262        self.__ignored_headers_indexes = []
263        self.__hashing_algorithm = hashing_algorithm
264        self.__force_strings = force_strings
265        self.__force_parse = force_parse
266        self.__limit_fields = limit_fields
267        self.__offset_fields = offset_fields
268        self.__limit_rows = limit_rows
269        self.__offset_rows = offset_rows
270        self.__post_parse = copy(post_parse)
271        self.__custom_loaders = copy(custom_loaders)
272        self.__custom_parsers = copy(custom_parsers)
273        self.__custom_writers = copy(custom_writers)
274        self.__actual_scheme = scheme
275        self.__actual_format = format
276        self.__actual_encoding = encoding
277        self.__actual_compression = compression
278        self.__options = options
279        self.__sample_extended_rows = []
280        self.__field_positions = None
281        self.__loader = None
282        self.__parser = None
283        self.__row_number = 0
284        self.__stats = None
285
286    def __enter__(self):
287        if self.closed:
288            self.open()
289        return self
290
291    def __exit__(self, type, value, traceback):
292        if not self.closed:
293            self.close()
294
295    def __iter__(self):
296        return self.iter()
297
298    @property
299    def closed(self):
300        """Returns True if the underlying stream is closed, False otherwise.
301
302        # Returns
303            bool: whether closed
304
305        """
306        return not self.__parser or self.__parser.closed
307
308    def open(self):
309        """Opens the stream for reading.
310
311        # Raises:
312            TabulatorException: if an error
313
314        """
315        source = self.__source
316        options = copy(self.__options)
317
318        # Programming error assertions
319        assert self.__hashing_algorithm in config.SUPPORTED_HASHING_ALGORITHMS
320
321        # Validate compression
322        if self.__compression:
323            if self.__compression not in config.SUPPORTED_COMPRESSION:
324                message = 'Not supported compression "%s"' % self.__compression
325                raise exceptions.CompressionError(message)
326
327        # Get scheme and format if not already given
328        compression = None
329        if self.__scheme is None or self.__format is None:
330            detected_scheme, detected_format = helpers.detect_scheme_and_format(source)
331            scheme = self.__scheme or detected_scheme
332            format = self.__format or detected_format
333            # Get compression
334            for type in config.SUPPORTED_COMPRESSION:
335                if self.__compression == type or detected_format == type:
336                    compression = type
337        else:
338            scheme = self.__scheme
339            format = self.__format
340
341        # Initiate loader
342        self.__loader = None
343        if scheme is not None:
344            loader_class = self.__custom_loaders.get(scheme)
345            if loader_class is None:
346                if scheme not in config.LOADERS:
347                    message = 'Scheme "%s" is not supported' % scheme
348                    raise exceptions.SchemeError(message)
349                loader_path = config.LOADERS[scheme]
350                if loader_path:
351                    loader_class = helpers.import_attribute(loader_path)
352            if loader_class is not None:
353                loader_options = helpers.extract_options(options, loader_class.options)
354                if compression and 'http_stream' in loader_class.options:
355                    loader_options['http_stream'] = False
356                self.__loader = loader_class(
357                    bytes_sample_size=self.__bytes_sample_size,
358                    **loader_options)
359
360        # Zip compression
361        if compression == 'zip' and six.PY3:
362            source = self.__loader.load(source, mode='b')
363            with zipfile.ZipFile(source) as archive:
364                name = archive.namelist()[0]
365                if 'filename' in options.keys():
366                    name = options['filename']
367                    del options['filename']
368                with archive.open(name) as file:
369                    source = tempfile.NamedTemporaryFile(suffix='.' + name)
370                    for line in file:
371                        source.write(line)
372                    source.seek(0)
373            # We redefine loader/format/schema after decompression
374            self.__loader = StreamLoader(bytes_sample_size=self.__bytes_sample_size)
375            format = self.__format or helpers.detect_scheme_and_format(source.name)[1]
376            scheme = 'stream'
377
378        # Gzip compression
379        elif compression == 'gz' and six.PY3:
380            name = ''
381            if isinstance(source, str):
382                name = source.replace('.gz', '')
383            source = gzip.open(self.__loader.load(source, mode='b'))
384            # We redefine loader/format/schema after decompression
385            self.__loader = StreamLoader(bytes_sample_size=self.__bytes_sample_size)
386            format = self.__format or helpers.detect_scheme_and_format(name)[1]
387            scheme = 'stream'
388
389        # Not supported compression
390        elif compression:
391            message = 'Compression "%s" is not supported for your Python version'
392            raise exceptions.TabulatorException(message % compression)
393
394        # Attach stats to the loader
395        if getattr(self.__loader, 'attach_stats', None):
396            self.__stats = {'size': 0, 'hash': '', 'hashing_algorithm': self.__hashing_algorithm}
397            getattr(self.__loader, 'attach_stats')(self.__stats)
398
399        # Initiate parser
400        parser_class = self.__custom_parsers.get(format)
401        if parser_class is None:
402            if format not in config.PARSERS:
403                # If not existent it's a not-found error
404                # Next line will raise IOError/HTTPError
405                chars = self.__loader.load(source)
406                chars.close()
407                # Otherwise it's a format error
408                message = 'Format "%s" is not supported' % format
409                raise exceptions.FormatError(message)
410            parser_class = helpers.import_attribute(config.PARSERS[format])
411        parser_options = helpers.extract_options(options, parser_class.options)
412        self.__parser = parser_class(self.__loader,
413                force_parse=self.__force_parse,
414                **parser_options)
415
416        # Bad options
417        if options:
418            message = 'Not supported option(s) "%s" for scheme "%s" and format "%s"'
419            message = message % (', '.join(options), scheme, format)
420            warnings.warn(message, UserWarning)
421
422        # Open and setup
423        self.__parser.open(source, encoding=self.__encoding)
424        self.__extract_sample()
425        self.__extract_headers()
426        if not self.__allow_html:
427            self.__detect_html()
428
429        # Set scheme/format/encoding
430        self.__actual_scheme = scheme
431        self.__actual_format = format
432        self.__actual_encoding = self.__parser.encoding
433        self.__actual_compression = compression
434
435        return self
436
437    def close(self):
438        """Closes the stream.
439        """
440        self.__parser.close()
441        self.__row_number = 0
442
443    def reset(self):
444        """Resets the stream pointer to the beginning of the file.
445        """
446        if self.__row_number > self.__sample_size:
447            self.__stats = {'size': 0, 'hash': ''}
448            self.__parser.reset()
449            self.__extract_sample()
450            self.__extract_headers()
451        self.__row_number = 0
452
453    @property
454    def source(self):
455        """Source
456
457        # Returns
458            any: stream source
459
460        """
461        return self.__source
462
463    @property
464    def headers(self):
465        """Headers
466
467        # Returns
468            str[]/None: headers if available
469
470        """
471        return self.__headers
472
473    @headers.setter
474    def headers(self, headers):
475        """Set headers
476
477        # Arguments
478            str[]: headers
479
480        """
481        self.__headers = headers
482
483    @property
484    def scheme(self):
485        """Path's scheme
486
487        # Returns
488            str: scheme
489
490        """
491        return self.__actual_scheme or 'inline'
492
493    @property
494    def format(self):
495        """Path's format
496
497        # Returns
498            str: format
499
500        """
501        return self.__actual_format or 'inline'
502
503    @property
504    def encoding(self):
505        """Stream's encoding
506
507        # Returns
508            str: encoding
509
510        """
511        return self.__actual_encoding or 'no'
512
513    @property
514    def compression(self):
515        """Stream's compression ("no" if no compression)
516
517        # Returns
518            str: compression
519
520        """
521        return self.__actual_compression or 'no'
522
523    @property
524    def fragment(self):
525        """Path's fragment
526
527        # Returns
528            str: fragment
529
530        """
531        if self.__parser:
532            return getattr(self.__parser, 'fragment', None)
533        return None
534
535    @property
536    def dialect(self):
537        """Dialect (if available)
538
539        # Returns
540            dict/None: dialect
541
542        """
543        if self.__parser:
544            return getattr(self.__parser, 'dialect', {})
545        return None
546
547    @property
548    def size(self):
549        """Returns the BYTE count of the read chunks if available
550
551        # Returns
552            int/None: BYTE count
553
554        """
555        if self.__stats:
556            return self.__stats['size']
557
558    @property
559    def hash(self):
560        """Returns the SHA256 (or according to the "hashing_algorithm" parameter)
561        hash of the read chunks if available
562
563        # Returns
564            str/None: bytes hash
565
566        """
567        if self.__stats:
568            return self.__stats['hash']
569
570    @property
571    def sample(self):
572        """Returns the stream's rows used as sample.
573
574        These sample rows are used internally to infer characteristics of the
575        source file (e.g. encoding, headers, ...).
576
577        # Returns
578            list[]: sample
579
580        """
581        sample = []
582        iterator = iter(self.__sample_extended_rows)
583        iterator = self.__apply_processors(iterator)
584        for row_number, headers, row in iterator:
585            sample.append(row)
586        return sample
587
588    @property
589    def field_positions(self):
590        if self.__field_positions is None:
591            self.__field_positions = []
592            if self.__headers:
593                size = len(self.__headers) + len(self.__ignored_headers_indexes)
594                for index in range(size):
595                    if index not in self.__ignored_headers_indexes:
596                        self.__field_positions.append(index + 1)
597        return self.__field_positions
598
599    @property
600    def hashing_algorithm(self):
601        return self.__hashing_algorithm
602
603    def iter(self, keyed=False, extended=False):
604        """Iterate over the rows.
605
606        Each row is returned in a format that depends on the arguments `keyed`
607        and `extended`. By default, each row is returned as list of their
608        values.
609
610        # Arguments
611            keyed (bool, optional):
612                When True, each returned row will be a
613                `dict` mapping the header name to its value in the current row.
614                For example, `[{'name'\\: 'J Smith', 'value'\\: '10'}]`. Ignored if
615                ``extended`` is True. Defaults to False.
616            extended (bool, optional):
617                When True, returns each row as a tuple
618                with row number (starts at 1), list of headers, and list of row
619                values. For example, `(1, ['name', 'value'], ['J Smith', '10'])`.
620                Defaults to False.
621
622        # Raises
623            exceptions.TabulatorException: If the stream is closed.
624
625        # Returns
626            Iterator[Union[List[Any], Dict[str, Any], Tuple[int, List[str], List[Any]]]]:
627                The row itself. The format depends on the values of `keyed` and
628                `extended` arguments.
629
630        """
631
632        # Error if closed
633        if self.closed:
634            message = 'Stream is closed. Please call "stream.open()" first.'
635            raise exceptions.TabulatorException(message)
636
637        # Create iterator
638        iterator = chain(
639            self.__sample_extended_rows,
640            self.__parser.extended_rows)
641        iterator = self.__apply_processors(iterator)
642
643        # Yield rows from iterator
644        try:
645            count = 0
646            for row_number, headers, row in iterator:
647                if row_number > self.__row_number:
648                    count += 1
649                    if self.__limit_rows or self.__offset_rows:
650                        offset = self.__offset_rows or 0
651                        limit = self.__limit_rows + offset if self.__limit_rows else None
652                        if offset and count <= offset:
653                            continue
654                        if limit and count > limit:
655                            break
656                    self.__row_number = row_number
657                    if extended:
658                        yield (row_number, headers, row)
659                    elif keyed:
660                        yield dict(zip(headers, row))
661                    else:
662                        yield row
663        except UnicodeError as error:
664            message = 'Cannot parse the source "%s" using "%s" encoding at "%s"'
665            raise exceptions.EncodingError(message % (self.__source, error.encoding, error.start))
666        except Exception as error:
667            raise exceptions.SourceError(str(error))
668
669    def read(self, keyed=False, extended=False, limit=None):
670        """Returns a list of rows.
671
672        # Arguments
673            keyed (bool, optional): See :func:`Stream.iter`.
674            extended (bool, optional): See :func:`Stream.iter`.
675            limit (int, optional):
676                Number of rows to return. If None, returns all rows. Defaults to None.
677
678        # Returns
679            List[Union[List[Any], Dict[str, Any], Tuple[int, List[str], List[Any]]]]:
680                The list of rows. The format depends on the values of `keyed`
681                and `extended` arguments.
682        """
683        result = []
684        rows = self.iter(keyed=keyed, extended=extended)
685        for count, row in enumerate(rows, start=1):
686            result.append(row)
687            if count == limit:
688                break
689        return result
690
691    def save(self, target, format=None,  encoding=None, **options):
692        """Save stream to the local filesystem.
693
694        # Arguments
695            target (str): Path where to save the stream.
696            format (str, optional):
697                The format the stream will be saved as. If
698                None, detects from the ``target`` path. Defaults to None.
699            encoding (str, optional):
700                Saved file encoding. Defaults to ``config.DEFAULT_ENCODING``.
701            **options: Extra options passed to the writer.
702
703        # Returns
704            count (int?): Written rows count if available
705        """
706
707        # Get encoding/format
708        if encoding is None:
709            encoding = config.DEFAULT_ENCODING
710        if format is None:
711            _, format = helpers.detect_scheme_and_format(target)
712
713        # Prepare writer class
714        writer_class = self.__custom_writers.get(format)
715        if writer_class is None:
716            if format not in config.WRITERS:
717                message = 'Format "%s" is not supported' % format
718                raise exceptions.FormatError(message)
719            writer_class = helpers.import_attribute(config.WRITERS[format])
720
721        # Prepare writer options
722        writer_options = helpers.extract_options(options, writer_class.options)
723        if options:
724            message = 'Not supported options "%s" for format "%s"'
725            message = message % (', '.join(options), format)
726            raise exceptions.TabulatorException(message)
727
728        # Write data to target
729        writer = writer_class(**writer_options)
730        return writer.write(self.iter(), target, headers=self.headers, encoding=encoding)
731
732    # Private
733
734    def __extract_sample(self):
735
736        # Sample is not requested
737        if not self.__sample_size:
738            return
739
740        # Extract sample rows
741        self.__sample_extended_rows = []
742        for _ in range(self.__sample_size):
743            try:
744                row_number, headers, row = next(self.__parser.extended_rows)
745                if self.__headers_row and self.__headers_row >= row_number:
746                    if self.__check_if_row_for_skipping(row_number, headers, row):
747                        self.__headers_row += 1
748                        self.__headers_row_last += 1
749                self.__sample_extended_rows.append((row_number, headers, row))
750            except StopIteration:
751                break
752            except UnicodeError as error:
753                message = 'Cannot parse the source "%s" using "%s" encoding at "%s"'
754                raise exceptions.EncodingError(message % (self.__source, error.encoding, error.start))
755            except Exception as error:
756                raise exceptions.SourceError(str(error))
757
758    def __extract_headers(self):
759
760        # Heders row is not set
761        if not self.__headers_row:
762            return
763
764        # Sample is too short
765        if self.__headers_row > self.__sample_size:
766            message = 'Headers row (%s) can\'t be more than sample_size (%s)'
767            message = message % (self.__headers_row, self.__sample_size)
768            raise exceptions.TabulatorException(message)
769
770        # Get headers from data
771        last_merged = {}
772        keyed_source = False
773        for row_number, headers, row in self.__sample_extended_rows:
774            keyed_source = keyed_source or headers is not None
775            headers = headers if keyed_source else row
776            for index, header in enumerate(headers):
777                if header is not None:
778                    headers[index] = six.text_type(header).strip()
779            if row_number == self.__headers_row:
780                self.__headers = headers
781                last_merged = {index: header for index, header in enumerate(headers)}
782            if row_number > self.__headers_row:
783                for index in range(0, len(self.__headers)):
784                    if len(headers) > index and headers[index] is not None:
785                        if not self.__headers[index]:
786                            self.__headers[index] = headers[index]
787                        else:
788                            if (self.__multiline_headers_duplicates or
789                                    last_merged.get(index) != headers[index]):
790                                self.__headers[index] += (
791                                    self.__multiline_headers_joiner + headers[index])
792                        last_merged[index] = headers[index]
793            if row_number == self.__headers_row_last:
794                break
795
796        # Ignore headers
797        if (self.__ignore_blank_headers or
798                self.__ignore_listed_headers is not None or
799                self.__ignore_not_listed_headers is not None):
800            self.__ignored_headers_indexes = []
801            raw_headers, self.__headers = self.__headers, []
802            for index, header in list(enumerate(raw_headers)):
803                ignore = False
804                # Ignore blank headers
805                if header in ['', None]:
806                    ignore = True
807                # Ignore listed headers
808                if self.__ignore_listed_headers is not None:
809                    if (header in self.__ignore_listed_headers or
810                            index + 1 in self.__ignore_listed_headers):
811                        ignore = True
812                    # Regex
813                    for item in self.__ignore_listed_headers:
814                        if isinstance(item, dict) and item.get('type') == 'regex':
815                            if bool(re.search(item['value'], header)):
816                                ignore = True
817                # Ignore not-listed headers
818                if self.__ignore_not_listed_headers is not None:
819                    if (header not in self.__ignore_not_listed_headers and
820                            index + 1 not in self.__ignore_not_listed_headers):
821                        ignore = True
822                    # Regex
823                    for item in self.__ignore_not_listed_headers:
824                        if isinstance(item, dict) and item.get('type') == 'regex':
825                            if bool(re.search(item['value'], header)):
826                                ignore = False
827                # Add to the list and skip
828                if ignore:
829                    self.__ignored_headers_indexes.append(index)
830                    continue
831                self.__headers.append(header)
832            self.__ignored_headers_indexes = list(sorted(self.__ignored_headers_indexes, reverse=True))
833
834        # Limit/offset fields
835        if self.__limit_fields or self.__offset_fields:
836            ignore = []
837            headers = []
838            min = self.__offset_fields or 0
839            max = self.__limit_fields + min if self.__limit_fields else len(self.__headers)
840            for position, header in enumerate(self.__headers, start=1):
841                if position <= min:
842                    ignore.append(position - 1)
843                    continue
844                if position > max:
845                    ignore.append(position - 1)
846                    continue
847                headers.append(header)
848            for index in ignore:
849                if index not in self.__ignored_headers_indexes:
850                    self.__ignored_headers_indexes.append(index)
851            self.__ignored_headers_indexes = list(sorted(self.__ignored_headers_indexes, reverse=True))
852            self.__headers = headers
853
854        # Remove headers from data
855        if not keyed_source:
856            del self.__sample_extended_rows[:self.__headers_row_last]
857
858        # Stringify headers
859        if isinstance(self.__headers, list):
860            str_headers = []
861            for header in self.__headers:
862                str_headers.append(six.text_type(header) if header is not None else '')
863            self.__headers = str_headers
864
865    def __detect_html(self):
866
867        # Prepare text
868        text = ''
869        for row_number, headers, row in self.__sample_extended_rows:
870            for value in row:
871                if isinstance(value, six.string_types):
872                    text += value
873
874        # Detect html content
875        html_source = helpers.detect_html(text)
876        if html_source:
877            message = 'Format has been detected as HTML (not supported)'
878            raise exceptions.FormatError(message)
879
880    def __apply_processors(self, iterator):
881
882        # Base processor
883        def builtin_processor(extended_rows):
884            for row_number, headers, row in extended_rows:
885
886                # Sync headers/row
887                if headers != self.__headers:
888                    if headers and self.__headers:
889                        keyed_row = dict(zip(headers, row))
890                        row = [keyed_row.get(header) for header in self.__headers]
891                    elif self.__ignored_headers_indexes:
892                        row = [value for index, value in enumerate(row) if index not in self.__ignored_headers_indexes]
893                    headers = self.__headers
894
895                # Skip rows by numbers/comments
896                if self.__check_if_row_for_skipping(row_number, headers, row):
897                    continue
898
899                yield (row_number, headers, row)
900
901        # Skip nagative rows processor
902        def skip_negative_rows(extended_rows):
903            '''
904            This processor will skip rows which counts from the end, e.g.
905            -1: skip last row, -2: skip pre-last row, etc.
906            Rows to skip are taken from  Stream.__skip_rows_by_numbers
907            '''
908            rows_to_skip = [n for n in self.__skip_rows_by_numbers if n < 0]
909            buffer_size = abs(min(rows_to_skip))
910            # collections.deque - takes O[1] time to push/pop values from any side.
911            buffer = deque()
912
913            # use buffer to save last rows
914            for row in extended_rows:
915                buffer.append(row)
916                if len(buffer) > buffer_size:
917                    yield buffer.popleft()
918
919            # Now squeeze out the buffer
920            n = len(buffer)
921            for i, row in enumerate(buffer):
922                if i - n not in rows_to_skip:
923                    yield row
924
925        # Force values to strings processor
926        def force_strings_processor(extended_rows):
927            for row_number, headers, row in extended_rows:
928                row = list(map(helpers.stringify_value, row))
929                yield (row_number, headers, row)
930
931        # Form a processors list
932        processors = [builtin_processor]
933        # if we have to delete some rows with negative index (counting from the end)
934        if [n for n in self.__skip_rows_by_numbers if n < 0]:
935            processors.insert(0, skip_negative_rows)
936        if self.__post_parse:
937            processors += self.__post_parse
938        if self.__force_strings:
939            processors.append(force_strings_processor)
940
941        # Apply processors to iterator
942        for processor in processors:
943            iterator = processor(iterator)
944
945        return iterator
946
947    def __check_if_row_for_skipping(self, row_number, headers, row):
948
949        # Pick rows
950        if self.__pick_rows:
951
952            # Skip by number
953            if row_number in self.__pick_rows_by_numbers:
954                return False
955
956            # Get first cell
957            cell = row[0] if row else None
958
959            # Handle blank cell/row
960            if cell in [None, '']:
961                if '' in self.__pick_rows_by_comments:
962                    return False
963                if self.__pick_rows_by_presets.get('blank'):
964                    if not list(filter(lambda cell: cell not in [None, ''], row)):
965                        return False
966                return True
967
968            # Pick by pattern
969            for pattern in self.__pick_rows_by_patterns:
970                if bool(pattern.search(cell)):
971                    return False
972
973            # Pick by comment
974            for comment in filter(None, self.__pick_rows_by_comments):
975                if six.text_type(cell).startswith(comment):
976                    return False
977
978            # Default
979            return True
980
981        # Skip rows
982        if self.__skip_rows:
983
984            # Skip by number
985            if row_number in self.__skip_rows_by_numbers:
986                return True
987
988            # Get first cell
989            cell = row[0] if row else None
990
991            # Handle blank cell/row
992            if cell in [None, '']:
993                if '' in self.__skip_rows_by_comments:
994                    return True
995                if self.__skip_rows_by_presets.get('blank'):
996                    if not list(filter(lambda cell: cell not in [None, ''], row)):
997                        return True
998                return False
999
1000            # Skip by pattern
1001            for pattern in self.__skip_rows_by_patterns:
1002                if bool(pattern.search(cell)):
1003                    return True
1004
1005            # Skip by comment
1006            for comment in filter(None, self.__skip_rows_by_comments):
1007                if six.text_type(cell).startswith(comment):
1008                    return True
1009
1010            # Default
1011            return False
1012
1013        # No pick/skip
1014        return False
1015