1# 2# Copyright (c) 2013-2014, PagerDuty, Inc. <info@pagerduty.com> 3# All rights reserved. 4# 5# Redistribution and use in source and binary forms, with or without 6# modification, are permitted provided that the following conditions are met: 7# 8# * Redistributions of source code must retain the above copyright 9# notice, this list of conditions and the following disclaimer. 10# * Redistributions in binary form must reproduce the above copyright 11# notice, this list of conditions and the following disclaimer in the 12# documentation and/or other materials provided with the distribution. 13# * Neither the name of the copyright holder nor the 14# names of its contributors may be used to endorse or promote products 15# derived from this software without specific prior written permission. 16# 17# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 18# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 19# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 20# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 21# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 22# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 23# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 24# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 25# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 26# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 27# POSSIBILITY OF SUCH DAMAGE. 28# 29 30 31import json 32import logging 33import time 34 35import pdagent 36from pdagent import http 37from pdagent.constants import HEARTBEAT_URI 38from pdagent.pdthread import RepeatingTask 39from pdagent.thirdparty.six.moves.http_client import HTTPException 40from pdagent.thirdparty.six.moves.urllib.request import Request 41from pdagent.thirdparty.six.moves.urllib.error import URLError, HTTPError 42 43logger = logging.getLogger(__name__) 44 45 46RETRY_GAP_SECS = 10 47HEARTBEAT_MAX_RETRIES = 10 48 49 50class HeartbeatTask(RepeatingTask): 51 52 def __init__( 53 self, 54 heartbeat_interval_secs, 55 agent_id, 56 pd_queue, 57 system_info, 58 source_address='0.0.0.0', 59 ): 60 RepeatingTask.__init__(self, heartbeat_interval_secs, True) 61 self._agent_id = agent_id 62 self._pd_queue = pd_queue 63 self._system_info = system_info 64 # The following variables exist to ease unit testing: 65 self._source_address = source_address 66 self._urllib2 = http 67 self._retry_gap_secs = RETRY_GAP_SECS 68 self._heartbeat_max_retries = HEARTBEAT_MAX_RETRIES 69 70 def tick(self): 71 try: 72 logger.debug("Sending heartbeat") 73 # max time is half an interval 74 retry_time_limit = time.time() + (self.get_interval_secs() // 2) 75 attempt_number = 0 76 while not self.is_stop_invoked(): 77 attempt_number += 1 78 try: 79 heartbeat_data = self._make_heartbeat_data() 80 response_str = self._heartbeat(heartbeat_data) 81 logger.debug("Heartbeat successful!") 82 self._system_info = None # only send in first heartbeat. 83 if response_str: 84 self._process_response(response_str) 85 break 86 except HTTPError as e: 87 # retry for 5xx errors 88 if 500 <= e.getcode() < 600: 89 logger.error( 90 "HTTPError sending heartbeat (will retry): %s" % e 91 ) 92 else: 93 error = HTTPError(e.url, e.code, e.msg, e.hdrs, None) 94 raise error 95 except (URLError, HTTPException) as e: 96 # assumes 2.6 where socket.error is a sub-class of IOError 97 # FIXME: not catching IOError so what does the above mean? 98 logger.error( 99 "Error sending heartbeat (will retry): %s" % e 100 ) 101 logger.debug("Traceback:", exc_info=True) 102 # retry limit checks 103 if time.time() > retry_time_limit: 104 logger.info("Won't retry - time limit reached") 105 break 106 if attempt_number >= self._heartbeat_max_retries: 107 logger.info("Won't retry - attempt count limit reached") 108 break 109 # sleep before retry 110 logger.debug("Sleeping before retry...") 111 for _ in range(self._retry_gap_secs): 112 if self.is_stop_invoked(): 113 break 114 time.sleep(1) 115 else: 116 logger.debug("Retrying...") 117 except: 118 logger.error( 119 "Error sending heartbeat (won't retry):", 120 exc_info=True 121 ) 122 123 def _make_heartbeat_data(self): 124 hb_data = { 125 "agent_id": self._agent_id, 126 "agent_version": pdagent.__version__, 127 "agent_stats": self._pd_queue.get_stats() 128 } 129 if self._system_info: 130 hb_data["system_info"] = self._system_info 131 return hb_data 132 133 def _heartbeat(self, heartbeat_data): 134 # Note that Request here is from urllib2, not self._urllib2. 135 request = Request(HEARTBEAT_URI) 136 request.add_header("Content-Type", "application/json") 137 heartbeat_json_str = json.dumps(heartbeat_data) 138 request.data = heartbeat_json_str 139 response = self._urllib2.urlopen(request, 140 source_address=self._source_address) 141 response_str = response.read() 142 response.close() 143 return response_str 144 145 def _process_response(self, response_str): 146 try: 147 result = json.loads(response_str) 148 except: 149 logger.warning( 150 "Error reading heartbeat response data:", 151 exc_info=True 152 ) 153 else: 154 heartbeat_interval_secs = result.get("heartbeat_interval_secs") 155 if heartbeat_interval_secs: 156 self.set_interval_secs(heartbeat_interval_secs) 157