1# 2# Copyright (c) 2013-2014, PagerDuty, Inc. <info@pagerduty.com> 3# All rights reserved. 4# 5# Redistribution and use in source and binary forms, with or without 6# modification, are permitted provided that the following conditions are met: 7# 8# * Redistributions of source code must retain the above copyright 9# notice, this list of conditions and the following disclaimer. 10# * Redistributions in binary form must reproduce the above copyright 11# notice, this list of conditions and the following disclaimer in the 12# documentation and/or other materials provided with the distribution. 13# * Neither the name of the copyright holder nor the 14# names of its contributors may be used to endorse or promote products 15# derived from this software without specific prior written permission. 16# 17# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 18# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 19# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 20# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 21# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 22# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 23# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 24# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 25# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 26# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 27# POSSIBILITY OF SUCH DAMAGE. 28# 29""" 30A directory based queue for PagerDuty events. 31 32Consists of two classes: 33- PDQEnqueuer which provides only enqueue functionality. 34- PDQueue which provides dequeue and queue management functionality. 35 36Notes: 37- Designed for multiple processes concurrently using the queue. 38- Each entry in the queue is written to a separate file in the 39 queue directory. 40- Files are named so that sorting by file name is queue order. 41- Concurrent enqueues use exclusive file create and retries to avoid 42 using the same file name. 43- Concurrent dequeues are serialized with an exclusive dequeue lock. 44- A dequeue will hold the exclusive lock until the consume callback 45 is done. 46- dequeue never block enqueue, and enqueue never blocks dequeue. 47""" 48 49 50import errno 51import logging 52import os 53 54from .constants import ConsumeEvent, EnqueueWarnings 55from .pdagentutil import ensure_readable_directory, ensure_writable_directory, \ 56 utcnow_isoformat 57 58 59logger = logging.getLogger(__name__) 60 61 62QUEUE_SUBDIRS = ["pdq", "tmp", "suc", "err"] 63 64 65class EmptyQueueError(Exception): 66 pass 67 68 69class PDQueueBase(object): 70 71 def __init__(self, queue_dir, lock_class, time_calc): 72 self.queue_dir = queue_dir 73 self.lock_class = lock_class 74 self.time = time_calc 75 76 def _abspath(self, ftype, fname): 77 return os.path.join(self.queue_dir, ftype, fname) 78 79 80class PDQEnqueuer(PDQueueBase): 81 82 def __init__( 83 self, 84 queue_dir, 85 lock_class, 86 time_calc, 87 enqueue_file_mode, 88 default_umask 89 ): 90 PDQueueBase.__init__(self, queue_dir, lock_class, time_calc) 91 self.enqueue_file_mode = enqueue_file_mode 92 self.default_umask = default_umask 93 94 # Enqueue needs only write access to the 'tmp' and 'pdq' directories 95 ensure_writable_directory(os.path.join(self.queue_dir, "tmp")) 96 ensure_writable_directory(os.path.join(self.queue_dir, "pdq")) 97 98 def enqueue(self, service_key, s): 99 # write to an exclusive temp file 100 _, tmp_fname_abs, tmp_fd, problems = self._open_creat_excl_with_retry( 101 "tmp", 102 "%%d_%s.txt" % service_key 103 ) 104 os.write(tmp_fd, s.encode()) 105 os.close(tmp_fd) 106 # link to an exclusive queue entry file 107 pdq_fname, _ = self._link_with_retry( 108 "pdq", 109 "%%d_%s.txt" % service_key, 110 tmp_fname_abs 111 ) 112 # unlink the temp file 113 os.unlink(tmp_fname_abs) 114 return pdq_fname, problems 115 116 def _open_creat_excl_with_retry(self, ftype, fname_fmt): 117 problems = [] 118 # we're changing the umask globally here, because this is not supposed 119 # to be multi-threaded, and will not cause problems elsewhere. 120 orig_umask = os.umask(self.default_umask) 121 if self.enqueue_file_mode & orig_umask > 0: 122 # current user's umask is very restrictive. 123 problems.append(EnqueueWarnings.UMASK_TOO_RESTRICTIVE) 124 try: 125 n = 0 126 t_microsecs = int(self.time.time() * 1e6) 127 while True: 128 fname = fname_fmt % (t_microsecs + n) 129 fname_abs = self._abspath(ftype, fname) 130 fd = _open_creat_excl(fname_abs, self.enqueue_file_mode) 131 if fd is None: 132 n += 1 133 if n >= 100: 134 raise Exception( 135 "Too many retries! (Last attempted name: %s)" 136 % fname_abs 137 ) 138 else: 139 return fname, fname_abs, fd, problems 140 finally: 141 os.umask(orig_umask) 142 143 def _link_with_retry(self, ftype, fname_fmt, orig_abs): 144 n = 0 145 t_microsecs = int(self.time.time() * 1e6) 146 while True: 147 fname = fname_fmt % (t_microsecs + n) 148 fname_abs = self._abspath(ftype, fname) 149 if _link(orig_abs, fname_abs): 150 return fname, fname_abs 151 else: 152 n += 1 153 if n >= 100: 154 raise Exception( 155 "Too many retries! (Last attempted name: %s)" 156 % fname_abs 157 ) 158 159 160class PDQueue(PDQueueBase): 161 162 def __init__( 163 self, 164 queue_dir, 165 lock_class, 166 time_calc, 167 event_size_max_bytes, 168 backoff_interval, 169 retry_limit_for_possible_errors, 170 backoff_db, 171 counter_db 172 ): 173 PDQueueBase.__init__(self, queue_dir, lock_class, time_calc) 174 175 for ftype in QUEUE_SUBDIRS: 176 d = os.path.join(self.queue_dir, ftype) 177 ensure_readable_directory(d) 178 ensure_writable_directory(d) 179 180 self._dequeue_lockfile = os.path.join( 181 self.queue_dir, "dequeue.lock" 182 ) 183 184 self.event_size_max_bytes = event_size_max_bytes 185 self.backoff_info = _BackoffInfo( 186 backoff_db, 187 backoff_interval, 188 retry_limit_for_possible_errors, 189 time_calc 190 ) 191 self.counter_info = _CounterInfo(counter_db, time_calc) 192 193 # Get the list of queued files from the queue directory in enqueue order 194 def _queued_files(self, ftype="pdq"): 195 fnames = sorted(os.listdir(os.path.join(self.queue_dir, ftype))) 196 return fnames 197 198 def dequeue(self, consume_func, stop_check_func=lambda: False): 199 # process only first event in queue. 200 self._process_queue( 201 lambda events: events[0:1], 202 consume_func, 203 stop_check_func 204 ) 205 206 def flush(self, consume_func, stop_check_func): 207 # process all events in queue. 208 self._process_queue( 209 lambda events: events, 210 consume_func, 211 stop_check_func 212 ) 213 214 def _process_queue( 215 self, 216 filter_events_to_process_func, 217 consume_func, 218 should_stop_func 219 ): 220 221 if not self._queued_files(): 222 raise EmptyQueueError 223 224 lock = self.lock_class(self._dequeue_lockfile) 225 lock.acquire() 226 227 try: 228 file_names = self._queued_files() 229 if not len(file_names): 230 raise EmptyQueueError 231 232 file_names = filter_events_to_process_func(file_names) 233 if not len(file_names): 234 return 235 236 now = self.time.time() 237 err_svc_keys = set() 238 239 self.backoff_info.update() 240 for fname in file_names: 241 if should_stop_func(): 242 break 243 try: 244 _, svc_key = _get_event_metadata(fname) 245 except _BadFileNameError: 246 logger.warning("Badly named event " + fname) 247 self._unsafe_change_event_type(fname, 'pdq', 'err') 248 continue 249 if svc_key not in err_svc_keys and \ 250 self.backoff_info.get_current_retry_at(svc_key) <= now: 251 # no back-off; nothing has gone wrong in this pass yet. 252 try: 253 if not self._process_event( 254 fname, consume_func, svc_key 255 ): 256 # this service key is problematic. 257 err_svc_keys.add(svc_key) 258 except StopIteration: 259 # no further processing must be done. 260 logger.info("Not processing any more events this time") 261 break 262 263 self.backoff_info.store() 264 self.counter_info.store() 265 finally: 266 lock.release() 267 268 # Returns true if processing can continue for service key, false if not. 269 def _process_event(self, fname, consume_func, svc_key): 270 fname_abs = self._abspath("pdq", fname) 271 data = None 272 if not os.path.getsize(fname_abs) > self.event_size_max_bytes: 273 with open(fname_abs) as f: 274 data = f.read() 275 276 # ensure that the event is not too large. 277 if data is None or len(data) > self.event_size_max_bytes: 278 logger.info( 279 "Not processing event %s -- it exceeds max-allowed size" % 280 fname) 281 self._unsafe_change_event_type(fname, 'pdq', 'err') 282 self.counter_info.increment_failure() 283 return True 284 285 logger.info("Processing event " + fname) 286 consume_code = consume_func(data, fname) 287 288 if consume_code == ConsumeEvent.CONSUMED: 289 # a failure here means duplicate event sends if the incident key 290 # was not specified, i.e. if event was enqueued in a non-standard 291 # manner (e.g. not using the pd* scripts.) 292 self._unsafe_change_event_type(fname, 'pdq', 'suc') 293 self.counter_info.increment_success() 294 return True 295 elif consume_code == ConsumeEvent.STOP_ALL: 296 # stop processing any more events. 297 raise StopIteration 298 elif consume_code == ConsumeEvent.BAD_ENTRY: 299 self._unsafe_change_event_type(fname, 'pdq', 'err') 300 self.counter_info.increment_failure() 301 return True 302 elif consume_code == ConsumeEvent.BACKOFF_SVCKEY_BAD_ENTRY: 303 logger.info("Backing off service key " + svc_key) 304 if self.backoff_info.is_threshold_breached(svc_key): 305 # time for stricter action -- mark event as bad. 306 logger.info( 307 ( 308 "Service key %s breached back-off limit." + 309 " Assuming bad event." 310 ) % 311 svc_key 312 ) 313 self._unsafe_change_event_type(fname, 'pdq', 'err') 314 self.counter_info.increment_failure() 315 # now that we have handled the bad entry, we'll want to 316 # give the other events in this service key a chance, so 317 # don't consider key as erroneous. 318 return True 319 else: 320 self.backoff_info.increment(svc_key) 321 return False 322 elif consume_code == ConsumeEvent.BACKOFF_SVCKEY_NOT_CONSUMED: 323 self.backoff_info.increment(svc_key) 324 return False 325 else: 326 raise ValueError( 327 "Unsupported dequeue consume code %d" % 328 consume_code) 329 330 def resurrect(self, service_key=None): 331 # move dead events of given service key back to queue. 332 errnames = self._queued_files("err") 333 count = 0 334 for errname in errnames: 335 try: 336 # even if we don't need to filter by service keys 337 # always parse the event file to check for _BadFileNameError 338 _, svc_key = _get_event_metadata(errname) 339 if not service_key or svc_key == service_key: 340 self._unsafe_change_event_type(errname, 'err', 'pdq') 341 count += 1 342 except _BadFileNameError: 343 # Don't resurrect badly named file 344 # TODO: log about this if logging will be available 345 pass 346 return count 347 348 def cleanup(self, delete_before_sec): 349 delete_before_time = int(self.time.time()) - delete_before_sec 350 351 def _cleanup_files(ftype): 352 fnames = self._queued_files(ftype) 353 for fname in fnames: 354 try: 355 enqueue_time, _ = _get_event_metadata(fname) 356 except _BadFileNameError: 357 logger.info( 358 "Cleanup: ignoring invalid file name %s" % fname) 359 else: 360 if enqueue_time < delete_before_time: 361 try: 362 logger.info("Cleanup: removing file %s" % fname) 363 os.remove(self._abspath(ftype, fname)) 364 except IOError as e: 365 logger.warning( 366 "Could not clean up %s file %s: %s" % 367 (ftype, fname, str(e)) 368 ) 369 370 # clean up bad / temp / success files created before delete-before-time. 371 _cleanup_files("err") 372 _cleanup_files("tmp") 373 _cleanup_files("suc") 374 375 def get_stats( 376 self, 377 detailed_snapshot=False, # to return success/error status too 378 per_service_key_snapshot=False, 379 service_key=None # looked at only if per_service_key_snapshot 380 ): 381 """ 382 Returns status of events. Status consists of snapshot stats (based on 383 current queue state), and historical stats (based on persisted state.) 384 Some states might be missing if there are no events in those states, 385 e.g. if there are no erroneous events, there might be no "failed_events" 386 entry. 387 388 Sample data returned: 389 { 390 "snapshot": { 391 "pending_events": { 392 "count": 3, 393 "newest_age_secs": 15, 394 "oldest_age_secs": 40, 395 "service_keys_count": 2 396 }, 397 "succeeded_events": { 398 "count": 3, 399 "newest_age_secs": 5, 400 "oldest_age_secs": 35, 401 "service_keys_count": 2 402 }, 403 "failed_events": { 404 "count": 3, 405 "newest_age_secs": 25, 406 "oldest_age_secs": 45, 407 "service_keys_count": 2 408 }, 409 "throttled_service_keys_count": 1 410 }, 411 "aggregate": { 412 "successful_events_count": 20, 413 "failed_events_count": 2, 414 "started_on": "2014-03-18T20:49:02Z" 415 } 416 } 417 """ 418 now = self.time.time() 419 420 snapshot_stats = dict() 421 422 def add_stat(queue_file_type, stat_name): 423 for fname in self._queued_files(queue_file_type): 424 try: 425 metadata = _get_event_metadata(fname) 426 except _BadFileNameError: 427 continue 428 if per_service_key_snapshot: 429 svc_key = metadata[1] 430 if service_key and (svc_key != service_key): 431 # stats required only for given service key. 432 continue 433 if not svc_key in snapshot_stats: 434 # we encountered a new service key. 435 snapshot_stats[svc_key] = dict() 436 stats = snapshot_stats[svc_key] 437 else: 438 stats = snapshot_stats 439 if stat_name not in stats: 440 stats[stat_name] = SnapshotStats(now) 441 stats[stat_name].add_event(metadata) 442 443 add_stat("pdq", "pending_events") 444 if detailed_snapshot: 445 add_stat("suc", "succeeded_events") 446 add_stat("err", "failed_events") 447 448 if per_service_key_snapshot: 449 for svc_key in snapshot_stats: 450 svc_key_stats = snapshot_stats[svc_key] 451 for stat_name in svc_key_stats: 452 svc_key_stats[stat_name] = \ 453 svc_key_stats[stat_name].to_dict() 454 else: 455 for stat_name in snapshot_stats: 456 snapshot_stats[stat_name] = snapshot_stats[stat_name].to_dict() 457 458 # if throttle info is required, compute from pre-loaded info. 459 # (we don't want to reload info if queue processing is underway.) 460 if self.backoff_info._current_retry_at: 461 throttled_keys = set() 462 now = int(self.time.time()) 463 for key, retry_at in \ 464 self.backoff_info._current_retry_at.items(): 465 if retry_at > now: 466 throttled_keys.add(key) 467 snapshot_stats["throttled_service_keys_count"] = len(throttled_keys) 468 469 stats = { 470 "snapshot": snapshot_stats 471 } 472 473 # historical counter data for completed events (success, failure) 474 if self.counter_info._data: 475 stats["aggregate"] = self.counter_info._data 476 477 return stats 478 479 # This function can move error files back into regular files, so ensure that 480 # you have considered any concurrency-related consequences to other queue 481 # operations before invoking this function. 482 def _unsafe_change_event_type(self, event_name, frm, to): 483 logger.info("Changing %s type: %s -> %s..." % (event_name, frm, to)) 484 old_abs = self._abspath(frm, event_name) 485 new_abs = self._abspath(to, event_name) 486 os.rename(old_abs, new_abs) 487 488 489def _open_creat_excl(fname_abs, mode): 490 try: 491 return os.open(fname_abs, os.O_WRONLY | os.O_CREAT | os.O_EXCL, mode) 492 except OSError as e: 493 if e.errno == errno.EEXIST: 494 return None 495 else: 496 raise 497 498 499def _link(orig_abs, new_abs): 500 try: 501 os.link(orig_abs, new_abs) 502 return True 503 except OSError as e: 504 if e.errno == errno.EEXIST: 505 return False 506 else: 507 raise 508 509 510class _BadFileNameError(Exception): 511 pass 512 513 514def _get_event_metadata(fname): 515 if not fname.endswith(".txt"): 516 raise _BadFileNameError 517 fname = fname[:-4] 518 try: 519 enqueue_time_microsec_str, service_key = fname.split('_', 1) 520 enqueue_time = int(enqueue_time_microsec_str) / (1000 * 1000) 521 return enqueue_time, service_key 522 except ValueError: 523 raise _BadFileNameError 524 525 526class _BackoffInfo(object): 527 """ 528 Loads, accesses, modifies and saves back-off info for 529 service keys in queue. 530 """ 531 532 def __init__( 533 self, 534 backoff_db, 535 backoff_interval, 536 retry_limit_for_possible_errors, 537 time_calc 538 ): 539 self._db = backoff_db 540 self._backoff_interval = backoff_interval 541 self._retry_limit_for_possible_errors = retry_limit_for_possible_errors 542 self._time = time_calc 543 try: 544 data = self._db.get() 545 except: 546 logger.warning( 547 "Unable to load service-key back-off history", 548 exc_info=True 549 ) 550 data = None 551 if not data: 552 # no db yet, or errors during db read. 553 data = { 554 'attempts': {}, 555 'next_retries': {} 556 } 557 self._previous_attempts = {} 558 self._current_attempts = data['attempts'] 559 self._current_retry_at = data['next_retries'] 560 self.update() 561 562 # returns true if `current-attempts`, or `previous-attempts + 1`, 563 # results in a threshold breach of retry-limit. 564 def is_threshold_breached(self, svc_key): 565 cur_attempt = self._current_attempts.get( 566 svc_key, 567 self._previous_attempts.get(svc_key, 0) + 1) 568 return cur_attempt > self._retry_limit_for_possible_errors 569 570 # returns the current retry-at time for svc_key, or 0 if not available. 571 def get_current_retry_at(self, svc_key): 572 return self._current_retry_at.get(svc_key, 0) 573 574 # updates current attempt and retry data based on previous data. 575 # Note that this doesn't check for threshold breach because the threshold is 576 # not required for all situations (e.g. back off due to throttling.) 577 def increment(self, svc_key): 578 logger.info( 579 "Retrying events in service key %s after %d sec" % 580 (svc_key, self._backoff_interval) 581 ) 582 583 self._current_attempts[svc_key] = \ 584 self._previous_attempts.get(svc_key, 0) + 1 585 self._current_retry_at[svc_key] = int( 586 self._time.time() + self._backoff_interval 587 ) 588 589 # only retains data that is still valid at current time. 590 def update(self): 591 time_now = self._time.time() 592 new_attempts = {} 593 new_retry_at = {} 594 595 # copy over all still-unexpired current back-offs to new data. 596 for (svc_key, retry_at) in self._current_retry_at.items(): 597 if retry_at > time_now: 598 new_attempts[svc_key] = self._current_attempts.get(svc_key) 599 new_retry_at[svc_key] = retry_at 600 601 # we'll still hold on to previous attempts data so we can use it to 602 # compute new current data if required later. 603 self._previous_attempts = self._current_attempts 604 self._current_attempts = new_attempts 605 self._current_retry_at = new_retry_at 606 607 # persists current back-off info. 608 def store(self): 609 try: 610 self._db.set({ 611 'attempts': self._current_attempts, 612 'next_retries': self._current_retry_at 613 }) 614 except: 615 logger.warning( 616 "Unable to save service-key back-off history", 617 exc_info=True) 618 619 620 621class _CounterInfo(object): 622 """ 623 Loads, accesses, modifies and saves counters for processed events. 624 """ 625 626 def __init__(self, counter_db, time_calc): 627 self._db = counter_db 628 self._data = {} 629 self._time = time_calc 630 631 # try to load data. 632 try: 633 self._data = self._db.get() 634 except: 635 logger.error("Unable to load counter history", exc_info=True) 636 if not self._data: 637 self._reset_data() 638 639 # validate that counter values are indeed integers. If not, reset data. 640 for key in (k for k in self._data if k != "started_on"): 641 if not isinstance(self._data[key], int): 642 logger.error( 643 "Invalid counter value %s=%s" % (key, self._data[key]) 644 ) 645 logger.warning("Resetting counter history") 646 self._reset_data() 647 break 648 649 # Try to persist loaded data. If we can't persist, we'll want to reset 650 # the data because we don't know for how long we haven't been able to 651 # persist. Instead of updating the currently-loaded old counters, 652 # potentially resulting in incorrect values, we'll just consider the 653 # persisted data invalid. 654 self.store(reset_data_if_failed=True) 655 656 # increments success count by 1. 657 def increment_success(self): 658 self._increment("successful_events_count") 659 660 # increments failure count by 1. 661 def increment_failure(self): 662 self._increment("failed_events_count") 663 664 # increments count of given type by 1. 665 def _increment(self, counter_type): 666 self._data[counter_type] = self._data.get(counter_type, 0) + 1 667 668 # persists current counter history. 669 def store(self, reset_data_if_failed=False): 670 try: 671 self._db.set(self._data) 672 except: 673 logger.error("Unable to save counter history", exc_info=True) 674 if reset_data_if_failed: 675 logger.warning("Resetting counter history") 676 self._reset_data() 677 678 def _reset_data(self): 679 self._data = { 680 "started_on": utcnow_isoformat(self._time) 681 } 682 683 684class SnapshotStats(object): 685 """ 686 Stats based on snapshot of queue. 687 """ 688 def __init__(self, time_now): 689 self.count = 0 690 self.oldest_enqueue_time = None 691 self.newest_enqueue_time = None 692 self.service_keys = set() 693 self._time_now = time_now 694 695 def add_event(self, event_metadata): 696 enqueue_time, svc_key = event_metadata 697 698 self.count += 1 699 if (not self.oldest_enqueue_time) or \ 700 enqueue_time < self.oldest_enqueue_time: 701 self.oldest_enqueue_time = enqueue_time 702 if (not self.newest_enqueue_time) or \ 703 enqueue_time > self.newest_enqueue_time: 704 self.newest_enqueue_time = enqueue_time 705 706 self.service_keys.add(svc_key) 707 708 def to_dict(self): 709 if self.count: 710 return { 711 "count": self.count, 712 "oldest_age_secs": int( 713 self._time_now - self.oldest_enqueue_time 714 ), 715 "newest_age_secs": int( 716 self._time_now - self.newest_enqueue_time 717 ), 718 "service_keys_count": len(self.service_keys) 719 } 720 else: 721 return { 722 "count": self.count 723 } 724