1# ================================================================= 2# 3# Authors: Tom Kralidis <tomkralidis@gmail.com> 4# 5# Copyright (c) 2020 Tom Kralidis 6# 7# Permission is hereby granted, free of charge, to any person 8# obtaining a copy of this software and associated documentation 9# files (the "Software"), to deal in the Software without 10# restriction, including without limitation the rights to use, 11# copy, modify, merge, publish, distribute, sublicense, and/or sell 12# copies of the Software, and to permit persons to whom the 13# Software is furnished to do so, subject to the following 14# conditions: 15# 16# The above copyright notice and this permission notice shall be 17# included in all copies or substantial portions of the Software. 18# 19# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, 20# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES 21# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND 22# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT 23# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, 24# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 25# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR 26# OTHER DEALINGS IN THE SOFTWARE. 27# 28# ================================================================= 29 30import io 31import json 32import logging 33import os 34 35import tinydb 36 37from pygeoapi.process.manager.base import BaseManager 38from pygeoapi.util import JobStatus 39 40LOGGER = logging.getLogger(__name__) 41 42 43class TinyDBManager(BaseManager): 44 """TinyDB Manager""" 45 46 def __init__(self, manager_def): 47 """ 48 Initialize object 49 50 :param manager_def: manager definition 51 52 :returns: `pygeoapi.process.manager.base.BaseManager` 53 """ 54 55 super().__init__(manager_def) 56 self.is_async = True 57 58 def _connect(self): 59 60 """ 61 connect to manager 62 63 :returns: `bool` of status of result 64 """ 65 66 self.db = tinydb.TinyDB(self.connection) 67 return True 68 69 def destroy(self): 70 """ 71 Destroy manager 72 73 :returns: `bool` status of result 74 """ 75 76 self.db.purge() 77 self.db.close() 78 return True 79 80 def get_jobs(self, process_id=None, status=None): 81 """ 82 Get jobs 83 84 :param process_id: process identifier 85 :param status: job status (accepted, running, successful, 86 failed, results) (default is all) 87 88 :returns: 'list` of jobs (identifier, status, process identifier) 89 """ 90 91 self._connect() 92 if process_id is None: 93 jobs_list = [doc.doc_id for doc in self.db.all()] 94 else: 95 query = tinydb.Query() 96 jobs_list = self.db.search(query.process_id == process_id) 97 98 self.db.close() 99 100 return jobs_list 101 102 def add_job(self, job_metadata): 103 """ 104 Add a job 105 106 :param job_metadata: `dict` of job metadata 107 108 :returns: identifier of added job 109 """ 110 111 self._connect() 112 doc_id = self.db.insert(job_metadata) 113 self.db.close() 114 115 return doc_id 116 117 def update_job(self, process_id, job_id, update_dict): 118 """ 119 Updates a job 120 121 :param process_id: process identifier 122 :param job_id: job identifier 123 :param update_dict: `dict` of property updates 124 125 :returns: `bool` of status result 126 """ 127 128 self._connect() 129 self.db.update(update_dict, tinydb.where('identifier') == job_id) 130 self.db.close() 131 132 return True 133 134 def delete_job(self, process_id, job_id): 135 """ 136 Deletes a job 137 138 :param process_id: process identifier 139 :param job_id: job identifier 140 141 :return `bool` of status result 142 """ 143 # delete result file if present 144 job_result = self.get_job(process_id, job_id) 145 if job_result: 146 location = job_result.get('location', None) 147 if location and self.output_dir is not None: 148 os.remove(location) 149 150 self._connect() 151 removed = bool(self.db.remove(tinydb.where('identifier') == job_id)) 152 self.db.close() 153 154 return removed 155 156 def get_job(self, process_id, job_id): 157 """ 158 Get a single job 159 160 :param process_id: process identifier 161 :param jobid: job identifier 162 163 :returns: `dict` # `pygeoapi.process.manager.Job` 164 """ 165 166 self._connect() 167 query = tinydb.Query() 168 result = self.db.search(( 169 query.process_id == process_id) & (query.identifier == job_id)) 170 171 result = result[0] if result else None 172 self.db.close() 173 return result 174 175 def get_job_result(self, process_id, job_id): 176 """ 177 Get a job's status, and actual output of executing the process 178 179 :param process_id: process identifier 180 :param jobid: job identifier 181 182 :returns: `tuple` of mimetype and raw output 183 """ 184 185 job_result = self.get_job(process_id, job_id) 186 if not job_result: 187 # processs/job does not exist 188 return None 189 190 location = job_result.get('location', None) 191 mimetype = job_result.get('mimetype', None) 192 job_status = JobStatus[job_result['status']] 193 194 if not job_status == JobStatus.successful: 195 # Job is incomplete 196 return (None,) 197 if not location: 198 # Job data was not written for some reason 199 # TODO log/raise exception? 200 return (None,) 201 202 with io.open(location, 'r', encoding='utf-8') as filehandler: 203 result = json.load(filehandler) 204 205 return mimetype, result 206 207 def __repr__(self): 208 return '<TinyDBManager> {}'.format(self.name) 209