1"""Copyright 2009 Chris Davis 2 3Licensed under the Apache License, Version 2.0 (the "License"); 4you may not use this file except in compliance with the License. 5You may obtain a copy of the License at 6 7 http://www.apache.org/licenses/LICENSE-2.0 8 9Unless required by applicable law or agreed to in writing, software 10distributed under the License is distributed on an "AS IS" BASIS, 11WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12See the License for the specific language governing permissions and 13limitations under the License.""" 14 15import os 16import sys 17import pwd 18import errno 19 20from os.path import join, dirname, normpath, exists, isdir 21from optparse import OptionParser 22 23try: 24 from ConfigParser import ConfigParser 25# ConfigParser is renamed to configparser in py3 26except ImportError: 27 from configparser import ConfigParser 28 29from carbon import log, state 30from carbon.database import TimeSeriesDatabase 31from carbon.routers import DatapointRouter 32from carbon.exceptions import CarbonConfigException 33 34from twisted.python import usage 35 36 37defaults = dict( 38 USER="", 39 MAX_CACHE_SIZE=float('inf'), 40 MAX_UPDATES_PER_SECOND=500, 41 MAX_CREATES_PER_MINUTE=float('inf'), 42 MIN_TIMESTAMP_RESOLUTION=0, 43 MIN_TIMESTAMP_LAG=0, 44 LINE_RECEIVER_INTERFACE='0.0.0.0', 45 LINE_RECEIVER_PORT=2003, 46 ENABLE_UDP_LISTENER=False, 47 UDP_RECEIVER_INTERFACE='0.0.0.0', 48 UDP_RECEIVER_PORT=2003, 49 PICKLE_RECEIVER_INTERFACE='0.0.0.0', 50 PICKLE_RECEIVER_PORT=2004, 51 MAX_RECEIVER_CONNECTIONS=float('inf'), 52 CACHE_QUERY_INTERFACE='0.0.0.0', 53 CACHE_QUERY_PORT=7002, 54 LOG_UPDATES=True, 55 LOG_CREATES=True, 56 LOG_CACHE_HITS=True, 57 LOG_CACHE_QUEUE_SORTS=True, 58 DATABASE='whisper', 59 WHISPER_AUTOFLUSH=False, 60 WHISPER_SPARSE_CREATE=False, 61 WHISPER_FALLOCATE_CREATE=False, 62 WHISPER_LOCK_WRITES=False, 63 WHISPER_FADVISE_RANDOM=False, 64 CERES_MAX_SLICE_GAP=80, 65 CERES_NODE_CACHING_BEHAVIOR='all', 66 CERES_SLICE_CACHING_BEHAVIOR='latest', 67 CERES_LOCK_WRITES=False, 68 MAX_DATAPOINTS_PER_MESSAGE=500, 69 MAX_AGGREGATION_INTERVALS=5, 70 FORWARD_ALL=True, 71 MAX_QUEUE_SIZE=1000, 72 QUEUE_LOW_WATERMARK_PCT=0.8, 73 TIME_TO_DEFER_SENDING=0.0001, 74 ENABLE_AMQP=False, 75 AMQP_METRIC_NAME_IN_BODY=False, 76 AMQP_VERBOSE=False, 77 AMQP_SPEC=None, 78 BIND_PATTERNS=['#'], 79 GRAPHITE_URL='http://127.0.0.1:80', 80 ENABLE_TAGS=True, 81 SKIP_TAGS_FOR_NONTAGGED=True, 82 TAG_UPDATE_INTERVAL=100, 83 TAG_BATCH_SIZE=100, 84 TAG_QUEUE_SIZE=10000, 85 TAG_HASH_FILENAMES=True, 86 TAG_RELAY_NORMALIZED=False, 87 ENABLE_MANHOLE=False, 88 MANHOLE_INTERFACE='127.0.0.1', 89 MANHOLE_PORT=7222, 90 MANHOLE_USER="", 91 MANHOLE_PUBLIC_KEY="", 92 MANHOLE_HOST_KEY_DIR="", 93 RELAY_METHOD='rules', 94 DYNAMIC_ROUTER=False, 95 DYNAMIC_ROUTER_MAX_RETRIES=5, 96 ROUTER_HASH_TYPE=None, 97 REPLICATION_FACTOR=1, 98 DIVERSE_REPLICAS=True, 99 DESTINATIONS=[], 100 DESTINATION_PROTOCOL="pickle", 101 DESTINATION_TRANSPORT="none", 102 DESTINATION_SSL_CA=None, 103 DESTINATION_POOL_REPLICAS=False, 104 USE_FLOW_CONTROL=True, 105 USE_INSECURE_UNPICKLER=False, 106 USE_WHITELIST=False, 107 CARBON_METRIC_PREFIX='carbon', 108 CARBON_METRIC_INTERVAL=60, 109 CACHE_WRITE_STRATEGY='sorted', 110 WRITE_BACK_FREQUENCY=None, 111 MIN_RESET_STAT_FLOW=1000, 112 MIN_RESET_RATIO=0.9, 113 MIN_RESET_INTERVAL=121, 114 TCP_KEEPALIVE=True, 115 TCP_KEEPIDLE=10, 116 TCP_KEEPINTVL=30, 117 TCP_KEEPCNT=2, 118 USE_RATIO_RESET=False, 119 LOG_LISTENER_CONN_LOST=False, 120 LOG_LISTENER_CONN_SUCCESS=True, 121 LOG_AGGREGATOR_MISSES=True, 122 AGGREGATION_RULES='aggregation-rules.conf', 123 REWRITE_RULES='rewrite-rules.conf', 124 RELAY_RULES='relay-rules.conf', 125 ENABLE_LOGROTATION=True, 126 METRIC_CLIENT_IDLE_TIMEOUT=None, 127 CACHE_METRIC_NAMES_MAX=0, 128 CACHE_METRIC_NAMES_TTL=0, 129 RAVEN_DSN=None, 130 PICKLE_RECEIVER_MAX_LENGTH=2**20, 131) 132 133 134def _process_alive(pid): 135 if exists("/proc"): 136 return exists("/proc/%d" % pid) 137 else: 138 try: 139 os.kill(int(pid), 0) 140 return True 141 except OSError as err: 142 return err.errno == errno.EPERM 143 144 145class OrderedConfigParser(ConfigParser): 146 """Hacky workaround to ensure sections are always returned in the order 147 they are defined in. Note that this does *not* make any guarantees about 148 the order of options within a section or the order in which sections get 149 written back to disk on write().""" 150 _ordered_sections = [] 151 152 def read(self, path): 153 # Verifies a file exists *and* is readable 154 if not os.access(path, os.R_OK): 155 raise CarbonConfigException("Error: Missing config file or wrong perms on %s" % path) 156 157 result = ConfigParser.read(self, path) 158 sections = [] 159 with open(path) as f: 160 for line in f: 161 line = line.strip() 162 163 if line.startswith('[') and line.endswith(']'): 164 sections.append(line[1:-1]) 165 166 self._ordered_sections = sections 167 168 return result 169 170 def sections(self): 171 return list(self._ordered_sections) # return a copy for safety 172 173 174class Settings(dict): 175 __getattr__ = dict.__getitem__ 176 177 def __init__(self): 178 dict.__init__(self) 179 self.update(defaults) 180 181 def readFrom(self, path, section): 182 parser = ConfigParser() 183 if not parser.read(path): 184 raise CarbonConfigException("Failed to read config file %s" % path) 185 186 if not parser.has_section(section): 187 return 188 189 for key, value in parser.items(section): 190 key = key.upper() 191 192 # Detect type from defaults dict 193 if key in defaults: 194 valueType = type(defaults[key]) 195 else: 196 valueType = str 197 198 if valueType is list: 199 value = [v.strip() for v in value.split(',')] 200 201 elif valueType is bool: 202 value = parser.getboolean(section, key) 203 204 else: 205 # Attempt to figure out numeric types automatically 206 try: 207 value = int(value) 208 except ValueError: 209 try: 210 value = float(value) 211 except ValueError: 212 pass 213 214 self[key] = value 215 216 217settings = Settings() 218settings.update(defaults) 219 220 221class CarbonCacheOptions(usage.Options): 222 223 optFlags = [ 224 ["debug", "", "Run in debug mode."], 225 ] 226 227 optParameters = [ 228 ["config", "c", None, "Use the given config file."], 229 ["instance", "", "a", "Manage a specific carbon instance."], 230 ["logdir", "", None, "Write logs to the given directory."], 231 ["whitelist", "", None, "List of metric patterns to allow."], 232 ["blacklist", "", None, "List of metric patterns to disallow."], 233 ] 234 235 def postOptions(self): 236 global settings 237 238 program = self.parent.subCommand 239 240 # Use provided pidfile (if any) as default for configuration. If it's 241 # set to 'twistd.pid', that means no value was provided and the default 242 # was used. 243 pidfile = self.parent["pidfile"] 244 if pidfile.endswith("twistd.pid"): 245 pidfile = None 246 self["pidfile"] = pidfile 247 248 # Enforce a default umask of '022' if none was set. 249 if "umask" not in self.parent or self.parent["umask"] is None: 250 self.parent["umask"] = 0o022 251 252 # Read extra settings from the configuration file. 253 program_settings = read_config(program, self) 254 settings.update(program_settings) 255 settings["program"] = program 256 257 # Normalize and expand paths 258 def cleanpath(path): 259 return os.path.normpath(os.path.expanduser(path)) 260 settings["STORAGE_DIR"] = cleanpath(settings["STORAGE_DIR"]) 261 settings["LOCAL_DATA_DIR"] = cleanpath(settings["LOCAL_DATA_DIR"]) 262 settings["WHITELISTS_DIR"] = cleanpath(settings["WHITELISTS_DIR"]) 263 settings["PID_DIR"] = cleanpath(settings["PID_DIR"]) 264 settings["LOG_DIR"] = cleanpath(settings["LOG_DIR"]) 265 settings["pidfile"] = cleanpath(settings["pidfile"]) 266 267 # Set process uid/gid by changing the parent config, if a user was 268 # provided in the configuration file. 269 if settings.USER: 270 self.parent["uid"], self.parent["gid"] = ( 271 pwd.getpwnam(settings.USER)[2:4]) 272 273 # Set the pidfile in parent config to the value that was computed by 274 # C{read_config}. 275 self.parent["pidfile"] = settings["pidfile"] 276 277 storage_schemas = join(settings["CONF_DIR"], "storage-schemas.conf") 278 if not exists(storage_schemas): 279 print("Error: missing required config %s" % storage_schemas) 280 sys.exit(1) 281 282 if settings.CACHE_WRITE_STRATEGY not in ('timesorted', 'sorted', 'max', 'naive'): 283 log.err("%s is not a valid value for CACHE_WRITE_STRATEGY, defaulting to %s" % 284 (settings.CACHE_WRITE_STRATEGY, defaults['CACHE_WRITE_STRATEGY'])) 285 else: 286 log.msg("Using %s write strategy for cache" % settings.CACHE_WRITE_STRATEGY) 287 288 # Database-specific settings 289 database = settings.DATABASE 290 if database not in TimeSeriesDatabase.plugins: 291 print("No database plugin implemented for '%s'" % database) 292 raise SystemExit(1) 293 294 database_class = TimeSeriesDatabase.plugins[database] 295 state.database = database_class(settings) 296 297 settings.CACHE_SIZE_LOW_WATERMARK = settings.MAX_CACHE_SIZE * 0.95 298 299 if "action" not in self: 300 self["action"] = "start" 301 self.handleAction() 302 303 # If we are not running in debug mode or non-daemon mode, then log to a 304 # directory, otherwise log output will go to stdout. If parent options 305 # are set to log to syslog, then use that instead. 306 if not self["debug"]: 307 if self.parent.get("syslog", None): 308 prefix = "%s-%s[%d]" % (program, self["instance"], os.getpid()) 309 log.logToSyslog(prefix) 310 elif not self.parent["nodaemon"]: 311 logdir = settings.LOG_DIR 312 if not isdir(logdir): 313 os.makedirs(logdir) 314 if settings.USER: 315 # We have not yet switched to the specified user, 316 # but that user must be able to create files in this 317 # directory. 318 os.chown(logdir, self.parent["uid"], self.parent["gid"]) 319 log.logToDir(logdir) 320 321 if self["whitelist"] is None: 322 self["whitelist"] = join(settings["CONF_DIR"], "whitelist.conf") 323 settings["whitelist"] = self["whitelist"] 324 325 if self["blacklist"] is None: 326 self["blacklist"] = join(settings["CONF_DIR"], "blacklist.conf") 327 settings["blacklist"] = self["blacklist"] 328 329 def parseArgs(self, *action): 330 """If an action was provided, store it for further processing.""" 331 if len(action) == 1: 332 self["action"] = action[0] 333 334 def handleAction(self): 335 """Handle extra argument for backwards-compatibility. 336 337 * C{start} will simply do minimal pid checking and otherwise let twistd 338 take over. 339 * C{stop} will kill an existing running process if it matches the 340 C{pidfile} contents. 341 * C{status} will simply report if the process is up or not. 342 """ 343 action = self["action"] 344 pidfile = self.parent["pidfile"] 345 program = settings["program"] 346 instance = self["instance"] 347 348 if action == "stop": 349 if not exists(pidfile): 350 print("Pidfile %s does not exist" % pidfile) 351 raise SystemExit(0) 352 pf = open(pidfile, 'r') 353 try: 354 pid = int(pf.read().strip()) 355 pf.close() 356 except ValueError: 357 print("Failed to parse pid from pidfile %s" % pidfile) 358 pf.close() 359 try: 360 print("removing corrupted pidfile %s" % pidfile) 361 os.unlink(pidfile) 362 except IOError: 363 print("Could not remove pidfile %s" % pidfile) 364 raise SystemExit(1) 365 except IOError: 366 print("Could not read pidfile %s" % pidfile) 367 raise SystemExit(1) 368 print("Sending kill signal to pid %d" % pid) 369 try: 370 os.kill(pid, 15) 371 except OSError as e: 372 if e.errno == errno.ESRCH: 373 print("No process with pid %d running" % pid) 374 else: 375 raise 376 377 raise SystemExit(0) 378 379 elif action == "status": 380 if not exists(pidfile): 381 print("%s (instance %s) is not running" % (program, instance)) 382 raise SystemExit(1) 383 pf = open(pidfile, "r") 384 try: 385 pid = int(pf.read().strip()) 386 pf.close() 387 except ValueError: 388 print("Failed to parse pid from pidfile %s" % pidfile) 389 pf.close() 390 try: 391 print("removing corrupted pidfile %s" % pidfile) 392 os.unlink(pidfile) 393 except IOError: 394 print("Could not remove pidfile %s" % pidfile) 395 raise SystemExit(1) 396 except IOError: 397 print("Failed to read pid from %s" % pidfile) 398 raise SystemExit(1) 399 400 if _process_alive(pid): 401 print("%s (instance %s) is running with pid %d" % 402 (program, instance, pid)) 403 raise SystemExit(0) 404 else: 405 print("%s (instance %s) is not running" % (program, instance)) 406 raise SystemExit(1) 407 408 elif action == "start": 409 if exists(pidfile): 410 pf = open(pidfile, 'r') 411 try: 412 pid = int(pf.read().strip()) 413 pf.close() 414 except ValueError: 415 print("Failed to parse pid from pidfile %s" % pidfile) 416 pf.close() 417 try: 418 print("removing corrupted pidfile %s" % pidfile) 419 os.unlink(pidfile) 420 except IOError: 421 print("Could not remove pidfile %s" % pidfile) 422 raise SystemExit(1) 423 except IOError: 424 print("Could not read pidfile %s" % pidfile) 425 raise SystemExit(1) 426 if _process_alive(pid): 427 print("%s (instance %s) is already running with pid %d" % 428 (program, instance, pid)) 429 raise SystemExit(1) 430 else: 431 print("Removing stale pidfile %s" % pidfile) 432 try: 433 os.unlink(pidfile) 434 except IOError: 435 print("Could not remove pidfile %s" % pidfile) 436 # Try to create the PID directory 437 else: 438 if not os.path.exists(settings["PID_DIR"]): 439 try: 440 os.makedirs(settings["PID_DIR"]) 441 except OSError as exc: # Python >2.5 442 if exc.errno == errno.EEXIST and os.path.isdir(settings["PID_DIR"]): 443 pass 444 else: 445 raise 446 447 print("Starting %s (instance %s)" % (program, instance)) 448 449 else: 450 print("Invalid action '%s'" % action) 451 print("Valid actions: start stop status") 452 raise SystemExit(1) 453 454 455class CarbonAggregatorOptions(CarbonCacheOptions): 456 457 optParameters = [ 458 ["rules", "", None, "Use the given aggregation rules file."], 459 ["rewrite-rules", "", None, "Use the given rewrite rules file."], 460 ] + CarbonCacheOptions.optParameters 461 462 def postOptions(self): 463 CarbonCacheOptions.postOptions(self) 464 if self["rules"] is None: 465 self["rules"] = join(settings["CONF_DIR"], settings['AGGREGATION_RULES']) 466 settings["aggregation-rules"] = self["rules"] 467 468 if self["rewrite-rules"] is None: 469 self["rewrite-rules"] = join(settings["CONF_DIR"], 470 settings['REWRITE_RULES']) 471 settings["rewrite-rules"] = self["rewrite-rules"] 472 473 474class CarbonRelayOptions(CarbonCacheOptions): 475 476 optParameters = [ 477 ["rules", "", None, "Use the given relay rules file."], 478 ["aggregation-rules", "", None, "Use the given aggregation rules file."], 479 ] + CarbonCacheOptions.optParameters 480 481 def postOptions(self): 482 CarbonCacheOptions.postOptions(self) 483 if self["rules"] is None: 484 self["rules"] = join(settings["CONF_DIR"], settings['RELAY_RULES']) 485 settings["relay-rules"] = self["rules"] 486 487 if self["aggregation-rules"] is None: 488 self["aggregation-rules"] = join(settings["CONF_DIR"], settings['AGGREGATION_RULES']) 489 settings["aggregation-rules"] = self["aggregation-rules"] 490 491 router = settings["RELAY_METHOD"] 492 if router not in DatapointRouter.plugins: 493 print("In carbon.conf, RELAY_METHOD must be one of %s. " 494 "Invalid value: '%s'" % (', '.join(DatapointRouter.plugins), router)) 495 raise SystemExit(1) 496 497 498def get_default_parser(usage="%prog [options] <start|stop|status>"): 499 """Create a parser for command line options.""" 500 parser = OptionParser(usage=usage) 501 parser.add_option( 502 "--debug", action="store_true", 503 help="Run in the foreground, log to stdout") 504 parser.add_option( 505 "--syslog", action="store_true", 506 help="Write logs to syslog") 507 parser.add_option( 508 "--nodaemon", action="store_true", 509 help="Run in the foreground") 510 parser.add_option( 511 "--profile", 512 help="Record performance profile data to the given file") 513 parser.add_option( 514 "--profiler", 515 help="Specify the profiler to use") 516 parser.add_option( 517 "--pidfile", default=None, 518 help="Write pid to the given file") 519 parser.add_option( 520 "--umask", default=None, 521 help="Use the given umask when creating files") 522 parser.add_option( 523 "--config", 524 default=None, 525 help="Use the given config file") 526 parser.add_option( 527 "--whitelist", 528 default=None, 529 help="Use the given whitelist file") 530 parser.add_option( 531 "--blacklist", 532 default=None, 533 help="Use the given blacklist file") 534 parser.add_option( 535 "--logdir", 536 default=None, 537 help="Write logs in the given directory") 538 parser.add_option( 539 "--instance", 540 default='a', 541 help="Manage a specific carbon instance") 542 parser.add_option( 543 "--logfile", 544 default=None, 545 help="Log to a specified file, - for stdout") 546 parser.add_option( 547 "--logger", 548 default=None, 549 help="A fully-qualified name to a log observer factory to use for the initial log " 550 "observer. Takes precedence over --logfile and --syslog (when available).") 551 return parser 552 553 554def get_parser(name): 555 parser = get_default_parser() 556 if "carbon-aggregator" in name: 557 parser.add_option( 558 "--rules", 559 default=None, 560 help="Use the given aggregation rules file.") 561 parser.add_option( 562 "--rewrite-rules", 563 default=None, 564 help="Use the given rewrite rules file.") 565 elif name == "carbon-relay": 566 parser.add_option( 567 "--rules", 568 default=None, 569 help="Use the given relay rules file.") 570 return parser 571 572 573def parse_options(parser, args): 574 """ 575 Parse command line options and print usage message if no arguments were 576 provided for the command. 577 """ 578 (options, args) = parser.parse_args(args) 579 580 if not args: 581 parser.print_usage() 582 raise SystemExit(1) 583 584 if args[0] not in ("start", "stop", "status"): 585 parser.print_usage() 586 raise SystemExit(1) 587 588 return options, args 589 590 591def read_config(program, options, **kwargs): 592 """ 593 Read settings for 'program' from configuration file specified by 594 'options["config"]', with missing values provided by 'defaults'. 595 """ 596 settings = Settings() 597 settings.update(defaults) 598 599 # Initialize default values if not set yet. 600 for name, value in kwargs.items(): 601 settings.setdefault(name, value) 602 603 graphite_root = kwargs.get("ROOT_DIR") 604 if graphite_root is None: 605 graphite_root = os.environ.get('GRAPHITE_ROOT') 606 if graphite_root is None: 607 raise CarbonConfigException("Either ROOT_DIR or GRAPHITE_ROOT " 608 "needs to be provided.") 609 610 # Default config directory to root-relative, unless overridden by the 611 # 'GRAPHITE_CONF_DIR' environment variable. 612 settings.setdefault("CONF_DIR", 613 os.environ.get("GRAPHITE_CONF_DIR", 614 join(graphite_root, "conf"))) 615 if options["config"] is None: 616 options["config"] = join(settings["CONF_DIR"], "carbon.conf") 617 else: 618 # Set 'CONF_DIR' to the parent directory of the 'carbon.conf' config 619 # file. 620 settings["CONF_DIR"] = dirname(normpath(options["config"])) 621 622 # Storage directory can be overridden by the 'GRAPHITE_STORAGE_DIR' 623 # environment variable. It defaults to a path relative to GRAPHITE_ROOT 624 # for backwards compatibility though. 625 settings.setdefault("STORAGE_DIR", 626 os.environ.get("GRAPHITE_STORAGE_DIR", 627 join(graphite_root, "storage"))) 628 629 def update_STORAGE_DIR_deps(): 630 # By default, everything is written to subdirectories of the storage dir. 631 settings.setdefault( 632 "PID_DIR", settings["STORAGE_DIR"]) 633 settings.setdefault( 634 "LOG_DIR", join(settings["STORAGE_DIR"], "log", program)) 635 settings.setdefault( 636 "LOCAL_DATA_DIR", join(settings["STORAGE_DIR"], "whisper")) 637 settings.setdefault( 638 "WHITELISTS_DIR", join(settings["STORAGE_DIR"], "lists")) 639 640 # Read configuration options from program-specific section. 641 section = program[len("carbon-"):] 642 config = options["config"] 643 644 if not exists(config): 645 raise CarbonConfigException("Error: missing required config %r" % config) 646 647 settings.readFrom(config, section) 648 settings.setdefault("instance", options["instance"]) 649 update_STORAGE_DIR_deps() 650 651 # If a specific instance of the program is specified, augment the settings 652 # with the instance-specific settings and provide sane defaults for 653 # optional settings. 654 if options["instance"]: 655 settings.readFrom(config, 656 "%s:%s" % (section, options["instance"])) 657 settings["pidfile"] = ( 658 options["pidfile"] or 659 join(settings["PID_DIR"], "%s-%s.pid" % (program, options["instance"]))) 660 settings["LOG_DIR"] = ( 661 options["logdir"] or 662 join(settings["LOG_DIR"], "%s-%s" % (program, options["instance"]))) 663 else: 664 settings["pidfile"] = ( 665 options["pidfile"] or join(settings["PID_DIR"], '%s.pid' % program)) 666 settings["LOG_DIR"] = (options["logdir"] or settings["LOG_DIR"]) 667 668 update_STORAGE_DIR_deps() 669 return settings 670