1"""
2Testing hook that periodically makes the primary of a replica set step down.
3"""
4from __future__ import absolute_import
5
6import collections
7import random
8import sys
9import time
10import threading
11
12import bson
13import pymongo
14import pymongo.errors
15
16from buildscripts.resmokelib import errors
17from buildscripts.resmokelib.testing.hooks import interface
18from buildscripts.resmokelib.testing.fixtures import replicaset
19from buildscripts.resmokelib.testing.fixtures import shardedcluster
20
21
22class ContinuousStepdown(interface.CustomBehavior):  # pylint: disable=too-many-instance-attributes
23    """Regularly connect to replica sets and send a replSetStepDown command."""
24
25    DESCRIPTION = ("Continuous stepdown (steps down the primary of replica sets at regular"
26                   " intervals)")
27
28    def __init__(  # pylint: disable=too-many-arguments
29            self, hook_logger, fixture, config_stepdown=True, shard_stepdown=True,
30            stepdown_interval_ms=8000, kill=False):
31        """Initialize the ContinuousStepdown.
32
33        Args:
34            hook_logger: the logger instance for this hook.
35            fixture: the target fixture (a replica set or sharded cluster).
36            config_stepdown: whether to stepdown the CSRS.
37            shard_stepdown: whether to stepdown the shard replica sets in a sharded cluster.
38            stepdown_interval_ms: the number of milliseconds between stepdowns.
39        """
40        interface.CustomBehavior.__init__(self, hook_logger, fixture,
41                                          ContinuousStepdown.DESCRIPTION)
42
43        self._fixture = fixture
44        self._config_stepdown = config_stepdown
45        self._shard_stepdown = shard_stepdown
46        self._stepdown_interval_secs = float(stepdown_interval_ms) / 1000
47
48        self._rs_fixtures = []
49        self._stepdown_thread = None
50        self._kill = kill
51
52    def before_suite(self, test_report):
53        if not self._rs_fixtures:
54            self._add_fixture(self._fixture)
55        self._stepdown_thread = _StepdownThread(self.logger, self._rs_fixtures,
56                                                self._stepdown_interval_secs, self._kill)
57        self.logger.info("Starting the stepdown thread.")
58        self._stepdown_thread.start()
59
60    def after_suite(self, test_report):
61        self.logger.info("Stopping the stepdown thread.")
62        self._stepdown_thread.stop()
63
64    def before_test(self, test, test_report):
65        self._check_thread(test, test_report)
66        self.logger.info("Resuming the stepdown thread.")
67        self._stepdown_thread.resume()
68
69    def after_test(self, test, test_report):
70        self._check_thread(test, test_report)
71        self.logger.info("Pausing the stepdown thread.")
72        self._stepdown_thread.pause()
73        self.logger.info("Paused the stepdown thread.")
74
75    def _check_thread(self, test, test_report):
76        if not self._stepdown_thread.is_alive():
77            msg = "The stepdown thread is not running."
78            self.logger.error(msg)
79            try:
80                raise errors.StopExecution(msg)
81            except errors.StopExecution:
82                test_report.addError(test, sys.exc_info())
83                raise
84
85    def _add_fixture(self, fixture):
86        if isinstance(fixture, replicaset.ReplicaSetFixture):
87            if not fixture.all_nodes_electable:
88                raise ValueError(
89                    "The replica sets that are the target of the ContinuousStepdown hook must have"
90                    " the 'all_nodes_electable' option set.")
91            self._rs_fixtures.append(fixture)
92        elif isinstance(fixture, shardedcluster.ShardedClusterFixture):
93            if self._shard_stepdown:
94                for shard_fixture in fixture.shards:
95                    self._add_fixture(shard_fixture)
96            if self._config_stepdown:
97                self._add_fixture(fixture.configsvr)
98
99
100class _StepdownThread(threading.Thread):  # pylint: disable=too-many-instance-attributes
101    def __init__(  # pylint: disable=too-many-arguments
102            self, logger, rs_fixtures, stepdown_interval_secs, kill):
103        """Initialize _StepdownThread."""
104        threading.Thread.__init__(self, name="StepdownThread")
105        self.daemon = True
106        self.logger = logger
107        self._rs_fixtures = rs_fixtures
108        self._stepdown_interval_secs = stepdown_interval_secs
109        # We set the self._stepdown_duration_secs to a very long time, to ensure that the former
110        # primary will not step back up on its own and the stepdown thread will cause it step up via
111        # replSetStepUp.
112        self._stepdown_duration_secs = 24 * 60 * 60  # 24 hours
113        self._kill = kill
114
115        self._last_exec = time.time()
116        # Event set when the thread has been stopped using the 'stop()' method.
117        self._is_stopped_evt = threading.Event()
118        # Event set when the thread is not paused.
119        self._is_resumed_evt = threading.Event()
120        self._is_resumed_evt.set()
121        # Event set when the thread is not performing stepdowns.
122        self._is_idle_evt = threading.Event()
123        self._is_idle_evt.set()
124
125        self._step_up_stats = collections.Counter()
126
127    def run(self):
128        if not self._rs_fixtures:
129            self.logger.warning("No replica set on which to run stepdowns.")
130            return
131
132        while True:
133            self._pause_if_needed()
134            if self._is_stopped():
135                break
136            now = time.time()
137            if now - self._last_exec > self._stepdown_interval_secs:
138                self._step_down_all()
139                # Wait until each replica set has a primary, so the test can make progress.
140                self._await_primaries()
141                self._last_exec = time.time()
142            now = time.time()
143            # 'wait_secs' is used to wait 'self._stepdown_interval_secs' from the moment the last
144            # stepdown command was sent.
145            wait_secs = max(0, self._stepdown_interval_secs - (now - self._last_exec))
146            self._wait(wait_secs)
147
148    def stop(self):
149        """Stops the thread."""
150        self._is_stopped_evt.set()
151        # Unpause to allow the thread to finish.
152        self.resume()
153        self.join()
154
155    def _is_stopped(self):
156        return self._is_stopped_evt.is_set()
157
158    def pause(self):
159        """Pauses the thread."""
160        self._is_resumed_evt.clear()
161        # Wait until we are no longer executing stepdowns.
162        self._is_idle_evt.wait()
163        # Wait until we all the replica sets have primaries.
164        self._await_primaries()
165
166    def resume(self):
167        """Resumes the thread."""
168        self._is_resumed_evt.set()
169
170        self.logger.info(
171            "Current statistics about which nodes have been successfully stepped up: %s",
172            self._step_up_stats)
173
174    def _pause_if_needed(self):
175        # Wait until resume or stop.
176        self._is_resumed_evt.wait()
177
178    def _wait(self, timeout):
179        # Wait until stop or timeout.
180        self._is_stopped_evt.wait(timeout)
181
182    def _await_primaries(self):
183        for fixture in self._rs_fixtures:
184            fixture.get_primary()
185
186    def _step_down_all(self):
187        self._is_idle_evt.clear()
188        for rs_fixture in self._rs_fixtures:
189            self._step_down(rs_fixture)
190        self._is_idle_evt.set()
191
192    # pylint: disable=R0912,R0915
193    def _step_down(self, rs_fixture):
194        try:
195            primary = rs_fixture.get_primary(timeout_secs=self._stepdown_interval_secs)
196        except errors.ServerFailure:
197            # We ignore the ServerFailure exception because it means a primary wasn't available.
198            # We'll try again after self._stepdown_interval_secs seconds.
199            return
200
201        secondaries = rs_fixture.get_secondaries()
202
203        # Check that the fixture is still running before stepping down or killing the primary.
204        # This ensures we still detect some cases in which the fixture has already crashed.
205        if not rs_fixture.is_running():
206            raise errors.ServerFailure("ReplicaSetFixture expected to be running in"
207                                       " ContinuousStepdown, but wasn't.")
208
209        if self._kill:
210            self.logger.info("Killing the primary on port %d of replica set '%s'.", primary.port,
211                             rs_fixture.replset_name)
212
213            # We send the mongod process the signal to exit but don't immediately wait for it to
214            # exit because clean shutdown may take a while and we want to restore write availability
215            # as quickly as possible.
216            primary.mongod.stop(kill=True)
217        else:
218            self.logger.info("Stepping down the primary on port %d of replica set '%s'.",
219                             primary.port, rs_fixture.replset_name)
220            try:
221                client = primary.mongo_client()
222                client.admin.command(
223                    bson.SON([
224                        ("replSetStepDown", self._stepdown_duration_secs),
225                        ("force", True),
226                    ]))
227            except pymongo.errors.AutoReconnect:
228                # AutoReconnect exceptions are expected as connections are closed during stepdown.
229                pass
230            except pymongo.errors.PyMongoError:
231                self.logger.exception(
232                    "Error while stepping down the primary on port %d of replica set '%s'.",
233                    primary.port, rs_fixture.replset_name)
234                raise
235
236        # We pick an arbitrary secondary to run for election immediately in order to avoid a long
237        # period where the replica set doesn't have write availability. If none of the secondaries
238        # are eligible, or their election attempt fails, then we'll run the replSetStepUp command on
239        # 'primary' to ensure we have write availability sooner than the
240        # self._stepdown_duration_secs duration expires.
241        while secondaries:
242            chosen = random.choice(secondaries)
243
244            self.logger.info("Attempting to step up the secondary on port %d of replica set '%s'.",
245                             chosen.port, rs_fixture.replset_name)
246
247            try:
248                client = chosen.mongo_client()
249                client.admin.command("replSetStepUp")
250                break
251            except pymongo.errors.OperationFailure:
252                # OperationFailure exceptions are expected when the election attempt fails due to
253                # not receiving enough votes. This can happen when the 'chosen' secondary's opTime
254                # is behind that of other secondaries. We handle this by attempting to elect a
255                # different secondary.
256                self.logger.info("Failed to step up the secondary on port %d of replica set '%s'.",
257                                 chosen.port, rs_fixture.replset_name)
258                secondaries.remove(chosen)
259
260        if self._kill:
261            self.logger.info("Waiting for the old primary on port %d of replica set '%s' to exit.",
262                             primary.port, rs_fixture.replset_name)
263
264            primary.mongod.wait()
265
266            self.logger.info("Attempting to restart the old primary on port %d of replica set '%s.",
267                             primary.port, rs_fixture.replset_name)
268
269            # Restart the mongod on the old primary and wait until we can contact it again. Keep the
270            # original preserve_dbpath to restore after restarting the mongod.
271            original_preserve_dbpath = primary.preserve_dbpath
272            primary.preserve_dbpath = True
273            try:
274                primary.setup()
275                primary.await_ready()
276            finally:
277                primary.preserve_dbpath = original_preserve_dbpath
278        else:
279            # We always run the {replSetFreeze: 0} command to ensure the former primary is electable
280            # in the next round of _step_down().
281            client = primary.mongo_client()
282            client.admin.command({"replSetFreeze": 0})
283
284        if not secondaries:
285            # If we failed to step up one of the secondaries, then we run the replSetStepUp to try
286            # and elect the former primary again. This way we don't need to wait
287            # self._stepdown_duration_secs seconds to restore write availability to the cluster.
288            # Since the former primary may have been killed, we need to wait until it has been
289            # restarted by retrying replSetStepUp.
290            retry_time_secs = rs_fixture.AWAIT_REPL_TIMEOUT_MINS * 60
291            retry_start_time = time.time()
292            while True:
293                try:
294                    client = primary.mongo_client()
295                    client.admin.command("replSetStepUp")
296                    break
297                except pymongo.errors.OperationFailure:
298                    self._wait(0.2)
299                if time.time() - retry_start_time > retry_time_secs:
300                    raise errors.ServerFailure(
301                        "The old primary on port {} of replica set {} did not step up in"
302                        " {} seconds.".format(client.port, rs_fixture.replset_name,
303                                              retry_time_secs))
304
305        # Bump the counter for the chosen secondary to indicate that the replSetStepUp command
306        # executed successfully.
307        key = "{}/{}".format(rs_fixture.replset_name,
308                             chosen.get_internal_connection_string() if secondaries else "none")
309        self._step_up_stats[key] += 1
310