1import stringcase 2from copy import deepcopy 3from multiprocessing import Pool 4from importlib import import_module 5from .metadata import Metadata 6from .errors import InquiryError 7from .system import system 8from .report import Report 9from . import settings 10from . import helpers 11 12 13class Inquiry(Metadata): 14 """Inquiry representation. 15 16 Parameters: 17 descriptor? (str|dict): descriptor 18 19 Raises: 20 FrictionlessException: raise any error that occurs during the process 21 22 """ 23 24 def __init__(self, descriptor=None, *, tasks=None): 25 self.setinitial("tasks", tasks) 26 super().__init__(descriptor) 27 28 @property 29 def tasks(self): 30 """ 31 Returns: 32 dict[]: tasks 33 """ 34 return self["tasks"] 35 36 # Run 37 38 def run(self, *, parallel=False): 39 40 # Create state 41 reports = [] 42 timer = helpers.Timer() 43 44 # Validate inquiry 45 if self.metadata_errors: 46 return Report(time=timer.time, errors=self.metadata_errors, tasks=[]) 47 48 # Validate sequentially 49 if not parallel: 50 for task in self.tasks: 51 report = task.run() 52 reports.append(report) 53 54 # Validate in-parallel 55 else: 56 with Pool() as pool: 57 task_descriptors = [task.to_dict() for task in self.tasks] 58 report_descriptors = pool.map(run_task_in_parallel, task_descriptors) 59 for report_descriptor in report_descriptors: 60 reports.append(Report(report_descriptor)) 61 62 # Return report 63 tasks = [] 64 errors = [] 65 for report in reports: 66 tasks.extend(report.tasks) 67 errors.extend(report.errors) 68 return Report(time=timer.time, errors=errors, tasks=tasks) 69 70 # Metadata 71 72 metadata_Error = InquiryError 73 metadata_profile = deepcopy(settings.INQUIRY_PROFILE) 74 metadata_profile["properties"]["tasks"] = {"type": "array"} 75 76 def metadata_process(self): 77 78 # Tasks 79 tasks = self.get("tasks") 80 if isinstance(tasks, list): 81 for index, task in enumerate(tasks): 82 if not isinstance(task, InquiryTask): 83 task = InquiryTask(task) 84 list.__setitem__(tasks, index, task) 85 if not isinstance(tasks, helpers.ControlledList): 86 tasks = helpers.ControlledList(tasks) 87 tasks.__onchange__(self.metadata_process) 88 dict.__setitem__(self, "tasks", tasks) 89 90 def metadata_validate(self): 91 yield from super().metadata_validate() 92 93 # Tasks 94 for task in self.tasks: 95 yield from task.metadata_errors 96 97 98class InquiryTask(Metadata): 99 """Inquiry task representation. 100 101 Parameters: 102 descriptor? (str|dict): descriptor 103 104 Raises: 105 FrictionlessException: raise any error that occurs during the process 106 107 """ 108 109 def __init__(self, descriptor=None, *, source=None, type=None, **options): 110 self.setinitial("source", source) 111 self.setinitial("type", type) 112 for key, value in options.items(): 113 self.setinitial(stringcase.camelcase(key), value) 114 super().__init__(descriptor) 115 116 @property 117 def source(self): 118 """ 119 Returns: 120 any: source 121 """ 122 return self["source"] 123 124 @property 125 def type(self): 126 """ 127 Returns: 128 string?: type 129 """ 130 return self.get("type") or system.create_file(self.source).type 131 132 # Run 133 134 def run(self): 135 validate = import_module("frictionless").validate 136 report = validate(**helpers.create_options(self)) 137 return report 138 139 # Metadata 140 141 metadata_Error = InquiryError 142 metadata_profile = settings.INQUIRY_PROFILE["properties"]["tasks"]["items"] 143 144 145# Internal 146 147 148def run_task_in_parallel(task_descriptor): 149 task = InquiryTask(task_descriptor) 150 report = task.run() 151 report_descriptor = report.to_dict() 152 return report_descriptor 153