1"""
2Master/slave fixture for executing JSTests against.
3"""
4
5from __future__ import absolute_import
6
7import os.path
8
9import pymongo
10
11from . import interface
12from . import standalone
13from ... import config
14from ... import logging
15from ... import utils
16
17
18class MasterSlaveFixture(interface.ReplFixture):
19    """
20    Fixture which provides JSTests with a master/slave deployment to
21    run against.
22    """
23
24    def __init__(self,
25                 logger,
26                 job_num,
27                 mongod_executable=None,
28                 mongod_options=None,
29                 master_options=None,
30                 slave_options=None,
31                 dbpath_prefix=None,
32                 preserve_dbpath=False):
33
34        interface.ReplFixture.__init__(self, logger, job_num)
35
36        if "dbpath" in mongod_options:
37            raise ValueError("Cannot specify mongod_options.dbpath")
38
39        self.mongod_executable = mongod_executable
40        self.mongod_options = utils.default_if_none(mongod_options, {})
41        self.master_options = utils.default_if_none(master_options, {})
42        self.slave_options = utils.default_if_none(slave_options, {})
43        self.preserve_dbpath = preserve_dbpath
44
45        # Command line options override the YAML configuration.
46        dbpath_prefix = utils.default_if_none(config.DBPATH_PREFIX, dbpath_prefix)
47        dbpath_prefix = utils.default_if_none(dbpath_prefix, config.DEFAULT_DBPATH_PREFIX)
48        self._dbpath_prefix = os.path.join(dbpath_prefix,
49                                           "job%d" % (self.job_num),
50                                           config.FIXTURE_SUBDIR)
51
52        self.master = None
53        self.slave = None
54
55    def setup(self):
56        if self.master is None:
57            self.master = self._new_mongod_master()
58        self.master.setup()
59        self.port = self.master.port
60
61        if self.slave is None:
62            self.slave = self._new_mongod_slave()
63        self.slave.setup()
64
65    def await_ready(self):
66        self.master.await_ready()
67        self.slave.await_ready()
68
69        # Do a replicated write to ensure that the slave has finished with its initial sync before
70        # starting to run any tests.
71        client = utils.new_mongo_client(self.port)
72
73        # Keep retrying this until it times out waiting for replication.
74        def insert_fn(remaining_secs):
75            remaining_millis = int(round(remaining_secs * 1000))
76            write_concern = pymongo.WriteConcern(w=2, wtimeout=remaining_millis)
77            coll = client.resmoke.get_collection("await_ready", write_concern=write_concern)
78            coll.insert_one({"awaiting": "ready"})
79
80        try:
81            self.retry_until_wtimeout(insert_fn)
82        except pymongo.errors.WTimeoutError:
83            self.logger.info("Replication of write operation timed out.")
84            raise
85
86    def teardown(self):
87        running_at_start = self.is_running()
88        success = True  # Still a success if nothing is running.
89
90        if not running_at_start:
91            self.logger.info("Master-slave deployment was expected to be running in teardown(),"
92                             " but wasn't.")
93
94        if self.slave is not None:
95            if running_at_start:
96                self.logger.info("Stopping slave...")
97
98            success = self.slave.teardown()
99
100            if running_at_start:
101                self.logger.info("Successfully stopped slave.")
102
103        if self.master is not None:
104            if running_at_start:
105                self.logger.info("Stopping master...")
106
107            success = self.master.teardown() and success
108
109            if running_at_start:
110                self.logger.info("Successfully stopped master.")
111
112        return success
113
114    def is_running(self):
115        return (self.master is not None and self.master.is_running() and
116                self.slave is not None and self.slave.is_running())
117
118    def get_primary(self):
119        return self.master
120
121    def get_secondaries(self):
122        return [self.slave]
123
124    def await_repl(self):
125        """
126        Inserts a document into each database on the master and waits
127        for all write operations to be acknowledged by the master-slave
128        deployment.
129        """
130
131        client = utils.new_mongo_client(self.port)
132
133        # We verify that each database has replicated to the slave because in the case of an initial
134        # sync, the slave may acknowledge writes to one database before it has finished syncing
135        # others.
136        db_names = client.database_names()
137        self.logger.info("Awaiting replication of inserts to each of the following databases on"
138                         " master on port %d: %s",
139                         self.port,
140                         db_names)
141
142        for db_name in db_names:
143            if db_name == "local":
144                continue  # The local database is expected to differ, ignore.
145
146            self.logger.info("Awaiting replication of insert to database %s (w=2, wtimeout=%d min)"
147                             " to master on port %d",
148                             db_name,
149                             interface.ReplFixture.AWAIT_REPL_TIMEOUT_MINS,
150                             self.port)
151
152            # Keep retrying this until it times out waiting for replication.
153            def insert_fn(remaining_secs):
154                remaining_millis = int(round(remaining_secs * 1000))
155                write_concern = pymongo.WriteConcern(w=2, wtimeout=remaining_millis)
156                coll = client[db_name].get_collection("await_repl", write_concern=write_concern)
157                coll.insert_one({"awaiting": "repl"})
158
159            try:
160                self.retry_until_wtimeout(insert_fn)
161            except pymongo.errors.WTimeoutError:
162                self.logger.info("Replication of write operation timed out.")
163                raise
164
165            self.logger.info("Replication of write operation completed for database %s.", db_name)
166
167        self.logger.info("Finished awaiting replication.")
168
169    def _new_mongod(self, mongod_logger, mongod_options):
170        """
171        Returns a standalone.MongoDFixture with the specified logger and
172        options.
173        """
174        return standalone.MongoDFixture(mongod_logger,
175                                        self.job_num,
176                                        mongod_executable=self.mongod_executable,
177                                        mongod_options=mongod_options,
178                                        preserve_dbpath=self.preserve_dbpath)
179
180    def _new_mongod_master(self):
181        """
182        Returns a standalone.MongoDFixture configured to be used as the
183        master of a master-slave deployment.
184        """
185
186        logger_name = "%s:master" % (self.logger.name)
187        mongod_logger = logging.loggers.new_logger(logger_name, parent=self.logger)
188
189        mongod_options = self.mongod_options.copy()
190        mongod_options.update(self.master_options)
191        mongod_options["master"] = ""
192        mongod_options["dbpath"] = os.path.join(self._dbpath_prefix, "master")
193        return self._new_mongod(mongod_logger, mongod_options)
194
195    def _new_mongod_slave(self):
196        """
197        Returns a standalone.MongoDFixture configured to be used as the
198        slave of a master-slave deployment.
199        """
200
201        logger_name = "%s:slave" % (self.logger.name)
202        mongod_logger = logging.loggers.new_logger(logger_name, parent=self.logger)
203
204        mongod_options = self.mongod_options.copy()
205        mongod_options.update(self.slave_options)
206        mongod_options["slave"] = ""
207        mongod_options["source"] = "localhost:%d" % (self.port)
208        mongod_options["dbpath"] = os.path.join(self._dbpath_prefix, "slave")
209        return self._new_mongod(mongod_logger, mongod_options)
210