1from copy import deepcopy 2from multiprocessing import Pool 3from importlib import import_module 4from .errors import PipelineError, TaskError 5from .status import Status, StatusTask 6from .metadata import Metadata 7from .resource import Resource 8from .package import Package 9from . import settings 10from . import helpers 11 12 13class Pipeline(Metadata): 14 """Pipeline representation. 15 16 Parameters: 17 descriptor? (str|dict): pipeline descriptor 18 19 Raises: 20 FrictionlessException: raise any error that occurs during the process 21 22 """ 23 24 def __init__(self, descriptor, tasks=None): 25 self.setinitial("tasks", tasks) 26 super().__init__(descriptor) 27 28 @property 29 def tasks(self): 30 """ 31 Returns: 32 dict[]: tasks 33 """ 34 tasks = self.get("tasks", []) 35 return self.metadata_attach("tasks", tasks) 36 37 # Run 38 39 def run(self, *, parallel=False): 40 """Run the pipeline""" 41 42 # Create state 43 statuses = [] 44 timer = helpers.Timer() 45 46 # Validate pipeline 47 if self.metadata_errors: 48 return Status(time=timer.time, errors=self.metadata_errors, tasks=[]) 49 50 # Transform sequentially 51 if not parallel: 52 for task in self.tasks: 53 status = task.run() 54 statuses.append(status) 55 56 # Transform in-parallel 57 else: 58 with Pool() as pool: 59 task_descriptors = [task.to_dict() for task in self.tasks] 60 status_descriptors = pool.map(run_task_in_parallel, task_descriptors) 61 for status_descriptor in status_descriptors: 62 statuses.append(Status(status_descriptor)) 63 64 # Return status 65 tasks = [] 66 errors = [] 67 for status in statuses: 68 tasks.extend(status["tasks"]) 69 errors.extend(status["errors"]) 70 return Status(time=timer.time, errors=[], tasks=tasks) 71 72 # Metadata 73 74 metadata_Error = PipelineError 75 metadata_profile = deepcopy(settings.PIPELINE_PROFILE) 76 metadata_profile["properties"]["tasks"] = {"type": "array"} 77 78 def metadata_process(self): 79 80 # Tasks 81 tasks = self.get("tasks") 82 if isinstance(tasks, list): 83 for index, task in enumerate(tasks): 84 if not isinstance(task, PipelineTask): 85 task = PipelineTask(task) 86 list.__setitem__(tasks, index, task) 87 if not isinstance(tasks, helpers.ControlledList): 88 tasks = helpers.ControlledList(tasks) 89 tasks.__onchange__(self.metadata_process) 90 dict.__setitem__(self, "tasks", tasks) 91 92 def metadata_validate(self): 93 yield from super().metadata_validate() 94 95 # Tasks 96 for task in self.tasks: 97 yield from task.metadata_errors 98 99 100class PipelineTask(Metadata): 101 """Pipeline task representation. 102 103 Parameters: 104 descriptor? (str|dict): pipeline task descriptor 105 106 Raises: 107 FrictionlessException: raise any error that occurs during the process 108 109 """ 110 111 def __init__(self, descriptor=None, *, source=None, type=None, steps=None): 112 self.setinitial("source", source) 113 self.setinitial("type", type) 114 self.setinitial("steps", steps) 115 super().__init__(descriptor) 116 117 @property 118 def source(self): 119 return self["source"] 120 121 @property 122 def type(self): 123 return self["type"] 124 125 @property 126 def steps(self): 127 return self["steps"] 128 129 # Run 130 131 def run(self): 132 """Run the task""" 133 errors = [] 134 target = None 135 timer = helpers.Timer() 136 try: 137 transform = import_module("frictionless").transform 138 target = transform(self.source, type=self.type, steps=self.steps) 139 except Exception as exception: 140 errors.append(TaskError(note=str(exception))) 141 task = StatusTask(time=timer.time, errors=errors, target=target, type=self.type) 142 return Status(tasks=[task], time=timer.time, errors=[]) 143 144 # Metadata 145 146 metadata_Error = PipelineError 147 metadata_profile = settings.PIPELINE_PROFILE["properties"]["tasks"]["items"] 148 149 def metadata_process(self): 150 151 # Source 152 source = self.get("source") 153 if not isinstance(source, Metadata): 154 source = Resource(source) if self.type == "resource" else Package(source) 155 dict.__setitem__(self, "source", source) 156 157 158# Internal 159 160 161def run_task_in_parallel(task_descriptor): 162 task = PipelineTask(task_descriptor) 163 status = task.run() 164 status_descriptor = status.to_dict() 165 return status_descriptor 166