1// Copyright (C) MongoDB, Inc. 2017-present.
2//
3// Licensed under the Apache License, Version 2.0 (the "License"); you may
4// not use this file except in compliance with the License. You may obtain
5// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
6
7package mongo
8
9import (
10	"context"
11	"crypto/tls"
12	"errors"
13	"strconv"
14	"strings"
15	"time"
16
17	"go.mongodb.org/mongo-driver/bson"
18	"go.mongodb.org/mongo-driver/bson/bsoncodec"
19	"go.mongodb.org/mongo-driver/event"
20	"go.mongodb.org/mongo-driver/mongo/options"
21	"go.mongodb.org/mongo-driver/mongo/readconcern"
22	"go.mongodb.org/mongo-driver/mongo/readpref"
23	"go.mongodb.org/mongo-driver/mongo/writeconcern"
24	"go.mongodb.org/mongo-driver/x/bsonx/bsoncore"
25	"go.mongodb.org/mongo-driver/x/mongo/driver"
26	"go.mongodb.org/mongo-driver/x/mongo/driver/auth"
27	"go.mongodb.org/mongo-driver/x/mongo/driver/connstring"
28	"go.mongodb.org/mongo-driver/x/mongo/driver/description"
29	"go.mongodb.org/mongo-driver/x/mongo/driver/operation"
30	"go.mongodb.org/mongo-driver/x/mongo/driver/session"
31	"go.mongodb.org/mongo-driver/x/mongo/driver/topology"
32	"go.mongodb.org/mongo-driver/x/mongo/driver/uuid"
33)
34
35const defaultLocalThreshold = 15 * time.Millisecond
36const batchSize = 10000
37
38// keyVaultCollOpts specifies options used to communicate with the key vault collection
39var keyVaultCollOpts = options.Collection().SetReadConcern(readconcern.Majority()).
40	SetWriteConcern(writeconcern.New(writeconcern.WMajority()))
41
42// Client is a handle representing a pool of connections to a MongoDB deployment. It is safe for concurrent use by
43// multiple goroutines.
44//
45// The Client type opens and closes connections automatically and maintains a pool of idle connections. For
46// connection pool configuration options, see documentation for the ClientOptions type in the mongo/options package.
47type Client struct {
48	id              uuid.UUID
49	topologyOptions []topology.Option
50	deployment      driver.Deployment
51	connString      connstring.ConnString
52	localThreshold  time.Duration
53	retryWrites     bool
54	retryReads      bool
55	clock           *session.ClusterClock
56	readPreference  *readpref.ReadPref
57	readConcern     *readconcern.ReadConcern
58	writeConcern    *writeconcern.WriteConcern
59	registry        *bsoncodec.Registry
60	marshaller      BSONAppender
61	monitor         *event.CommandMonitor
62	sessionPool     *session.Pool
63
64	// client-side encryption fields
65	keyVaultClient *Client
66	keyVaultColl   *Collection
67	mongocryptd    *mcryptClient
68	crypt          *driver.Crypt
69}
70
71// Connect creates a new Client and then initializes it using the Connect method. This is equivalent to calling
72// NewClient followed by Client.Connect.
73//
74// When creating an options.ClientOptions, the order the methods are called matters. Later Set*
75// methods will overwrite the values from previous Set* method invocations. This includes the
76// ApplyURI method. This allows callers to determine the order of precedence for option
77// application. For instance, if ApplyURI is called before SetAuth, the Credential from
78// SetAuth will overwrite the values from the connection string. If ApplyURI is called
79// after SetAuth, then its values will overwrite those from SetAuth.
80//
81// The opts parameter is processed using options.MergeClientOptions, which will overwrite entire
82// option fields of previous options, there is no partial overwriting. For example, if Username is
83// set in the Auth field for the first option, and Password is set for the second but with no
84// Username, after the merge the Username field will be empty.
85//
86// The NewClient function does not do any I/O and returns an error if the given options are invalid.
87// The Client.Connect method starts background goroutines to monitor the state of the deployment and does not do
88// any I/O in the main goroutine to prevent the main goroutine from blocking. Therefore, it will not error if the
89// deployment is down.
90//
91// The Client.Ping method can be used to verify that the deployment is successfully connected and the
92// Client was correctly configured.
93func Connect(ctx context.Context, opts ...*options.ClientOptions) (*Client, error) {
94	c, err := NewClient(opts...)
95	if err != nil {
96		return nil, err
97	}
98	err = c.Connect(ctx)
99	if err != nil {
100		return nil, err
101	}
102	return c, nil
103}
104
105// NewClient creates a new client to connect to a deployment specified by the uri.
106//
107// When creating an options.ClientOptions, the order the methods are called matters. Later Set*
108// methods will overwrite the values from previous Set* method invocations. This includes the
109// ApplyURI method. This allows callers to determine the order of precedence for option
110// application. For instance, if ApplyURI is called before SetAuth, the Credential from
111// SetAuth will overwrite the values from the connection string. If ApplyURI is called
112// after SetAuth, then its values will overwrite those from SetAuth.
113//
114// The opts parameter is processed using options.MergeClientOptions, which will overwrite entire
115// option fields of previous options, there is no partial overwriting. For example, if Username is
116// set in the Auth field for the first option, and Password is set for the second but with no
117// Username, after the merge the Username field will be empty.
118func NewClient(opts ...*options.ClientOptions) (*Client, error) {
119	clientOpt := options.MergeClientOptions(opts...)
120
121	id, err := uuid.New()
122	if err != nil {
123		return nil, err
124	}
125	client := &Client{id: id}
126
127	err = client.configure(clientOpt)
128	if err != nil {
129		return nil, err
130	}
131
132	if client.deployment == nil {
133		client.deployment, err = topology.New(client.topologyOptions...)
134		if err != nil {
135			return nil, replaceErrors(err)
136		}
137	}
138	return client, nil
139}
140
141// Connect initializes the Client by starting background monitoring goroutines.
142// If the Client was created using the NewClient function, this method must be called before a Client can be used.
143//
144// Connect starts background goroutines to monitor the state of the deployment and does not do any I/O in the main
145// goroutine. The Client.Ping method can be used to verify that the connection was created successfully.
146func (c *Client) Connect(ctx context.Context) error {
147	if connector, ok := c.deployment.(driver.Connector); ok {
148		err := connector.Connect()
149		if err != nil {
150			return replaceErrors(err)
151		}
152	}
153
154	if c.mongocryptd != nil {
155		if err := c.mongocryptd.connect(ctx); err != nil {
156			return err
157		}
158	}
159	if c.keyVaultClient != nil {
160		if err := c.keyVaultClient.Connect(ctx); err != nil {
161			return err
162		}
163	}
164
165	var updateChan <-chan description.Topology
166	if subscriber, ok := c.deployment.(driver.Subscriber); ok {
167		sub, err := subscriber.Subscribe()
168		if err != nil {
169			return replaceErrors(err)
170		}
171		updateChan = sub.Updates
172	}
173	c.sessionPool = session.NewPool(updateChan)
174	return nil
175}
176
177// Disconnect closes sockets to the topology referenced by this Client. It will
178// shut down any monitoring goroutines, close the idle connection pool, and will
179// wait until all the in use connections have been returned to the connection
180// pool and closed before returning. If the context expires via cancellation,
181// deadline, or timeout before the in use connections have returned, the in use
182// connections will be closed, resulting in the failure of any in flight read
183// or write operations. If this method returns with no errors, all connections
184// associated with this Client have been closed.
185func (c *Client) Disconnect(ctx context.Context) error {
186	if ctx == nil {
187		ctx = context.Background()
188	}
189
190	c.endSessions(ctx)
191	if c.mongocryptd != nil {
192		if err := c.mongocryptd.disconnect(ctx); err != nil {
193			return err
194		}
195	}
196	if c.keyVaultClient != nil {
197		if err := c.keyVaultClient.Disconnect(ctx); err != nil {
198			return err
199		}
200	}
201	if c.crypt != nil {
202		c.crypt.Close()
203	}
204
205	if disconnector, ok := c.deployment.(driver.Disconnector); ok {
206		return replaceErrors(disconnector.Disconnect(ctx))
207	}
208	return nil
209}
210
211// Ping sends a ping command to verify that the client can connect to the deployment.
212//
213// The rp paramter is used to determine which server is selected for the operation.
214// If it is nil, the client's read preference is used.
215//
216// If the server is down, Ping will try to select a server until the client's server selection timeout expires.
217// This can be configured through the ClientOptions.SetServerSelectionTimeout option when creating a new Client.
218// After the timeout expires, a server selection error is returned.
219//
220// Using Ping reduces application resilience because applications starting up will error if the server is temporarily
221// unavailable or is failing over (e.g. during autoscaling due to a load spike).
222func (c *Client) Ping(ctx context.Context, rp *readpref.ReadPref) error {
223	if ctx == nil {
224		ctx = context.Background()
225	}
226
227	if rp == nil {
228		rp = c.readPreference
229	}
230
231	db := c.Database("admin")
232	res := db.RunCommand(ctx, bson.D{
233		{"ping", 1},
234	}, options.RunCmd().SetReadPreference(rp))
235
236	return replaceErrors(res.Err())
237}
238
239// StartSession starts a new session configured with the given options.
240//
241// If the DefaultReadConcern, DefaultWriteConcern, or DefaultReadPreference options are not set, the client's read
242// concern, write concern, or read preference will be used, respectively.
243func (c *Client) StartSession(opts ...*options.SessionOptions) (Session, error) {
244	if c.sessionPool == nil {
245		return nil, ErrClientDisconnected
246	}
247
248	sopts := options.MergeSessionOptions(opts...)
249	coreOpts := &session.ClientOptions{
250		DefaultReadConcern:    c.readConcern,
251		DefaultReadPreference: c.readPreference,
252		DefaultWriteConcern:   c.writeConcern,
253	}
254	if sopts.CausalConsistency != nil {
255		coreOpts.CausalConsistency = sopts.CausalConsistency
256	}
257	if sopts.DefaultReadConcern != nil {
258		coreOpts.DefaultReadConcern = sopts.DefaultReadConcern
259	}
260	if sopts.DefaultWriteConcern != nil {
261		coreOpts.DefaultWriteConcern = sopts.DefaultWriteConcern
262	}
263	if sopts.DefaultReadPreference != nil {
264		coreOpts.DefaultReadPreference = sopts.DefaultReadPreference
265	}
266	if sopts.DefaultMaxCommitTime != nil {
267		coreOpts.DefaultMaxCommitTime = sopts.DefaultMaxCommitTime
268	}
269
270	sess, err := session.NewClientSession(c.sessionPool, c.id, session.Explicit, coreOpts)
271	if err != nil {
272		return nil, replaceErrors(err)
273	}
274
275	sess.RetryWrite = c.retryWrites
276	sess.RetryRead = c.retryReads
277
278	return &sessionImpl{
279		clientSession: sess,
280		client:        c,
281		deployment:    c.deployment,
282	}, nil
283}
284
285func (c *Client) endSessions(ctx context.Context) {
286	if c.sessionPool == nil {
287		return
288	}
289
290	ids := c.sessionPool.IDSlice()
291	idx, idArray := bsoncore.AppendArrayStart(nil)
292	for i, id := range ids {
293		idDoc, _ := id.MarshalBSON()
294		idArray = bsoncore.AppendDocumentElement(idArray, strconv.Itoa(i), idDoc)
295	}
296	idArray, _ = bsoncore.AppendArrayEnd(idArray, idx)
297
298	op := operation.NewEndSessions(idArray).ClusterClock(c.clock).Deployment(c.deployment).
299		ServerSelector(description.ReadPrefSelector(readpref.PrimaryPreferred())).CommandMonitor(c.monitor).
300		Database("admin").Crypt(c.crypt)
301
302	idx, idArray = bsoncore.AppendArrayStart(nil)
303	totalNumIDs := len(ids)
304	for i := 0; i < totalNumIDs; i++ {
305		idDoc, _ := ids[i].MarshalBSON()
306		idArray = bsoncore.AppendDocumentElement(idArray, strconv.Itoa(i), idDoc)
307		if ((i+1)%batchSize) == 0 || i == totalNumIDs-1 {
308			idArray, _ = bsoncore.AppendArrayEnd(idArray, idx)
309			_ = op.SessionIDs(idArray).Execute(ctx)
310			idArray = idArray[:0]
311			idx = 0
312		}
313	}
314
315}
316
317func (c *Client) configure(opts *options.ClientOptions) error {
318	if err := opts.Validate(); err != nil {
319		return err
320	}
321
322	var connOpts []topology.ConnectionOption
323	var serverOpts []topology.ServerOption
324	var topologyOpts []topology.Option
325
326	// TODO(GODRIVER-814): Add tests for topology, server, and connection related options.
327
328	// AppName
329	var appName string
330	if opts.AppName != nil {
331		appName = *opts.AppName
332	}
333	// Compressors & ZlibLevel
334	var comps []string
335	if len(opts.Compressors) > 0 {
336		comps = opts.Compressors
337
338		connOpts = append(connOpts, topology.WithCompressors(
339			func(compressors []string) []string {
340				return append(compressors, comps...)
341			},
342		))
343
344		for _, comp := range comps {
345			switch comp {
346			case "zlib":
347				connOpts = append(connOpts, topology.WithZlibLevel(func(level *int) *int {
348					return opts.ZlibLevel
349				}))
350			case "zstd":
351				connOpts = append(connOpts, topology.WithZstdLevel(func(level *int) *int {
352					return opts.ZstdLevel
353				}))
354			}
355		}
356
357		serverOpts = append(serverOpts, topology.WithCompressionOptions(
358			func(opts ...string) []string { return append(opts, comps...) },
359		))
360	}
361	// Handshaker
362	var handshaker = func(driver.Handshaker) driver.Handshaker {
363		return operation.NewIsMaster().AppName(appName).Compressors(comps)
364	}
365	// Auth & Database & Password & Username
366	if opts.Auth != nil {
367		cred := &auth.Cred{
368			Username:    opts.Auth.Username,
369			Password:    opts.Auth.Password,
370			PasswordSet: opts.Auth.PasswordSet,
371			Props:       opts.Auth.AuthMechanismProperties,
372			Source:      opts.Auth.AuthSource,
373		}
374		mechanism := opts.Auth.AuthMechanism
375
376		if len(cred.Source) == 0 {
377			switch strings.ToUpper(mechanism) {
378			case auth.MongoDBX509, auth.GSSAPI, auth.PLAIN:
379				cred.Source = "$external"
380			default:
381				cred.Source = "admin"
382			}
383		}
384
385		authenticator, err := auth.CreateAuthenticator(mechanism, cred)
386		if err != nil {
387			return err
388		}
389
390		handshakeOpts := &auth.HandshakeOptions{
391			AppName:       appName,
392			Authenticator: authenticator,
393			Compressors:   comps,
394		}
395		if mechanism == "" {
396			// Required for SASL mechanism negotiation during handshake
397			handshakeOpts.DBUser = cred.Source + "." + cred.Username
398		}
399		if opts.AuthenticateToAnything != nil && *opts.AuthenticateToAnything {
400			// Authenticate arbiters
401			handshakeOpts.PerformAuthentication = func(serv description.Server) bool {
402				return true
403			}
404		}
405
406		handshaker = func(driver.Handshaker) driver.Handshaker {
407			return auth.Handshaker(nil, handshakeOpts)
408		}
409	}
410	connOpts = append(connOpts, topology.WithHandshaker(handshaker))
411	// ConnectTimeout
412	if opts.ConnectTimeout != nil {
413		serverOpts = append(serverOpts, topology.WithHeartbeatTimeout(
414			func(time.Duration) time.Duration { return *opts.ConnectTimeout },
415		))
416		connOpts = append(connOpts, topology.WithConnectTimeout(
417			func(time.Duration) time.Duration { return *opts.ConnectTimeout },
418		))
419	}
420	// Dialer
421	if opts.Dialer != nil {
422		connOpts = append(connOpts, topology.WithDialer(
423			func(topology.Dialer) topology.Dialer { return opts.Dialer },
424		))
425	}
426	// Direct
427	if opts.Direct != nil && *opts.Direct {
428		topologyOpts = append(topologyOpts, topology.WithMode(
429			func(topology.MonitorMode) topology.MonitorMode { return topology.SingleMode },
430		))
431	}
432	// HeartbeatInterval
433	if opts.HeartbeatInterval != nil {
434		serverOpts = append(serverOpts, topology.WithHeartbeatInterval(
435			func(time.Duration) time.Duration { return *opts.HeartbeatInterval },
436		))
437	}
438	// Hosts
439	hosts := []string{"localhost:27017"} // default host
440	if len(opts.Hosts) > 0 {
441		hosts = opts.Hosts
442	}
443	topologyOpts = append(topologyOpts, topology.WithSeedList(
444		func(...string) []string { return hosts },
445	))
446	// LocalThreshold
447	c.localThreshold = defaultLocalThreshold
448	if opts.LocalThreshold != nil {
449		c.localThreshold = *opts.LocalThreshold
450	}
451	// MaxConIdleTime
452	if opts.MaxConnIdleTime != nil {
453		connOpts = append(connOpts, topology.WithIdleTimeout(
454			func(time.Duration) time.Duration { return *opts.MaxConnIdleTime },
455		))
456	}
457	// MaxPoolSize
458	if opts.MaxPoolSize != nil {
459		serverOpts = append(
460			serverOpts,
461			topology.WithMaxConnections(func(uint64) uint64 { return *opts.MaxPoolSize }),
462		)
463	}
464	// MinPoolSize
465	if opts.MinPoolSize != nil {
466		serverOpts = append(
467			serverOpts,
468			topology.WithMinConnections(func(uint64) uint64 { return *opts.MinPoolSize }),
469		)
470	}
471	// PoolMonitor
472	if opts.PoolMonitor != nil {
473		serverOpts = append(
474			serverOpts,
475			topology.WithConnectionPoolMonitor(func(*event.PoolMonitor) *event.PoolMonitor { return opts.PoolMonitor }),
476		)
477	}
478	// Monitor
479	if opts.Monitor != nil {
480		c.monitor = opts.Monitor
481		connOpts = append(connOpts, topology.WithMonitor(
482			func(*event.CommandMonitor) *event.CommandMonitor { return opts.Monitor },
483		))
484	}
485	// ReadConcern
486	c.readConcern = readconcern.New()
487	if opts.ReadConcern != nil {
488		c.readConcern = opts.ReadConcern
489	}
490	// ReadPreference
491	c.readPreference = readpref.Primary()
492	if opts.ReadPreference != nil {
493		c.readPreference = opts.ReadPreference
494	}
495	// Registry
496	c.registry = bson.DefaultRegistry
497	if opts.Registry != nil {
498		c.registry = opts.Registry
499	}
500	// ReplicaSet
501	if opts.ReplicaSet != nil {
502		topologyOpts = append(topologyOpts, topology.WithReplicaSetName(
503			func(string) string { return *opts.ReplicaSet },
504		))
505	}
506	// RetryWrites
507	c.retryWrites = true // retry writes on by default
508	if opts.RetryWrites != nil {
509		c.retryWrites = *opts.RetryWrites
510	}
511	c.retryReads = true
512	if opts.RetryReads != nil {
513		c.retryReads = *opts.RetryReads
514	}
515	// ServerSelectionTimeout
516	if opts.ServerSelectionTimeout != nil {
517		topologyOpts = append(topologyOpts, topology.WithServerSelectionTimeout(
518			func(time.Duration) time.Duration { return *opts.ServerSelectionTimeout },
519		))
520	}
521	// SocketTimeout
522	if opts.SocketTimeout != nil {
523		connOpts = append(
524			connOpts,
525			topology.WithReadTimeout(func(time.Duration) time.Duration { return *opts.SocketTimeout }),
526			topology.WithWriteTimeout(func(time.Duration) time.Duration { return *opts.SocketTimeout }),
527		)
528	}
529	// TLSConfig
530	if opts.TLSConfig != nil {
531		connOpts = append(connOpts, topology.WithTLSConfig(
532			func(*tls.Config) *tls.Config {
533				return opts.TLSConfig
534			},
535		))
536	}
537	// WriteConcern
538	if opts.WriteConcern != nil {
539		c.writeConcern = opts.WriteConcern
540	}
541	// AutoEncryptionOptions
542	if opts.AutoEncryptionOptions != nil {
543		if err := c.configureAutoEncryption(opts.AutoEncryptionOptions); err != nil {
544			return err
545		}
546	}
547
548	// ClusterClock
549	c.clock = new(session.ClusterClock)
550
551	serverOpts = append(
552		serverOpts,
553		topology.WithClock(func(*session.ClusterClock) *session.ClusterClock { return c.clock }),
554		topology.WithConnectionOptions(func(...topology.ConnectionOption) []topology.ConnectionOption { return connOpts }),
555	)
556	c.topologyOptions = append(topologyOpts, topology.WithServerOptions(
557		func(...topology.ServerOption) []topology.ServerOption { return serverOpts },
558	))
559
560	// Deployment
561	if opts.Deployment != nil {
562		if len(serverOpts) > 2 || len(topologyOpts) > 1 {
563			return errors.New("cannot specify topology or server options with a deployment")
564		}
565		c.deployment = opts.Deployment
566	}
567
568	return nil
569}
570
571func (c *Client) configureAutoEncryption(opts *options.AutoEncryptionOptions) error {
572	if err := c.configureKeyVault(opts); err != nil {
573		return err
574	}
575	if err := c.configureMongocryptd(opts); err != nil {
576		return err
577	}
578	return c.configureCrypt(opts)
579}
580
581func (c *Client) configureKeyVault(opts *options.AutoEncryptionOptions) error {
582	// parse key vault options and create new client if necessary
583	if opts.KeyVaultClientOptions != nil {
584		var err error
585		c.keyVaultClient, err = NewClient(opts.KeyVaultClientOptions)
586		if err != nil {
587			return err
588		}
589	}
590
591	dbName, collName := splitNamespace(opts.KeyVaultNamespace)
592	client := c.keyVaultClient
593	if client == nil {
594		client = c
595	}
596	c.keyVaultColl = client.Database(dbName).Collection(collName, keyVaultCollOpts)
597	return nil
598}
599
600func (c *Client) configureMongocryptd(opts *options.AutoEncryptionOptions) error {
601	var err error
602	c.mongocryptd, err = newMcryptClient(opts.ExtraOptions)
603	return err
604}
605
606func (c *Client) configureCrypt(opts *options.AutoEncryptionOptions) error {
607	// convert schemas in SchemaMap to bsoncore documents
608	cryptSchemaMap := make(map[string]bsoncore.Document)
609	for k, v := range opts.SchemaMap {
610		schema, err := transformBsoncoreDocument(c.registry, v)
611		if err != nil {
612			return err
613		}
614		cryptSchemaMap[k] = schema
615	}
616
617	// configure options
618	var bypass bool
619	if opts.BypassAutoEncryption != nil {
620		bypass = *opts.BypassAutoEncryption
621	}
622	kr := keyRetriever{coll: c.keyVaultColl}
623	cir := collInfoRetriever{client: c}
624	cryptOpts := &driver.CryptOptions{
625		CollInfoFn:           cir.cryptCollInfo,
626		KeyFn:                kr.cryptKeys,
627		MarkFn:               c.mongocryptd.markCommand,
628		KmsProviders:         opts.KmsProviders,
629		BypassAutoEncryption: bypass,
630		SchemaMap:            cryptSchemaMap,
631	}
632
633	var err error
634	c.crypt, err = driver.NewCrypt(cryptOpts)
635	return err
636}
637
638// validSession returns an error if the session doesn't belong to the client
639func (c *Client) validSession(sess *session.Client) error {
640	if sess != nil && !uuid.Equal(sess.ClientID, c.id) {
641		return ErrWrongClient
642	}
643	return nil
644}
645
646// Database returns a handle for a database with the given name configured with the given DatabaseOptions.
647func (c *Client) Database(name string, opts ...*options.DatabaseOptions) *Database {
648	return newDatabase(c, name, opts...)
649}
650
651// ListDatabases executes a listDatabases command and returns the result.
652//
653// The filter parameter must be a document containing query operators and can be used to select which
654// databases are included in the result. It cannot be nil. An empty document (e.g. bson.D{}) should be used to include
655// all databases.
656//
657// The opts paramter can be used to specify options for this operation (see the options.ListDatabasesOptions documentation).
658//
659// For more information about the command, see https://docs.mongodb.com/manual/reference/command/listDatabases/.
660func (c *Client) ListDatabases(ctx context.Context, filter interface{}, opts ...*options.ListDatabasesOptions) (ListDatabasesResult, error) {
661	if ctx == nil {
662		ctx = context.Background()
663	}
664
665	sess := sessionFromContext(ctx)
666
667	err := c.validSession(sess)
668	if sess == nil && c.sessionPool != nil {
669		sess, err = session.NewClientSession(c.sessionPool, c.id, session.Implicit)
670		if err != nil {
671			return ListDatabasesResult{}, err
672		}
673		defer sess.EndSession()
674	}
675
676	err = c.validSession(sess)
677	if err != nil {
678		return ListDatabasesResult{}, err
679	}
680
681	filterDoc, err := transformBsoncoreDocument(c.registry, filter)
682	if err != nil {
683		return ListDatabasesResult{}, err
684	}
685
686	selector := description.CompositeSelector([]description.ServerSelector{
687		description.ReadPrefSelector(readpref.Primary()),
688		description.LatencySelector(c.localThreshold),
689	})
690	selector = makeReadPrefSelector(sess, selector, c.localThreshold)
691
692	ldo := options.MergeListDatabasesOptions(opts...)
693	op := operation.NewListDatabases(filterDoc).
694		Session(sess).ReadPreference(c.readPreference).CommandMonitor(c.monitor).
695		ServerSelector(selector).ClusterClock(c.clock).Database("admin").Deployment(c.deployment).Crypt(c.crypt)
696	if ldo.NameOnly != nil {
697		op = op.NameOnly(*ldo.NameOnly)
698	}
699	retry := driver.RetryNone
700	if c.retryReads {
701		retry = driver.RetryOncePerCommand
702	}
703	op.Retry(retry)
704
705	err = op.Execute(ctx)
706	if err != nil {
707		return ListDatabasesResult{}, replaceErrors(err)
708	}
709
710	return newListDatabasesResultFromOperation(op.Result()), nil
711}
712
713// ListDatabaseNames executes a listDatabases command and returns a slice containing the names of all of the databases
714// on the server.
715//
716// The filter parameter must be a document containing query operators and can be used to select which databases
717// are included in the result. It cannot be nil. An empty document (e.g. bson.D{}) should be used to include all
718// databases.
719//
720// The opts parameter can be used to specify options for this operation (see the options.ListDatabasesOptions
721// documentation.)
722//
723// For more information about the command, see https://docs.mongodb.com/manual/reference/command/listDatabases/.
724func (c *Client) ListDatabaseNames(ctx context.Context, filter interface{}, opts ...*options.ListDatabasesOptions) ([]string, error) {
725	opts = append(opts, options.ListDatabases().SetNameOnly(true))
726
727	res, err := c.ListDatabases(ctx, filter, opts...)
728	if err != nil {
729		return nil, err
730	}
731
732	names := make([]string, 0)
733	for _, spec := range res.Databases {
734		names = append(names, spec.Name)
735	}
736
737	return names, nil
738}
739
740// WithSession creates a new SessionContext from the ctx and sess parameters and uses it to call the fn callback. The
741// SessionContext must be used as the Context parameter for any operations in the fn callback that should be executed
742// under the session.
743//
744// If the ctx parameter already contains a Session, that Session will be replaced with the one provided.
745//
746// Any error returned by the fn callback will be returned without any modifications.
747func WithSession(ctx context.Context, sess Session, fn func(SessionContext) error) error {
748	return fn(contextWithSession(ctx, sess))
749}
750
751// UseSession creates a new Session and uses it to create a new SessionContext, which is used to call the fn callback.
752// The SessionContext parameter must be used as the Context parameter for any operations in the fn callback that should
753// be executed under a session. After the callback returns, the created Session is ended, meaning that any in-progress
754// transactions started by fn will be aborted even if fn returns an error.
755//
756// If the ctx parameter already contains a Session, that Session will be replaced with the newly created one.
757//
758// Any error returned by the fn callback will be returned without any modifications.
759func (c *Client) UseSession(ctx context.Context, fn func(SessionContext) error) error {
760	return c.UseSessionWithOptions(ctx, options.Session(), fn)
761}
762
763// UseSessionWithOptions operates like UseSession but uses the given SessionOptions to create the Session.
764func (c *Client) UseSessionWithOptions(ctx context.Context, opts *options.SessionOptions, fn func(SessionContext) error) error {
765	defaultSess, err := c.StartSession(opts)
766	if err != nil {
767		return err
768	}
769
770	defer defaultSess.EndSession(ctx)
771
772	sessCtx := sessionContext{
773		Context: context.WithValue(ctx, sessionKey{}, defaultSess),
774		Session: defaultSess,
775	}
776
777	return fn(sessCtx)
778}
779
780// Watch returns a change stream for all changes on the deployment. See
781// https://docs.mongodb.com/manual/changeStreams/ for more information about change streams.
782//
783// The client must be configured with read concern majority or no read concern for a change stream to be created
784// successfully.
785//
786// The pipeline parameter must be an array of documents, each representing a pipeline stage. The pipeline cannot be
787// nil or empty. The stage documents must all be non-nil. See https://docs.mongodb.com/manual/changeStreams/ for a list
788// of pipeline stages that can be used with change streams. For a pipeline of bson.D documents, the mongo.Pipeline{}
789// type can be used.
790//
791// The opts parameter can be used to specify options for change stream creation (see the options.ChangeStreamOptions
792// documentation).
793func (c *Client) Watch(ctx context.Context, pipeline interface{},
794	opts ...*options.ChangeStreamOptions) (*ChangeStream, error) {
795	if c.sessionPool == nil {
796		return nil, ErrClientDisconnected
797	}
798
799	csConfig := changeStreamConfig{
800		readConcern:    c.readConcern,
801		readPreference: c.readPreference,
802		client:         c,
803		registry:       c.registry,
804		streamType:     ClientStream,
805	}
806
807	return newChangeStream(ctx, csConfig, pipeline, opts...)
808}
809
810// NumberSessionsInProgress returns the number of sessions that have been started for this client but have not been
811// closed (i.e. EndSession has not been called).
812func (c *Client) NumberSessionsInProgress() int {
813	return c.sessionPool.CheckedOut()
814}
815