1#
2# Copyright (c) 2013-2014, PagerDuty, Inc. <info@pagerduty.com>
3# All rights reserved.
4#
5# Redistribution and use in source and binary forms, with or without
6# modification, are permitted provided that the following conditions are met:
7#
8#   * Redistributions of source code must retain the above copyright
9#     notice, this list of conditions and the following disclaimer.
10#   * Redistributions in binary form must reproduce the above copyright
11#     notice, this list of conditions and the following disclaimer in the
12#     documentation and/or other materials provided with the distribution.
13#   * Neither the name of the copyright holder nor the
14#     names of its contributors may be used to endorse or promote products
15#     derived from this software without specific prior written permission.
16#
17# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
21# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27# POSSIBILITY OF SUCH DAMAGE.
28#
29"""
30A directory based queue for PagerDuty events.
31
32Consists of two classes:
33- PDQEnqueuer which provides only enqueue functionality.
34- PDQueue which provides dequeue and queue management functionality.
35
36Notes:
37- Designed for multiple processes concurrently using the queue.
38- Each entry in the queue is written to a separate file in the
39    queue directory.
40- Files are named so that sorting by file name is queue order.
41- Concurrent enqueues use exclusive file create and retries to avoid
42    using the same file name.
43- Concurrent dequeues are serialized with an exclusive dequeue lock.
44- A dequeue will hold the exclusive lock until the consume callback
45    is done.
46- dequeue never block enqueue, and enqueue never blocks dequeue.
47"""
48
49
50import errno
51import logging
52import os
53
54from .constants import ConsumeEvent, EnqueueWarnings
55from .pdagentutil import ensure_readable_directory, ensure_writable_directory, \
56    utcnow_isoformat
57
58
59logger = logging.getLogger(__name__)
60
61
62QUEUE_SUBDIRS = ["pdq", "tmp", "suc", "err"]
63
64
65class EmptyQueueError(Exception):
66    pass
67
68
69class PDQueueBase(object):
70
71    def __init__(self, queue_dir, lock_class, time_calc):
72        self.queue_dir = queue_dir
73        self.lock_class = lock_class
74        self.time = time_calc
75
76    def _abspath(self, ftype, fname):
77        return os.path.join(self.queue_dir, ftype, fname)
78
79
80class PDQEnqueuer(PDQueueBase):
81
82    def __init__(
83            self,
84            queue_dir,
85            lock_class,
86            time_calc,
87            enqueue_file_mode,
88            default_umask
89            ):
90        PDQueueBase.__init__(self, queue_dir, lock_class, time_calc)
91        self.enqueue_file_mode = enqueue_file_mode
92        self.default_umask = default_umask
93
94        # Enqueue needs only write access to the 'tmp' and 'pdq' directories
95        ensure_writable_directory(os.path.join(self.queue_dir, "tmp"))
96        ensure_writable_directory(os.path.join(self.queue_dir, "pdq"))
97
98    def enqueue(self, service_key, s):
99        # write to an exclusive temp file
100        _, tmp_fname_abs, tmp_fd, problems = self._open_creat_excl_with_retry(
101            "tmp",
102            "%%d_%s.txt" % service_key
103            )
104        os.write(tmp_fd, s.encode())
105        os.close(tmp_fd)
106        # link to an exclusive queue entry file
107        pdq_fname, _ = self._link_with_retry(
108            "pdq",
109            "%%d_%s.txt" % service_key,
110            tmp_fname_abs
111            )
112        # unlink the temp file
113        os.unlink(tmp_fname_abs)
114        return pdq_fname, problems
115
116    def _open_creat_excl_with_retry(self, ftype, fname_fmt):
117        problems = []
118        # we're changing the umask globally here, because this is not supposed
119        # to be multi-threaded, and will not cause problems elsewhere.
120        orig_umask = os.umask(self.default_umask)
121        if self.enqueue_file_mode & orig_umask > 0:
122            # current user's umask is very restrictive.
123            problems.append(EnqueueWarnings.UMASK_TOO_RESTRICTIVE)
124        try:
125            n = 0
126            t_microsecs = int(self.time.time() * 1e6)
127            while True:
128                fname = fname_fmt % (t_microsecs + n)
129                fname_abs = self._abspath(ftype, fname)
130                fd = _open_creat_excl(fname_abs, self.enqueue_file_mode)
131                if fd is None:
132                    n += 1
133                    if n >= 100:
134                        raise Exception(
135                            "Too many retries! (Last attempted name: %s)"
136                            % fname_abs
137                            )
138                else:
139                    return fname, fname_abs, fd, problems
140        finally:
141            os.umask(orig_umask)
142
143    def _link_with_retry(self, ftype, fname_fmt, orig_abs):
144        n = 0
145        t_microsecs = int(self.time.time() * 1e6)
146        while True:
147            fname = fname_fmt % (t_microsecs + n)
148            fname_abs = self._abspath(ftype, fname)
149            if _link(orig_abs, fname_abs):
150                return fname, fname_abs
151            else:
152                n += 1
153                if n >= 100:
154                    raise Exception(
155                        "Too many retries! (Last attempted name: %s)"
156                        % fname_abs
157                        )
158
159
160class PDQueue(PDQueueBase):
161
162    def __init__(
163            self,
164            queue_dir,
165            lock_class,
166            time_calc,
167            event_size_max_bytes,
168            backoff_interval,
169            retry_limit_for_possible_errors,
170            backoff_db,
171            counter_db
172            ):
173        PDQueueBase.__init__(self, queue_dir, lock_class, time_calc)
174
175        for ftype in QUEUE_SUBDIRS:
176            d = os.path.join(self.queue_dir, ftype)
177            ensure_readable_directory(d)
178            ensure_writable_directory(d)
179
180        self._dequeue_lockfile = os.path.join(
181            self.queue_dir, "dequeue.lock"
182            )
183
184        self.event_size_max_bytes = event_size_max_bytes
185        self.backoff_info = _BackoffInfo(
186            backoff_db,
187            backoff_interval,
188            retry_limit_for_possible_errors,
189            time_calc
190            )
191        self.counter_info = _CounterInfo(counter_db, time_calc)
192
193    # Get the list of queued files from the queue directory in enqueue order
194    def _queued_files(self, ftype="pdq"):
195        fnames = sorted(os.listdir(os.path.join(self.queue_dir, ftype)))
196        return fnames
197
198    def dequeue(self, consume_func, stop_check_func=lambda: False):
199        # process only first event in queue.
200        self._process_queue(
201            lambda events: events[0:1],
202            consume_func,
203            stop_check_func
204            )
205
206    def flush(self, consume_func, stop_check_func):
207        # process all events in queue.
208        self._process_queue(
209            lambda events: events,
210            consume_func,
211            stop_check_func
212            )
213
214    def _process_queue(
215            self,
216            filter_events_to_process_func,
217            consume_func,
218            should_stop_func
219            ):
220
221        if not self._queued_files():
222            raise EmptyQueueError
223
224        lock = self.lock_class(self._dequeue_lockfile)
225        lock.acquire()
226
227        try:
228            file_names = self._queued_files()
229            if not len(file_names):
230                raise EmptyQueueError
231
232            file_names = filter_events_to_process_func(file_names)
233            if not len(file_names):
234                return
235
236            now = self.time.time()
237            err_svc_keys = set()
238
239            self.backoff_info.update()
240            for fname in file_names:
241                if should_stop_func():
242                    break
243                try:
244                    _, svc_key = _get_event_metadata(fname)
245                except _BadFileNameError:
246                    logger.warning("Badly named event " + fname)
247                    self._unsafe_change_event_type(fname, 'pdq', 'err')
248                    continue
249                if svc_key not in err_svc_keys and \
250                        self.backoff_info.get_current_retry_at(svc_key) <= now:
251                    # no back-off; nothing has gone wrong in this pass yet.
252                    try:
253                        if not self._process_event(
254                                fname, consume_func, svc_key
255                                ):
256                            # this service key is problematic.
257                            err_svc_keys.add(svc_key)
258                    except StopIteration:
259                        # no further processing must be done.
260                        logger.info("Not processing any more events this time")
261                        break
262
263            self.backoff_info.store()
264            self.counter_info.store()
265        finally:
266            lock.release()
267
268    # Returns true if processing can continue for service key, false if not.
269    def _process_event(self, fname, consume_func, svc_key):
270        fname_abs = self._abspath("pdq", fname)
271        data = None
272        if not os.path.getsize(fname_abs) > self.event_size_max_bytes:
273            with open(fname_abs) as f:
274                data = f.read()
275
276        # ensure that the event is not too large.
277        if data is None or len(data) > self.event_size_max_bytes:
278            logger.info(
279                "Not processing event %s -- it exceeds max-allowed size" %
280                fname)
281            self._unsafe_change_event_type(fname, 'pdq', 'err')
282            self.counter_info.increment_failure()
283            return True
284
285        logger.info("Processing event " + fname)
286        consume_code = consume_func(data, fname)
287
288        if consume_code == ConsumeEvent.CONSUMED:
289            # a failure here means duplicate event sends if the incident key
290            # was not specified, i.e. if event was enqueued in a non-standard
291            # manner (e.g. not using the pd* scripts.)
292            self._unsafe_change_event_type(fname, 'pdq', 'suc')
293            self.counter_info.increment_success()
294            return True
295        elif consume_code == ConsumeEvent.STOP_ALL:
296            # stop processing any more events.
297            raise StopIteration
298        elif consume_code == ConsumeEvent.BAD_ENTRY:
299            self._unsafe_change_event_type(fname, 'pdq', 'err')
300            self.counter_info.increment_failure()
301            return True
302        elif consume_code == ConsumeEvent.BACKOFF_SVCKEY_BAD_ENTRY:
303            logger.info("Backing off service key " + svc_key)
304            if self.backoff_info.is_threshold_breached(svc_key):
305                # time for stricter action -- mark event as bad.
306                logger.info(
307                    (
308                        "Service key %s breached back-off limit." +
309                        " Assuming bad event."
310                    ) %
311                    svc_key
312                    )
313                self._unsafe_change_event_type(fname, 'pdq', 'err')
314                self.counter_info.increment_failure()
315                # now that we have handled the bad entry, we'll want to
316                # give the other events in this service key a chance, so
317                # don't consider key as erroneous.
318                return True
319            else:
320                self.backoff_info.increment(svc_key)
321                return False
322        elif consume_code == ConsumeEvent.BACKOFF_SVCKEY_NOT_CONSUMED:
323            self.backoff_info.increment(svc_key)
324            return False
325        else:
326            raise ValueError(
327                "Unsupported dequeue consume code %d" %
328                consume_code)
329
330    def resurrect(self, service_key=None):
331        # move dead events of given service key back to queue.
332        errnames = self._queued_files("err")
333        count = 0
334        for errname in errnames:
335            try:
336                # even if we don't need to filter by service keys
337                # always parse the event file to check for _BadFileNameError
338                _, svc_key = _get_event_metadata(errname)
339                if not service_key or svc_key == service_key:
340                    self._unsafe_change_event_type(errname, 'err', 'pdq')
341                    count += 1
342            except _BadFileNameError:
343                # Don't resurrect badly named file
344                # TODO: log about this if logging will be available
345                pass
346        return count
347
348    def cleanup(self, delete_before_sec):
349        delete_before_time = int(self.time.time()) - delete_before_sec
350
351        def _cleanup_files(ftype):
352            fnames = self._queued_files(ftype)
353            for fname in fnames:
354                try:
355                    enqueue_time, _ = _get_event_metadata(fname)
356                except _BadFileNameError:
357                    logger.info(
358                        "Cleanup: ignoring invalid file name %s" % fname)
359                else:
360                    if enqueue_time < delete_before_time:
361                        try:
362                            logger.info("Cleanup: removing file %s" % fname)
363                            os.remove(self._abspath(ftype, fname))
364                        except IOError as e:
365                            logger.warning(
366                                "Could not clean up %s file %s: %s" %
367                                (ftype, fname, str(e))
368                                )
369
370        # clean up bad / temp / success files created before delete-before-time.
371        _cleanup_files("err")
372        _cleanup_files("tmp")
373        _cleanup_files("suc")
374
375    def get_stats(
376            self,
377            detailed_snapshot=False,  # to return success/error status too
378            per_service_key_snapshot=False,
379            service_key=None    # looked at only if per_service_key_snapshot
380            ):
381        """
382        Returns status of events. Status consists of snapshot stats (based on
383        current queue state), and historical stats (based on persisted state.)
384        Some states might be missing if there are no events in those states,
385        e.g. if there are no erroneous events, there might be no "failed_events"
386        entry.
387
388        Sample data returned:
389        {
390            "snapshot": {
391                "pending_events": {
392                    "count": 3,
393                    "newest_age_secs": 15,
394                    "oldest_age_secs": 40,
395                    "service_keys_count": 2
396                },
397                "succeeded_events": {
398                    "count": 3,
399                    "newest_age_secs": 5,
400                    "oldest_age_secs": 35,
401                    "service_keys_count": 2
402                },
403                "failed_events": {
404                    "count": 3,
405                    "newest_age_secs": 25,
406                    "oldest_age_secs": 45,
407                    "service_keys_count": 2
408                },
409                "throttled_service_keys_count": 1
410            },
411            "aggregate": {
412                "successful_events_count": 20,
413                "failed_events_count": 2,
414                "started_on": "2014-03-18T20:49:02Z"
415            }
416        }
417        """
418        now = self.time.time()
419
420        snapshot_stats = dict()
421
422        def add_stat(queue_file_type, stat_name):
423            for fname in self._queued_files(queue_file_type):
424                try:
425                    metadata = _get_event_metadata(fname)
426                except _BadFileNameError:
427                    continue
428                if per_service_key_snapshot:
429                    svc_key = metadata[1]
430                    if service_key and (svc_key != service_key):
431                        # stats required only for given service key.
432                        continue
433                    if not svc_key in snapshot_stats:
434                        # we encountered a new service key.
435                        snapshot_stats[svc_key] = dict()
436                    stats = snapshot_stats[svc_key]
437                else:
438                    stats = snapshot_stats
439                if stat_name not in stats:
440                    stats[stat_name] = SnapshotStats(now)
441                stats[stat_name].add_event(metadata)
442
443        add_stat("pdq", "pending_events")
444        if detailed_snapshot:
445            add_stat("suc", "succeeded_events")
446            add_stat("err", "failed_events")
447
448        if per_service_key_snapshot:
449            for svc_key in snapshot_stats:
450                svc_key_stats = snapshot_stats[svc_key]
451                for stat_name in svc_key_stats:
452                    svc_key_stats[stat_name] = \
453                        svc_key_stats[stat_name].to_dict()
454        else:
455            for stat_name in snapshot_stats:
456                snapshot_stats[stat_name] = snapshot_stats[stat_name].to_dict()
457
458        # if throttle info is required, compute from pre-loaded info.
459        # (we don't want to reload info if queue processing is underway.)
460        if self.backoff_info._current_retry_at:
461            throttled_keys = set()
462            now = int(self.time.time())
463            for key, retry_at in \
464                    self.backoff_info._current_retry_at.items():
465                if retry_at > now:
466                    throttled_keys.add(key)
467            snapshot_stats["throttled_service_keys_count"] = len(throttled_keys)
468
469        stats = {
470            "snapshot": snapshot_stats
471            }
472
473        # historical counter data for completed events (success, failure)
474        if self.counter_info._data:
475            stats["aggregate"] = self.counter_info._data
476
477        return stats
478
479    # This function can move error files back into regular files, so ensure that
480    # you have considered any concurrency-related consequences to other queue
481    # operations before invoking this function.
482    def _unsafe_change_event_type(self, event_name, frm, to):
483        logger.info("Changing %s type: %s -> %s..." % (event_name, frm, to))
484        old_abs = self._abspath(frm, event_name)
485        new_abs = self._abspath(to, event_name)
486        os.rename(old_abs, new_abs)
487
488
489def _open_creat_excl(fname_abs, mode):
490    try:
491        return os.open(fname_abs, os.O_WRONLY | os.O_CREAT | os.O_EXCL, mode)
492    except OSError as e:
493        if e.errno == errno.EEXIST:
494            return None
495        else:
496            raise
497
498
499def _link(orig_abs, new_abs):
500    try:
501        os.link(orig_abs, new_abs)
502        return True
503    except OSError as e:
504        if e.errno == errno.EEXIST:
505            return False
506        else:
507            raise
508
509
510class _BadFileNameError(Exception):
511    pass
512
513
514def _get_event_metadata(fname):
515    if not fname.endswith(".txt"):
516        raise _BadFileNameError
517    fname = fname[:-4]
518    try:
519        enqueue_time_microsec_str, service_key = fname.split('_', 1)
520        enqueue_time = int(enqueue_time_microsec_str) / (1000 * 1000)
521        return enqueue_time, service_key
522    except ValueError:
523        raise _BadFileNameError
524
525
526class _BackoffInfo(object):
527    """
528    Loads, accesses, modifies and saves back-off info for
529    service keys in queue.
530    """
531
532    def __init__(
533            self,
534            backoff_db,
535            backoff_interval,
536            retry_limit_for_possible_errors,
537            time_calc
538            ):
539        self._db = backoff_db
540        self._backoff_interval = backoff_interval
541        self._retry_limit_for_possible_errors = retry_limit_for_possible_errors
542        self._time = time_calc
543        try:
544            data = self._db.get()
545        except:
546            logger.warning(
547                "Unable to load service-key back-off history",
548                exc_info=True
549                )
550            data = None
551        if not data:
552            # no db yet, or errors during db read.
553            data = {
554                'attempts': {},
555                'next_retries': {}
556                }
557        self._previous_attempts = {}
558        self._current_attempts = data['attempts']
559        self._current_retry_at = data['next_retries']
560        self.update()
561
562    # returns true if `current-attempts`, or `previous-attempts + 1`,
563    # results in a threshold breach of retry-limit.
564    def is_threshold_breached(self, svc_key):
565        cur_attempt = self._current_attempts.get(
566            svc_key,
567            self._previous_attempts.get(svc_key, 0) + 1)
568        return cur_attempt > self._retry_limit_for_possible_errors
569
570    # returns the current retry-at time for svc_key, or 0 if not available.
571    def get_current_retry_at(self, svc_key):
572        return self._current_retry_at.get(svc_key, 0)
573
574    # updates current attempt and retry data based on previous data.
575    # Note that this doesn't check for threshold breach because the threshold is
576    # not required for all situations (e.g. back off due to throttling.)
577    def increment(self, svc_key):
578        logger.info(
579            "Retrying events in service key %s after %d sec" %
580            (svc_key, self._backoff_interval)
581            )
582
583        self._current_attempts[svc_key] = \
584            self._previous_attempts.get(svc_key, 0) + 1
585        self._current_retry_at[svc_key] = int(
586            self._time.time() + self._backoff_interval
587            )
588
589    # only retains data that is still valid at current time.
590    def update(self):
591        time_now = self._time.time()
592        new_attempts = {}
593        new_retry_at = {}
594
595        # copy over all still-unexpired current back-offs to new data.
596        for (svc_key, retry_at) in self._current_retry_at.items():
597            if retry_at > time_now:
598                new_attempts[svc_key] = self._current_attempts.get(svc_key)
599                new_retry_at[svc_key] = retry_at
600
601        # we'll still hold on to previous attempts data so we can use it to
602        # compute new current data if required later.
603        self._previous_attempts = self._current_attempts
604        self._current_attempts = new_attempts
605        self._current_retry_at = new_retry_at
606
607    # persists current back-off info.
608    def store(self):
609        try:
610            self._db.set({
611                'attempts': self._current_attempts,
612                'next_retries': self._current_retry_at
613                })
614        except:
615            logger.warning(
616                "Unable to save service-key back-off history",
617                exc_info=True)
618
619
620
621class _CounterInfo(object):
622    """
623    Loads, accesses, modifies and saves counters for processed events.
624    """
625
626    def __init__(self, counter_db, time_calc):
627        self._db = counter_db
628        self._data = {}
629        self._time = time_calc
630
631        # try to load data.
632        try:
633            self._data = self._db.get()
634        except:
635            logger.error("Unable to load counter history", exc_info=True)
636        if not self._data:
637            self._reset_data()
638
639        # validate that counter values are indeed integers. If not, reset data.
640        for key in (k for k in self._data if k != "started_on"):
641            if not isinstance(self._data[key], int):
642                logger.error(
643                    "Invalid counter value %s=%s" % (key, self._data[key])
644                    )
645                logger.warning("Resetting counter history")
646                self._reset_data()
647                break
648
649        # Try to persist loaded data. If we can't persist, we'll want to reset
650        # the data because we don't know for how long we haven't been able to
651        # persist. Instead of updating the currently-loaded old counters,
652        # potentially resulting in incorrect values, we'll just consider the
653        # persisted data invalid.
654        self.store(reset_data_if_failed=True)
655
656    # increments success count by 1.
657    def increment_success(self):
658        self._increment("successful_events_count")
659
660    # increments failure count by 1.
661    def increment_failure(self):
662        self._increment("failed_events_count")
663
664    # increments count of given type by 1.
665    def _increment(self, counter_type):
666        self._data[counter_type] = self._data.get(counter_type, 0) + 1
667
668    # persists current counter history.
669    def store(self, reset_data_if_failed=False):
670        try:
671            self._db.set(self._data)
672        except:
673            logger.error("Unable to save counter history", exc_info=True)
674            if reset_data_if_failed:
675                logger.warning("Resetting counter history")
676                self._reset_data()
677
678    def _reset_data(self):
679        self._data = {
680            "started_on": utcnow_isoformat(self._time)
681            }
682
683
684class SnapshotStats(object):
685    """
686    Stats based on snapshot of queue.
687    """
688    def __init__(self, time_now):
689        self.count = 0
690        self.oldest_enqueue_time = None
691        self.newest_enqueue_time = None
692        self.service_keys = set()
693        self._time_now = time_now
694
695    def add_event(self, event_metadata):
696        enqueue_time, svc_key = event_metadata
697
698        self.count += 1
699        if (not self.oldest_enqueue_time) or \
700                enqueue_time < self.oldest_enqueue_time:
701            self.oldest_enqueue_time = enqueue_time
702        if (not self.newest_enqueue_time) or \
703                enqueue_time > self.newest_enqueue_time:
704            self.newest_enqueue_time = enqueue_time
705
706        self.service_keys.add(svc_key)
707
708    def to_dict(self):
709        if self.count:
710            return {
711                "count": self.count,
712                "oldest_age_secs": int(
713                    self._time_now - self.oldest_enqueue_time
714                    ),
715                "newest_age_secs": int(
716                    self._time_now - self.newest_enqueue_time
717                    ),
718                "service_keys_count": len(self.service_keys)
719                }
720        else:
721            return {
722                "count": self.count
723                }
724