1"""Copyright 2009 Chris Davis
2
3Licensed under the Apache License, Version 2.0 (the "License");
4you may not use this file except in compliance with the License.
5You may obtain a copy of the License at
6
7   http://www.apache.org/licenses/LICENSE-2.0
8
9Unless required by applicable law or agreed to in writing, software
10distributed under the License is distributed on an "AS IS" BASIS,
11WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12See the License for the specific language governing permissions and
13limitations under the License."""
14
15import os
16import time
17
18from os.path import exists, dirname, join, sep
19from carbon.util import PluginRegistrar, TaggedSeries
20from carbon import log
21from six import with_metaclass
22
23
24class TimeSeriesDatabase(with_metaclass(PluginRegistrar, object)):
25  "Abstract base class for Carbon database backends."
26  plugins = {}
27
28  "List of supported aggregation methods for the database."
29  aggregationMethods = []
30
31  def __init__(self, settings):
32    self.graphite_url = settings.GRAPHITE_URL
33
34  def write(self, metric, datapoints):
35    "Persist datapoints in the database for metric."
36    raise NotImplementedError()
37
38  def exists(self, metric):
39    "Return True if the given metric path exists, False otherwise."
40    raise NotImplementedError()
41
42  def create(self, metric, retentions, xfilesfactor, aggregation_method):
43    "Create an entry in the database for metric using options."
44    raise NotImplementedError()
45
46  def getMetadata(self, metric, key):
47    "Lookup metric metadata."
48    raise NotImplementedError()
49
50  def setMetadata(self, metric, key, value):
51    "Modify metric metadata."
52    raise NotImplementedError()
53
54  def getFilesystemPath(self, metric):
55    "Return filesystem path for metric, defaults to None."
56    pass
57
58  def validateArchiveList(self, archiveList):
59    "Validate that the database can handle the given archiveList."
60    pass
61
62  def tag(self, *metrics):
63    from carbon.http import httpRequest
64
65    log.debug("Tagging %s" % ', '.join(metrics), type='tagdb')
66    t = time.time()
67
68    try:
69      httpRequest(
70        self.graphite_url + '/tags/tagMultiSeries',
71        [('path', metric) for metric in metrics]
72      )
73      log.debug("Tagged %s in %s" % (', '.join(metrics), time.time() - t), type='tagdb')
74    except Exception as err:
75      log.msg("Error tagging %s: %s" % (', '.join(metrics), err), type='tagdb')
76
77
78try:
79  import whisper
80except ImportError:
81  pass
82else:
83  class WhisperDatabase(TimeSeriesDatabase):
84    plugin_name = 'whisper'
85    aggregationMethods = whisper.aggregationMethods
86
87    def __init__(self, settings):
88      super(WhisperDatabase, self).__init__(settings)
89
90      self.data_dir = settings.LOCAL_DATA_DIR
91      self.tag_hash_filenames = settings.TAG_HASH_FILENAMES
92      self.sparse_create = settings.WHISPER_SPARSE_CREATE
93      self.fallocate_create = settings.WHISPER_FALLOCATE_CREATE
94      if settings.WHISPER_AUTOFLUSH:
95        log.msg("Enabling Whisper autoflush")
96        whisper.AUTOFLUSH = True
97
98      if settings.WHISPER_FALLOCATE_CREATE:
99        if whisper.CAN_FALLOCATE:
100          log.msg("Enabling Whisper fallocate support")
101        else:
102          log.err("WHISPER_FALLOCATE_CREATE is enabled but linking failed.")
103
104      if settings.WHISPER_LOCK_WRITES:
105        if whisper.CAN_LOCK:
106          log.msg("Enabling Whisper file locking")
107          whisper.LOCK = True
108        else:
109          log.err("WHISPER_LOCK_WRITES is enabled but import of fcntl module failed.")
110
111      if settings.WHISPER_FADVISE_RANDOM:
112        try:
113          if whisper.CAN_FADVISE:
114            log.msg("Enabling Whisper fadvise_random support")
115            whisper.FADVISE_RANDOM = True
116          else:
117            log.err("WHISPER_FADVISE_RANDOM is enabled but import of ftools module failed.")
118        except AttributeError:
119          log.err("WHISPER_FADVISE_RANDOM is enabled but skipped because it is not compatible " +
120                  "with the version of Whisper.")
121
122    def write(self, metric, datapoints):
123      path = self.getFilesystemPath(metric)
124      whisper.update_many(path, datapoints)
125
126    def exists(self, metric):
127      if exists(self.getFilesystemPath(metric)):
128        return True
129      # if we're using hashed filenames and a non-hashed file exists then move it to the new name
130      if self.tag_hash_filenames and exists(self._getFilesystemPath(metric, False)):
131        os.rename(self._getFilesystemPath(metric, False), self.getFilesystemPath(metric))
132        return True
133      return False
134
135    def create(self, metric, retentions, xfilesfactor, aggregation_method):
136      path = self.getFilesystemPath(metric)
137      directory = dirname(path)
138      try:
139        if not exists(directory):
140          os.makedirs(directory)
141      except OSError as e:
142        log.err("%s" % e)
143
144      whisper.create(path, retentions, xfilesfactor, aggregation_method,
145                     self.sparse_create, self.fallocate_create)
146
147    def getMetadata(self, metric, key):
148      if key != 'aggregationMethod':
149        raise ValueError("Unsupported metadata key \"%s\"" % key)
150
151      wsp_path = self.getFilesystemPath(metric)
152      return whisper.info(wsp_path)['aggregationMethod']
153
154    def setMetadata(self, metric, key, value):
155      if key != 'aggregationMethod':
156        raise ValueError("Unsupported metadata key \"%s\"" % key)
157
158      wsp_path = self.getFilesystemPath(metric)
159      return whisper.setAggregationMethod(wsp_path, value)
160
161    def getFilesystemPath(self, metric):
162      return self._getFilesystemPath(metric, self.tag_hash_filenames)
163
164    def _getFilesystemPath(self, metric, tag_hash_filenames):
165      return join(
166        self.data_dir,
167        TaggedSeries.encode(metric, sep, hash_only=tag_hash_filenames) + '.wsp'
168      )
169
170    def validateArchiveList(self, archiveList):
171      try:
172        whisper.validateArchiveList(archiveList)
173      except whisper.InvalidConfiguration as e:
174        raise ValueError("%s" % e)
175
176
177try:
178  import ceres
179except ImportError:
180  pass
181else:
182  class CeresDatabase(TimeSeriesDatabase):
183    plugin_name = 'ceres'
184    aggregationMethods = ['average', 'sum', 'last', 'max', 'min']
185
186    def __init__(self, settings):
187      super(CeresDatabase, self).__init__(settings)
188
189      self.data_dir = settings.LOCAL_DATA_DIR
190      self.tag_hash_filenames = settings.TAG_HASH_FILENAMES
191      ceres.setDefaultNodeCachingBehavior(settings.CERES_NODE_CACHING_BEHAVIOR)
192      ceres.setDefaultSliceCachingBehavior(settings.CERES_SLICE_CACHING_BEHAVIOR)
193      ceres.MAX_SLICE_GAP = int(settings.CERES_MAX_SLICE_GAP)
194
195      if settings.CERES_LOCK_WRITES:
196        if ceres.CAN_LOCK:
197          log.msg("Enabling Ceres file locking")
198          ceres.LOCK_WRITES = True
199        else:
200          log.err("CERES_LOCK_WRITES is enabled but import of fcntl module failed.")
201
202      self.tree = ceres.CeresTree(self.data_dir)
203
204    def encode(self, metric, tag_hash_filenames=None):
205      if tag_hash_filenames is None:
206        tag_hash_filenames = self.tag_hash_filenames
207      return TaggedSeries.encode(metric, hash_only=tag_hash_filenames)
208
209    def write(self, metric, datapoints):
210      self.tree.store(self.encode(metric), datapoints)
211
212    def exists(self, metric):
213      if self.tree.hasNode(self.encode(metric)):
214        return True
215      # if we're using hashed filenames and a non-hashed file exists then move it to the new name
216      if self.tag_hash_filenames and self.tree.hasNode(self.encode(metric, False)):
217        os.rename(self._getFilesystemPath(metric, False), self.getFilesystemPath(metric))
218        return True
219      return False
220
221    def create(self, metric, retentions, xfilesfactor, aggregation_method):
222      self.tree.createNode(self.encode(metric),
223                           retentions=retentions,
224                           timeStep=retentions[0][0],
225                           xFilesFactor=xfilesfactor,
226                           aggregationMethod=aggregation_method)
227
228    def getMetadata(self, metric, key):
229      return self.tree.getNode(self.encode(metric)).readMetadata()[key]
230
231    def setMetadata(self, metric, key, value):
232      node = self.tree.getNode(self.encode(metric))
233      metadata = node.readMetadata()
234      metadata[key] = value
235      node.writeMetadata(metadata)
236
237    def getFilesystemPath(self, metric):
238      return self._getFilesystemPath(metric, self.tag_hash_filenames)
239
240    def _getFilesystemPath(self, metric, tag_hash_filenames):
241      return self.tree.getFilesystemPath(self.encode(metric, tag_hash_filenames))
242