1""" 2Testing hook that periodically makes the primary of a replica set step down. 3""" 4from __future__ import absolute_import 5 6import collections 7import random 8import sys 9import time 10import threading 11 12import bson 13import pymongo 14import pymongo.errors 15 16from buildscripts.resmokelib import errors 17from buildscripts.resmokelib.testing.hooks import interface 18from buildscripts.resmokelib.testing.fixtures import replicaset 19from buildscripts.resmokelib.testing.fixtures import shardedcluster 20 21 22class ContinuousStepdown(interface.CustomBehavior): # pylint: disable=too-many-instance-attributes 23 """Regularly connect to replica sets and send a replSetStepDown command.""" 24 25 DESCRIPTION = ("Continuous stepdown (steps down the primary of replica sets at regular" 26 " intervals)") 27 28 def __init__( # pylint: disable=too-many-arguments 29 self, hook_logger, fixture, config_stepdown=True, shard_stepdown=True, 30 stepdown_interval_ms=8000, kill=False): 31 """Initialize the ContinuousStepdown. 32 33 Args: 34 hook_logger: the logger instance for this hook. 35 fixture: the target fixture (a replica set or sharded cluster). 36 config_stepdown: whether to stepdown the CSRS. 37 shard_stepdown: whether to stepdown the shard replica sets in a sharded cluster. 38 stepdown_interval_ms: the number of milliseconds between stepdowns. 39 """ 40 interface.CustomBehavior.__init__(self, hook_logger, fixture, 41 ContinuousStepdown.DESCRIPTION) 42 43 self._fixture = fixture 44 self._config_stepdown = config_stepdown 45 self._shard_stepdown = shard_stepdown 46 self._stepdown_interval_secs = float(stepdown_interval_ms) / 1000 47 48 self._rs_fixtures = [] 49 self._stepdown_thread = None 50 self._kill = kill 51 52 def before_suite(self, test_report): 53 if not self._rs_fixtures: 54 self._add_fixture(self._fixture) 55 self._stepdown_thread = _StepdownThread(self.logger, self._rs_fixtures, 56 self._stepdown_interval_secs, self._kill) 57 self.logger.info("Starting the stepdown thread.") 58 self._stepdown_thread.start() 59 60 def after_suite(self, test_report): 61 self.logger.info("Stopping the stepdown thread.") 62 self._stepdown_thread.stop() 63 64 def before_test(self, test, test_report): 65 self._check_thread(test, test_report) 66 self.logger.info("Resuming the stepdown thread.") 67 self._stepdown_thread.resume() 68 69 def after_test(self, test, test_report): 70 self._check_thread(test, test_report) 71 self.logger.info("Pausing the stepdown thread.") 72 self._stepdown_thread.pause() 73 self.logger.info("Paused the stepdown thread.") 74 75 def _check_thread(self, test, test_report): 76 if not self._stepdown_thread.is_alive(): 77 msg = "The stepdown thread is not running." 78 self.logger.error(msg) 79 try: 80 raise errors.StopExecution(msg) 81 except errors.StopExecution: 82 test_report.addError(test, sys.exc_info()) 83 raise 84 85 def _add_fixture(self, fixture): 86 if isinstance(fixture, replicaset.ReplicaSetFixture): 87 if not fixture.all_nodes_electable: 88 raise ValueError( 89 "The replica sets that are the target of the ContinuousStepdown hook must have" 90 " the 'all_nodes_electable' option set.") 91 self._rs_fixtures.append(fixture) 92 elif isinstance(fixture, shardedcluster.ShardedClusterFixture): 93 if self._shard_stepdown: 94 for shard_fixture in fixture.shards: 95 self._add_fixture(shard_fixture) 96 if self._config_stepdown: 97 self._add_fixture(fixture.configsvr) 98 99 100class _StepdownThread(threading.Thread): # pylint: disable=too-many-instance-attributes 101 def __init__( # pylint: disable=too-many-arguments 102 self, logger, rs_fixtures, stepdown_interval_secs, kill): 103 """Initialize _StepdownThread.""" 104 threading.Thread.__init__(self, name="StepdownThread") 105 self.daemon = True 106 self.logger = logger 107 self._rs_fixtures = rs_fixtures 108 self._stepdown_interval_secs = stepdown_interval_secs 109 # We set the self._stepdown_duration_secs to a very long time, to ensure that the former 110 # primary will not step back up on its own and the stepdown thread will cause it step up via 111 # replSetStepUp. 112 self._stepdown_duration_secs = 24 * 60 * 60 # 24 hours 113 self._kill = kill 114 115 self._last_exec = time.time() 116 # Event set when the thread has been stopped using the 'stop()' method. 117 self._is_stopped_evt = threading.Event() 118 # Event set when the thread is not paused. 119 self._is_resumed_evt = threading.Event() 120 self._is_resumed_evt.set() 121 # Event set when the thread is not performing stepdowns. 122 self._is_idle_evt = threading.Event() 123 self._is_idle_evt.set() 124 125 self._step_up_stats = collections.Counter() 126 127 def run(self): 128 if not self._rs_fixtures: 129 self.logger.warning("No replica set on which to run stepdowns.") 130 return 131 132 while True: 133 self._pause_if_needed() 134 if self._is_stopped(): 135 break 136 now = time.time() 137 if now - self._last_exec > self._stepdown_interval_secs: 138 self._step_down_all() 139 # Wait until each replica set has a primary, so the test can make progress. 140 self._await_primaries() 141 self._last_exec = time.time() 142 now = time.time() 143 # 'wait_secs' is used to wait 'self._stepdown_interval_secs' from the moment the last 144 # stepdown command was sent. 145 wait_secs = max(0, self._stepdown_interval_secs - (now - self._last_exec)) 146 self._wait(wait_secs) 147 148 def stop(self): 149 """Stops the thread.""" 150 self._is_stopped_evt.set() 151 # Unpause to allow the thread to finish. 152 self.resume() 153 self.join() 154 155 def _is_stopped(self): 156 return self._is_stopped_evt.is_set() 157 158 def pause(self): 159 """Pauses the thread.""" 160 self._is_resumed_evt.clear() 161 # Wait until we are no longer executing stepdowns. 162 self._is_idle_evt.wait() 163 # Wait until we all the replica sets have primaries. 164 self._await_primaries() 165 166 def resume(self): 167 """Resumes the thread.""" 168 self._is_resumed_evt.set() 169 170 self.logger.info( 171 "Current statistics about which nodes have been successfully stepped up: %s", 172 self._step_up_stats) 173 174 def _pause_if_needed(self): 175 # Wait until resume or stop. 176 self._is_resumed_evt.wait() 177 178 def _wait(self, timeout): 179 # Wait until stop or timeout. 180 self._is_stopped_evt.wait(timeout) 181 182 def _await_primaries(self): 183 for fixture in self._rs_fixtures: 184 fixture.get_primary() 185 186 def _step_down_all(self): 187 self._is_idle_evt.clear() 188 for rs_fixture in self._rs_fixtures: 189 self._step_down(rs_fixture) 190 self._is_idle_evt.set() 191 192 # pylint: disable=R0912,R0915 193 def _step_down(self, rs_fixture): 194 try: 195 primary = rs_fixture.get_primary(timeout_secs=self._stepdown_interval_secs) 196 except errors.ServerFailure: 197 # We ignore the ServerFailure exception because it means a primary wasn't available. 198 # We'll try again after self._stepdown_interval_secs seconds. 199 return 200 201 secondaries = rs_fixture.get_secondaries() 202 203 # Check that the fixture is still running before stepping down or killing the primary. 204 # This ensures we still detect some cases in which the fixture has already crashed. 205 if not rs_fixture.is_running(): 206 raise errors.ServerFailure("ReplicaSetFixture expected to be running in" 207 " ContinuousStepdown, but wasn't.") 208 209 if self._kill: 210 self.logger.info("Killing the primary on port %d of replica set '%s'.", primary.port, 211 rs_fixture.replset_name) 212 213 # We send the mongod process the signal to exit but don't immediately wait for it to 214 # exit because clean shutdown may take a while and we want to restore write availability 215 # as quickly as possible. 216 primary.mongod.stop(kill=True) 217 else: 218 self.logger.info("Stepping down the primary on port %d of replica set '%s'.", 219 primary.port, rs_fixture.replset_name) 220 try: 221 client = primary.mongo_client() 222 client.admin.command( 223 bson.SON([ 224 ("replSetStepDown", self._stepdown_duration_secs), 225 ("force", True), 226 ])) 227 except pymongo.errors.AutoReconnect: 228 # AutoReconnect exceptions are expected as connections are closed during stepdown. 229 pass 230 except pymongo.errors.PyMongoError: 231 self.logger.exception( 232 "Error while stepping down the primary on port %d of replica set '%s'.", 233 primary.port, rs_fixture.replset_name) 234 raise 235 236 # We pick an arbitrary secondary to run for election immediately in order to avoid a long 237 # period where the replica set doesn't have write availability. If none of the secondaries 238 # are eligible, or their election attempt fails, then we'll run the replSetStepUp command on 239 # 'primary' to ensure we have write availability sooner than the 240 # self._stepdown_duration_secs duration expires. 241 while secondaries: 242 chosen = random.choice(secondaries) 243 244 self.logger.info("Attempting to step up the secondary on port %d of replica set '%s'.", 245 chosen.port, rs_fixture.replset_name) 246 247 try: 248 client = chosen.mongo_client() 249 client.admin.command("replSetStepUp") 250 break 251 except pymongo.errors.OperationFailure: 252 # OperationFailure exceptions are expected when the election attempt fails due to 253 # not receiving enough votes. This can happen when the 'chosen' secondary's opTime 254 # is behind that of other secondaries. We handle this by attempting to elect a 255 # different secondary. 256 self.logger.info("Failed to step up the secondary on port %d of replica set '%s'.", 257 chosen.port, rs_fixture.replset_name) 258 secondaries.remove(chosen) 259 260 if self._kill: 261 self.logger.info("Waiting for the old primary on port %d of replica set '%s' to exit.", 262 primary.port, rs_fixture.replset_name) 263 264 primary.mongod.wait() 265 266 self.logger.info("Attempting to restart the old primary on port %d of replica set '%s.", 267 primary.port, rs_fixture.replset_name) 268 269 # Restart the mongod on the old primary and wait until we can contact it again. Keep the 270 # original preserve_dbpath to restore after restarting the mongod. 271 original_preserve_dbpath = primary.preserve_dbpath 272 primary.preserve_dbpath = True 273 try: 274 primary.setup() 275 primary.await_ready() 276 finally: 277 primary.preserve_dbpath = original_preserve_dbpath 278 else: 279 # We always run the {replSetFreeze: 0} command to ensure the former primary is electable 280 # in the next round of _step_down(). 281 client = primary.mongo_client() 282 client.admin.command({"replSetFreeze": 0}) 283 284 if not secondaries: 285 # If we failed to step up one of the secondaries, then we run the replSetStepUp to try 286 # and elect the former primary again. This way we don't need to wait 287 # self._stepdown_duration_secs seconds to restore write availability to the cluster. 288 # Since the former primary may have been killed, we need to wait until it has been 289 # restarted by retrying replSetStepUp. 290 retry_time_secs = rs_fixture.AWAIT_REPL_TIMEOUT_MINS * 60 291 retry_start_time = time.time() 292 while True: 293 try: 294 client = primary.mongo_client() 295 client.admin.command("replSetStepUp") 296 break 297 except pymongo.errors.OperationFailure: 298 self._wait(0.2) 299 if time.time() - retry_start_time > retry_time_secs: 300 raise errors.ServerFailure( 301 "The old primary on port {} of replica set {} did not step up in" 302 " {} seconds.".format(client.port, rs_fixture.replset_name, 303 retry_time_secs)) 304 305 # Bump the counter for the chosen secondary to indicate that the replSetStepUp command 306 # executed successfully. 307 key = "{}/{}".format(rs_fixture.replset_name, 308 chosen.get_internal_connection_string() if secondaries else "none") 309 self._step_up_stats[key] += 1 310