1import stringcase
2from copy import deepcopy
3from multiprocessing import Pool
4from importlib import import_module
5from .metadata import Metadata
6from .errors import InquiryError
7from .system import system
8from .report import Report
9from . import settings
10from . import helpers
11
12
13class Inquiry(Metadata):
14    """Inquiry representation.
15
16    Parameters:
17        descriptor? (str|dict): descriptor
18
19    Raises:
20        FrictionlessException: raise any error that occurs during the process
21
22    """
23
24    def __init__(self, descriptor=None, *, tasks=None):
25        self.setinitial("tasks", tasks)
26        super().__init__(descriptor)
27
28    @property
29    def tasks(self):
30        """
31        Returns:
32            dict[]: tasks
33        """
34        return self["tasks"]
35
36    # Run
37
38    def run(self, *, parallel=False):
39
40        # Create state
41        reports = []
42        timer = helpers.Timer()
43
44        # Validate inquiry
45        if self.metadata_errors:
46            return Report(time=timer.time, errors=self.metadata_errors, tasks=[])
47
48        # Validate sequentially
49        if not parallel:
50            for task in self.tasks:
51                report = task.run()
52                reports.append(report)
53
54        # Validate in-parallel
55        else:
56            with Pool() as pool:
57                task_descriptors = [task.to_dict() for task in self.tasks]
58                report_descriptors = pool.map(run_task_in_parallel, task_descriptors)
59                for report_descriptor in report_descriptors:
60                    reports.append(Report(report_descriptor))
61
62        # Return report
63        tasks = []
64        errors = []
65        for report in reports:
66            tasks.extend(report.tasks)
67            errors.extend(report.errors)
68        return Report(time=timer.time, errors=errors, tasks=tasks)
69
70    # Metadata
71
72    metadata_Error = InquiryError
73    metadata_profile = deepcopy(settings.INQUIRY_PROFILE)
74    metadata_profile["properties"]["tasks"] = {"type": "array"}
75
76    def metadata_process(self):
77
78        # Tasks
79        tasks = self.get("tasks")
80        if isinstance(tasks, list):
81            for index, task in enumerate(tasks):
82                if not isinstance(task, InquiryTask):
83                    task = InquiryTask(task)
84                    list.__setitem__(tasks, index, task)
85            if not isinstance(tasks, helpers.ControlledList):
86                tasks = helpers.ControlledList(tasks)
87                tasks.__onchange__(self.metadata_process)
88                dict.__setitem__(self, "tasks", tasks)
89
90    def metadata_validate(self):
91        yield from super().metadata_validate()
92
93        # Tasks
94        for task in self.tasks:
95            yield from task.metadata_errors
96
97
98class InquiryTask(Metadata):
99    """Inquiry task representation.
100
101    Parameters:
102        descriptor? (str|dict): descriptor
103
104    Raises:
105        FrictionlessException: raise any error that occurs during the process
106
107    """
108
109    def __init__(self, descriptor=None, *, source=None, type=None, **options):
110        self.setinitial("source", source)
111        self.setinitial("type", type)
112        for key, value in options.items():
113            self.setinitial(stringcase.camelcase(key), value)
114        super().__init__(descriptor)
115
116    @property
117    def source(self):
118        """
119        Returns:
120            any: source
121        """
122        return self["source"]
123
124    @property
125    def type(self):
126        """
127        Returns:
128            string?: type
129        """
130        return self.get("type") or system.create_file(self.source).type
131
132    # Run
133
134    def run(self):
135        validate = import_module("frictionless").validate
136        report = validate(**helpers.create_options(self))
137        return report
138
139    # Metadata
140
141    metadata_Error = InquiryError
142    metadata_profile = settings.INQUIRY_PROFILE["properties"]["tasks"]["items"]
143
144
145# Internal
146
147
148def run_task_in_parallel(task_descriptor):
149    task = InquiryTask(task_descriptor)
150    report = task.run()
151    report_descriptor = report.to_dict()
152    return report_descriptor
153