1"""Copyright 2009 Chris Davis
2
3Licensed under the Apache License, Version 2.0 (the "License");
4you may not use this file except in compliance with the License.
5You may obtain a copy of the License at
6
7   http://www.apache.org/licenses/LICENSE-2.0
8
9Unless required by applicable law or agreed to in writing, software
10distributed under the License is distributed on an "AS IS" BASIS,
11WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12See the License for the specific language governing permissions and
13limitations under the License."""
14
15import os
16import sys
17import pwd
18import errno
19
20from os.path import join, dirname, normpath, exists, isdir
21from optparse import OptionParser
22
23try:
24    from ConfigParser import ConfigParser
25# ConfigParser is renamed to configparser in py3
26except ImportError:
27    from configparser import ConfigParser
28
29from carbon import log, state
30from carbon.database import TimeSeriesDatabase
31from carbon.routers import DatapointRouter
32from carbon.exceptions import CarbonConfigException
33
34from twisted.python import usage
35
36
37defaults = dict(
38  USER="",
39  MAX_CACHE_SIZE=float('inf'),
40  MAX_UPDATES_PER_SECOND=500,
41  MAX_CREATES_PER_MINUTE=float('inf'),
42  MIN_TIMESTAMP_RESOLUTION=0,
43  MIN_TIMESTAMP_LAG=0,
44  LINE_RECEIVER_INTERFACE='0.0.0.0',
45  LINE_RECEIVER_PORT=2003,
46  ENABLE_UDP_LISTENER=False,
47  UDP_RECEIVER_INTERFACE='0.0.0.0',
48  UDP_RECEIVER_PORT=2003,
49  PICKLE_RECEIVER_INTERFACE='0.0.0.0',
50  PICKLE_RECEIVER_PORT=2004,
51  MAX_RECEIVER_CONNECTIONS=float('inf'),
52  CACHE_QUERY_INTERFACE='0.0.0.0',
53  CACHE_QUERY_PORT=7002,
54  LOG_UPDATES=True,
55  LOG_CREATES=True,
56  LOG_CACHE_HITS=True,
57  LOG_CACHE_QUEUE_SORTS=True,
58  DATABASE='whisper',
59  WHISPER_AUTOFLUSH=False,
60  WHISPER_SPARSE_CREATE=False,
61  WHISPER_FALLOCATE_CREATE=False,
62  WHISPER_LOCK_WRITES=False,
63  WHISPER_FADVISE_RANDOM=False,
64  CERES_MAX_SLICE_GAP=80,
65  CERES_NODE_CACHING_BEHAVIOR='all',
66  CERES_SLICE_CACHING_BEHAVIOR='latest',
67  CERES_LOCK_WRITES=False,
68  MAX_DATAPOINTS_PER_MESSAGE=500,
69  MAX_AGGREGATION_INTERVALS=5,
70  FORWARD_ALL=True,
71  MAX_QUEUE_SIZE=1000,
72  QUEUE_LOW_WATERMARK_PCT=0.8,
73  TIME_TO_DEFER_SENDING=0.0001,
74  ENABLE_AMQP=False,
75  AMQP_METRIC_NAME_IN_BODY=False,
76  AMQP_VERBOSE=False,
77  AMQP_SPEC=None,
78  BIND_PATTERNS=['#'],
79  GRAPHITE_URL='http://127.0.0.1:80',
80  ENABLE_TAGS=True,
81  SKIP_TAGS_FOR_NONTAGGED=True,
82  TAG_UPDATE_INTERVAL=100,
83  TAG_BATCH_SIZE=100,
84  TAG_QUEUE_SIZE=10000,
85  TAG_HASH_FILENAMES=True,
86  TAG_RELAY_NORMALIZED=False,
87  ENABLE_MANHOLE=False,
88  MANHOLE_INTERFACE='127.0.0.1',
89  MANHOLE_PORT=7222,
90  MANHOLE_USER="",
91  MANHOLE_PUBLIC_KEY="",
92  MANHOLE_HOST_KEY_DIR="",
93  RELAY_METHOD='rules',
94  DYNAMIC_ROUTER=False,
95  DYNAMIC_ROUTER_MAX_RETRIES=5,
96  ROUTER_HASH_TYPE=None,
97  REPLICATION_FACTOR=1,
98  DIVERSE_REPLICAS=True,
99  DESTINATIONS=[],
100  DESTINATION_PROTOCOL="pickle",
101  DESTINATION_TRANSPORT="none",
102  DESTINATION_SSL_CA=None,
103  DESTINATION_POOL_REPLICAS=False,
104  USE_FLOW_CONTROL=True,
105  USE_INSECURE_UNPICKLER=False,
106  USE_WHITELIST=False,
107  CARBON_METRIC_PREFIX='carbon',
108  CARBON_METRIC_INTERVAL=60,
109  CACHE_WRITE_STRATEGY='sorted',
110  WRITE_BACK_FREQUENCY=None,
111  MIN_RESET_STAT_FLOW=1000,
112  MIN_RESET_RATIO=0.9,
113  MIN_RESET_INTERVAL=121,
114  TCP_KEEPALIVE=True,
115  TCP_KEEPIDLE=10,
116  TCP_KEEPINTVL=30,
117  TCP_KEEPCNT=2,
118  USE_RATIO_RESET=False,
119  LOG_LISTENER_CONN_LOST=False,
120  LOG_LISTENER_CONN_SUCCESS=True,
121  LOG_AGGREGATOR_MISSES=True,
122  AGGREGATION_RULES='aggregation-rules.conf',
123  REWRITE_RULES='rewrite-rules.conf',
124  RELAY_RULES='relay-rules.conf',
125  ENABLE_LOGROTATION=True,
126  METRIC_CLIENT_IDLE_TIMEOUT=None,
127  CACHE_METRIC_NAMES_MAX=0,
128  CACHE_METRIC_NAMES_TTL=0,
129  RAVEN_DSN=None,
130  PICKLE_RECEIVER_MAX_LENGTH=2**20,
131)
132
133
134def _process_alive(pid):
135    if exists("/proc"):
136        return exists("/proc/%d" % pid)
137    else:
138        try:
139            os.kill(int(pid), 0)
140            return True
141        except OSError as err:
142            return err.errno == errno.EPERM
143
144
145class OrderedConfigParser(ConfigParser):
146  """Hacky workaround to ensure sections are always returned in the order
147   they are defined in. Note that this does *not* make any guarantees about
148   the order of options within a section or the order in which sections get
149   written back to disk on write()."""
150  _ordered_sections = []
151
152  def read(self, path):
153    # Verifies a file exists *and* is readable
154    if not os.access(path, os.R_OK):
155        raise CarbonConfigException("Error: Missing config file or wrong perms on %s" % path)
156
157    result = ConfigParser.read(self, path)
158    sections = []
159    with open(path) as f:
160      for line in f:
161        line = line.strip()
162
163        if line.startswith('[') and line.endswith(']'):
164          sections.append(line[1:-1])
165
166    self._ordered_sections = sections
167
168    return result
169
170  def sections(self):
171    return list(self._ordered_sections)  # return a copy for safety
172
173
174class Settings(dict):
175  __getattr__ = dict.__getitem__
176
177  def __init__(self):
178    dict.__init__(self)
179    self.update(defaults)
180
181  def readFrom(self, path, section):
182    parser = ConfigParser()
183    if not parser.read(path):
184      raise CarbonConfigException("Failed to read config file %s" % path)
185
186    if not parser.has_section(section):
187      return
188
189    for key, value in parser.items(section):
190      key = key.upper()
191
192      # Detect type from defaults dict
193      if key in defaults:
194        valueType = type(defaults[key])
195      else:
196        valueType = str
197
198      if valueType is list:
199        value = [v.strip() for v in value.split(',')]
200
201      elif valueType is bool:
202        value = parser.getboolean(section, key)
203
204      else:
205        # Attempt to figure out numeric types automatically
206        try:
207          value = int(value)
208        except ValueError:
209          try:
210            value = float(value)
211          except ValueError:
212            pass
213
214      self[key] = value
215
216
217settings = Settings()
218settings.update(defaults)
219
220
221class CarbonCacheOptions(usage.Options):
222
223    optFlags = [
224        ["debug", "", "Run in debug mode."],
225    ]
226
227    optParameters = [
228        ["config", "c", None, "Use the given config file."],
229        ["instance", "", "a", "Manage a specific carbon instance."],
230        ["logdir", "", None, "Write logs to the given directory."],
231        ["whitelist", "", None, "List of metric patterns to allow."],
232        ["blacklist", "", None, "List of metric patterns to disallow."],
233    ]
234
235    def postOptions(self):
236        global settings
237
238        program = self.parent.subCommand
239
240        # Use provided pidfile (if any) as default for configuration. If it's
241        # set to 'twistd.pid', that means no value was provided and the default
242        # was used.
243        pidfile = self.parent["pidfile"]
244        if pidfile.endswith("twistd.pid"):
245            pidfile = None
246        self["pidfile"] = pidfile
247
248        # Enforce a default umask of '022' if none was set.
249        if "umask" not in self.parent or self.parent["umask"] is None:
250            self.parent["umask"] = 0o022
251
252        # Read extra settings from the configuration file.
253        program_settings = read_config(program, self)
254        settings.update(program_settings)
255        settings["program"] = program
256
257        # Normalize and expand paths
258        def cleanpath(path):
259          return os.path.normpath(os.path.expanduser(path))
260        settings["STORAGE_DIR"] = cleanpath(settings["STORAGE_DIR"])
261        settings["LOCAL_DATA_DIR"] = cleanpath(settings["LOCAL_DATA_DIR"])
262        settings["WHITELISTS_DIR"] = cleanpath(settings["WHITELISTS_DIR"])
263        settings["PID_DIR"] = cleanpath(settings["PID_DIR"])
264        settings["LOG_DIR"] = cleanpath(settings["LOG_DIR"])
265        settings["pidfile"] = cleanpath(settings["pidfile"])
266
267        # Set process uid/gid by changing the parent config, if a user was
268        # provided in the configuration file.
269        if settings.USER:
270            self.parent["uid"], self.parent["gid"] = (
271                pwd.getpwnam(settings.USER)[2:4])
272
273        # Set the pidfile in parent config to the value that was computed by
274        # C{read_config}.
275        self.parent["pidfile"] = settings["pidfile"]
276
277        storage_schemas = join(settings["CONF_DIR"], "storage-schemas.conf")
278        if not exists(storage_schemas):
279            print("Error: missing required config %s" % storage_schemas)
280            sys.exit(1)
281
282        if settings.CACHE_WRITE_STRATEGY not in ('timesorted', 'sorted', 'max', 'naive'):
283            log.err("%s is not a valid value for CACHE_WRITE_STRATEGY, defaulting to %s" %
284                    (settings.CACHE_WRITE_STRATEGY, defaults['CACHE_WRITE_STRATEGY']))
285        else:
286            log.msg("Using %s write strategy for cache" % settings.CACHE_WRITE_STRATEGY)
287
288        # Database-specific settings
289        database = settings.DATABASE
290        if database not in TimeSeriesDatabase.plugins:
291            print("No database plugin implemented for '%s'" % database)
292            raise SystemExit(1)
293
294        database_class = TimeSeriesDatabase.plugins[database]
295        state.database = database_class(settings)
296
297        settings.CACHE_SIZE_LOW_WATERMARK = settings.MAX_CACHE_SIZE * 0.95
298
299        if "action" not in self:
300            self["action"] = "start"
301        self.handleAction()
302
303        # If we are not running in debug mode or non-daemon mode, then log to a
304        # directory, otherwise log output will go to stdout. If parent options
305        # are set to log to syslog, then use that instead.
306        if not self["debug"]:
307            if self.parent.get("syslog", None):
308                prefix = "%s-%s[%d]" % (program, self["instance"], os.getpid())
309                log.logToSyslog(prefix)
310            elif not self.parent["nodaemon"]:
311                logdir = settings.LOG_DIR
312                if not isdir(logdir):
313                    os.makedirs(logdir)
314                    if settings.USER:
315                        # We have not yet switched to the specified user,
316                        # but that user must be able to create files in this
317                        # directory.
318                        os.chown(logdir, self.parent["uid"], self.parent["gid"])
319                log.logToDir(logdir)
320
321        if self["whitelist"] is None:
322            self["whitelist"] = join(settings["CONF_DIR"], "whitelist.conf")
323        settings["whitelist"] = self["whitelist"]
324
325        if self["blacklist"] is None:
326            self["blacklist"] = join(settings["CONF_DIR"], "blacklist.conf")
327        settings["blacklist"] = self["blacklist"]
328
329    def parseArgs(self, *action):
330        """If an action was provided, store it for further processing."""
331        if len(action) == 1:
332            self["action"] = action[0]
333
334    def handleAction(self):
335        """Handle extra argument for backwards-compatibility.
336
337        * C{start} will simply do minimal pid checking and otherwise let twistd
338              take over.
339        * C{stop} will kill an existing running process if it matches the
340              C{pidfile} contents.
341        * C{status} will simply report if the process is up or not.
342        """
343        action = self["action"]
344        pidfile = self.parent["pidfile"]
345        program = settings["program"]
346        instance = self["instance"]
347
348        if action == "stop":
349            if not exists(pidfile):
350                print("Pidfile %s does not exist" % pidfile)
351                raise SystemExit(0)
352            pf = open(pidfile, 'r')
353            try:
354                pid = int(pf.read().strip())
355                pf.close()
356            except ValueError:
357                print("Failed to parse pid from pidfile %s" % pidfile)
358                pf.close()
359                try:
360                    print("removing corrupted pidfile %s" % pidfile)
361                    os.unlink(pidfile)
362                except IOError:
363                    print("Could not remove pidfile %s" % pidfile)
364                raise SystemExit(1)
365            except IOError:
366                print("Could not read pidfile %s" % pidfile)
367                raise SystemExit(1)
368            print("Sending kill signal to pid %d" % pid)
369            try:
370                os.kill(pid, 15)
371            except OSError as e:
372                if e.errno == errno.ESRCH:
373                    print("No process with pid %d running" % pid)
374                else:
375                    raise
376
377            raise SystemExit(0)
378
379        elif action == "status":
380            if not exists(pidfile):
381                print("%s (instance %s) is not running" % (program, instance))
382                raise SystemExit(1)
383            pf = open(pidfile, "r")
384            try:
385                pid = int(pf.read().strip())
386                pf.close()
387            except ValueError:
388                print("Failed to parse pid from pidfile %s" % pidfile)
389                pf.close()
390                try:
391                    print("removing corrupted pidfile %s" % pidfile)
392                    os.unlink(pidfile)
393                except IOError:
394                    print("Could not remove pidfile %s" % pidfile)
395                raise SystemExit(1)
396            except IOError:
397                print("Failed to read pid from %s" % pidfile)
398                raise SystemExit(1)
399
400            if _process_alive(pid):
401                print("%s (instance %s) is running with pid %d" %
402                      (program, instance, pid))
403                raise SystemExit(0)
404            else:
405                print("%s (instance %s) is not running" % (program, instance))
406                raise SystemExit(1)
407
408        elif action == "start":
409            if exists(pidfile):
410                pf = open(pidfile, 'r')
411                try:
412                    pid = int(pf.read().strip())
413                    pf.close()
414                except ValueError:
415                    print("Failed to parse pid from pidfile %s" % pidfile)
416                    pf.close()
417                    try:
418                        print("removing corrupted pidfile %s" % pidfile)
419                        os.unlink(pidfile)
420                    except IOError:
421                        print("Could not remove pidfile %s" % pidfile)
422                    raise SystemExit(1)
423                except IOError:
424                    print("Could not read pidfile %s" % pidfile)
425                    raise SystemExit(1)
426                if _process_alive(pid):
427                    print("%s (instance %s) is already running with pid %d" %
428                          (program, instance, pid))
429                    raise SystemExit(1)
430                else:
431                    print("Removing stale pidfile %s" % pidfile)
432                    try:
433                        os.unlink(pidfile)
434                    except IOError:
435                        print("Could not remove pidfile %s" % pidfile)
436            # Try to create the PID directory
437            else:
438                if not os.path.exists(settings["PID_DIR"]):
439                    try:
440                        os.makedirs(settings["PID_DIR"])
441                    except OSError as exc:  # Python >2.5
442                        if exc.errno == errno.EEXIST and os.path.isdir(settings["PID_DIR"]):
443                           pass
444                        else:
445                           raise
446
447            print("Starting %s (instance %s)" % (program, instance))
448
449        else:
450            print("Invalid action '%s'" % action)
451            print("Valid actions: start stop status")
452            raise SystemExit(1)
453
454
455class CarbonAggregatorOptions(CarbonCacheOptions):
456
457    optParameters = [
458        ["rules", "", None, "Use the given aggregation rules file."],
459        ["rewrite-rules", "", None, "Use the given rewrite rules file."],
460    ] + CarbonCacheOptions.optParameters
461
462    def postOptions(self):
463        CarbonCacheOptions.postOptions(self)
464        if self["rules"] is None:
465            self["rules"] = join(settings["CONF_DIR"], settings['AGGREGATION_RULES'])
466        settings["aggregation-rules"] = self["rules"]
467
468        if self["rewrite-rules"] is None:
469            self["rewrite-rules"] = join(settings["CONF_DIR"],
470                                         settings['REWRITE_RULES'])
471        settings["rewrite-rules"] = self["rewrite-rules"]
472
473
474class CarbonRelayOptions(CarbonCacheOptions):
475
476    optParameters = [
477        ["rules", "", None, "Use the given relay rules file."],
478        ["aggregation-rules", "", None, "Use the given aggregation rules file."],
479    ] + CarbonCacheOptions.optParameters
480
481    def postOptions(self):
482        CarbonCacheOptions.postOptions(self)
483        if self["rules"] is None:
484            self["rules"] = join(settings["CONF_DIR"], settings['RELAY_RULES'])
485        settings["relay-rules"] = self["rules"]
486
487        if self["aggregation-rules"] is None:
488            self["aggregation-rules"] = join(settings["CONF_DIR"], settings['AGGREGATION_RULES'])
489        settings["aggregation-rules"] = self["aggregation-rules"]
490
491        router = settings["RELAY_METHOD"]
492        if router not in DatapointRouter.plugins:
493            print("In carbon.conf, RELAY_METHOD must be one of %s. "
494                  "Invalid value: '%s'" % (', '.join(DatapointRouter.plugins), router))
495            raise SystemExit(1)
496
497
498def get_default_parser(usage="%prog [options] <start|stop|status>"):
499    """Create a parser for command line options."""
500    parser = OptionParser(usage=usage)
501    parser.add_option(
502        "--debug", action="store_true",
503        help="Run in the foreground, log to stdout")
504    parser.add_option(
505        "--syslog", action="store_true",
506        help="Write logs to syslog")
507    parser.add_option(
508        "--nodaemon", action="store_true",
509        help="Run in the foreground")
510    parser.add_option(
511        "--profile",
512        help="Record performance profile data to the given file")
513    parser.add_option(
514        "--profiler",
515        help="Specify the profiler to use")
516    parser.add_option(
517        "--pidfile", default=None,
518        help="Write pid to the given file")
519    parser.add_option(
520        "--umask", default=None,
521        help="Use the given umask when creating files")
522    parser.add_option(
523        "--config",
524        default=None,
525        help="Use the given config file")
526    parser.add_option(
527      "--whitelist",
528      default=None,
529      help="Use the given whitelist file")
530    parser.add_option(
531      "--blacklist",
532      default=None,
533      help="Use the given blacklist file")
534    parser.add_option(
535        "--logdir",
536        default=None,
537        help="Write logs in the given directory")
538    parser.add_option(
539        "--instance",
540        default='a',
541        help="Manage a specific carbon instance")
542    parser.add_option(
543        "--logfile",
544        default=None,
545        help="Log to a specified file, - for stdout")
546    parser.add_option(
547        "--logger",
548        default=None,
549        help="A fully-qualified name to a log observer factory to use for the initial log "
550             "observer. Takes precedence over --logfile and --syslog (when available).")
551    return parser
552
553
554def get_parser(name):
555    parser = get_default_parser()
556    if "carbon-aggregator" in name:
557        parser.add_option(
558            "--rules",
559            default=None,
560            help="Use the given aggregation rules file.")
561        parser.add_option(
562            "--rewrite-rules",
563            default=None,
564            help="Use the given rewrite rules file.")
565    elif name == "carbon-relay":
566        parser.add_option(
567            "--rules",
568            default=None,
569            help="Use the given relay rules file.")
570    return parser
571
572
573def parse_options(parser, args):
574    """
575    Parse command line options and print usage message if no arguments were
576    provided for the command.
577    """
578    (options, args) = parser.parse_args(args)
579
580    if not args:
581        parser.print_usage()
582        raise SystemExit(1)
583
584    if args[0] not in ("start", "stop", "status"):
585        parser.print_usage()
586        raise SystemExit(1)
587
588    return options, args
589
590
591def read_config(program, options, **kwargs):
592    """
593    Read settings for 'program' from configuration file specified by
594    'options["config"]', with missing values provided by 'defaults'.
595    """
596    settings = Settings()
597    settings.update(defaults)
598
599    # Initialize default values if not set yet.
600    for name, value in kwargs.items():
601        settings.setdefault(name, value)
602
603    graphite_root = kwargs.get("ROOT_DIR")
604    if graphite_root is None:
605        graphite_root = os.environ.get('GRAPHITE_ROOT')
606    if graphite_root is None:
607        raise CarbonConfigException("Either ROOT_DIR or GRAPHITE_ROOT "
608                                    "needs to be provided.")
609
610    # Default config directory to root-relative, unless overridden by the
611    # 'GRAPHITE_CONF_DIR' environment variable.
612    settings.setdefault("CONF_DIR",
613                        os.environ.get("GRAPHITE_CONF_DIR",
614                                       join(graphite_root, "conf")))
615    if options["config"] is None:
616        options["config"] = join(settings["CONF_DIR"], "carbon.conf")
617    else:
618        # Set 'CONF_DIR' to the parent directory of the 'carbon.conf' config
619        # file.
620        settings["CONF_DIR"] = dirname(normpath(options["config"]))
621
622    # Storage directory can be overridden by the 'GRAPHITE_STORAGE_DIR'
623    # environment variable. It defaults to a path relative to GRAPHITE_ROOT
624    # for backwards compatibility though.
625    settings.setdefault("STORAGE_DIR",
626                        os.environ.get("GRAPHITE_STORAGE_DIR",
627                                       join(graphite_root, "storage")))
628
629    def update_STORAGE_DIR_deps():
630        # By default, everything is written to subdirectories of the storage dir.
631        settings.setdefault(
632            "PID_DIR", settings["STORAGE_DIR"])
633        settings.setdefault(
634            "LOG_DIR", join(settings["STORAGE_DIR"], "log", program))
635        settings.setdefault(
636            "LOCAL_DATA_DIR", join(settings["STORAGE_DIR"], "whisper"))
637        settings.setdefault(
638            "WHITELISTS_DIR", join(settings["STORAGE_DIR"], "lists"))
639
640    # Read configuration options from program-specific section.
641    section = program[len("carbon-"):]
642    config = options["config"]
643
644    if not exists(config):
645        raise CarbonConfigException("Error: missing required config %r" % config)
646
647    settings.readFrom(config, section)
648    settings.setdefault("instance", options["instance"])
649    update_STORAGE_DIR_deps()
650
651    # If a specific instance of the program is specified, augment the settings
652    # with the instance-specific settings and provide sane defaults for
653    # optional settings.
654    if options["instance"]:
655        settings.readFrom(config,
656                          "%s:%s" % (section, options["instance"]))
657        settings["pidfile"] = (
658            options["pidfile"] or
659            join(settings["PID_DIR"], "%s-%s.pid" % (program, options["instance"])))
660        settings["LOG_DIR"] = (
661            options["logdir"] or
662            join(settings["LOG_DIR"], "%s-%s" % (program, options["instance"])))
663    else:
664        settings["pidfile"] = (
665            options["pidfile"] or join(settings["PID_DIR"], '%s.pid' % program))
666        settings["LOG_DIR"] = (options["logdir"] or settings["LOG_DIR"])
667
668    update_STORAGE_DIR_deps()
669    return settings
670