1#
2# Copyright (c) 2013-2014, PagerDuty, Inc. <info@pagerduty.com>
3# All rights reserved.
4#
5# Redistribution and use in source and binary forms, with or without
6# modification, are permitted provided that the following conditions are met:
7#
8#   * Redistributions of source code must retain the above copyright
9#     notice, this list of conditions and the following disclaimer.
10#   * Redistributions in binary form must reproduce the above copyright
11#     notice, this list of conditions and the following disclaimer in the
12#     documentation and/or other materials provided with the distribution.
13#   * Neither the name of the copyright holder nor the
14#     names of its contributors may be used to endorse or promote products
15#     derived from this software without specific prior written permission.
16#
17# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
21# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27# POSSIBILITY OF SUCH DAMAGE.
28#
29
30
31import json
32import logging
33import time
34
35import pdagent
36from pdagent import http
37from pdagent.constants import HEARTBEAT_URI
38from pdagent.pdthread import RepeatingTask
39from pdagent.thirdparty.six.moves.http_client import HTTPException
40from pdagent.thirdparty.six.moves.urllib.request import Request
41from pdagent.thirdparty.six.moves.urllib.error import URLError, HTTPError
42
43logger = logging.getLogger(__name__)
44
45
46RETRY_GAP_SECS = 10
47HEARTBEAT_MAX_RETRIES = 10
48
49
50class HeartbeatTask(RepeatingTask):
51
52    def __init__(
53            self,
54            heartbeat_interval_secs,
55            agent_id,
56            pd_queue,
57            system_info,
58            source_address='0.0.0.0',
59            ):
60        RepeatingTask.__init__(self, heartbeat_interval_secs, True)
61        self._agent_id = agent_id
62        self._pd_queue = pd_queue
63        self._system_info = system_info
64        # The following variables exist to ease unit testing:
65        self._source_address = source_address
66        self._urllib2 = http
67        self._retry_gap_secs = RETRY_GAP_SECS
68        self._heartbeat_max_retries = HEARTBEAT_MAX_RETRIES
69
70    def tick(self):
71        try:
72            logger.debug("Sending heartbeat")
73            # max time is half an interval
74            retry_time_limit = time.time() + (self.get_interval_secs() // 2)
75            attempt_number = 0
76            while not self.is_stop_invoked():
77                attempt_number += 1
78                try:
79                    heartbeat_data = self._make_heartbeat_data()
80                    response_str = self._heartbeat(heartbeat_data)
81                    logger.debug("Heartbeat successful!")
82                    self._system_info = None  # only send in first heartbeat.
83                    if response_str:
84                        self._process_response(response_str)
85                    break
86                except HTTPError as e:
87                    # retry for 5xx errors
88                    if 500 <= e.getcode() < 600:
89                        logger.error(
90                            "HTTPError sending heartbeat (will retry): %s" % e
91                            )
92                    else:
93                        error = HTTPError(e.url, e.code, e.msg, e.hdrs, None)
94                        raise error
95                except (URLError, HTTPException) as e:
96                    # assumes 2.6 where socket.error is a sub-class of IOError
97                    # FIXME: not catching IOError so what does the above mean?
98                    logger.error(
99                        "Error sending heartbeat (will retry): %s" % e
100                        )
101                    logger.debug("Traceback:", exc_info=True)
102                # retry limit checks
103                if time.time() > retry_time_limit:
104                    logger.info("Won't retry - time limit reached")
105                    break
106                if attempt_number >= self._heartbeat_max_retries:
107                    logger.info("Won't retry - attempt count limit reached")
108                    break
109                # sleep before retry
110                logger.debug("Sleeping before retry...")
111                for _ in range(self._retry_gap_secs):
112                    if self.is_stop_invoked():
113                        break
114                    time.sleep(1)
115                else:
116                    logger.debug("Retrying...")
117        except:
118            logger.error(
119                "Error sending heartbeat (won't retry):",
120                exc_info=True
121                )
122
123    def _make_heartbeat_data(self):
124        hb_data = {
125            "agent_id": self._agent_id,
126            "agent_version": pdagent.__version__,
127            "agent_stats": self._pd_queue.get_stats()
128            }
129        if self._system_info:
130            hb_data["system_info"] = self._system_info
131        return hb_data
132
133    def _heartbeat(self, heartbeat_data):
134        # Note that Request here is from urllib2, not self._urllib2.
135        request = Request(HEARTBEAT_URI)
136        request.add_header("Content-Type", "application/json")
137        heartbeat_json_str = json.dumps(heartbeat_data)
138        request.data = heartbeat_json_str
139        response = self._urllib2.urlopen(request,
140            source_address=self._source_address)
141        response_str = response.read()
142        response.close()
143        return response_str
144
145    def _process_response(self, response_str):
146        try:
147            result = json.loads(response_str)
148        except:
149            logger.warning(
150                "Error reading heartbeat response data:",
151                exc_info=True
152                )
153        else:
154            heartbeat_interval_secs = result.get("heartbeat_interval_secs")
155            if heartbeat_interval_secs:
156                self.set_interval_secs(heartbeat_interval_secs)
157