1""" 2Sharded cluster fixture for executing JSTests against. 3""" 4 5from __future__ import absolute_import 6 7import copy 8import os.path 9import time 10 11import pymongo 12 13from . import interface 14from . import standalone 15from . import replicaset 16from ... import config 17from ... import core 18from ... import errors 19from ... import logging 20from ... import utils 21 22 23class ShardedClusterFixture(interface.Fixture): 24 """ 25 Fixture which provides JSTests with a sharded cluster to run 26 against. 27 """ 28 29 _CONFIGSVR_REPLSET_NAME = "config-rs" 30 31 def __init__(self, 32 logger, 33 job_num, 34 mongos_executable=None, 35 mongos_options=None, 36 mongod_executable=None, 37 mongod_options=None, 38 dbpath_prefix=None, 39 preserve_dbpath=False, 40 num_shards=1, 41 separate_configsvr=True, 42 enable_sharding=None, 43 auth_options=None): 44 """ 45 Initializes ShardedClusterFixture with the different options to 46 the mongod and mongos processes. 47 """ 48 49 interface.Fixture.__init__(self, logger, job_num) 50 51 if "dbpath" in mongod_options: 52 raise ValueError("Cannot specify mongod_options.dbpath") 53 54 self.mongos_executable = mongos_executable 55 self.mongos_options = utils.default_if_none(mongos_options, {}) 56 self.mongod_executable = mongod_executable 57 self.mongod_options = utils.default_if_none(mongod_options, {}) 58 self.preserve_dbpath = preserve_dbpath 59 self.num_shards = num_shards 60 self.separate_configsvr = separate_configsvr 61 self.enable_sharding = utils.default_if_none(enable_sharding, []) 62 self.auth_options = auth_options 63 64 # Command line options override the YAML configuration. 65 dbpath_prefix = utils.default_if_none(config.DBPATH_PREFIX, dbpath_prefix) 66 dbpath_prefix = utils.default_if_none(dbpath_prefix, config.DEFAULT_DBPATH_PREFIX) 67 self._dbpath_prefix = os.path.join(dbpath_prefix, 68 "job%d" % (self.job_num), 69 config.FIXTURE_SUBDIR) 70 71 self.configsvr = None 72 self.mongos = None 73 self.shards = [] 74 75 def setup(self): 76 if self.separate_configsvr: 77 if self.configsvr is None: 78 self.configsvr = self._new_configsvr() 79 self.configsvr.setup() 80 81 if not self.shards: 82 for i in xrange(self.num_shards): 83 shard = self._new_shard(i) 84 self.shards.append(shard) 85 86 # Start up each of the shards 87 for shard in self.shards: 88 shard.setup() 89 90 def await_ready(self): 91 # Wait for the config server 92 if self.configsvr is not None: 93 self.configsvr.await_ready() 94 95 # Wait for each of the shards 96 for shard in self.shards: 97 shard.await_ready() 98 99 if self.mongos is None: 100 self.mongos = self._new_mongos() 101 102 # Start up the mongos 103 self.mongos.setup() 104 105 # Wait for the mongos 106 self.mongos.await_ready() 107 self.port = self.mongos.port 108 109 client = utils.new_mongo_client(port=self.port) 110 if self.auth_options is not None: 111 auth_db = client[self.auth_options["authenticationDatabase"]] 112 auth_db.authenticate(self.auth_options["username"], 113 password=self.auth_options["password"], 114 mechanism=self.auth_options["authenticationMechanism"]) 115 116 # Inform mongos about each of the shards 117 for shard in self.shards: 118 self._add_shard(client, shard) 119 120 # Enable sharding on each of the specified databases 121 for db_name in self.enable_sharding: 122 self.logger.info("Enabling sharding for '%s' database...", db_name) 123 client.admin.command({"enablesharding": db_name}) 124 125 def teardown(self): 126 """ 127 Shuts down the sharded cluster. 128 """ 129 running_at_start = self.is_running() 130 success = True # Still a success even if nothing is running. 131 132 if not running_at_start: 133 self.logger.info("Sharded cluster was expected to be running in teardown(), but" 134 " wasn't.") 135 136 if self.configsvr is not None: 137 if running_at_start: 138 self.logger.info("Stopping config server...") 139 140 success = self.configsvr.teardown() and success 141 142 if running_at_start: 143 self.logger.info("Successfully terminated the config server.") 144 145 if self.mongos is not None: 146 if running_at_start: 147 self.logger.info("Stopping mongos...") 148 149 success = self.mongos.teardown() and success 150 151 if running_at_start: 152 self.logger.info("Successfully terminated the mongos.") 153 154 if running_at_start: 155 self.logger.info("Stopping shards...") 156 for shard in self.shards: 157 success = shard.teardown() and success 158 if running_at_start: 159 self.logger.info("Successfully terminated all shards.") 160 161 return success 162 163 def is_running(self): 164 """ 165 Returns true if the config server, all shards, and the mongos 166 are all still operating, and false otherwise. 167 """ 168 return (self.configsvr is not None and self.configsvr.is_running() and 169 all(shard.is_running() for shard in self.shards) and 170 self.mongos is not None and self.mongos.is_running()) 171 172 def _new_configsvr(self): 173 """ 174 Returns a replicaset.ReplicaSetFixture configured to be used as 175 the config server of a sharded cluster. 176 """ 177 178 logger_name = "%s:configsvr" % (self.logger.name) 179 mongod_logger = logging.loggers.new_logger(logger_name, parent=self.logger) 180 181 mongod_options = copy.deepcopy(self.mongod_options) 182 mongod_options["configsvr"] = "" 183 mongod_options["dbpath"] = os.path.join(self._dbpath_prefix, "config") 184 mongod_options["replSet"] = ShardedClusterFixture._CONFIGSVR_REPLSET_NAME 185 mongod_options["storageEngine"] = "wiredTiger" 186 187 return replicaset.ReplicaSetFixture(mongod_logger, 188 self.job_num, 189 mongod_executable=self.mongod_executable, 190 mongod_options=mongod_options, 191 preserve_dbpath=self.preserve_dbpath, 192 num_nodes=3, 193 auth_options=self.auth_options, 194 replset_config_options={"configsvr": True}) 195 196 def _new_shard(self, index): 197 """ 198 Returns a standalone.MongoDFixture configured to be used as a 199 shard in a sharded cluster. 200 """ 201 202 logger_name = "%s:shard%d" % (self.logger.name, index) 203 mongod_logger = logging.loggers.new_logger(logger_name, parent=self.logger) 204 205 mongod_options = copy.deepcopy(self.mongod_options) 206 mongod_options["dbpath"] = os.path.join(self._dbpath_prefix, "shard%d" % (index)) 207 208 return standalone.MongoDFixture(mongod_logger, 209 self.job_num, 210 mongod_executable=self.mongod_executable, 211 mongod_options=mongod_options, 212 preserve_dbpath=self.preserve_dbpath) 213 214 def _new_mongos(self): 215 """ 216 Returns a _MongoSFixture configured to be used as the mongos for 217 a sharded cluster. 218 """ 219 220 logger_name = "%s:mongos" % (self.logger.name) 221 mongos_logger = logging.loggers.new_logger(logger_name, parent=self.logger) 222 223 mongos_options = copy.deepcopy(self.mongos_options) 224 if self.separate_configsvr: 225 configdb_replset = ShardedClusterFixture._CONFIGSVR_REPLSET_NAME 226 configdb_port = self.configsvr.port 227 mongos_options["configdb"] = "%s/localhost:%d" % (configdb_replset, configdb_port) 228 else: 229 mongos_options["configdb"] = "localhost:%d" % (self.shards[0].port) 230 231 return _MongoSFixture(mongos_logger, 232 self.job_num, 233 mongos_executable=self.mongos_executable, 234 mongos_options=mongos_options) 235 236 def _add_shard(self, client, shard): 237 """ 238 Add the specified program as a shard by executing the addShard 239 command. 240 241 See https://docs.mongodb.org/manual/reference/command/addShard 242 for more details. 243 """ 244 245 self.logger.info("Adding localhost:%d as a shard...", shard.port) 246 client.admin.command({"addShard": "localhost:%d" % (shard.port)}) 247 248 249class _MongoSFixture(interface.Fixture): 250 """ 251 Fixture which provides JSTests with a mongos to connect to. 252 """ 253 254 def __init__(self, 255 logger, 256 job_num, 257 mongos_executable=None, 258 mongos_options=None): 259 260 interface.Fixture.__init__(self, logger, job_num) 261 262 # Command line options override the YAML configuration. 263 self.mongos_executable = utils.default_if_none(config.MONGOS_EXECUTABLE, mongos_executable) 264 265 self.mongos_options = utils.default_if_none(mongos_options, {}).copy() 266 267 self.mongos = None 268 269 def setup(self): 270 if "chunkSize" not in self.mongos_options: 271 self.mongos_options["chunkSize"] = 50 272 273 if "port" not in self.mongos_options: 274 self.mongos_options["port"] = core.network.PortAllocator.next_fixture_port(self.job_num) 275 self.port = self.mongos_options["port"] 276 277 mongos = core.programs.mongos_program(self.logger, 278 executable=self.mongos_executable, 279 **self.mongos_options) 280 try: 281 self.logger.info("Starting mongos on port %d...\n%s", self.port, mongos.as_command()) 282 mongos.start() 283 self.logger.info("mongos started on port %d with pid %d.", self.port, mongos.pid) 284 except: 285 self.logger.exception("Failed to start mongos on port %d.", self.port) 286 raise 287 288 self.mongos = mongos 289 290 def await_ready(self): 291 deadline = time.time() + standalone.MongoDFixture.AWAIT_READY_TIMEOUT_SECS 292 293 # Wait until the mongos is accepting connections. The retry logic is necessary to support 294 # versions of PyMongo <3.0 that immediately raise a ConnectionFailure if a connection cannot 295 # be established. 296 while True: 297 # Check whether the mongos exited for some reason. 298 exit_code = self.mongos.poll() 299 if exit_code is not None: 300 raise errors.ServerFailure("Could not connect to mongos on port %d, process ended" 301 " unexpectedly with code %d." % (self.port, exit_code)) 302 303 try: 304 # Use a shorter connection timeout to more closely satisfy the requested deadline. 305 client = utils.new_mongo_client(self.port, timeout_millis=500) 306 client.admin.command("ping") 307 break 308 except pymongo.errors.ConnectionFailure: 309 remaining = deadline - time.time() 310 if remaining <= 0.0: 311 raise errors.ServerFailure( 312 "Failed to connect to mongos on port %d after %d seconds" 313 % (self.port, standalone.MongoDFixture.AWAIT_READY_TIMEOUT_SECS)) 314 315 self.logger.info("Waiting to connect to mongos on port %d.", self.port) 316 time.sleep(0.1) # Wait a little bit before trying again. 317 318 self.logger.info("Successfully contacted the mongos on port %d.", self.port) 319 320 def teardown(self): 321 running_at_start = self.is_running() 322 success = True # Still a success even if nothing is running. 323 324 if not running_at_start and self.port is not None: 325 self.logger.info("mongos on port %d was expected to be running in teardown(), but" 326 " wasn't." % (self.port)) 327 328 if self.mongos is not None: 329 if running_at_start: 330 self.logger.info("Stopping mongos on port %d with pid %d...", 331 self.port, 332 self.mongos.pid) 333 self.mongos.stop() 334 335 exit_code = self.mongos.wait() 336 success = exit_code == 0 337 338 if running_at_start: 339 self.logger.info("Successfully terminated the mongos on port %d, exited with code" 340 " %d", 341 self.port, 342 exit_code) 343 344 return success 345 346 def is_running(self): 347 return self.mongos is not None and self.mongos.poll() is None 348