1import os 2import json 3import petl 4import warnings 5from pathlib import Path 6from copy import deepcopy 7from itertools import zip_longest, chain 8from .exception import FrictionlessException 9from .detector import Detector 10from .metadata import Metadata 11from .layout import Layout 12from .schema import Schema 13from .header import Header 14from .system import system 15from .field import Field 16from .row import Row 17from . import settings 18from . import helpers 19from . import errors 20 21 22# NOTE: 23# Review the situation with describe function removing stats (move to infer?) 24 25 26class Resource(Metadata): 27 """Resource representation. 28 29 API | Usage 30 -------- | -------- 31 Public | `from frictionless import Resource` 32 33 This class is one of the cornerstones of of Frictionless framework. 34 It loads a data source, and allows you to stream its parsed contents. 35 At the same time, it's a metadata class data description. 36 37 ```python 38 with Resource("data/table.csv") as resource: 39 resource.header == ["id", "name"] 40 resource.read_rows() == [ 41 {'id': 1, 'name': 'english'}, 42 {'id': 2, 'name': '中国人'}, 43 ] 44 ``` 45 46 Parameters: 47 48 source (any): Source of the resource; can be in various forms. 49 Usually, it's a string as `<scheme>://path/to/file.<format>`. 50 It also can be, for example, an array of data arrays/dictionaries. 51 Or it can be a resource descriptor dict or path. 52 53 descriptor (dict|str): A resource descriptor provided explicitly. 54 Keyword arguments will patch this descriptor if provided. 55 56 name? (str): A Resource name according to the specs. 57 It should be a slugified name of the resource. 58 59 title? (str): A Resource title according to the specs 60 It should a human-oriented title of the resource. 61 62 description? (str): A Resource description according to the specs 63 It should a human-oriented description of the resource. 64 65 mediatype? (str): A mediatype/mimetype of the resource e.g. “text/csv”, 66 or “application/vnd.ms-excel”. Mediatypes are maintained by the 67 Internet Assigned Numbers Authority (IANA) in a media type registry. 68 69 licenses? (dict[]): The license(s) under which the resource is provided. 70 If omitted it's considered the same as the package's licenses. 71 72 sources? (dict[]): The raw sources for this data resource. 73 It MUST be an array of Source objects. 74 Each Source object MUST have a title and 75 MAY have path and/or email properties. 76 77 profile? (str): A string identifying the profile of this descriptor. 78 For example, `tabular-data-resource`. 79 80 scheme? (str): Scheme for loading the file (file, http, ...). 81 If not set, it'll be inferred from `source`. 82 83 format? (str): File source's format (csv, xls, ...). 84 If not set, it'll be inferred from `source`. 85 86 hashing? (str): An algorithm to hash data. 87 It defaults to 'md5'. 88 89 encoding? (str): Source encoding. 90 If not set, it'll be inferred from `source`. 91 92 innerpath? (str): A path within the compressed file. 93 It defaults to the first file in the archive. 94 95 compression? (str): Source file compression (zip, ...). 96 If not set, it'll be inferred from `source`. 97 98 control? (dict|Control): File control. 99 For more information, please check the Control documentation. 100 101 dialect? (dict|Dialect): Table dialect. 102 For more information, please check the Dialect documentation. 103 104 layout? (dict|Layout): Table layout. 105 For more information, please check the Layout documentation. 106 107 schema? (dict|Schema): Table schema. 108 For more information, please check the Schema documentation. 109 110 stats? (dict): File/table stats. 111 A dict with the following possible properties: hash, bytes, fields, rows. 112 113 basepath? (str): A basepath of the resource 114 The fullpath of the resource is joined `basepath` and /path` 115 116 detector? (Detector): File/table detector. 117 For more information, please check the Detector documentation. 118 119 onerror? (ignore|warn|raise): Behaviour if there is an error. 120 It defaults to 'ignore'. The default mode will ignore all errors 121 on resource level and they should be handled by the user 122 being available in Header and Row objects. 123 124 trusted? (bool): Don't raise an exception on unsafe paths. 125 A path provided as a part of the descriptor considered unsafe 126 if there are path traversing or the path is absolute. 127 A path provided as `source` or `path` is alway trusted. 128 129 package? (Package): A owning this resource package. 130 It's actual if the resource is part of some data package. 131 132 Raises: 133 FrictionlessException: raise any error that occurs during the process 134 """ 135 136 def __init__( 137 self, 138 source=None, 139 *, 140 descriptor=None, 141 # Spec 142 name=None, 143 title=None, 144 description=None, 145 mediatype=None, 146 licenses=None, 147 sources=None, 148 profile=None, 149 path=None, 150 data=None, 151 scheme=None, 152 format=None, 153 hashing=None, 154 encoding=None, 155 innerpath=None, 156 compression=None, 157 control=None, 158 dialect=None, 159 layout=None, 160 schema=None, 161 stats=None, 162 # Extra 163 basepath="", 164 detector=None, 165 onerror="ignore", 166 trusted=False, 167 package=None, 168 ): 169 170 # Handle source 171 if source is not None: 172 file = system.create_file(source, basepath=basepath) 173 if file.type == "table": 174 if path is None: 175 path = file.path 176 if data is None: 177 data = file.data 178 elif descriptor is None: 179 descriptor = source 180 181 # Handle pathlib 182 if isinstance(descriptor, Path): 183 descriptor = str(descriptor) 184 185 # Handle trusted 186 if descriptor is None: 187 trusted = True 188 189 # Store state 190 self.__loader = None 191 self.__parser = None 192 self.__sample = None 193 self.__labels = None 194 self.__fragment = None 195 self.__header = None 196 self.__lookup = None 197 self.__byte_stream = None 198 self.__text_stream = None 199 self.__list_stream = None 200 self.__row_stream = None 201 self.__row_number = None 202 self.__row_position = None 203 self.__field_positions = None 204 self.__fragment_positions = None 205 206 # Store extra 207 self.__basepath = basepath or helpers.parse_basepath(descriptor) 208 self.__detector = detector or Detector() 209 self.__onerror = onerror 210 self.__trusted = trusted 211 self.__package = package 212 213 # Store specs 214 self.setinitial("name", name) 215 self.setinitial("title", title) 216 self.setinitial("description", description) 217 self.setinitial("mediatype", mediatype) 218 self.setinitial("licenses", licenses) 219 self.setinitial("sources", sources) 220 self.setinitial("profile", profile) 221 self.setinitial("path", path) 222 self.setinitial("data", data) 223 self.setinitial("scheme", scheme) 224 self.setinitial("format", format) 225 self.setinitial("hashing", hashing) 226 self.setinitial("encoding", encoding) 227 self.setinitial("compression", compression) 228 self.setinitial("innerpath", innerpath) 229 self.setinitial("control", control) 230 self.setinitial("dialect", dialect) 231 self.setinitial("layout", layout) 232 self.setinitial("schema", schema) 233 self.setinitial("stats", stats) 234 super().__init__(descriptor) 235 236 # NOTE: it will not work if dialect is a path 237 # Handle official dialect.header 238 dialect = self.get("dialect") 239 if isinstance(dialect, dict): 240 header = dialect.pop("header", None) 241 if header is False: 242 self.setdefault("layout", {}) 243 self["layout"]["header"] = False 244 245 # Handle official hash/bytes/rows 246 for name in ["hash", "bytes", "rows"]: 247 value = self.pop(name, None) 248 if value: 249 if name == "hash": 250 hashing, value = helpers.parse_resource_hash(value) 251 if hashing != settings.DEFAULT_HASHING: 252 self["hashing"] = hashing 253 self.setdefault("stats", {}) 254 self["stats"][name] = value 255 256 # Handle deprecated url 257 url = self.get("url") 258 path = self.get("path") 259 if url and not path: 260 message = 'Property "url" is deprecated. Please use "path" instead.' 261 warnings.warn(message, UserWarning) 262 self["path"] = self.pop("url") 263 264 # Handle deprecated compression 265 compression = self.get("compression") 266 if compression == "no": 267 message = 'Compression "no" is deprecated. Please use "" compression.' 268 warnings.warn(message, UserWarning) 269 self["compression"] = "" 270 271 def __setattr__(self, name, value): 272 if name == "basepath": 273 self.__basepath = value 274 elif name == "detector": 275 self.__detector = value 276 elif name == "onerror": 277 self.__onerror = value 278 elif name == "trusted": 279 self.__trusted = value 280 elif name == "package": 281 self.__package = value 282 else: 283 return super().__setattr__(name, value) 284 self.metadata_process() 285 286 def __enter__(self): 287 if self.closed: 288 self.open() 289 return self 290 291 def __exit__(self, type, value, traceback): 292 self.close() 293 294 def __iter__(self): 295 with helpers.ensure_open(self): 296 yield from self.__row_stream 297 298 @Metadata.property 299 def name(self): 300 """ 301 Returns 302 str: resource name 303 """ 304 return self.get("name", self.__file.name) 305 306 @Metadata.property 307 def title(self): 308 """ 309 Returns 310 str: resource title 311 """ 312 return self.get("title", "") 313 314 @Metadata.property 315 def description(self): 316 """ 317 Returns 318 str: resource description 319 """ 320 return self.get("description", "") 321 322 @Metadata.property(cache=False, write=False) 323 def description_html(self): 324 """ 325 Returns: 326 str?: resource description 327 """ 328 return helpers.md_to_html(self.description) 329 330 @Metadata.property 331 def description_text(self): 332 """ 333 Returns: 334 str: resource description 335 """ 336 return helpers.html_to_text(self.description_html) 337 338 @Metadata.property 339 def mediatype(self): 340 """ 341 Returns 342 str: resource mediatype 343 """ 344 return self.get("mediatype", "") 345 346 @Metadata.property 347 def licenses(self): 348 """ 349 Returns 350 dict[]: resource licenses 351 """ 352 licenses = self.get("licenses", []) 353 return self.metadata_attach("licenses", licenses) 354 355 @Metadata.property 356 def sources(self): 357 """ 358 Returns 359 dict[]: resource sources 360 """ 361 sources = self.get("sources", []) 362 return self.metadata_attach("sources", sources) 363 364 @Metadata.property 365 def profile(self): 366 """ 367 Returns 368 str: resource profile 369 """ 370 default = settings.DEFAULT_RESOURCE_PROFILE 371 if self.tabular: 372 default = settings.DEFAULT_TABULAR_RESOURCE_PROFILE 373 return self.get("profile", default) 374 375 @Metadata.property 376 def path(self): 377 """ 378 Returns 379 str: resource path 380 """ 381 return self.get("path", self.__file.path) 382 383 @Metadata.property 384 def data(self): 385 """ 386 Returns 387 any[][]?: resource data 388 """ 389 return self.get("data", self.__file.data) 390 391 @Metadata.property 392 def scheme(self): 393 """ 394 Returns 395 str: resource scheme 396 """ 397 return self.get("scheme", self.__file.scheme).lower() 398 399 @Metadata.property 400 def format(self): 401 """ 402 Returns 403 str: resource format 404 """ 405 return self.get("format", self.__file.format).lower() 406 407 @Metadata.property 408 def hashing(self): 409 """ 410 Returns 411 str: resource hashing 412 """ 413 return self.get("hashing", settings.DEFAULT_HASHING).lower() 414 415 @Metadata.property 416 def encoding(self): 417 """ 418 Returns 419 str: resource encoding 420 """ 421 return self.get("encoding", settings.DEFAULT_ENCODING).lower() 422 423 @Metadata.property 424 def innerpath(self): 425 """ 426 Returns 427 str: resource compression path 428 """ 429 return self.get("innerpath", self.__file.innerpath) 430 431 @Metadata.property 432 def compression(self): 433 """ 434 Returns 435 str: resource compression 436 """ 437 return self.get("compression", self.__file.compression).lower() 438 439 @Metadata.property 440 def control(self): 441 """ 442 Returns 443 Control: resource control 444 """ 445 control = self.get("control") 446 if control is None: 447 control = system.create_control(self, descriptor=control) 448 control = self.metadata_attach("control", control) 449 elif isinstance(control, str): 450 control = os.path.join(self.basepath, control) 451 control = system.create_control(self, descriptor=control) 452 control = self.metadata_attach("control", control) 453 return control 454 455 @Metadata.property 456 def dialect(self): 457 """ 458 Returns 459 Dialect: resource dialect 460 """ 461 dialect = self.get("dialect") 462 if dialect is None: 463 dialect = system.create_dialect(self, descriptor=dialect) 464 dialect = self.metadata_attach("dialect", dialect) 465 elif isinstance(dialect, str): 466 dialect = helpers.join_path(self.basepath, dialect) 467 dialect = system.create_dialect(self, descriptor=dialect) 468 dialect = self.metadata_attach("dialect", dialect) 469 return dialect 470 471 @Metadata.property 472 def layout(self): 473 """ 474 Returns: 475 Layout: table layout 476 """ 477 layout = self.get("layout") 478 if layout is None: 479 layout = Layout() 480 layout = self.metadata_attach("layout", layout) 481 elif isinstance(layout, str): 482 layout = Layout(os.path.join(self.basepath, layout)) 483 layout = self.metadata_attach("layout", layout) 484 return layout 485 486 @Metadata.property 487 def schema(self): 488 """ 489 Returns 490 Schema: resource schema 491 """ 492 schema = self.get("schema") 493 if schema is None: 494 schema = Schema() 495 schema = self.metadata_attach("schema", schema) 496 elif isinstance(schema, str): 497 schema = Schema(helpers.join_path(self.basepath, schema)) 498 schema = self.metadata_attach("schema", schema) 499 return schema 500 501 # NOTE: updating this Metadata.propertyc reates a huge overheader 502 # Once it's fixed we might return to stats updating during reading 503 # See: https://github.com/frictionlessdata/frictionless-py/issues/879 504 @Metadata.property 505 def stats(self): 506 """ 507 Returns 508 dict: resource stats 509 """ 510 stats = self.get("stats") 511 if stats is None: 512 stats = {"hash": "", "bytes": 0} 513 if self.tabular: 514 stats.update({"fields": 0, "rows": 0}) 515 stats = self.metadata_attach("stats", stats) 516 return stats 517 518 @property 519 def buffer(self): 520 """File's bytes used as a sample 521 522 These buffer bytes are used to infer characteristics of the 523 source file (e.g. encoding, ...). 524 525 Returns: 526 bytes?: file buffer 527 """ 528 if self.__parser and self.__parser.loader: 529 return self.__parser.loader.buffer 530 if self.__loader: 531 return self.__loader.buffer 532 533 @property 534 def sample(self): 535 """Table's lists used as sample. 536 537 These sample rows are used to infer characteristics of the 538 source file (e.g. schema, ...). 539 540 Returns: 541 list[]?: table sample 542 """ 543 return self.__sample 544 545 @property 546 def labels(self): 547 """ 548 Returns: 549 str[]?: table labels 550 """ 551 return self.__labels 552 553 @property 554 def fragment(self): 555 """Table's lists used as fragment. 556 557 These fragment rows are used internally to infer characteristics of the 558 source file (e.g. schema, ...). 559 560 Returns: 561 list[]?: table fragment 562 """ 563 return self.__fragment 564 565 @property 566 def header(self): 567 """ 568 Returns: 569 str[]?: table header 570 """ 571 return self.__header 572 573 @Metadata.property(cache=False, write=False) 574 def basepath(self): 575 """ 576 Returns 577 str: resource basepath 578 """ 579 return self.__file.basepath 580 581 @Metadata.property(cache=False, write=False) 582 def fullpath(self): 583 """ 584 Returns 585 str: resource fullpath 586 """ 587 return self.__file.fullpath 588 589 @Metadata.property(cache=False, write=False) 590 def detector(self): 591 """ 592 Returns 593 str: resource detector 594 """ 595 return self.__detector 596 597 @Metadata.property(cache=False, write=False) 598 def onerror(self): 599 """ 600 Returns: 601 ignore|warn|raise: on error bahaviour 602 """ 603 return self.__onerror 604 605 @Metadata.property(cache=False, write=False) 606 def trusted(self): 607 """ 608 Returns: 609 bool: don't raise an exception on unsafe paths 610 """ 611 return self.__trusted 612 613 @Metadata.property(cache=False, write=False) 614 def package(self): 615 """ 616 Returns: 617 Package?: parent package 618 """ 619 return self.__package 620 621 @Metadata.property(write=False) 622 def memory(self): 623 return self.__file.memory 624 625 @Metadata.property(write=False) 626 def remote(self): 627 return self.__file.remote 628 629 @Metadata.property(write=False) 630 def multipart(self): 631 return self.__file.multipart 632 633 @Metadata.property(write=False) 634 def tabular(self): 635 """ 636 Returns 637 bool: if resource is tabular 638 """ 639 if not self.closed: 640 return bool(self.__parser) 641 try: 642 system.create_parser(self) 643 return True 644 except Exception: 645 return False 646 647 @property 648 def byte_stream(self): 649 """Byte stream in form of a generator 650 651 Yields: 652 gen<bytes>?: byte stream 653 """ 654 if not self.closed: 655 if not self.__loader: 656 self.__loader = system.create_loader(self) 657 self.__loader.open() 658 return self.__loader.byte_stream 659 660 @property 661 def text_stream(self): 662 """Text stream in form of a generator 663 664 Yields: 665 gen<str[]>?: text stream 666 """ 667 if not self.closed: 668 if not self.__loader: 669 self.__loader = system.create_loader(self) 670 self.__loader.open() 671 return self.__loader.text_stream 672 673 @property 674 def list_stream(self): 675 """List stream in form of a generator 676 677 Yields: 678 gen<any[][]>?: list stream 679 """ 680 if self.__parser: 681 return self.__parser.list_stream 682 683 @property 684 def row_stream(self): 685 """Row stream in form of a generator of Row objects 686 687 Yields: 688 gen<Row[]>?: row stream 689 """ 690 return self.__row_stream 691 692 # Expand 693 694 def expand(self): 695 """Expand metadata""" 696 self.setdefault("profile", self.profile) 697 self.setdefault("scheme", self.scheme) 698 self.setdefault("format", self.format) 699 self.setdefault("hashing", self.hashing) 700 self.setdefault("encoding", self.encoding) 701 self.setdefault("innerpath", self.innerpath) 702 self.setdefault("compression", self.compression) 703 self.setdefault("control", self.control) 704 self.setdefault("dialect", self.dialect) 705 self.control.expand() 706 self.dialect.expand() 707 if self.tabular: 708 self.setdefault("layout", self.layout) 709 self.setdefault("schema", self.schema) 710 self.layout.expand() 711 self.schema.expand() 712 713 # Infer 714 715 def infer(self, *, stats=False): 716 """Infer metadata 717 718 Parameters: 719 stats? (bool): stream file completely and infer stats 720 """ 721 if not self.closed: 722 note = "Resource.infer canot be used on a open resource" 723 raise FrictionlessException(errors.ResourceError(note=note)) 724 with self: 725 if not stats: 726 self.pop("stats", None) 727 return 728 stream = self.row_stream or self.byte_stream 729 helpers.pass_through(stream) 730 731 # Open/Close 732 733 def open(self): 734 """Open the resource as "io.open" does 735 736 Raises: 737 FrictionlessException: any exception that occurs 738 """ 739 self.close() 740 741 # Infer 742 self.pop("stats", None) 743 self["name"] = self.name 744 self["profile"] = self.profile 745 self["scheme"] = self.scheme 746 self["format"] = self.format 747 self["hashing"] = self.hashing 748 if self.innerpath: 749 self["innerpath"] = self.innerpath 750 if self.compression: 751 self["compression"] = self.compression 752 if self.control: 753 self["control"] = self.control 754 if self.dialect: 755 self["dialect"] = self.dialect 756 self["stats"] = self.stats 757 758 # Validate 759 if self.metadata_errors: 760 error = self.metadata_errors[0] 761 raise FrictionlessException(error) 762 763 # Open 764 try: 765 766 # Table 767 if self.tabular: 768 self.__parser = system.create_parser(self) 769 self.__parser.open() 770 self.__read_detect_layout() 771 self.__read_detect_schema() 772 self.__read_detect_lookup() 773 self.__header = self.__read_header() 774 self.__row_stream = self.__read_row_stream() 775 return self 776 777 # File 778 else: 779 self.__loader = system.create_loader(self) 780 self.__loader.open() 781 return self 782 783 # Error 784 except Exception: 785 self.close() 786 raise 787 788 def close(self): 789 """Close the table as "filelike.close" does""" 790 if self.__parser: 791 self.__parser.close() 792 self.__parser = None 793 if self.__loader: 794 self.__loader.close() 795 self.__loader = None 796 797 @property 798 def closed(self): 799 """Whether the table is closed 800 801 Returns: 802 bool: if closed 803 """ 804 return self.__parser is None and self.__loader is None 805 806 # Read 807 808 def read_bytes(self, *, size=None): 809 """Read bytes into memory 810 811 Returns: 812 any[][]: resource bytes 813 """ 814 if self.memory: 815 return b"" 816 with helpers.ensure_open(self): 817 return self.byte_stream.read1(size) 818 819 def read_text(self, *, size=None): 820 """Read text into memory 821 822 Returns: 823 str: resource text 824 """ 825 if self.memory: 826 return "" 827 with helpers.ensure_open(self): 828 return self.text_stream.read(size) 829 830 def read_data(self, *, size=None): 831 """Read data into memory 832 833 Returns: 834 any: resource data 835 """ 836 if self.data: 837 return self.data 838 with helpers.ensure_open(self): 839 text = self.read_text() 840 data = json.loads(text) 841 return data 842 843 def read_lists(self, *, size=None): 844 """Read lists into memory 845 846 Returns: 847 any[][]: table lists 848 """ 849 with helpers.ensure_open(self): 850 lists = [] 851 for cells in self.list_stream: 852 lists.append(cells) 853 if size and len(lists) >= size: 854 break 855 return lists 856 857 def read_rows(self, *, size=None): 858 """Read rows into memory 859 860 Returns: 861 Row[]: table rows 862 """ 863 with helpers.ensure_open(self): 864 rows = [] 865 for row in self.row_stream: 866 rows.append(row) 867 if size and len(rows) >= size: 868 break 869 return rows 870 871 def __read_row_stream(self): 872 873 # During row streaming we crate a field inf structure 874 # This structure is optimized and detached version of schema.fields 875 # We create all data structures in-advance to share them between rows 876 877 # Create field info 878 field_number = 0 879 field_info = {"names": [], "objects": [], "positions": [], "mapping": {}} 880 iterator = zip_longest(self.schema.fields, self.__field_positions) 881 for field, field_position in iterator: 882 if field is None: 883 break 884 field_number += 1 885 field_info["names"].append(field.name) 886 field_info["objects"].append(field.to_copy()) 887 field_info["mapping"][field.name] = (field, field_number, field_position) 888 if field_position is not None: 889 field_info["positions"].append(field_position) 890 891 # Create state 892 memory_unique = {} 893 memory_primary = {} 894 foreign_groups = [] 895 is_integrity = bool(self.schema.primary_key) 896 for field in self.schema.fields: 897 if field.constraints.get("unique"): 898 memory_unique[field.name] = {} 899 is_integrity = True 900 if self.__lookup: 901 for fk in self.schema.foreign_keys: 902 group = {} 903 group["sourceName"] = fk["reference"]["resource"] 904 group["sourceKey"] = tuple(fk["reference"]["fields"]) 905 group["targetKey"] = tuple(fk["fields"]) 906 foreign_groups.append(group) 907 is_integrity = True 908 909 # Create iterator 910 iterator = chain( 911 zip(self.__fragment_positions, self.__fragment), 912 self.__read_list_stream(), 913 ) 914 915 # Create row stream 916 def row_stream(): 917 self.__row_number = 0 918 limit = self.layout.limit_rows 919 offset = self.layout.offset_rows or 0 920 for row_position, cells in iterator: 921 self.__row_position = row_position 922 923 # Offset/offset rows 924 if offset: 925 offset -= 1 926 continue 927 if limit and limit <= self.__row_number: 928 break 929 930 # Create row 931 self.__row_number += 1 932 row = Row( 933 cells, 934 field_info=field_info, 935 row_position=self.__row_position, 936 row_number=self.__row_number, 937 ) 938 939 # Unique Error 940 if is_integrity and memory_unique: 941 for field_name in memory_unique.keys(): 942 cell = row[field_name] 943 if cell is not None: 944 match = memory_unique[field_name].get(cell) 945 memory_unique[field_name][cell] = row.row_position 946 if match: 947 func = errors.UniqueError.from_row 948 note = "the same as in the row at position %s" % match 949 error = func(row, note=note, field_name=field_name) 950 row.errors.append(error) 951 952 # Primary Key Error 953 if is_integrity and self.schema.primary_key: 954 cells = tuple(row[name] for name in self.schema.primary_key) 955 if set(cells) == {None}: 956 note = 'cells composing the primary keys are all "None"' 957 error = errors.PrimaryKeyError.from_row(row, note=note) 958 row.errors.append(error) 959 else: 960 match = memory_primary.get(cells) 961 memory_primary[cells] = row.row_position 962 if match: 963 if match: 964 note = "the same as in the row at position %s" % match 965 error = errors.PrimaryKeyError.from_row(row, note=note) 966 row.errors.append(error) 967 968 # Foreign Key Error 969 if is_integrity and foreign_groups: 970 for group in foreign_groups: 971 group_lookup = self.__lookup.get(group["sourceName"]) 972 if group_lookup: 973 cells = tuple(row[name] for name in group["targetKey"]) 974 if set(cells) == {None}: 975 continue 976 match = cells in group_lookup.get(group["sourceKey"], set()) 977 if not match: 978 note = "not found in the lookup table" 979 error = errors.ForeignKeyError.from_row(row, note=note) 980 row.errors.append(error) 981 982 # Handle errors 983 if self.onerror != "ignore": 984 if not row.valid: 985 error = row.errors[0] 986 if self.onerror == "raise": 987 raise FrictionlessException(error) 988 warnings.warn(error.message, UserWarning) 989 990 # Yield row 991 yield row 992 993 # Update stats 994 self.stats["rows"] = self.__row_number 995 996 # Return row stream 997 return row_stream() 998 999 def __read_header(self): 1000 1001 # Create header 1002 header = Header( 1003 self.__labels, 1004 fields=self.schema.fields, 1005 field_positions=self.__field_positions, 1006 row_positions=self.layout.header_rows, 1007 ignore_case=not self.layout.header_case, 1008 ) 1009 1010 # Handle errors 1011 if not header.valid: 1012 error = header.errors[0] 1013 if self.onerror == "warn": 1014 warnings.warn(error.message, UserWarning) 1015 elif self.onerror == "raise": 1016 raise FrictionlessException(error) 1017 1018 return header 1019 1020 def __read_list_stream(self): 1021 1022 # Prepare iterator 1023 iterator = ( 1024 (position, cells) 1025 for position, cells in enumerate(self.__parser.list_stream, start=1) 1026 if position > len(self.__parser.sample) 1027 ) 1028 1029 # Stream without filtering 1030 if not self.layout: 1031 yield from iterator 1032 return 1033 1034 # Stream with filtering 1035 for row_position, cells in iterator: 1036 if self.layout.read_filter_rows(cells, row_position=row_position): 1037 yield row_position, self.layout.read_filter_cells( 1038 cells, field_positions=self.__field_positions 1039 ) 1040 1041 def __read_detect_layout(self): 1042 sample = self.__parser.sample 1043 layout = self.detector.detect_layout(sample, layout=self.layout) 1044 if layout: 1045 self.layout = layout 1046 self.__sample = sample 1047 1048 def __read_detect_schema(self): 1049 labels, field_positions = self.layout.read_labels(self.sample) 1050 fragment, fragment_positions = self.layout.read_fragment(self.sample) 1051 schema = self.detector.detect_schema(fragment, labels=labels, schema=self.schema) 1052 if schema: 1053 self.schema = schema 1054 self.__labels = labels 1055 self.__fragment = fragment 1056 self.__field_positions = field_positions 1057 self.__fragment_positions = fragment_positions 1058 self.stats["fields"] = len(schema.fields) 1059 # NOTE: review whether it's a proper place for this fallback to data resource 1060 if not schema: 1061 self.profile = "data-resource" 1062 1063 def __read_detect_lookup(self): 1064 lookup = {} 1065 for fk in self.schema.foreign_keys: 1066 source_name = fk["reference"]["resource"] 1067 source_key = tuple(fk["reference"]["fields"]) 1068 if source_name != "" and not self.__package: 1069 continue 1070 if source_name: 1071 if not self.__package.has_resource(source_name): 1072 note = f'Failed to handle a foreign key for resource "{self.name}" as resource "{source_name}" does not exist' 1073 raise FrictionlessException(errors.ResourceError(note=note)) 1074 source_res = self.__package.get_resource(source_name) 1075 else: 1076 source_res = self.to_copy() 1077 source_res.schema.pop("foreignKeys", None) 1078 lookup.setdefault(source_name, {}) 1079 if source_key in lookup[source_name]: 1080 continue 1081 lookup[source_name][source_key] = set() 1082 if not source_res: 1083 continue 1084 with source_res: 1085 for row in source_res.row_stream: 1086 cells = tuple(row.get(field_name) for field_name in source_key) 1087 if set(cells) == {None}: 1088 continue 1089 lookup[source_name][source_key].add(cells) 1090 self.__lookup = lookup 1091 1092 # Write 1093 1094 def write(self, target=None, **options): 1095 """Write this resource to the target resource 1096 1097 Parameters: 1098 target (any|Resource): target or target resource instance 1099 **options (dict): Resource constructor options 1100 """ 1101 native = isinstance(target, Resource) 1102 target = target.to_copy() if native else Resource(target, **options) 1103 parser = system.create_parser(target) 1104 parser.write_row_stream(self.to_copy()) 1105 return target 1106 1107 # Import/Export 1108 1109 def to_dict(self): 1110 """Create a dict from the resource 1111 1112 Returns 1113 dict: dict representation 1114 """ 1115 # Data can be not serializable (generators/functions) 1116 descriptor = super().to_dict() 1117 data = descriptor.pop("data", None) 1118 if isinstance(data, list): 1119 descriptor["data"] = data 1120 return descriptor 1121 1122 def to_copy(self, **options): 1123 """Create a copy from the resource 1124 1125 Returns 1126 Resource: resource copy 1127 """ 1128 descriptor = self.to_dict() 1129 return Resource( 1130 descriptor, 1131 data=self.data, 1132 basepath=self.__basepath, 1133 detector=self.__detector, 1134 onerror=self.__onerror, 1135 trusted=self.__trusted, 1136 package=self.__package, 1137 **options, 1138 ) 1139 1140 def to_view(self, type="look", **options): 1141 """Create a view from the resource 1142 1143 See PETL's docs for more information: 1144 https://petl.readthedocs.io/en/stable/util.html#visualising-tables 1145 1146 Parameters: 1147 type (look|lookall|see|display|displayall): view's type 1148 **options (dict): options to be passed to PETL 1149 1150 Returns 1151 str: resource's view 1152 """ 1153 assert type in ["look", "lookall", "see", "display", "displayall"] 1154 view = str(getattr(self.to_petl(normalize=True), type)(**options)) 1155 return view 1156 1157 def to_snap(self, *, json=False): 1158 """Create a snapshot from the resource 1159 1160 Parameters: 1161 json (bool): make data types compatible with JSON format 1162 1163 Returns 1164 list: resource's data 1165 """ 1166 snap = [] 1167 with helpers.ensure_open(self): 1168 snap.append(self.header.to_list()) 1169 for row in self.row_stream: 1170 snap.append(row.to_list(json=json)) 1171 return snap 1172 1173 def to_inline(self, *, dialect=None): 1174 """Helper to export resource as an inline data""" 1175 target = self.write(Resource(format="inline", dialect=dialect)) 1176 return target.data 1177 1178 def to_pandas(self, *, dialect=None): 1179 """Helper to export resource as an Pandas dataframe""" 1180 target = self.write(Resource(format="pandas", dialect=dialect)) 1181 return target.data 1182 1183 @staticmethod 1184 def from_petl(view, **options): 1185 """Create a resource from PETL view""" 1186 return Resource(data=view, **options) 1187 1188 def to_petl(self, normalize=False): 1189 """Export resource as a PETL table""" 1190 resource = self.to_copy() 1191 1192 # Define view 1193 class ResourceView(petl.Table): 1194 def __iter__(self): 1195 with resource: 1196 if normalize: 1197 yield resource.schema.field_names 1198 yield from (row.to_list() for row in resource.row_stream) 1199 return 1200 if not resource.header.missing: 1201 yield resource.header.labels 1202 yield from (row.cells for row in resource.row_stream) 1203 1204 return ResourceView() 1205 1206 # Metadata 1207 1208 metadata_duplicate = True 1209 metadata_Error = errors.ResourceError 1210 metadata_profile = deepcopy(settings.RESOURCE_PROFILE) 1211 metadata_profile["properties"]["control"] = {"type": ["string", "object"]} 1212 metadata_profile["properties"]["dialect"] = {"type": ["string", "object"]} 1213 metadata_profile["properties"]["layout"] = {"type": ["string", "object"]} 1214 metadata_profile["properties"]["schema"] = {"type": ["string", "object"]} 1215 1216 def metadata_process(self): 1217 1218 # File 1219 self.__file = system.create_file( 1220 self.get("data", self.get("path", [])), 1221 innerpath=self.get("innerpath"), 1222 basepath=self.__basepath, 1223 ) 1224 1225 # Control 1226 control = self.get("control") 1227 if not isinstance(control, (str, type(None))): 1228 control = system.create_control(self, descriptor=control) 1229 dict.__setitem__(self, "control", control) 1230 1231 # Dialect 1232 dialect = self.get("dialect") 1233 if not isinstance(dialect, (str, type(None))): 1234 dialect = system.create_dialect(self, descriptor=dialect) 1235 dict.__setitem__(self, "dialect", dialect) 1236 1237 # Layout 1238 layout = self.get("layout") 1239 if not isinstance(layout, (str, type(None), Layout)): 1240 layout = Layout(layout) 1241 dict.__setitem__(self, "layout", layout) 1242 1243 # Schema 1244 schema = self.get("schema") 1245 if not isinstance(schema, (str, type(None), Schema)): 1246 schema = Schema(schema) 1247 dict.__setitem__(self, "schema", schema) 1248 1249 # Security 1250 if not self.trusted: 1251 for name in ["path", "control", "dialect", "schema"]: 1252 path = self.get(name) 1253 if not isinstance(path, (str, list)): 1254 continue 1255 path = path if isinstance(path, list) else [path] 1256 if not all(helpers.is_safe_path(chunk) for chunk in path): 1257 note = f'path "{path}" is not safe' 1258 error = errors.ResourceError(note=note) 1259 raise FrictionlessException(error) 1260 1261 def metadata_validate(self): 1262 yield from super().metadata_validate() 1263 1264 # Control/Dialect 1265 yield from self.control.metadata_errors 1266 yield from self.dialect.metadata_errors 1267 1268 # Layout/Schema 1269 if self.layout: 1270 yield from self.layout.metadata_errors 1271 if self.schema: 1272 yield from self.schema.metadata_errors 1273 1274 # Contributors/Sources 1275 for name in ["contributors", "sources"]: 1276 for item in self.get(name, []): 1277 if item.get("email"): 1278 field = Field(type="string", format="email") 1279 cell = field.read_cell(item.get("email"))[0] 1280 if not cell: 1281 note = f'property "{name}[].email" is not valid "email"' 1282 yield errors.PackageError(note=note) 1283