1# -*- coding: utf-8 -*- 2"""ArangoDb result store backend.""" 3 4# pylint: disable=W1202,W0703 5 6from __future__ import absolute_import, unicode_literals 7 8import json 9import logging 10from datetime import timedelta 11 12from kombu.utils.encoding import str_t 13from kombu.utils.objects import cached_property 14from kombu.utils.url import _parse_url 15 16from celery.exceptions import ImproperlyConfigured 17 18from .base import KeyValueStoreBackend 19 20try: 21 from pyArango import connection as py_arango_connection 22 from pyArango.theExceptions import AQLQueryError 23except ImportError: 24 py_arango_connection = AQLQueryError = None # noqa 25 26__all__ = ('ArangoDbBackend',) 27 28 29class ArangoDbBackend(KeyValueStoreBackend): 30 """ArangoDb backend. 31 32 Sample url 33 "arangodb://username:password@host:port/database/collection" 34 *arangodb_backend_settings* is where the settings are present 35 (in the app.conf) 36 Settings should contain the host, port, username, password, database name, 37 collection name else the default will be chosen. 38 Default database name and collection name is celery. 39 40 Raises 41 ------ 42 celery.exceptions.ImproperlyConfigured: 43 if module :pypi:`pyArango` is not available. 44 45 """ 46 47 host = '127.0.0.1' 48 port = '8529' 49 database = 'celery' 50 collection = 'celery' 51 username = None 52 password = None 53 # protocol is not supported in backend url (http is taken as default) 54 http_protocol = 'http' 55 56 # Use str as arangodb key not bytes 57 key_t = str_t 58 59 def __init__(self, url=None, *args, **kwargs): 60 """Parse the url or load the settings from settings object.""" 61 super(ArangoDbBackend, self).__init__(*args, **kwargs) 62 63 if py_arango_connection is None: 64 raise ImproperlyConfigured( 65 'You need to install the pyArango library to use the ' 66 'ArangoDb backend.', 67 ) 68 69 self.url = url 70 71 if url is None: 72 host = port = database = collection = username = password = None 73 else: 74 ( 75 _schema, host, port, username, password, 76 database_collection, _query 77 ) = _parse_url(url) 78 if database_collection is None: 79 database = collection = None 80 else: 81 database, collection = database_collection.split('/') 82 83 config = self.app.conf.get('arangodb_backend_settings', None) 84 if config is not None: 85 if not isinstance(config, dict): 86 raise ImproperlyConfigured( 87 'ArangoDb backend settings should be grouped in a dict', 88 ) 89 else: 90 config = {} 91 92 self.host = host or config.get('host', self.host) 93 self.port = int(port or config.get('port', self.port)) 94 self.http_protocol = config.get('http_protocol', self.http_protocol) 95 self.database = database or config.get('database', self.database) 96 self.collection = \ 97 collection or config.get('collection', self.collection) 98 self.username = username or config.get('username', self.username) 99 self.password = password or config.get('password', self.password) 100 self.arangodb_url = "{http_protocol}://{host}:{port}".format( 101 http_protocol=self.http_protocol, host=self.host, port=self.port 102 ) 103 self._connection = None 104 105 @property 106 def connection(self): 107 """Connect to the arangodb server.""" 108 if self._connection is None: 109 self._connection = py_arango_connection.Connection( 110 arangoURL=self.arangodb_url, username=self.username, 111 password=self.password 112 ) 113 return self._connection 114 115 @property 116 def db(self): 117 """Database Object to the given database.""" 118 return self.connection[self.database] 119 120 @cached_property 121 def expires_delta(self): 122 return timedelta(seconds=self.expires) 123 124 def get(self, key): 125 try: 126 logging.debug( 127 'RETURN DOCUMENT("{collection}/{key}").task'.format( 128 collection=self.collection, key=key 129 ) 130 ) 131 query = self.db.AQLQuery( 132 'RETURN DOCUMENT("{collection}/{key}").task'.format( 133 collection=self.collection, key=key 134 ) 135 ) 136 result = query.response["result"][0] 137 if result is None: 138 return None 139 return json.dumps(result) 140 except AQLQueryError as aql_err: 141 logging.error(aql_err) 142 return None 143 except Exception as err: 144 logging.error(err) 145 return None 146 147 def set(self, key, value): 148 """Insert a doc with value into task attribute and _key as key.""" 149 try: 150 logging.debug( 151 'INSERT {{ task: {task}, _key: "{key}" }} INTO {collection}' 152 .format( 153 collection=self.collection, key=key, task=value 154 ) 155 ) 156 self.db.AQLQuery( 157 'INSERT {{ task: {task}, _key: "{key}" }} INTO {collection}' 158 .format( 159 collection=self.collection, key=key, task=value 160 ) 161 ) 162 except AQLQueryError as aql_err: 163 logging.error(aql_err) 164 except Exception as err: 165 logging.error(err) 166 167 def mget(self, keys): 168 try: 169 json_keys = json.dumps(keys) 170 logging.debug( 171 """ 172 FOR key in {keys} 173 RETURN DOCUMENT(CONCAT("{collection}/", key).task 174 """.format( 175 collection=self.collection, keys=json_keys 176 ) 177 ) 178 query = self.db.AQLQuery( 179 """ 180 FOR key in {keys} 181 RETURN DOCUMENT(CONCAT("{collection}/", key).task 182 """.format( 183 collection=self.collection, keys=json_keys 184 ) 185 ) 186 results = [] 187 while True: 188 results.extend(query.response['result']) 189 query.nextBatch() 190 except StopIteration: 191 values = [ 192 result if result is None else json.dumps(result) 193 for result in results 194 ] 195 return values 196 except AQLQueryError as aql_err: 197 logging.error(aql_err) 198 return [None] * len(keys) 199 except Exception as err: 200 logging.error(err) 201 return [None] * len(keys) 202 203 def delete(self, key): 204 try: 205 logging.debug( 206 'REMOVE {{ _key: "{key}" }} IN {collection}'.format( 207 key=key, collection=self.collection 208 ) 209 ) 210 self.db.AQLQuery( 211 'REMOVE {{ _key: "{key}" }} IN {collection}'.format( 212 key=key, collection=self.collection 213 ) 214 ) 215 except AQLQueryError as aql_err: 216 logging.error(aql_err) 217 except Exception as err: 218 logging.error(err) 219 220 def cleanup(self): 221 """Delete expired meta-data.""" 222 remove_before = (self.app.now() - self.expires_delta).isoformat() 223 try: 224 query = ( 225 'FOR item IN {collection} ' 226 'FILTER item.task.date_done < "{remove_before}" ' 227 'REMOVE item IN {collection}' 228 ).format(collection=self.collection, remove_before=remove_before) 229 logging.debug(query) 230 self.db.AQLQuery(query) 231 except AQLQueryError as aql_err: 232 logging.error(aql_err) 233 except Exception as err: 234 logging.error(err) 235