1import os
2import json
3import petl
4import warnings
5from pathlib import Path
6from copy import deepcopy
7from itertools import zip_longest, chain
8from .exception import FrictionlessException
9from .detector import Detector
10from .metadata import Metadata
11from .layout import Layout
12from .schema import Schema
13from .header import Header
14from .system import system
15from .field import Field
16from .row import Row
17from . import settings
18from . import helpers
19from . import errors
20
21
22# NOTE:
23# Review the situation with describe function removing stats (move to infer?)
24
25
26class Resource(Metadata):
27    """Resource representation.
28
29    API      | Usage
30    -------- | --------
31    Public   | `from frictionless import Resource`
32
33    This class is one of the cornerstones of of Frictionless framework.
34    It loads a data source, and allows you to stream its parsed contents.
35    At the same time, it's a metadata class data description.
36
37    ```python
38    with Resource("data/table.csv") as resource:
39        resource.header == ["id", "name"]
40        resource.read_rows() == [
41            {'id': 1, 'name': 'english'},
42            {'id': 2, 'name': '中国人'},
43        ]
44    ```
45
46    Parameters:
47
48        source (any): Source of the resource; can be in various forms.
49            Usually, it's a string as `<scheme>://path/to/file.<format>`.
50            It also can be, for example, an array of data arrays/dictionaries.
51            Or it can be a resource descriptor dict or path.
52
53        descriptor (dict|str): A resource descriptor provided explicitly.
54            Keyword arguments will patch this descriptor if provided.
55
56        name? (str): A Resource name according to the specs.
57            It should be a slugified name of the resource.
58
59        title? (str): A Resource title according to the specs
60           It should a human-oriented title of the resource.
61
62        description? (str): A Resource description according to the specs
63           It should a human-oriented description of the resource.
64
65        mediatype? (str): A mediatype/mimetype of the resource e.g. “text/csv”,
66            or “application/vnd.ms-excel”.  Mediatypes are maintained by the
67            Internet Assigned Numbers Authority (IANA) in a media type registry.
68
69        licenses? (dict[]): The license(s) under which the resource is provided.
70            If omitted it's considered the same as the package's licenses.
71
72        sources? (dict[]): The raw sources for this data resource.
73            It MUST be an array of Source objects.
74            Each Source object MUST have a title and
75            MAY have path and/or email properties.
76
77        profile? (str): A string identifying the profile of this descriptor.
78            For example, `tabular-data-resource`.
79
80        scheme? (str): Scheme for loading the file (file, http, ...).
81            If not set, it'll be inferred from `source`.
82
83        format? (str): File source's format (csv, xls, ...).
84            If not set, it'll be inferred from `source`.
85
86        hashing? (str): An algorithm to hash data.
87            It defaults to 'md5'.
88
89        encoding? (str): Source encoding.
90            If not set, it'll be inferred from `source`.
91
92        innerpath? (str): A path within the compressed file.
93            It defaults to the first file in the archive.
94
95        compression? (str): Source file compression (zip, ...).
96            If not set, it'll be inferred from `source`.
97
98        control? (dict|Control): File control.
99            For more information, please check the Control documentation.
100
101        dialect? (dict|Dialect): Table dialect.
102            For more information, please check the Dialect documentation.
103
104        layout? (dict|Layout): Table layout.
105            For more information, please check the Layout documentation.
106
107        schema? (dict|Schema): Table schema.
108            For more information, please check the Schema documentation.
109
110        stats? (dict): File/table stats.
111            A dict with the following possible properties: hash, bytes, fields, rows.
112
113        basepath? (str): A basepath of the resource
114            The fullpath of the resource is joined `basepath` and /path`
115
116        detector? (Detector): File/table detector.
117            For more information, please check the Detector documentation.
118
119        onerror? (ignore|warn|raise): Behaviour if there is an error.
120            It defaults to 'ignore'. The default mode will ignore all errors
121            on resource level and they should be handled by the user
122            being available in Header and Row objects.
123
124        trusted? (bool): Don't raise an exception on unsafe paths.
125            A path provided as a part of the descriptor considered unsafe
126            if there are path traversing or the path is absolute.
127            A path provided as `source` or `path` is alway trusted.
128
129        package? (Package): A owning this resource package.
130            It's actual if the resource is part of some data package.
131
132    Raises:
133        FrictionlessException: raise any error that occurs during the process
134    """
135
136    def __init__(
137        self,
138        source=None,
139        *,
140        descriptor=None,
141        # Spec
142        name=None,
143        title=None,
144        description=None,
145        mediatype=None,
146        licenses=None,
147        sources=None,
148        profile=None,
149        path=None,
150        data=None,
151        scheme=None,
152        format=None,
153        hashing=None,
154        encoding=None,
155        innerpath=None,
156        compression=None,
157        control=None,
158        dialect=None,
159        layout=None,
160        schema=None,
161        stats=None,
162        # Extra
163        basepath="",
164        detector=None,
165        onerror="ignore",
166        trusted=False,
167        package=None,
168    ):
169
170        # Handle source
171        if source is not None:
172            file = system.create_file(source, basepath=basepath)
173            if file.type == "table":
174                if path is None:
175                    path = file.path
176                if data is None:
177                    data = file.data
178            elif descriptor is None:
179                descriptor = source
180
181        # Handle pathlib
182        if isinstance(descriptor, Path):
183            descriptor = str(descriptor)
184
185        # Handle trusted
186        if descriptor is None:
187            trusted = True
188
189        # Store state
190        self.__loader = None
191        self.__parser = None
192        self.__sample = None
193        self.__labels = None
194        self.__fragment = None
195        self.__header = None
196        self.__lookup = None
197        self.__byte_stream = None
198        self.__text_stream = None
199        self.__list_stream = None
200        self.__row_stream = None
201        self.__row_number = None
202        self.__row_position = None
203        self.__field_positions = None
204        self.__fragment_positions = None
205
206        # Store extra
207        self.__basepath = basepath or helpers.parse_basepath(descriptor)
208        self.__detector = detector or Detector()
209        self.__onerror = onerror
210        self.__trusted = trusted
211        self.__package = package
212
213        # Store specs
214        self.setinitial("name", name)
215        self.setinitial("title", title)
216        self.setinitial("description", description)
217        self.setinitial("mediatype", mediatype)
218        self.setinitial("licenses", licenses)
219        self.setinitial("sources", sources)
220        self.setinitial("profile", profile)
221        self.setinitial("path", path)
222        self.setinitial("data", data)
223        self.setinitial("scheme", scheme)
224        self.setinitial("format", format)
225        self.setinitial("hashing", hashing)
226        self.setinitial("encoding", encoding)
227        self.setinitial("compression", compression)
228        self.setinitial("innerpath", innerpath)
229        self.setinitial("control", control)
230        self.setinitial("dialect", dialect)
231        self.setinitial("layout", layout)
232        self.setinitial("schema", schema)
233        self.setinitial("stats", stats)
234        super().__init__(descriptor)
235
236        # NOTE: it will not work if dialect is a path
237        # Handle official dialect.header
238        dialect = self.get("dialect")
239        if isinstance(dialect, dict):
240            header = dialect.pop("header", None)
241            if header is False:
242                self.setdefault("layout", {})
243                self["layout"]["header"] = False
244
245        # Handle official hash/bytes/rows
246        for name in ["hash", "bytes", "rows"]:
247            value = self.pop(name, None)
248            if value:
249                if name == "hash":
250                    hashing, value = helpers.parse_resource_hash(value)
251                    if hashing != settings.DEFAULT_HASHING:
252                        self["hashing"] = hashing
253                self.setdefault("stats", {})
254                self["stats"][name] = value
255
256        # Handle deprecated url
257        url = self.get("url")
258        path = self.get("path")
259        if url and not path:
260            message = 'Property "url" is deprecated. Please use "path" instead.'
261            warnings.warn(message, UserWarning)
262            self["path"] = self.pop("url")
263
264        # Handle deprecated compression
265        compression = self.get("compression")
266        if compression == "no":
267            message = 'Compression "no" is deprecated. Please use "" compression.'
268            warnings.warn(message, UserWarning)
269            self["compression"] = ""
270
271    def __setattr__(self, name, value):
272        if name == "basepath":
273            self.__basepath = value
274        elif name == "detector":
275            self.__detector = value
276        elif name == "onerror":
277            self.__onerror = value
278        elif name == "trusted":
279            self.__trusted = value
280        elif name == "package":
281            self.__package = value
282        else:
283            return super().__setattr__(name, value)
284        self.metadata_process()
285
286    def __enter__(self):
287        if self.closed:
288            self.open()
289        return self
290
291    def __exit__(self, type, value, traceback):
292        self.close()
293
294    def __iter__(self):
295        with helpers.ensure_open(self):
296            yield from self.__row_stream
297
298    @Metadata.property
299    def name(self):
300        """
301        Returns
302            str: resource name
303        """
304        return self.get("name", self.__file.name)
305
306    @Metadata.property
307    def title(self):
308        """
309        Returns
310            str: resource title
311        """
312        return self.get("title", "")
313
314    @Metadata.property
315    def description(self):
316        """
317        Returns
318            str: resource description
319        """
320        return self.get("description", "")
321
322    @Metadata.property(cache=False, write=False)
323    def description_html(self):
324        """
325        Returns:
326            str?: resource description
327        """
328        return helpers.md_to_html(self.description)
329
330    @Metadata.property
331    def description_text(self):
332        """
333        Returns:
334            str: resource description
335        """
336        return helpers.html_to_text(self.description_html)
337
338    @Metadata.property
339    def mediatype(self):
340        """
341        Returns
342            str: resource mediatype
343        """
344        return self.get("mediatype", "")
345
346    @Metadata.property
347    def licenses(self):
348        """
349        Returns
350            dict[]: resource licenses
351        """
352        licenses = self.get("licenses", [])
353        return self.metadata_attach("licenses", licenses)
354
355    @Metadata.property
356    def sources(self):
357        """
358        Returns
359            dict[]: resource sources
360        """
361        sources = self.get("sources", [])
362        return self.metadata_attach("sources", sources)
363
364    @Metadata.property
365    def profile(self):
366        """
367        Returns
368            str: resource profile
369        """
370        default = settings.DEFAULT_RESOURCE_PROFILE
371        if self.tabular:
372            default = settings.DEFAULT_TABULAR_RESOURCE_PROFILE
373        return self.get("profile", default)
374
375    @Metadata.property
376    def path(self):
377        """
378        Returns
379            str: resource path
380        """
381        return self.get("path", self.__file.path)
382
383    @Metadata.property
384    def data(self):
385        """
386        Returns
387            any[][]?: resource data
388        """
389        return self.get("data", self.__file.data)
390
391    @Metadata.property
392    def scheme(self):
393        """
394        Returns
395            str: resource scheme
396        """
397        return self.get("scheme", self.__file.scheme).lower()
398
399    @Metadata.property
400    def format(self):
401        """
402        Returns
403            str: resource format
404        """
405        return self.get("format", self.__file.format).lower()
406
407    @Metadata.property
408    def hashing(self):
409        """
410        Returns
411            str: resource hashing
412        """
413        return self.get("hashing", settings.DEFAULT_HASHING).lower()
414
415    @Metadata.property
416    def encoding(self):
417        """
418        Returns
419            str: resource encoding
420        """
421        return self.get("encoding", settings.DEFAULT_ENCODING).lower()
422
423    @Metadata.property
424    def innerpath(self):
425        """
426        Returns
427            str: resource compression path
428        """
429        return self.get("innerpath", self.__file.innerpath)
430
431    @Metadata.property
432    def compression(self):
433        """
434        Returns
435            str: resource compression
436        """
437        return self.get("compression", self.__file.compression).lower()
438
439    @Metadata.property
440    def control(self):
441        """
442        Returns
443            Control: resource control
444        """
445        control = self.get("control")
446        if control is None:
447            control = system.create_control(self, descriptor=control)
448            control = self.metadata_attach("control", control)
449        elif isinstance(control, str):
450            control = os.path.join(self.basepath, control)
451            control = system.create_control(self, descriptor=control)
452            control = self.metadata_attach("control", control)
453        return control
454
455    @Metadata.property
456    def dialect(self):
457        """
458        Returns
459            Dialect: resource dialect
460        """
461        dialect = self.get("dialect")
462        if dialect is None:
463            dialect = system.create_dialect(self, descriptor=dialect)
464            dialect = self.metadata_attach("dialect", dialect)
465        elif isinstance(dialect, str):
466            dialect = helpers.join_path(self.basepath, dialect)
467            dialect = system.create_dialect(self, descriptor=dialect)
468            dialect = self.metadata_attach("dialect", dialect)
469        return dialect
470
471    @Metadata.property
472    def layout(self):
473        """
474        Returns:
475            Layout: table layout
476        """
477        layout = self.get("layout")
478        if layout is None:
479            layout = Layout()
480            layout = self.metadata_attach("layout", layout)
481        elif isinstance(layout, str):
482            layout = Layout(os.path.join(self.basepath, layout))
483            layout = self.metadata_attach("layout", layout)
484        return layout
485
486    @Metadata.property
487    def schema(self):
488        """
489        Returns
490            Schema: resource schema
491        """
492        schema = self.get("schema")
493        if schema is None:
494            schema = Schema()
495            schema = self.metadata_attach("schema", schema)
496        elif isinstance(schema, str):
497            schema = Schema(helpers.join_path(self.basepath, schema))
498            schema = self.metadata_attach("schema", schema)
499        return schema
500
501    # NOTE: updating this Metadata.propertyc reates a huge overheader
502    # Once it's fixed we might return to stats updating during reading
503    # See: https://github.com/frictionlessdata/frictionless-py/issues/879
504    @Metadata.property
505    def stats(self):
506        """
507        Returns
508            dict: resource stats
509        """
510        stats = self.get("stats")
511        if stats is None:
512            stats = {"hash": "", "bytes": 0}
513            if self.tabular:
514                stats.update({"fields": 0, "rows": 0})
515            stats = self.metadata_attach("stats", stats)
516        return stats
517
518    @property
519    def buffer(self):
520        """File's bytes used as a sample
521
522        These buffer bytes are used to infer characteristics of the
523        source file (e.g. encoding, ...).
524
525        Returns:
526            bytes?: file buffer
527        """
528        if self.__parser and self.__parser.loader:
529            return self.__parser.loader.buffer
530        if self.__loader:
531            return self.__loader.buffer
532
533    @property
534    def sample(self):
535        """Table's lists used as sample.
536
537        These sample rows are used to infer characteristics of the
538        source file (e.g. schema, ...).
539
540        Returns:
541            list[]?: table sample
542        """
543        return self.__sample
544
545    @property
546    def labels(self):
547        """
548        Returns:
549            str[]?: table labels
550        """
551        return self.__labels
552
553    @property
554    def fragment(self):
555        """Table's lists used as fragment.
556
557        These fragment rows are used internally to infer characteristics of the
558        source file (e.g. schema, ...).
559
560        Returns:
561            list[]?: table fragment
562        """
563        return self.__fragment
564
565    @property
566    def header(self):
567        """
568        Returns:
569            str[]?: table header
570        """
571        return self.__header
572
573    @Metadata.property(cache=False, write=False)
574    def basepath(self):
575        """
576        Returns
577            str: resource basepath
578        """
579        return self.__file.basepath
580
581    @Metadata.property(cache=False, write=False)
582    def fullpath(self):
583        """
584        Returns
585            str: resource fullpath
586        """
587        return self.__file.fullpath
588
589    @Metadata.property(cache=False, write=False)
590    def detector(self):
591        """
592        Returns
593            str: resource detector
594        """
595        return self.__detector
596
597    @Metadata.property(cache=False, write=False)
598    def onerror(self):
599        """
600        Returns:
601            ignore|warn|raise: on error bahaviour
602        """
603        return self.__onerror
604
605    @Metadata.property(cache=False, write=False)
606    def trusted(self):
607        """
608        Returns:
609            bool: don't raise an exception on unsafe paths
610        """
611        return self.__trusted
612
613    @Metadata.property(cache=False, write=False)
614    def package(self):
615        """
616        Returns:
617            Package?: parent package
618        """
619        return self.__package
620
621    @Metadata.property(write=False)
622    def memory(self):
623        return self.__file.memory
624
625    @Metadata.property(write=False)
626    def remote(self):
627        return self.__file.remote
628
629    @Metadata.property(write=False)
630    def multipart(self):
631        return self.__file.multipart
632
633    @Metadata.property(write=False)
634    def tabular(self):
635        """
636        Returns
637            bool: if resource is tabular
638        """
639        if not self.closed:
640            return bool(self.__parser)
641        try:
642            system.create_parser(self)
643            return True
644        except Exception:
645            return False
646
647    @property
648    def byte_stream(self):
649        """Byte stream in form of a generator
650
651        Yields:
652            gen<bytes>?: byte stream
653        """
654        if not self.closed:
655            if not self.__loader:
656                self.__loader = system.create_loader(self)
657                self.__loader.open()
658            return self.__loader.byte_stream
659
660    @property
661    def text_stream(self):
662        """Text stream in form of a generator
663
664        Yields:
665            gen<str[]>?: text stream
666        """
667        if not self.closed:
668            if not self.__loader:
669                self.__loader = system.create_loader(self)
670                self.__loader.open()
671            return self.__loader.text_stream
672
673    @property
674    def list_stream(self):
675        """List stream in form of a generator
676
677        Yields:
678            gen<any[][]>?: list stream
679        """
680        if self.__parser:
681            return self.__parser.list_stream
682
683    @property
684    def row_stream(self):
685        """Row stream in form of a generator of Row objects
686
687        Yields:
688            gen<Row[]>?: row stream
689        """
690        return self.__row_stream
691
692    # Expand
693
694    def expand(self):
695        """Expand metadata"""
696        self.setdefault("profile", self.profile)
697        self.setdefault("scheme", self.scheme)
698        self.setdefault("format", self.format)
699        self.setdefault("hashing", self.hashing)
700        self.setdefault("encoding", self.encoding)
701        self.setdefault("innerpath", self.innerpath)
702        self.setdefault("compression", self.compression)
703        self.setdefault("control", self.control)
704        self.setdefault("dialect", self.dialect)
705        self.control.expand()
706        self.dialect.expand()
707        if self.tabular:
708            self.setdefault("layout", self.layout)
709            self.setdefault("schema", self.schema)
710            self.layout.expand()
711            self.schema.expand()
712
713    # Infer
714
715    def infer(self, *, stats=False):
716        """Infer metadata
717
718        Parameters:
719            stats? (bool): stream file completely and infer stats
720        """
721        if not self.closed:
722            note = "Resource.infer canot be used on a open resource"
723            raise FrictionlessException(errors.ResourceError(note=note))
724        with self:
725            if not stats:
726                self.pop("stats", None)
727                return
728            stream = self.row_stream or self.byte_stream
729            helpers.pass_through(stream)
730
731    # Open/Close
732
733    def open(self):
734        """Open the resource as "io.open" does
735
736        Raises:
737            FrictionlessException: any exception that occurs
738        """
739        self.close()
740
741        # Infer
742        self.pop("stats", None)
743        self["name"] = self.name
744        self["profile"] = self.profile
745        self["scheme"] = self.scheme
746        self["format"] = self.format
747        self["hashing"] = self.hashing
748        if self.innerpath:
749            self["innerpath"] = self.innerpath
750        if self.compression:
751            self["compression"] = self.compression
752        if self.control:
753            self["control"] = self.control
754        if self.dialect:
755            self["dialect"] = self.dialect
756        self["stats"] = self.stats
757
758        # Validate
759        if self.metadata_errors:
760            error = self.metadata_errors[0]
761            raise FrictionlessException(error)
762
763        # Open
764        try:
765
766            # Table
767            if self.tabular:
768                self.__parser = system.create_parser(self)
769                self.__parser.open()
770                self.__read_detect_layout()
771                self.__read_detect_schema()
772                self.__read_detect_lookup()
773                self.__header = self.__read_header()
774                self.__row_stream = self.__read_row_stream()
775                return self
776
777            # File
778            else:
779                self.__loader = system.create_loader(self)
780                self.__loader.open()
781                return self
782
783        # Error
784        except Exception:
785            self.close()
786            raise
787
788    def close(self):
789        """Close the table as "filelike.close" does"""
790        if self.__parser:
791            self.__parser.close()
792            self.__parser = None
793        if self.__loader:
794            self.__loader.close()
795            self.__loader = None
796
797    @property
798    def closed(self):
799        """Whether the table is closed
800
801        Returns:
802            bool: if closed
803        """
804        return self.__parser is None and self.__loader is None
805
806    # Read
807
808    def read_bytes(self, *, size=None):
809        """Read bytes into memory
810
811        Returns:
812            any[][]: resource bytes
813        """
814        if self.memory:
815            return b""
816        with helpers.ensure_open(self):
817            return self.byte_stream.read1(size)
818
819    def read_text(self, *, size=None):
820        """Read text into memory
821
822        Returns:
823            str: resource text
824        """
825        if self.memory:
826            return ""
827        with helpers.ensure_open(self):
828            return self.text_stream.read(size)
829
830    def read_data(self, *, size=None):
831        """Read data into memory
832
833        Returns:
834            any: resource data
835        """
836        if self.data:
837            return self.data
838        with helpers.ensure_open(self):
839            text = self.read_text()
840            data = json.loads(text)
841            return data
842
843    def read_lists(self, *, size=None):
844        """Read lists into memory
845
846        Returns:
847            any[][]: table lists
848        """
849        with helpers.ensure_open(self):
850            lists = []
851            for cells in self.list_stream:
852                lists.append(cells)
853                if size and len(lists) >= size:
854                    break
855            return lists
856
857    def read_rows(self, *, size=None):
858        """Read rows into memory
859
860        Returns:
861            Row[]: table rows
862        """
863        with helpers.ensure_open(self):
864            rows = []
865            for row in self.row_stream:
866                rows.append(row)
867                if size and len(rows) >= size:
868                    break
869            return rows
870
871    def __read_row_stream(self):
872
873        # During row streaming we crate a field inf structure
874        # This structure is optimized and detached version of schema.fields
875        # We create all data structures in-advance to share them between rows
876
877        # Create field info
878        field_number = 0
879        field_info = {"names": [], "objects": [], "positions": [], "mapping": {}}
880        iterator = zip_longest(self.schema.fields, self.__field_positions)
881        for field, field_position in iterator:
882            if field is None:
883                break
884            field_number += 1
885            field_info["names"].append(field.name)
886            field_info["objects"].append(field.to_copy())
887            field_info["mapping"][field.name] = (field, field_number, field_position)
888            if field_position is not None:
889                field_info["positions"].append(field_position)
890
891        # Create state
892        memory_unique = {}
893        memory_primary = {}
894        foreign_groups = []
895        is_integrity = bool(self.schema.primary_key)
896        for field in self.schema.fields:
897            if field.constraints.get("unique"):
898                memory_unique[field.name] = {}
899                is_integrity = True
900        if self.__lookup:
901            for fk in self.schema.foreign_keys:
902                group = {}
903                group["sourceName"] = fk["reference"]["resource"]
904                group["sourceKey"] = tuple(fk["reference"]["fields"])
905                group["targetKey"] = tuple(fk["fields"])
906                foreign_groups.append(group)
907                is_integrity = True
908
909        # Create iterator
910        iterator = chain(
911            zip(self.__fragment_positions, self.__fragment),
912            self.__read_list_stream(),
913        )
914
915        # Create row stream
916        def row_stream():
917            self.__row_number = 0
918            limit = self.layout.limit_rows
919            offset = self.layout.offset_rows or 0
920            for row_position, cells in iterator:
921                self.__row_position = row_position
922
923                # Offset/offset rows
924                if offset:
925                    offset -= 1
926                    continue
927                if limit and limit <= self.__row_number:
928                    break
929
930                # Create row
931                self.__row_number += 1
932                row = Row(
933                    cells,
934                    field_info=field_info,
935                    row_position=self.__row_position,
936                    row_number=self.__row_number,
937                )
938
939                # Unique Error
940                if is_integrity and memory_unique:
941                    for field_name in memory_unique.keys():
942                        cell = row[field_name]
943                        if cell is not None:
944                            match = memory_unique[field_name].get(cell)
945                            memory_unique[field_name][cell] = row.row_position
946                            if match:
947                                func = errors.UniqueError.from_row
948                                note = "the same as in the row at position %s" % match
949                                error = func(row, note=note, field_name=field_name)
950                                row.errors.append(error)
951
952                # Primary Key Error
953                if is_integrity and self.schema.primary_key:
954                    cells = tuple(row[name] for name in self.schema.primary_key)
955                    if set(cells) == {None}:
956                        note = 'cells composing the primary keys are all "None"'
957                        error = errors.PrimaryKeyError.from_row(row, note=note)
958                        row.errors.append(error)
959                    else:
960                        match = memory_primary.get(cells)
961                        memory_primary[cells] = row.row_position
962                        if match:
963                            if match:
964                                note = "the same as in the row at position %s" % match
965                                error = errors.PrimaryKeyError.from_row(row, note=note)
966                                row.errors.append(error)
967
968                # Foreign Key Error
969                if is_integrity and foreign_groups:
970                    for group in foreign_groups:
971                        group_lookup = self.__lookup.get(group["sourceName"])
972                        if group_lookup:
973                            cells = tuple(row[name] for name in group["targetKey"])
974                            if set(cells) == {None}:
975                                continue
976                            match = cells in group_lookup.get(group["sourceKey"], set())
977                            if not match:
978                                note = "not found in the lookup table"
979                                error = errors.ForeignKeyError.from_row(row, note=note)
980                                row.errors.append(error)
981
982                # Handle errors
983                if self.onerror != "ignore":
984                    if not row.valid:
985                        error = row.errors[0]
986                        if self.onerror == "raise":
987                            raise FrictionlessException(error)
988                        warnings.warn(error.message, UserWarning)
989
990                # Yield row
991                yield row
992
993            # Update stats
994            self.stats["rows"] = self.__row_number
995
996        # Return row stream
997        return row_stream()
998
999    def __read_header(self):
1000
1001        # Create header
1002        header = Header(
1003            self.__labels,
1004            fields=self.schema.fields,
1005            field_positions=self.__field_positions,
1006            row_positions=self.layout.header_rows,
1007            ignore_case=not self.layout.header_case,
1008        )
1009
1010        # Handle errors
1011        if not header.valid:
1012            error = header.errors[0]
1013            if self.onerror == "warn":
1014                warnings.warn(error.message, UserWarning)
1015            elif self.onerror == "raise":
1016                raise FrictionlessException(error)
1017
1018        return header
1019
1020    def __read_list_stream(self):
1021
1022        # Prepare iterator
1023        iterator = (
1024            (position, cells)
1025            for position, cells in enumerate(self.__parser.list_stream, start=1)
1026            if position > len(self.__parser.sample)
1027        )
1028
1029        # Stream without filtering
1030        if not self.layout:
1031            yield from iterator
1032            return
1033
1034        # Stream with filtering
1035        for row_position, cells in iterator:
1036            if self.layout.read_filter_rows(cells, row_position=row_position):
1037                yield row_position, self.layout.read_filter_cells(
1038                    cells, field_positions=self.__field_positions
1039                )
1040
1041    def __read_detect_layout(self):
1042        sample = self.__parser.sample
1043        layout = self.detector.detect_layout(sample, layout=self.layout)
1044        if layout:
1045            self.layout = layout
1046        self.__sample = sample
1047
1048    def __read_detect_schema(self):
1049        labels, field_positions = self.layout.read_labels(self.sample)
1050        fragment, fragment_positions = self.layout.read_fragment(self.sample)
1051        schema = self.detector.detect_schema(fragment, labels=labels, schema=self.schema)
1052        if schema:
1053            self.schema = schema
1054        self.__labels = labels
1055        self.__fragment = fragment
1056        self.__field_positions = field_positions
1057        self.__fragment_positions = fragment_positions
1058        self.stats["fields"] = len(schema.fields)
1059        # NOTE: review whether it's a proper place for this fallback to data resource
1060        if not schema:
1061            self.profile = "data-resource"
1062
1063    def __read_detect_lookup(self):
1064        lookup = {}
1065        for fk in self.schema.foreign_keys:
1066            source_name = fk["reference"]["resource"]
1067            source_key = tuple(fk["reference"]["fields"])
1068            if source_name != "" and not self.__package:
1069                continue
1070            if source_name:
1071                if not self.__package.has_resource(source_name):
1072                    note = f'Failed to handle a foreign key for resource "{self.name}" as resource "{source_name}" does not exist'
1073                    raise FrictionlessException(errors.ResourceError(note=note))
1074                source_res = self.__package.get_resource(source_name)
1075            else:
1076                source_res = self.to_copy()
1077            source_res.schema.pop("foreignKeys", None)
1078            lookup.setdefault(source_name, {})
1079            if source_key in lookup[source_name]:
1080                continue
1081            lookup[source_name][source_key] = set()
1082            if not source_res:
1083                continue
1084            with source_res:
1085                for row in source_res.row_stream:
1086                    cells = tuple(row.get(field_name) for field_name in source_key)
1087                    if set(cells) == {None}:
1088                        continue
1089                    lookup[source_name][source_key].add(cells)
1090        self.__lookup = lookup
1091
1092    # Write
1093
1094    def write(self, target=None, **options):
1095        """Write this resource to the target resource
1096
1097        Parameters:
1098            target (any|Resource): target or target resource instance
1099            **options (dict): Resource constructor options
1100        """
1101        native = isinstance(target, Resource)
1102        target = target.to_copy() if native else Resource(target, **options)
1103        parser = system.create_parser(target)
1104        parser.write_row_stream(self.to_copy())
1105        return target
1106
1107    # Import/Export
1108
1109    def to_dict(self):
1110        """Create a dict from the resource
1111
1112        Returns
1113            dict: dict representation
1114        """
1115        # Data can be not serializable (generators/functions)
1116        descriptor = super().to_dict()
1117        data = descriptor.pop("data", None)
1118        if isinstance(data, list):
1119            descriptor["data"] = data
1120        return descriptor
1121
1122    def to_copy(self, **options):
1123        """Create a copy from the resource
1124
1125        Returns
1126            Resource: resource copy
1127        """
1128        descriptor = self.to_dict()
1129        return Resource(
1130            descriptor,
1131            data=self.data,
1132            basepath=self.__basepath,
1133            detector=self.__detector,
1134            onerror=self.__onerror,
1135            trusted=self.__trusted,
1136            package=self.__package,
1137            **options,
1138        )
1139
1140    def to_view(self, type="look", **options):
1141        """Create a view from the resource
1142
1143        See PETL's docs for more information:
1144        https://petl.readthedocs.io/en/stable/util.html#visualising-tables
1145
1146        Parameters:
1147            type (look|lookall|see|display|displayall): view's type
1148            **options (dict): options to be passed to PETL
1149
1150        Returns
1151            str: resource's view
1152        """
1153        assert type in ["look", "lookall", "see", "display", "displayall"]
1154        view = str(getattr(self.to_petl(normalize=True), type)(**options))
1155        return view
1156
1157    def to_snap(self, *, json=False):
1158        """Create a snapshot from the resource
1159
1160        Parameters:
1161            json (bool): make data types compatible with JSON format
1162
1163        Returns
1164            list: resource's data
1165        """
1166        snap = []
1167        with helpers.ensure_open(self):
1168            snap.append(self.header.to_list())
1169            for row in self.row_stream:
1170                snap.append(row.to_list(json=json))
1171        return snap
1172
1173    def to_inline(self, *, dialect=None):
1174        """Helper to export resource as an inline data"""
1175        target = self.write(Resource(format="inline", dialect=dialect))
1176        return target.data
1177
1178    def to_pandas(self, *, dialect=None):
1179        """Helper to export resource as an Pandas dataframe"""
1180        target = self.write(Resource(format="pandas", dialect=dialect))
1181        return target.data
1182
1183    @staticmethod
1184    def from_petl(view, **options):
1185        """Create a resource from PETL view"""
1186        return Resource(data=view, **options)
1187
1188    def to_petl(self, normalize=False):
1189        """Export resource as a PETL table"""
1190        resource = self.to_copy()
1191
1192        # Define view
1193        class ResourceView(petl.Table):
1194            def __iter__(self):
1195                with resource:
1196                    if normalize:
1197                        yield resource.schema.field_names
1198                        yield from (row.to_list() for row in resource.row_stream)
1199                        return
1200                    if not resource.header.missing:
1201                        yield resource.header.labels
1202                    yield from (row.cells for row in resource.row_stream)
1203
1204        return ResourceView()
1205
1206    # Metadata
1207
1208    metadata_duplicate = True
1209    metadata_Error = errors.ResourceError
1210    metadata_profile = deepcopy(settings.RESOURCE_PROFILE)
1211    metadata_profile["properties"]["control"] = {"type": ["string", "object"]}
1212    metadata_profile["properties"]["dialect"] = {"type": ["string", "object"]}
1213    metadata_profile["properties"]["layout"] = {"type": ["string", "object"]}
1214    metadata_profile["properties"]["schema"] = {"type": ["string", "object"]}
1215
1216    def metadata_process(self):
1217
1218        # File
1219        self.__file = system.create_file(
1220            self.get("data", self.get("path", [])),
1221            innerpath=self.get("innerpath"),
1222            basepath=self.__basepath,
1223        )
1224
1225        # Control
1226        control = self.get("control")
1227        if not isinstance(control, (str, type(None))):
1228            control = system.create_control(self, descriptor=control)
1229            dict.__setitem__(self, "control", control)
1230
1231        # Dialect
1232        dialect = self.get("dialect")
1233        if not isinstance(dialect, (str, type(None))):
1234            dialect = system.create_dialect(self, descriptor=dialect)
1235            dict.__setitem__(self, "dialect", dialect)
1236
1237        # Layout
1238        layout = self.get("layout")
1239        if not isinstance(layout, (str, type(None), Layout)):
1240            layout = Layout(layout)
1241            dict.__setitem__(self, "layout", layout)
1242
1243        # Schema
1244        schema = self.get("schema")
1245        if not isinstance(schema, (str, type(None), Schema)):
1246            schema = Schema(schema)
1247            dict.__setitem__(self, "schema", schema)
1248
1249        # Security
1250        if not self.trusted:
1251            for name in ["path", "control", "dialect", "schema"]:
1252                path = self.get(name)
1253                if not isinstance(path, (str, list)):
1254                    continue
1255                path = path if isinstance(path, list) else [path]
1256                if not all(helpers.is_safe_path(chunk) for chunk in path):
1257                    note = f'path "{path}" is not safe'
1258                    error = errors.ResourceError(note=note)
1259                    raise FrictionlessException(error)
1260
1261    def metadata_validate(self):
1262        yield from super().metadata_validate()
1263
1264        # Control/Dialect
1265        yield from self.control.metadata_errors
1266        yield from self.dialect.metadata_errors
1267
1268        # Layout/Schema
1269        if self.layout:
1270            yield from self.layout.metadata_errors
1271        if self.schema:
1272            yield from self.schema.metadata_errors
1273
1274        # Contributors/Sources
1275        for name in ["contributors", "sources"]:
1276            for item in self.get(name, []):
1277                if item.get("email"):
1278                    field = Field(type="string", format="email")
1279                    cell = field.read_cell(item.get("email"))[0]
1280                    if not cell:
1281                        note = f'property "{name}[].email" is not valid "email"'
1282                        yield errors.PackageError(note=note)
1283