1""" 2Master/slave fixture for executing JSTests against. 3""" 4 5from __future__ import absolute_import 6 7import os.path 8 9import pymongo 10 11from . import interface 12from . import standalone 13from ... import config 14from ... import logging 15from ... import utils 16 17 18class MasterSlaveFixture(interface.ReplFixture): 19 """ 20 Fixture which provides JSTests with a master/slave deployment to 21 run against. 22 """ 23 24 def __init__(self, 25 logger, 26 job_num, 27 mongod_executable=None, 28 mongod_options=None, 29 master_options=None, 30 slave_options=None, 31 dbpath_prefix=None, 32 preserve_dbpath=False): 33 34 interface.ReplFixture.__init__(self, logger, job_num) 35 36 if "dbpath" in mongod_options: 37 raise ValueError("Cannot specify mongod_options.dbpath") 38 39 self.mongod_executable = mongod_executable 40 self.mongod_options = utils.default_if_none(mongod_options, {}) 41 self.master_options = utils.default_if_none(master_options, {}) 42 self.slave_options = utils.default_if_none(slave_options, {}) 43 self.preserve_dbpath = preserve_dbpath 44 45 # Command line options override the YAML configuration. 46 dbpath_prefix = utils.default_if_none(config.DBPATH_PREFIX, dbpath_prefix) 47 dbpath_prefix = utils.default_if_none(dbpath_prefix, config.DEFAULT_DBPATH_PREFIX) 48 self._dbpath_prefix = os.path.join(dbpath_prefix, 49 "job%d" % (self.job_num), 50 config.FIXTURE_SUBDIR) 51 52 self.master = None 53 self.slave = None 54 55 def setup(self): 56 if self.master is None: 57 self.master = self._new_mongod_master() 58 self.master.setup() 59 self.port = self.master.port 60 61 if self.slave is None: 62 self.slave = self._new_mongod_slave() 63 self.slave.setup() 64 65 def await_ready(self): 66 self.master.await_ready() 67 self.slave.await_ready() 68 69 # Do a replicated write to ensure that the slave has finished with its initial sync before 70 # starting to run any tests. 71 client = utils.new_mongo_client(self.port) 72 73 # Keep retrying this until it times out waiting for replication. 74 def insert_fn(remaining_secs): 75 remaining_millis = int(round(remaining_secs * 1000)) 76 write_concern = pymongo.WriteConcern(w=2, wtimeout=remaining_millis) 77 coll = client.resmoke.get_collection("await_ready", write_concern=write_concern) 78 coll.insert_one({"awaiting": "ready"}) 79 80 try: 81 self.retry_until_wtimeout(insert_fn) 82 except pymongo.errors.WTimeoutError: 83 self.logger.info("Replication of write operation timed out.") 84 raise 85 86 def teardown(self): 87 running_at_start = self.is_running() 88 success = True # Still a success if nothing is running. 89 90 if not running_at_start: 91 self.logger.info("Master-slave deployment was expected to be running in teardown()," 92 " but wasn't.") 93 94 if self.slave is not None: 95 if running_at_start: 96 self.logger.info("Stopping slave...") 97 98 success = self.slave.teardown() 99 100 if running_at_start: 101 self.logger.info("Successfully stopped slave.") 102 103 if self.master is not None: 104 if running_at_start: 105 self.logger.info("Stopping master...") 106 107 success = self.master.teardown() and success 108 109 if running_at_start: 110 self.logger.info("Successfully stopped master.") 111 112 return success 113 114 def is_running(self): 115 return (self.master is not None and self.master.is_running() and 116 self.slave is not None and self.slave.is_running()) 117 118 def get_primary(self): 119 return self.master 120 121 def get_secondaries(self): 122 return [self.slave] 123 124 def await_repl(self): 125 """ 126 Inserts a document into each database on the master and waits 127 for all write operations to be acknowledged by the master-slave 128 deployment. 129 """ 130 131 client = utils.new_mongo_client(self.port) 132 133 # We verify that each database has replicated to the slave because in the case of an initial 134 # sync, the slave may acknowledge writes to one database before it has finished syncing 135 # others. 136 db_names = client.database_names() 137 self.logger.info("Awaiting replication of inserts to each of the following databases on" 138 " master on port %d: %s", 139 self.port, 140 db_names) 141 142 for db_name in db_names: 143 if db_name == "local": 144 continue # The local database is expected to differ, ignore. 145 146 self.logger.info("Awaiting replication of insert to database %s (w=2, wtimeout=%d min)" 147 " to master on port %d", 148 db_name, 149 interface.ReplFixture.AWAIT_REPL_TIMEOUT_MINS, 150 self.port) 151 152 # Keep retrying this until it times out waiting for replication. 153 def insert_fn(remaining_secs): 154 remaining_millis = int(round(remaining_secs * 1000)) 155 write_concern = pymongo.WriteConcern(w=2, wtimeout=remaining_millis) 156 coll = client[db_name].get_collection("await_repl", write_concern=write_concern) 157 coll.insert_one({"awaiting": "repl"}) 158 159 try: 160 self.retry_until_wtimeout(insert_fn) 161 except pymongo.errors.WTimeoutError: 162 self.logger.info("Replication of write operation timed out.") 163 raise 164 165 self.logger.info("Replication of write operation completed for database %s.", db_name) 166 167 self.logger.info("Finished awaiting replication.") 168 169 def _new_mongod(self, mongod_logger, mongod_options): 170 """ 171 Returns a standalone.MongoDFixture with the specified logger and 172 options. 173 """ 174 return standalone.MongoDFixture(mongod_logger, 175 self.job_num, 176 mongod_executable=self.mongod_executable, 177 mongod_options=mongod_options, 178 preserve_dbpath=self.preserve_dbpath) 179 180 def _new_mongod_master(self): 181 """ 182 Returns a standalone.MongoDFixture configured to be used as the 183 master of a master-slave deployment. 184 """ 185 186 logger_name = "%s:master" % (self.logger.name) 187 mongod_logger = logging.loggers.new_logger(logger_name, parent=self.logger) 188 189 mongod_options = self.mongod_options.copy() 190 mongod_options.update(self.master_options) 191 mongod_options["master"] = "" 192 mongod_options["dbpath"] = os.path.join(self._dbpath_prefix, "master") 193 return self._new_mongod(mongod_logger, mongod_options) 194 195 def _new_mongod_slave(self): 196 """ 197 Returns a standalone.MongoDFixture configured to be used as the 198 slave of a master-slave deployment. 199 """ 200 201 logger_name = "%s:slave" % (self.logger.name) 202 mongod_logger = logging.loggers.new_logger(logger_name, parent=self.logger) 203 204 mongod_options = self.mongod_options.copy() 205 mongod_options.update(self.slave_options) 206 mongod_options["slave"] = "" 207 mongod_options["source"] = "localhost:%d" % (self.port) 208 mongod_options["dbpath"] = os.path.join(self._dbpath_prefix, "slave") 209 return self._new_mongod(mongod_logger, mongod_options) 210