1from copy import deepcopy
2from multiprocessing import Pool
3from importlib import import_module
4from .errors import PipelineError, TaskError
5from .status import Status, StatusTask
6from .metadata import Metadata
7from .resource import Resource
8from .package import Package
9from . import settings
10from . import helpers
11
12
13class Pipeline(Metadata):
14    """Pipeline representation.
15
16    Parameters:
17        descriptor? (str|dict): pipeline descriptor
18
19    Raises:
20        FrictionlessException: raise any error that occurs during the process
21
22    """
23
24    def __init__(self, descriptor, tasks=None):
25        self.setinitial("tasks", tasks)
26        super().__init__(descriptor)
27
28    @property
29    def tasks(self):
30        """
31        Returns:
32            dict[]: tasks
33        """
34        tasks = self.get("tasks", [])
35        return self.metadata_attach("tasks", tasks)
36
37    # Run
38
39    def run(self, *, parallel=False):
40        """Run the pipeline"""
41
42        # Create state
43        statuses = []
44        timer = helpers.Timer()
45
46        # Validate pipeline
47        if self.metadata_errors:
48            return Status(time=timer.time, errors=self.metadata_errors, tasks=[])
49
50        # Transform sequentially
51        if not parallel:
52            for task in self.tasks:
53                status = task.run()
54                statuses.append(status)
55
56        # Transform in-parallel
57        else:
58            with Pool() as pool:
59                task_descriptors = [task.to_dict() for task in self.tasks]
60                status_descriptors = pool.map(run_task_in_parallel, task_descriptors)
61                for status_descriptor in status_descriptors:
62                    statuses.append(Status(status_descriptor))
63
64        # Return status
65        tasks = []
66        errors = []
67        for status in statuses:
68            tasks.extend(status["tasks"])
69            errors.extend(status["errors"])
70        return Status(time=timer.time, errors=[], tasks=tasks)
71
72    # Metadata
73
74    metadata_Error = PipelineError
75    metadata_profile = deepcopy(settings.PIPELINE_PROFILE)
76    metadata_profile["properties"]["tasks"] = {"type": "array"}
77
78    def metadata_process(self):
79
80        # Tasks
81        tasks = self.get("tasks")
82        if isinstance(tasks, list):
83            for index, task in enumerate(tasks):
84                if not isinstance(task, PipelineTask):
85                    task = PipelineTask(task)
86                    list.__setitem__(tasks, index, task)
87            if not isinstance(tasks, helpers.ControlledList):
88                tasks = helpers.ControlledList(tasks)
89                tasks.__onchange__(self.metadata_process)
90                dict.__setitem__(self, "tasks", tasks)
91
92    def metadata_validate(self):
93        yield from super().metadata_validate()
94
95        # Tasks
96        for task in self.tasks:
97            yield from task.metadata_errors
98
99
100class PipelineTask(Metadata):
101    """Pipeline task representation.
102
103    Parameters:
104        descriptor? (str|dict): pipeline task descriptor
105
106    Raises:
107        FrictionlessException: raise any error that occurs during the process
108
109    """
110
111    def __init__(self, descriptor=None, *, source=None, type=None, steps=None):
112        self.setinitial("source", source)
113        self.setinitial("type", type)
114        self.setinitial("steps", steps)
115        super().__init__(descriptor)
116
117    @property
118    def source(self):
119        return self["source"]
120
121    @property
122    def type(self):
123        return self["type"]
124
125    @property
126    def steps(self):
127        return self["steps"]
128
129    # Run
130
131    def run(self):
132        """Run the task"""
133        errors = []
134        target = None
135        timer = helpers.Timer()
136        try:
137            transform = import_module("frictionless").transform
138            target = transform(self.source, type=self.type, steps=self.steps)
139        except Exception as exception:
140            errors.append(TaskError(note=str(exception)))
141        task = StatusTask(time=timer.time, errors=errors, target=target, type=self.type)
142        return Status(tasks=[task], time=timer.time, errors=[])
143
144    # Metadata
145
146    metadata_Error = PipelineError
147    metadata_profile = settings.PIPELINE_PROFILE["properties"]["tasks"]["items"]
148
149    def metadata_process(self):
150
151        # Source
152        source = self.get("source")
153        if not isinstance(source, Metadata):
154            source = Resource(source) if self.type == "resource" else Package(source)
155            dict.__setitem__(self, "source", source)
156
157
158# Internal
159
160
161def run_task_in_parallel(task_descriptor):
162    task = PipelineTask(task_descriptor)
163    status = task.run()
164    status_descriptor = status.to_dict()
165    return status_descriptor
166