1"""Copyright 2009 Chris Davis 2 3Licensed under the Apache License, Version 2.0 (the "License"); 4you may not use this file except in compliance with the License. 5You may obtain a copy of the License at 6 7 http://www.apache.org/licenses/LICENSE-2.0 8 9Unless required by applicable law or agreed to in writing, software 10distributed under the License is distributed on an "AS IS" BASIS, 11WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12See the License for the specific language governing permissions and 13limitations under the License.""" 14 15import os 16import time 17 18from os.path import exists, dirname, join, sep 19from carbon.util import PluginRegistrar, TaggedSeries 20from carbon import log 21from six import with_metaclass 22 23 24class TimeSeriesDatabase(with_metaclass(PluginRegistrar, object)): 25 "Abstract base class for Carbon database backends." 26 plugins = {} 27 28 "List of supported aggregation methods for the database." 29 aggregationMethods = [] 30 31 def __init__(self, settings): 32 self.graphite_url = settings.GRAPHITE_URL 33 34 def write(self, metric, datapoints): 35 "Persist datapoints in the database for metric." 36 raise NotImplementedError() 37 38 def exists(self, metric): 39 "Return True if the given metric path exists, False otherwise." 40 raise NotImplementedError() 41 42 def create(self, metric, retentions, xfilesfactor, aggregation_method): 43 "Create an entry in the database for metric using options." 44 raise NotImplementedError() 45 46 def getMetadata(self, metric, key): 47 "Lookup metric metadata." 48 raise NotImplementedError() 49 50 def setMetadata(self, metric, key, value): 51 "Modify metric metadata." 52 raise NotImplementedError() 53 54 def getFilesystemPath(self, metric): 55 "Return filesystem path for metric, defaults to None." 56 pass 57 58 def validateArchiveList(self, archiveList): 59 "Validate that the database can handle the given archiveList." 60 pass 61 62 def tag(self, *metrics): 63 from carbon.http import httpRequest 64 65 log.debug("Tagging %s" % ', '.join(metrics), type='tagdb') 66 t = time.time() 67 68 try: 69 httpRequest( 70 self.graphite_url + '/tags/tagMultiSeries', 71 [('path', metric) for metric in metrics] 72 ) 73 log.debug("Tagged %s in %s" % (', '.join(metrics), time.time() - t), type='tagdb') 74 except Exception as err: 75 log.msg("Error tagging %s: %s" % (', '.join(metrics), err), type='tagdb') 76 77 78try: 79 import whisper 80except ImportError: 81 pass 82else: 83 class WhisperDatabase(TimeSeriesDatabase): 84 plugin_name = 'whisper' 85 aggregationMethods = whisper.aggregationMethods 86 87 def __init__(self, settings): 88 super(WhisperDatabase, self).__init__(settings) 89 90 self.data_dir = settings.LOCAL_DATA_DIR 91 self.tag_hash_filenames = settings.TAG_HASH_FILENAMES 92 self.sparse_create = settings.WHISPER_SPARSE_CREATE 93 self.fallocate_create = settings.WHISPER_FALLOCATE_CREATE 94 if settings.WHISPER_AUTOFLUSH: 95 log.msg("Enabling Whisper autoflush") 96 whisper.AUTOFLUSH = True 97 98 if settings.WHISPER_FALLOCATE_CREATE: 99 if whisper.CAN_FALLOCATE: 100 log.msg("Enabling Whisper fallocate support") 101 else: 102 log.err("WHISPER_FALLOCATE_CREATE is enabled but linking failed.") 103 104 if settings.WHISPER_LOCK_WRITES: 105 if whisper.CAN_LOCK: 106 log.msg("Enabling Whisper file locking") 107 whisper.LOCK = True 108 else: 109 log.err("WHISPER_LOCK_WRITES is enabled but import of fcntl module failed.") 110 111 if settings.WHISPER_FADVISE_RANDOM: 112 try: 113 if whisper.CAN_FADVISE: 114 log.msg("Enabling Whisper fadvise_random support") 115 whisper.FADVISE_RANDOM = True 116 else: 117 log.err("WHISPER_FADVISE_RANDOM is enabled but import of ftools module failed.") 118 except AttributeError: 119 log.err("WHISPER_FADVISE_RANDOM is enabled but skipped because it is not compatible " + 120 "with the version of Whisper.") 121 122 def write(self, metric, datapoints): 123 path = self.getFilesystemPath(metric) 124 whisper.update_many(path, datapoints) 125 126 def exists(self, metric): 127 if exists(self.getFilesystemPath(metric)): 128 return True 129 # if we're using hashed filenames and a non-hashed file exists then move it to the new name 130 if self.tag_hash_filenames and exists(self._getFilesystemPath(metric, False)): 131 os.rename(self._getFilesystemPath(metric, False), self.getFilesystemPath(metric)) 132 return True 133 return False 134 135 def create(self, metric, retentions, xfilesfactor, aggregation_method): 136 path = self.getFilesystemPath(metric) 137 directory = dirname(path) 138 try: 139 if not exists(directory): 140 os.makedirs(directory) 141 except OSError as e: 142 log.err("%s" % e) 143 144 whisper.create(path, retentions, xfilesfactor, aggregation_method, 145 self.sparse_create, self.fallocate_create) 146 147 def getMetadata(self, metric, key): 148 if key != 'aggregationMethod': 149 raise ValueError("Unsupported metadata key \"%s\"" % key) 150 151 wsp_path = self.getFilesystemPath(metric) 152 return whisper.info(wsp_path)['aggregationMethod'] 153 154 def setMetadata(self, metric, key, value): 155 if key != 'aggregationMethod': 156 raise ValueError("Unsupported metadata key \"%s\"" % key) 157 158 wsp_path = self.getFilesystemPath(metric) 159 return whisper.setAggregationMethod(wsp_path, value) 160 161 def getFilesystemPath(self, metric): 162 return self._getFilesystemPath(metric, self.tag_hash_filenames) 163 164 def _getFilesystemPath(self, metric, tag_hash_filenames): 165 return join( 166 self.data_dir, 167 TaggedSeries.encode(metric, sep, hash_only=tag_hash_filenames) + '.wsp' 168 ) 169 170 def validateArchiveList(self, archiveList): 171 try: 172 whisper.validateArchiveList(archiveList) 173 except whisper.InvalidConfiguration as e: 174 raise ValueError("%s" % e) 175 176 177try: 178 import ceres 179except ImportError: 180 pass 181else: 182 class CeresDatabase(TimeSeriesDatabase): 183 plugin_name = 'ceres' 184 aggregationMethods = ['average', 'sum', 'last', 'max', 'min'] 185 186 def __init__(self, settings): 187 super(CeresDatabase, self).__init__(settings) 188 189 self.data_dir = settings.LOCAL_DATA_DIR 190 self.tag_hash_filenames = settings.TAG_HASH_FILENAMES 191 ceres.setDefaultNodeCachingBehavior(settings.CERES_NODE_CACHING_BEHAVIOR) 192 ceres.setDefaultSliceCachingBehavior(settings.CERES_SLICE_CACHING_BEHAVIOR) 193 ceres.MAX_SLICE_GAP = int(settings.CERES_MAX_SLICE_GAP) 194 195 if settings.CERES_LOCK_WRITES: 196 if ceres.CAN_LOCK: 197 log.msg("Enabling Ceres file locking") 198 ceres.LOCK_WRITES = True 199 else: 200 log.err("CERES_LOCK_WRITES is enabled but import of fcntl module failed.") 201 202 self.tree = ceres.CeresTree(self.data_dir) 203 204 def encode(self, metric, tag_hash_filenames=None): 205 if tag_hash_filenames is None: 206 tag_hash_filenames = self.tag_hash_filenames 207 return TaggedSeries.encode(metric, hash_only=tag_hash_filenames) 208 209 def write(self, metric, datapoints): 210 self.tree.store(self.encode(metric), datapoints) 211 212 def exists(self, metric): 213 if self.tree.hasNode(self.encode(metric)): 214 return True 215 # if we're using hashed filenames and a non-hashed file exists then move it to the new name 216 if self.tag_hash_filenames and self.tree.hasNode(self.encode(metric, False)): 217 os.rename(self._getFilesystemPath(metric, False), self.getFilesystemPath(metric)) 218 return True 219 return False 220 221 def create(self, metric, retentions, xfilesfactor, aggregation_method): 222 self.tree.createNode(self.encode(metric), 223 retentions=retentions, 224 timeStep=retentions[0][0], 225 xFilesFactor=xfilesfactor, 226 aggregationMethod=aggregation_method) 227 228 def getMetadata(self, metric, key): 229 return self.tree.getNode(self.encode(metric)).readMetadata()[key] 230 231 def setMetadata(self, metric, key, value): 232 node = self.tree.getNode(self.encode(metric)) 233 metadata = node.readMetadata() 234 metadata[key] = value 235 node.writeMetadata(metadata) 236 237 def getFilesystemPath(self, metric): 238 return self._getFilesystemPath(metric, self.tag_hash_filenames) 239 240 def _getFilesystemPath(self, metric, tag_hash_filenames): 241 return self.tree.getFilesystemPath(self.encode(metric, tag_hash_filenames)) 242