1# ================================================================= 2# 3# Authors: Tom Kralidis <tomkralidis@gmail.com> 4# 5# Copyright (c) 2020 Tom Kralidis 6# 7# Permission is hereby granted, free of charge, to any person 8# obtaining a copy of this software and associated documentation 9# files (the "Software"), to deal in the Software without 10# restriction, including without limitation the rights to use, 11# copy, modify, merge, publish, distribute, sublicense, and/or sell 12# copies of the Software, and to permit persons to whom the 13# Software is furnished to do so, subject to the following 14# conditions: 15# 16# The above copyright notice and this permission notice shall be 17# included in all copies or substantial portions of the Software. 18# 19# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, 20# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES 21# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND 22# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT 23# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, 24# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 25# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR 26# OTHER DEALINGS IN THE SOFTWARE. 27# 28# ================================================================= 29 30from datetime import datetime 31import io 32import json 33import logging 34from multiprocessing import dummy 35import os 36 37from pygeoapi.util import DATETIME_FORMAT, JobStatus 38 39LOGGER = logging.getLogger(__name__) 40 41 42class BaseManager: 43 """generic Manager ABC""" 44 45 def __init__(self, manager_def): 46 """ 47 Initialize object 48 49 :param manager_def: manager definition 50 51 :returns: `pygeoapi.process.manager.base.BaseManager` 52 """ 53 54 self.name = manager_def['name'] 55 self.is_async = False 56 self.connection = manager_def.get('connection', None) 57 self.output_dir = manager_def.get('output_dir', None) 58 59 def get_jobs(self, process_id=None, status=None): 60 """ 61 Get process jobs, optionally filtered by status 62 63 :param process_id: process identifier 64 :param status: job status (accepted, running, successful, 65 failed, results) (default is all) 66 67 :returns: `list` of jobs (identifier, status, process identifier) 68 """ 69 70 raise NotImplementedError() 71 72 def add_job(self, job_metadata): 73 """ 74 Add a job 75 76 :param job_metadata: `dict` of job metadata 77 78 :returns: `str` added job identifier 79 """ 80 81 raise NotImplementedError() 82 83 def update_job(self, process_id, job_id, update_dict): 84 """ 85 Updates a job 86 87 :param process_id: process identifier 88 :param job_id: job identifier 89 :param update_dict: `dict` of property updates 90 91 :returns: `bool` of status result 92 """ 93 94 raise NotImplementedError() 95 96 def get_job(self, process_id, job_id): 97 """ 98 Get a job (!) 99 100 :param process_id: process identifier 101 :param job_id: job identifier 102 103 :returns: `dict` of job result 104 """ 105 106 raise NotImplementedError() 107 108 def get_job_result(self, process_id, job_id): 109 """ 110 Returns the actual output from a completed process 111 112 :param process_id: process identifier 113 :param job_id: job identifier 114 115 :returns: `tuple` of mimetype and raw output 116 """ 117 118 raise NotImplementedError() 119 120 def delete_job(self, process_id, job_id): 121 """ 122 Deletes a job and associated results/outputs 123 124 :param process_id: process identifier 125 :param job_id: job identifier 126 127 :returns: `bool` of status result 128 """ 129 130 raise NotImplementedError() 131 132 def _execute_handler_async(self, p, job_id, data_dict): 133 """ 134 This private execution handler executes a process in a background 135 thread using `multiprocessing.dummy` 136 137 https://docs.python.org/3/library/multiprocessing.html#module-multiprocessing.dummy # noqa 138 139 :param p: `pygeoapi.process` object 140 :param job_id: job identifier 141 :param data_dict: `dict` of data parameters 142 143 :returns: tuple of None (i.e. initial response payload) 144 and JobStatus.accepted (i.e. initial job status) 145 """ 146 _process = dummy.Process( 147 target=self._execute_handler_sync, 148 args=(p, job_id, data_dict) 149 ) 150 _process.start() 151 return 'application/json', None, JobStatus.accepted 152 153 def _execute_handler_sync(self, p, job_id, data_dict): 154 """ 155 Synchronous execution handler 156 157 If the manager has defined `output_dir`, then the result 158 will be written to disk 159 output store. There is no clean-up of old process outputs. 160 161 :param p: `pygeoapi.process` object 162 :param job_id: job identifier 163 :param data_dict: `dict` of data parameters 164 165 :returns: tuple of MIME type, response payload and status 166 """ 167 168 process_id = p.metadata['id'] 169 current_status = JobStatus.accepted 170 171 job_metadata = { 172 'identifier': job_id, 173 'process_id': process_id, 174 'job_start_datetime': datetime.utcnow().strftime( 175 DATETIME_FORMAT), 176 'job_end_datetime': None, 177 'status': current_status.value, 178 'location': None, 179 'mimetype': None, 180 'message': 'Job accepted and ready for execution', 181 'progress': 5 182 } 183 184 self.add_job(job_metadata) 185 186 try: 187 if self.output_dir is not None: 188 filename = '{}-{}'.format(p.metadata['id'], job_id) 189 job_filename = os.path.join(self.output_dir, filename) 190 else: 191 job_filename = None 192 193 current_status = JobStatus.running 194 jfmt, outputs = p.execute(data_dict) 195 196 self.update_job(process_id, job_id, { 197 'status': current_status.value, 198 'message': 'Writing job output', 199 'progress': 95 200 }) 201 202 if self.output_dir is not None: 203 LOGGER.debug('writing output to {}'.format(job_filename)) 204 with io.open(job_filename, 'w', encoding='utf-8') as fh: 205 fh.write(json.dumps(outputs, sort_keys=True, indent=4)) 206 207 current_status = JobStatus.successful 208 209 job_update_metadata = { 210 'job_end_datetime': datetime.utcnow().strftime( 211 DATETIME_FORMAT), 212 'status': current_status.value, 213 'location': job_filename, 214 'mimetype': jfmt, 215 'message': 'Job complete', 216 'progress': 100 217 } 218 219 self.update_job(process_id, job_id, job_update_metadata) 220 221 except Exception as err: 222 # TODO assess correct exception type and description to help users 223 # NOTE, the /results endpoint should return the error HTTP status 224 # for jobs that failed, ths specification says that failing jobs 225 # must still be able to be retrieved with their error message 226 # intact, and the correct HTTP error status at the /results 227 # endpoint, even if the /result endpoint correctly returns the 228 # failure information (i.e. what one might assume is a 200 229 # response). 230 current_status = JobStatus.failed 231 code = 'InvalidParameterValue' 232 outputs = { 233 'code': code, 234 'description': 'Error updating job' 235 } 236 LOGGER.error(err) 237 job_metadata = { 238 'job_end_datetime': datetime.utcnow().strftime( 239 DATETIME_FORMAT), 240 'status': current_status.value, 241 'location': None, 242 'mimetype': None, 243 'message': f'{code}: {outputs["description"]}' 244 } 245 246 jfmt = 'application/json' 247 248 self.update_job(process_id, job_id, job_metadata) 249 250 return jfmt, outputs, current_status 251 252 def execute_process(self, p, job_id, data_dict, is_async=False): 253 """ 254 Default process execution handler 255 256 :param p: `pygeoapi.process` object 257 :param job_id: job identifier 258 :param data_dict: `dict` of data parameters 259 :param is_async: `bool` specifying sync or async processing. 260 261 :returns: tuple of MIME type, response payload and status 262 """ 263 264 if not is_async: 265 LOGGER.debug('Synchronous execution') 266 return self._execute_handler_sync(p, job_id, data_dict) 267 else: 268 LOGGER.debug('Asynchronous execution') 269 return self._execute_handler_async(p, job_id, data_dict) 270 271 def __repr__(self): 272 return '<BaseManager> {}'.format(self.name) 273