1# -*- coding: utf-8 -*-
2"""ArangoDb result store backend."""
3
4# pylint: disable=W1202,W0703
5
6from __future__ import absolute_import, unicode_literals
7
8import json
9import logging
10from datetime import timedelta
11
12from kombu.utils.encoding import str_t
13from kombu.utils.objects import cached_property
14from kombu.utils.url import _parse_url
15
16from celery.exceptions import ImproperlyConfigured
17
18from .base import KeyValueStoreBackend
19
20try:
21    from pyArango import connection as py_arango_connection
22    from pyArango.theExceptions import AQLQueryError
23except ImportError:
24    py_arango_connection = AQLQueryError = None   # noqa
25
26__all__ = ('ArangoDbBackend',)
27
28
29class ArangoDbBackend(KeyValueStoreBackend):
30    """ArangoDb backend.
31
32    Sample url
33    "arangodb://username:password@host:port/database/collection"
34    *arangodb_backend_settings* is where the settings are present
35    (in the app.conf)
36    Settings should contain the host, port, username, password, database name,
37    collection name else the default will be chosen.
38    Default database name and collection name is celery.
39
40    Raises
41    ------
42    celery.exceptions.ImproperlyConfigured:
43        if module :pypi:`pyArango` is not available.
44
45    """
46
47    host = '127.0.0.1'
48    port = '8529'
49    database = 'celery'
50    collection = 'celery'
51    username = None
52    password = None
53    # protocol is not supported in backend url (http is taken as default)
54    http_protocol = 'http'
55
56    # Use str as arangodb key not bytes
57    key_t = str_t
58
59    def __init__(self, url=None, *args, **kwargs):
60        """Parse the url or load the settings from settings object."""
61        super(ArangoDbBackend, self).__init__(*args, **kwargs)
62
63        if py_arango_connection is None:
64            raise ImproperlyConfigured(
65                'You need to install the pyArango library to use the '
66                'ArangoDb backend.',
67            )
68
69        self.url = url
70
71        if url is None:
72            host = port = database = collection = username = password = None
73        else:
74            (
75                _schema, host, port, username, password,
76                database_collection, _query
77            ) = _parse_url(url)
78            if database_collection is None:
79                database = collection = None
80            else:
81                database, collection = database_collection.split('/')
82
83        config = self.app.conf.get('arangodb_backend_settings', None)
84        if config is not None:
85            if not isinstance(config, dict):
86                raise ImproperlyConfigured(
87                    'ArangoDb backend settings should be grouped in a dict',
88                )
89        else:
90            config = {}
91
92        self.host = host or config.get('host', self.host)
93        self.port = int(port or config.get('port', self.port))
94        self.http_protocol = config.get('http_protocol', self.http_protocol)
95        self.database = database or config.get('database', self.database)
96        self.collection = \
97            collection or config.get('collection', self.collection)
98        self.username = username or config.get('username', self.username)
99        self.password = password or config.get('password', self.password)
100        self.arangodb_url = "{http_protocol}://{host}:{port}".format(
101            http_protocol=self.http_protocol, host=self.host, port=self.port
102        )
103        self._connection = None
104
105    @property
106    def connection(self):
107        """Connect to the arangodb server."""
108        if self._connection is None:
109            self._connection = py_arango_connection.Connection(
110                arangoURL=self.arangodb_url, username=self.username,
111                password=self.password
112            )
113        return self._connection
114
115    @property
116    def db(self):
117        """Database Object to the given database."""
118        return self.connection[self.database]
119
120    @cached_property
121    def expires_delta(self):
122        return timedelta(seconds=self.expires)
123
124    def get(self, key):
125        try:
126            logging.debug(
127                'RETURN DOCUMENT("{collection}/{key}").task'.format(
128                    collection=self.collection, key=key
129                )
130            )
131            query = self.db.AQLQuery(
132                'RETURN DOCUMENT("{collection}/{key}").task'.format(
133                    collection=self.collection, key=key
134                )
135            )
136            result = query.response["result"][0]
137            if result is None:
138                return None
139            return json.dumps(result)
140        except AQLQueryError as aql_err:
141            logging.error(aql_err)
142            return None
143        except Exception as err:
144            logging.error(err)
145            return None
146
147    def set(self, key, value):
148        """Insert a doc with value into task attribute and _key as key."""
149        try:
150            logging.debug(
151                'INSERT {{ task: {task}, _key: "{key}" }} INTO {collection}'
152                .format(
153                    collection=self.collection, key=key, task=value
154                )
155            )
156            self.db.AQLQuery(
157                'INSERT {{ task: {task}, _key: "{key}" }} INTO {collection}'
158                .format(
159                    collection=self.collection, key=key, task=value
160                )
161            )
162        except AQLQueryError as aql_err:
163            logging.error(aql_err)
164        except Exception as err:
165            logging.error(err)
166
167    def mget(self, keys):
168        try:
169            json_keys = json.dumps(keys)
170            logging.debug(
171                """
172                FOR key in {keys}
173                    RETURN DOCUMENT(CONCAT("{collection}/", key).task
174                """.format(
175                    collection=self.collection, keys=json_keys
176                )
177            )
178            query = self.db.AQLQuery(
179                """
180                FOR key in {keys}
181                    RETURN DOCUMENT(CONCAT("{collection}/", key).task
182                """.format(
183                    collection=self.collection, keys=json_keys
184                )
185            )
186            results = []
187            while True:
188                results.extend(query.response['result'])
189                query.nextBatch()
190        except StopIteration:
191            values = [
192                result if result is None else json.dumps(result)
193                for result in results
194            ]
195            return values
196        except AQLQueryError as aql_err:
197            logging.error(aql_err)
198            return [None] * len(keys)
199        except Exception as err:
200            logging.error(err)
201            return [None] * len(keys)
202
203    def delete(self, key):
204        try:
205            logging.debug(
206                'REMOVE {{ _key: "{key}" }} IN {collection}'.format(
207                    key=key, collection=self.collection
208                )
209            )
210            self.db.AQLQuery(
211                'REMOVE {{ _key: "{key}" }} IN {collection}'.format(
212                    key=key, collection=self.collection
213                )
214            )
215        except AQLQueryError as aql_err:
216            logging.error(aql_err)
217        except Exception as err:
218            logging.error(err)
219
220    def cleanup(self):
221        """Delete expired meta-data."""
222        remove_before = (self.app.now() - self.expires_delta).isoformat()
223        try:
224            query = (
225                'FOR item IN {collection} '
226                'FILTER item.task.date_done < "{remove_before}" '
227                'REMOVE item IN {collection}'
228            ).format(collection=self.collection, remove_before=remove_before)
229            logging.debug(query)
230            self.db.AQLQuery(query)
231        except AQLQueryError as aql_err:
232            logging.error(aql_err)
233        except Exception as err:
234            logging.error(err)
235