1"""
2Sharded cluster fixture for executing JSTests against.
3"""
4
5from __future__ import absolute_import
6
7import copy
8import os.path
9import time
10
11import pymongo
12
13from . import interface
14from . import standalone
15from . import replicaset
16from ... import config
17from ... import core
18from ... import errors
19from ... import logging
20from ... import utils
21
22
23class ShardedClusterFixture(interface.Fixture):
24    """
25    Fixture which provides JSTests with a sharded cluster to run
26    against.
27    """
28
29    _CONFIGSVR_REPLSET_NAME = "config-rs"
30
31    def __init__(self,
32                 logger,
33                 job_num,
34                 mongos_executable=None,
35                 mongos_options=None,
36                 mongod_executable=None,
37                 mongod_options=None,
38                 dbpath_prefix=None,
39                 preserve_dbpath=False,
40                 num_shards=1,
41                 separate_configsvr=True,
42                 enable_sharding=None,
43                 auth_options=None):
44        """
45        Initializes ShardedClusterFixture with the different options to
46        the mongod and mongos processes.
47        """
48
49        interface.Fixture.__init__(self, logger, job_num)
50
51        if "dbpath" in mongod_options:
52            raise ValueError("Cannot specify mongod_options.dbpath")
53
54        self.mongos_executable = mongos_executable
55        self.mongos_options = utils.default_if_none(mongos_options, {})
56        self.mongod_executable = mongod_executable
57        self.mongod_options = utils.default_if_none(mongod_options, {})
58        self.preserve_dbpath = preserve_dbpath
59        self.num_shards = num_shards
60        self.separate_configsvr = separate_configsvr
61        self.enable_sharding = utils.default_if_none(enable_sharding, [])
62        self.auth_options = auth_options
63
64        # Command line options override the YAML configuration.
65        dbpath_prefix = utils.default_if_none(config.DBPATH_PREFIX, dbpath_prefix)
66        dbpath_prefix = utils.default_if_none(dbpath_prefix, config.DEFAULT_DBPATH_PREFIX)
67        self._dbpath_prefix = os.path.join(dbpath_prefix,
68                                           "job%d" % (self.job_num),
69                                           config.FIXTURE_SUBDIR)
70
71        self.configsvr = None
72        self.mongos = None
73        self.shards = []
74
75    def setup(self):
76        if self.separate_configsvr:
77            if self.configsvr is None:
78                self.configsvr = self._new_configsvr()
79            self.configsvr.setup()
80
81        if not self.shards:
82            for i in xrange(self.num_shards):
83                shard = self._new_shard(i)
84                self.shards.append(shard)
85
86        # Start up each of the shards
87        for shard in self.shards:
88            shard.setup()
89
90    def await_ready(self):
91        # Wait for the config server
92        if self.configsvr is not None:
93            self.configsvr.await_ready()
94
95        # Wait for each of the shards
96        for shard in self.shards:
97            shard.await_ready()
98
99        if self.mongos is None:
100            self.mongos = self._new_mongos()
101
102        # Start up the mongos
103        self.mongos.setup()
104
105        # Wait for the mongos
106        self.mongos.await_ready()
107        self.port = self.mongos.port
108
109        client = utils.new_mongo_client(port=self.port)
110        if self.auth_options is not None:
111            auth_db = client[self.auth_options["authenticationDatabase"]]
112            auth_db.authenticate(self.auth_options["username"],
113                                 password=self.auth_options["password"],
114                                 mechanism=self.auth_options["authenticationMechanism"])
115
116        # Inform mongos about each of the shards
117        for shard in self.shards:
118            self._add_shard(client, shard)
119
120        # Enable sharding on each of the specified databases
121        for db_name in self.enable_sharding:
122            self.logger.info("Enabling sharding for '%s' database...", db_name)
123            client.admin.command({"enablesharding": db_name})
124
125    def teardown(self):
126        """
127        Shuts down the sharded cluster.
128        """
129        running_at_start = self.is_running()
130        success = True  # Still a success even if nothing is running.
131
132        if not running_at_start:
133            self.logger.info("Sharded cluster was expected to be running in teardown(), but"
134                             " wasn't.")
135
136        if self.configsvr is not None:
137            if running_at_start:
138                self.logger.info("Stopping config server...")
139
140            success = self.configsvr.teardown() and success
141
142            if running_at_start:
143                self.logger.info("Successfully terminated the config server.")
144
145        if self.mongos is not None:
146            if running_at_start:
147                self.logger.info("Stopping mongos...")
148
149            success = self.mongos.teardown() and success
150
151            if running_at_start:
152                self.logger.info("Successfully terminated the mongos.")
153
154        if running_at_start:
155            self.logger.info("Stopping shards...")
156        for shard in self.shards:
157            success = shard.teardown() and success
158        if running_at_start:
159            self.logger.info("Successfully terminated all shards.")
160
161        return success
162
163    def is_running(self):
164        """
165        Returns true if the config server, all shards, and the mongos
166        are all still operating, and false otherwise.
167        """
168        return (self.configsvr is not None and self.configsvr.is_running() and
169                all(shard.is_running() for shard in self.shards) and
170                self.mongos is not None and self.mongos.is_running())
171
172    def _new_configsvr(self):
173        """
174        Returns a replicaset.ReplicaSetFixture configured to be used as
175        the config server of a sharded cluster.
176        """
177
178        logger_name = "%s:configsvr" % (self.logger.name)
179        mongod_logger = logging.loggers.new_logger(logger_name, parent=self.logger)
180
181        mongod_options = copy.deepcopy(self.mongod_options)
182        mongod_options["configsvr"] = ""
183        mongod_options["dbpath"] = os.path.join(self._dbpath_prefix, "config")
184        mongod_options["replSet"] = ShardedClusterFixture._CONFIGSVR_REPLSET_NAME
185        mongod_options["storageEngine"] = "wiredTiger"
186
187        return replicaset.ReplicaSetFixture(mongod_logger,
188                                            self.job_num,
189                                            mongod_executable=self.mongod_executable,
190                                            mongod_options=mongod_options,
191                                            preserve_dbpath=self.preserve_dbpath,
192                                            num_nodes=3,
193                                            auth_options=self.auth_options,
194                                            replset_config_options={"configsvr": True})
195
196    def _new_shard(self, index):
197        """
198        Returns a standalone.MongoDFixture configured to be used as a
199        shard in a sharded cluster.
200        """
201
202        logger_name = "%s:shard%d" % (self.logger.name, index)
203        mongod_logger = logging.loggers.new_logger(logger_name, parent=self.logger)
204
205        mongod_options = copy.deepcopy(self.mongod_options)
206        mongod_options["dbpath"] = os.path.join(self._dbpath_prefix, "shard%d" % (index))
207
208        return standalone.MongoDFixture(mongod_logger,
209                                        self.job_num,
210                                        mongod_executable=self.mongod_executable,
211                                        mongod_options=mongod_options,
212                                        preserve_dbpath=self.preserve_dbpath)
213
214    def _new_mongos(self):
215        """
216        Returns a _MongoSFixture configured to be used as the mongos for
217        a sharded cluster.
218        """
219
220        logger_name = "%s:mongos" % (self.logger.name)
221        mongos_logger = logging.loggers.new_logger(logger_name, parent=self.logger)
222
223        mongos_options = copy.deepcopy(self.mongos_options)
224        if self.separate_configsvr:
225            configdb_replset = ShardedClusterFixture._CONFIGSVR_REPLSET_NAME
226            configdb_port = self.configsvr.port
227            mongos_options["configdb"] = "%s/localhost:%d" % (configdb_replset, configdb_port)
228        else:
229            mongos_options["configdb"] = "localhost:%d" % (self.shards[0].port)
230
231        return _MongoSFixture(mongos_logger,
232                              self.job_num,
233                              mongos_executable=self.mongos_executable,
234                              mongos_options=mongos_options)
235
236    def _add_shard(self, client, shard):
237        """
238        Add the specified program as a shard by executing the addShard
239        command.
240
241        See https://docs.mongodb.org/manual/reference/command/addShard
242        for more details.
243        """
244
245        self.logger.info("Adding localhost:%d as a shard...", shard.port)
246        client.admin.command({"addShard": "localhost:%d" % (shard.port)})
247
248
249class _MongoSFixture(interface.Fixture):
250    """
251    Fixture which provides JSTests with a mongos to connect to.
252    """
253
254    def __init__(self,
255                 logger,
256                 job_num,
257                 mongos_executable=None,
258                 mongos_options=None):
259
260        interface.Fixture.__init__(self, logger, job_num)
261
262        # Command line options override the YAML configuration.
263        self.mongos_executable = utils.default_if_none(config.MONGOS_EXECUTABLE, mongos_executable)
264
265        self.mongos_options = utils.default_if_none(mongos_options, {}).copy()
266
267        self.mongos = None
268
269    def setup(self):
270        if "chunkSize" not in self.mongos_options:
271            self.mongos_options["chunkSize"] = 50
272
273        if "port" not in self.mongos_options:
274            self.mongos_options["port"] = core.network.PortAllocator.next_fixture_port(self.job_num)
275        self.port = self.mongos_options["port"]
276
277        mongos = core.programs.mongos_program(self.logger,
278                                              executable=self.mongos_executable,
279                                              **self.mongos_options)
280        try:
281            self.logger.info("Starting mongos on port %d...\n%s", self.port, mongos.as_command())
282            mongos.start()
283            self.logger.info("mongos started on port %d with pid %d.", self.port, mongos.pid)
284        except:
285            self.logger.exception("Failed to start mongos on port %d.", self.port)
286            raise
287
288        self.mongos = mongos
289
290    def await_ready(self):
291        deadline = time.time() + standalone.MongoDFixture.AWAIT_READY_TIMEOUT_SECS
292
293        # Wait until the mongos is accepting connections. The retry logic is necessary to support
294        # versions of PyMongo <3.0 that immediately raise a ConnectionFailure if a connection cannot
295        # be established.
296        while True:
297            # Check whether the mongos exited for some reason.
298            exit_code = self.mongos.poll()
299            if exit_code is not None:
300                raise errors.ServerFailure("Could not connect to mongos on port %d, process ended"
301                                           " unexpectedly with code %d." % (self.port, exit_code))
302
303            try:
304                # Use a shorter connection timeout to more closely satisfy the requested deadline.
305                client = utils.new_mongo_client(self.port, timeout_millis=500)
306                client.admin.command("ping")
307                break
308            except pymongo.errors.ConnectionFailure:
309                remaining = deadline - time.time()
310                if remaining <= 0.0:
311                    raise errors.ServerFailure(
312                        "Failed to connect to mongos on port %d after %d seconds"
313                        % (self.port, standalone.MongoDFixture.AWAIT_READY_TIMEOUT_SECS))
314
315                self.logger.info("Waiting to connect to mongos on port %d.", self.port)
316                time.sleep(0.1)  # Wait a little bit before trying again.
317
318        self.logger.info("Successfully contacted the mongos on port %d.", self.port)
319
320    def teardown(self):
321        running_at_start = self.is_running()
322        success = True  # Still a success even if nothing is running.
323
324        if not running_at_start and self.port is not None:
325            self.logger.info("mongos on port %d was expected to be running in teardown(), but"
326                             " wasn't." % (self.port))
327
328        if self.mongos is not None:
329            if running_at_start:
330                self.logger.info("Stopping mongos on port %d with pid %d...",
331                                 self.port,
332                                 self.mongos.pid)
333                self.mongos.stop()
334
335            exit_code = self.mongos.wait()
336            success = exit_code == 0
337
338            if running_at_start:
339                self.logger.info("Successfully terminated the mongos on port %d, exited with code"
340                                 " %d",
341                                 self.port,
342                                 exit_code)
343
344        return success
345
346    def is_running(self):
347        return self.mongos is not None and self.mongos.poll() is None
348