1# =================================================================
2#
3# Authors: Tom Kralidis <tomkralidis@gmail.com>
4#
5# Copyright (c) 2020 Tom Kralidis
6#
7# Permission is hereby granted, free of charge, to any person
8# obtaining a copy of this software and associated documentation
9# files (the "Software"), to deal in the Software without
10# restriction, including without limitation the rights to use,
11# copy, modify, merge, publish, distribute, sublicense, and/or sell
12# copies of the Software, and to permit persons to whom the
13# Software is furnished to do so, subject to the following
14# conditions:
15#
16# The above copyright notice and this permission notice shall be
17# included in all copies or substantial portions of the Software.
18#
19# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
20# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
21# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
22# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
23# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
24# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
25# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
26# OTHER DEALINGS IN THE SOFTWARE.
27#
28# =================================================================
29
30from datetime import datetime
31import io
32import json
33import logging
34from multiprocessing import dummy
35import os
36
37from pygeoapi.util import DATETIME_FORMAT, JobStatus
38
39LOGGER = logging.getLogger(__name__)
40
41
42class BaseManager:
43    """generic Manager ABC"""
44
45    def __init__(self, manager_def):
46        """
47        Initialize object
48
49        :param manager_def: manager definition
50
51        :returns: `pygeoapi.process.manager.base.BaseManager`
52        """
53
54        self.name = manager_def['name']
55        self.is_async = False
56        self.connection = manager_def.get('connection', None)
57        self.output_dir = manager_def.get('output_dir', None)
58
59    def get_jobs(self, process_id=None, status=None):
60        """
61        Get process jobs, optionally filtered by status
62
63        :param process_id: process identifier
64        :param status: job status (accepted, running, successful,
65                       failed, results) (default is all)
66
67        :returns: `list` of jobs (identifier, status, process identifier)
68        """
69
70        raise NotImplementedError()
71
72    def add_job(self, job_metadata):
73        """
74        Add a job
75
76        :param job_metadata: `dict` of job metadata
77
78        :returns: `str` added job identifier
79        """
80
81        raise NotImplementedError()
82
83    def update_job(self, process_id, job_id, update_dict):
84        """
85        Updates a job
86
87        :param process_id: process identifier
88        :param job_id: job identifier
89        :param update_dict: `dict` of property updates
90
91        :returns: `bool` of status result
92        """
93
94        raise NotImplementedError()
95
96    def get_job(self, process_id, job_id):
97        """
98        Get a job (!)
99
100        :param process_id: process identifier
101        :param job_id: job identifier
102
103        :returns: `dict` of job result
104        """
105
106        raise NotImplementedError()
107
108    def get_job_result(self, process_id, job_id):
109        """
110        Returns the actual output from a completed process
111
112        :param process_id: process identifier
113        :param job_id: job identifier
114
115        :returns: `tuple` of mimetype and raw output
116        """
117
118        raise NotImplementedError()
119
120    def delete_job(self, process_id, job_id):
121        """
122        Deletes a job and associated results/outputs
123
124        :param process_id: process identifier
125        :param job_id: job identifier
126
127        :returns: `bool` of status result
128        """
129
130        raise NotImplementedError()
131
132    def _execute_handler_async(self, p, job_id, data_dict):
133        """
134        This private execution handler executes a process in a background
135        thread using `multiprocessing.dummy`
136
137        https://docs.python.org/3/library/multiprocessing.html#module-multiprocessing.dummy  # noqa
138
139        :param p: `pygeoapi.process` object
140        :param job_id: job identifier
141        :param data_dict: `dict` of data parameters
142
143        :returns: tuple of None (i.e. initial response payload)
144                  and JobStatus.accepted (i.e. initial job status)
145        """
146        _process = dummy.Process(
147            target=self._execute_handler_sync,
148            args=(p, job_id, data_dict)
149        )
150        _process.start()
151        return 'application/json', None, JobStatus.accepted
152
153    def _execute_handler_sync(self, p, job_id, data_dict):
154        """
155        Synchronous execution handler
156
157        If the manager has defined `output_dir`, then the result
158        will be written to disk
159        output store. There is no clean-up of old process outputs.
160
161        :param p: `pygeoapi.process` object
162        :param job_id: job identifier
163        :param data_dict: `dict` of data parameters
164
165        :returns: tuple of MIME type, response payload and status
166        """
167
168        process_id = p.metadata['id']
169        current_status = JobStatus.accepted
170
171        job_metadata = {
172            'identifier': job_id,
173            'process_id': process_id,
174            'job_start_datetime': datetime.utcnow().strftime(
175                DATETIME_FORMAT),
176            'job_end_datetime': None,
177            'status': current_status.value,
178            'location': None,
179            'mimetype': None,
180            'message': 'Job accepted and ready for execution',
181            'progress': 5
182        }
183
184        self.add_job(job_metadata)
185
186        try:
187            if self.output_dir is not None:
188                filename = '{}-{}'.format(p.metadata['id'], job_id)
189                job_filename = os.path.join(self.output_dir, filename)
190            else:
191                job_filename = None
192
193            current_status = JobStatus.running
194            jfmt, outputs = p.execute(data_dict)
195
196            self.update_job(process_id, job_id, {
197                'status': current_status.value,
198                'message': 'Writing job output',
199                'progress': 95
200            })
201
202            if self.output_dir is not None:
203                LOGGER.debug('writing output to {}'.format(job_filename))
204                with io.open(job_filename, 'w', encoding='utf-8') as fh:
205                    fh.write(json.dumps(outputs, sort_keys=True, indent=4))
206
207            current_status = JobStatus.successful
208
209            job_update_metadata = {
210                'job_end_datetime': datetime.utcnow().strftime(
211                    DATETIME_FORMAT),
212                'status': current_status.value,
213                'location': job_filename,
214                'mimetype': jfmt,
215                'message': 'Job complete',
216                'progress': 100
217            }
218
219            self.update_job(process_id, job_id, job_update_metadata)
220
221        except Exception as err:
222            # TODO assess correct exception type and description to help users
223            # NOTE, the /results endpoint should return the error HTTP status
224            # for jobs that failed, ths specification says that failing jobs
225            # must still be able to be retrieved with their error message
226            # intact, and the correct HTTP error status at the /results
227            # endpoint, even if the /result endpoint correctly returns the
228            # failure information (i.e. what one might assume is a 200
229            # response).
230            current_status = JobStatus.failed
231            code = 'InvalidParameterValue'
232            outputs = {
233                'code': code,
234                'description': 'Error updating job'
235            }
236            LOGGER.error(err)
237            job_metadata = {
238                'job_end_datetime': datetime.utcnow().strftime(
239                    DATETIME_FORMAT),
240                'status': current_status.value,
241                'location': None,
242                'mimetype': None,
243                'message': f'{code}: {outputs["description"]}'
244            }
245
246            jfmt = 'application/json'
247
248            self.update_job(process_id, job_id, job_metadata)
249
250        return jfmt, outputs, current_status
251
252    def execute_process(self, p, job_id, data_dict, is_async=False):
253        """
254        Default process execution handler
255
256        :param p: `pygeoapi.process` object
257        :param job_id: job identifier
258        :param data_dict: `dict` of data parameters
259        :param is_async: `bool` specifying sync or async processing.
260
261        :returns: tuple of MIME type, response payload and status
262        """
263
264        if not is_async:
265            LOGGER.debug('Synchronous execution')
266            return self._execute_handler_sync(p, job_id, data_dict)
267        else:
268            LOGGER.debug('Asynchronous execution')
269            return self._execute_handler_async(p, job_id, data_dict)
270
271    def __repr__(self):
272        return '<BaseManager> {}'.format(self.name)
273