1// Copyright (C) MongoDB, Inc. 2017-present.
2//
3// Licensed under the Apache License, Version 2.0 (the "License"); you may
4// not use this file except in compliance with the License. You may obtain
5// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
6
7package mongo
8
9import (
10	"context"
11	"crypto/tls"
12	"errors"
13	"strings"
14	"time"
15
16	"go.mongodb.org/mongo-driver/bson"
17	"go.mongodb.org/mongo-driver/bson/bsoncodec"
18	"go.mongodb.org/mongo-driver/event"
19	"go.mongodb.org/mongo-driver/mongo/options"
20	"go.mongodb.org/mongo-driver/mongo/readconcern"
21	"go.mongodb.org/mongo-driver/mongo/readpref"
22	"go.mongodb.org/mongo-driver/mongo/writeconcern"
23	"go.mongodb.org/mongo-driver/x/bsonx/bsoncore"
24	"go.mongodb.org/mongo-driver/x/mongo/driver"
25	"go.mongodb.org/mongo-driver/x/mongo/driver/auth"
26	"go.mongodb.org/mongo-driver/x/mongo/driver/connstring"
27	"go.mongodb.org/mongo-driver/x/mongo/driver/description"
28	"go.mongodb.org/mongo-driver/x/mongo/driver/ocsp"
29	"go.mongodb.org/mongo-driver/x/mongo/driver/operation"
30	"go.mongodb.org/mongo-driver/x/mongo/driver/session"
31	"go.mongodb.org/mongo-driver/x/mongo/driver/topology"
32	"go.mongodb.org/mongo-driver/x/mongo/driver/uuid"
33)
34
35const defaultLocalThreshold = 15 * time.Millisecond
36
37var (
38	// keyVaultCollOpts specifies options used to communicate with the key vault collection
39	keyVaultCollOpts = options.Collection().SetReadConcern(readconcern.Majority()).
40				SetWriteConcern(writeconcern.New(writeconcern.WMajority()))
41
42	endSessionsBatchSize = 10000
43)
44
45// Client is a handle representing a pool of connections to a MongoDB deployment. It is safe for concurrent use by
46// multiple goroutines.
47//
48// The Client type opens and closes connections automatically and maintains a pool of idle connections. For
49// connection pool configuration options, see documentation for the ClientOptions type in the mongo/options package.
50type Client struct {
51	id              uuid.UUID
52	topologyOptions []topology.Option
53	deployment      driver.Deployment
54	connString      connstring.ConnString
55	localThreshold  time.Duration
56	retryWrites     bool
57	retryReads      bool
58	clock           *session.ClusterClock
59	readPreference  *readpref.ReadPref
60	readConcern     *readconcern.ReadConcern
61	writeConcern    *writeconcern.WriteConcern
62	registry        *bsoncodec.Registry
63	marshaller      BSONAppender
64	monitor         *event.CommandMonitor
65	sessionPool     *session.Pool
66
67	// client-side encryption fields
68	keyVaultClient *Client
69	keyVaultColl   *Collection
70	mongocryptd    *mcryptClient
71	crypt          *driver.Crypt
72}
73
74// Connect creates a new Client and then initializes it using the Connect method. This is equivalent to calling
75// NewClient followed by Client.Connect.
76//
77// When creating an options.ClientOptions, the order the methods are called matters. Later Set*
78// methods will overwrite the values from previous Set* method invocations. This includes the
79// ApplyURI method. This allows callers to determine the order of precedence for option
80// application. For instance, if ApplyURI is called before SetAuth, the Credential from
81// SetAuth will overwrite the values from the connection string. If ApplyURI is called
82// after SetAuth, then its values will overwrite those from SetAuth.
83//
84// The opts parameter is processed using options.MergeClientOptions, which will overwrite entire
85// option fields of previous options, there is no partial overwriting. For example, if Username is
86// set in the Auth field for the first option, and Password is set for the second but with no
87// Username, after the merge the Username field will be empty.
88//
89// The NewClient function does not do any I/O and returns an error if the given options are invalid.
90// The Client.Connect method starts background goroutines to monitor the state of the deployment and does not do
91// any I/O in the main goroutine to prevent the main goroutine from blocking. Therefore, it will not error if the
92// deployment is down.
93//
94// The Client.Ping method can be used to verify that the deployment is successfully connected and the
95// Client was correctly configured.
96func Connect(ctx context.Context, opts ...*options.ClientOptions) (*Client, error) {
97	c, err := NewClient(opts...)
98	if err != nil {
99		return nil, err
100	}
101	err = c.Connect(ctx)
102	if err != nil {
103		return nil, err
104	}
105	return c, nil
106}
107
108// NewClient creates a new client to connect to a deployment specified by the uri.
109//
110// When creating an options.ClientOptions, the order the methods are called matters. Later Set*
111// methods will overwrite the values from previous Set* method invocations. This includes the
112// ApplyURI method. This allows callers to determine the order of precedence for option
113// application. For instance, if ApplyURI is called before SetAuth, the Credential from
114// SetAuth will overwrite the values from the connection string. If ApplyURI is called
115// after SetAuth, then its values will overwrite those from SetAuth.
116//
117// The opts parameter is processed using options.MergeClientOptions, which will overwrite entire
118// option fields of previous options, there is no partial overwriting. For example, if Username is
119// set in the Auth field for the first option, and Password is set for the second but with no
120// Username, after the merge the Username field will be empty.
121func NewClient(opts ...*options.ClientOptions) (*Client, error) {
122	clientOpt := options.MergeClientOptions(opts...)
123
124	id, err := uuid.New()
125	if err != nil {
126		return nil, err
127	}
128	client := &Client{id: id}
129
130	err = client.configure(clientOpt)
131	if err != nil {
132		return nil, err
133	}
134
135	if client.deployment == nil {
136		client.deployment, err = topology.New(client.topologyOptions...)
137		if err != nil {
138			return nil, replaceErrors(err)
139		}
140	}
141	return client, nil
142}
143
144// Connect initializes the Client by starting background monitoring goroutines.
145// If the Client was created using the NewClient function, this method must be called before a Client can be used.
146//
147// Connect starts background goroutines to monitor the state of the deployment and does not do any I/O in the main
148// goroutine. The Client.Ping method can be used to verify that the connection was created successfully.
149func (c *Client) Connect(ctx context.Context) error {
150	if connector, ok := c.deployment.(driver.Connector); ok {
151		err := connector.Connect()
152		if err != nil {
153			return replaceErrors(err)
154		}
155	}
156
157	if c.mongocryptd != nil {
158		if err := c.mongocryptd.connect(ctx); err != nil {
159			return err
160		}
161	}
162	if c.keyVaultClient != nil {
163		if err := c.keyVaultClient.Connect(ctx); err != nil {
164			return err
165		}
166	}
167
168	var updateChan <-chan description.Topology
169	if subscriber, ok := c.deployment.(driver.Subscriber); ok {
170		sub, err := subscriber.Subscribe()
171		if err != nil {
172			return replaceErrors(err)
173		}
174		updateChan = sub.Updates
175	}
176	c.sessionPool = session.NewPool(updateChan)
177	return nil
178}
179
180// Disconnect closes sockets to the topology referenced by this Client. It will
181// shut down any monitoring goroutines, close the idle connection pool, and will
182// wait until all the in use connections have been returned to the connection
183// pool and closed before returning. If the context expires via cancellation,
184// deadline, or timeout before the in use connections have returned, the in use
185// connections will be closed, resulting in the failure of any in flight read
186// or write operations. If this method returns with no errors, all connections
187// associated with this Client have been closed.
188func (c *Client) Disconnect(ctx context.Context) error {
189	if ctx == nil {
190		ctx = context.Background()
191	}
192
193	c.endSessions(ctx)
194	if c.mongocryptd != nil {
195		if err := c.mongocryptd.disconnect(ctx); err != nil {
196			return err
197		}
198	}
199	if c.keyVaultClient != nil {
200		if err := c.keyVaultClient.Disconnect(ctx); err != nil {
201			return err
202		}
203	}
204	if c.crypt != nil {
205		c.crypt.Close()
206	}
207
208	if disconnector, ok := c.deployment.(driver.Disconnector); ok {
209		return replaceErrors(disconnector.Disconnect(ctx))
210	}
211	return nil
212}
213
214// Ping sends a ping command to verify that the client can connect to the deployment.
215//
216// The rp paramter is used to determine which server is selected for the operation.
217// If it is nil, the client's read preference is used.
218//
219// If the server is down, Ping will try to select a server until the client's server selection timeout expires.
220// This can be configured through the ClientOptions.SetServerSelectionTimeout option when creating a new Client.
221// After the timeout expires, a server selection error is returned.
222//
223// Using Ping reduces application resilience because applications starting up will error if the server is temporarily
224// unavailable or is failing over (e.g. during autoscaling due to a load spike).
225func (c *Client) Ping(ctx context.Context, rp *readpref.ReadPref) error {
226	if ctx == nil {
227		ctx = context.Background()
228	}
229
230	if rp == nil {
231		rp = c.readPreference
232	}
233
234	db := c.Database("admin")
235	res := db.RunCommand(ctx, bson.D{
236		{"ping", 1},
237	}, options.RunCmd().SetReadPreference(rp))
238
239	return replaceErrors(res.Err())
240}
241
242// StartSession starts a new session configured with the given options.
243//
244// If the DefaultReadConcern, DefaultWriteConcern, or DefaultReadPreference options are not set, the client's read
245// concern, write concern, or read preference will be used, respectively.
246func (c *Client) StartSession(opts ...*options.SessionOptions) (Session, error) {
247	if c.sessionPool == nil {
248		return nil, ErrClientDisconnected
249	}
250
251	sopts := options.MergeSessionOptions(opts...)
252	coreOpts := &session.ClientOptions{
253		DefaultReadConcern:    c.readConcern,
254		DefaultReadPreference: c.readPreference,
255		DefaultWriteConcern:   c.writeConcern,
256	}
257	if sopts.CausalConsistency != nil {
258		coreOpts.CausalConsistency = sopts.CausalConsistency
259	}
260	if sopts.DefaultReadConcern != nil {
261		coreOpts.DefaultReadConcern = sopts.DefaultReadConcern
262	}
263	if sopts.DefaultWriteConcern != nil {
264		coreOpts.DefaultWriteConcern = sopts.DefaultWriteConcern
265	}
266	if sopts.DefaultReadPreference != nil {
267		coreOpts.DefaultReadPreference = sopts.DefaultReadPreference
268	}
269	if sopts.DefaultMaxCommitTime != nil {
270		coreOpts.DefaultMaxCommitTime = sopts.DefaultMaxCommitTime
271	}
272
273	sess, err := session.NewClientSession(c.sessionPool, c.id, session.Explicit, coreOpts)
274	if err != nil {
275		return nil, replaceErrors(err)
276	}
277
278	// Writes are not retryable on standalones, so let operation determine whether to retry
279	sess.RetryWrite = false
280	sess.RetryRead = c.retryReads
281
282	return &sessionImpl{
283		clientSession: sess,
284		client:        c,
285		deployment:    c.deployment,
286	}, nil
287}
288
289func (c *Client) endSessions(ctx context.Context) {
290	if c.sessionPool == nil {
291		return
292	}
293
294	sessionIDs := c.sessionPool.IDSlice()
295	op := operation.NewEndSessions(nil).ClusterClock(c.clock).Deployment(c.deployment).
296		ServerSelector(description.ReadPrefSelector(readpref.PrimaryPreferred())).CommandMonitor(c.monitor).
297		Database("admin").Crypt(c.crypt)
298
299	totalNumIDs := len(sessionIDs)
300	var currentBatch []bsoncore.Document
301	for i := 0; i < totalNumIDs; i++ {
302		currentBatch = append(currentBatch, sessionIDs[i])
303
304		// If we are at the end of a batch or the end of the overall IDs array, execute the operation.
305		if ((i+1)%endSessionsBatchSize) == 0 || i == totalNumIDs-1 {
306			// Ignore all errors when ending sessions.
307			_, marshalVal, err := bson.MarshalValue(currentBatch)
308			if err == nil {
309				_ = op.SessionIDs(marshalVal).Execute(ctx)
310			}
311
312			currentBatch = currentBatch[:0]
313		}
314	}
315}
316
317func (c *Client) configure(opts *options.ClientOptions) error {
318	if err := opts.Validate(); err != nil {
319		return err
320	}
321
322	var connOpts []topology.ConnectionOption
323	var serverOpts []topology.ServerOption
324	var topologyOpts []topology.Option
325
326	// TODO(GODRIVER-814): Add tests for topology, server, and connection related options.
327
328	// ClusterClock
329	c.clock = new(session.ClusterClock)
330
331	// Pass down URI so topology can determine whether or not SRV polling is required
332	topologyOpts = append(topologyOpts, topology.WithURI(func(uri string) string {
333		return opts.GetURI()
334	}))
335
336	// AppName
337	var appName string
338	if opts.AppName != nil {
339		appName = *opts.AppName
340
341		serverOpts = append(serverOpts, topology.WithServerAppName(func(string) string {
342			return appName
343		}))
344	}
345	// Compressors & ZlibLevel
346	var comps []string
347	if len(opts.Compressors) > 0 {
348		comps = opts.Compressors
349
350		connOpts = append(connOpts, topology.WithCompressors(
351			func(compressors []string) []string {
352				return append(compressors, comps...)
353			},
354		))
355
356		for _, comp := range comps {
357			switch comp {
358			case "zlib":
359				connOpts = append(connOpts, topology.WithZlibLevel(func(level *int) *int {
360					return opts.ZlibLevel
361				}))
362			case "zstd":
363				connOpts = append(connOpts, topology.WithZstdLevel(func(level *int) *int {
364					return opts.ZstdLevel
365				}))
366			}
367		}
368
369		serverOpts = append(serverOpts, topology.WithCompressionOptions(
370			func(opts ...string) []string { return append(opts, comps...) },
371		))
372	}
373	// Handshaker
374	var handshaker = func(driver.Handshaker) driver.Handshaker {
375		return operation.NewIsMaster().AppName(appName).Compressors(comps).ClusterClock(c.clock)
376	}
377	// Auth & Database & Password & Username
378	if opts.Auth != nil {
379		cred := &auth.Cred{
380			Username:    opts.Auth.Username,
381			Password:    opts.Auth.Password,
382			PasswordSet: opts.Auth.PasswordSet,
383			Props:       opts.Auth.AuthMechanismProperties,
384			Source:      opts.Auth.AuthSource,
385		}
386		mechanism := opts.Auth.AuthMechanism
387
388		if len(cred.Source) == 0 {
389			switch strings.ToUpper(mechanism) {
390			case auth.MongoDBX509, auth.GSSAPI, auth.PLAIN:
391				cred.Source = "$external"
392			default:
393				cred.Source = "admin"
394			}
395		}
396
397		authenticator, err := auth.CreateAuthenticator(mechanism, cred)
398		if err != nil {
399			return err
400		}
401
402		handshakeOpts := &auth.HandshakeOptions{
403			AppName:       appName,
404			Authenticator: authenticator,
405			Compressors:   comps,
406			ClusterClock:  c.clock,
407		}
408		if mechanism == "" {
409			// Required for SASL mechanism negotiation during handshake
410			handshakeOpts.DBUser = cred.Source + "." + cred.Username
411		}
412		if opts.AuthenticateToAnything != nil && *opts.AuthenticateToAnything {
413			// Authenticate arbiters
414			handshakeOpts.PerformAuthentication = func(serv description.Server) bool {
415				return true
416			}
417		}
418
419		handshaker = func(driver.Handshaker) driver.Handshaker {
420			return auth.Handshaker(nil, handshakeOpts)
421		}
422	}
423	connOpts = append(connOpts, topology.WithHandshaker(handshaker))
424	// ConnectTimeout
425	if opts.ConnectTimeout != nil {
426		serverOpts = append(serverOpts, topology.WithHeartbeatTimeout(
427			func(time.Duration) time.Duration { return *opts.ConnectTimeout },
428		))
429		connOpts = append(connOpts, topology.WithConnectTimeout(
430			func(time.Duration) time.Duration { return *opts.ConnectTimeout },
431		))
432	}
433	// Dialer
434	if opts.Dialer != nil {
435		connOpts = append(connOpts, topology.WithDialer(
436			func(topology.Dialer) topology.Dialer { return opts.Dialer },
437		))
438	}
439	// Direct
440	if opts.Direct != nil && *opts.Direct {
441		topologyOpts = append(topologyOpts, topology.WithMode(
442			func(topology.MonitorMode) topology.MonitorMode { return topology.SingleMode },
443		))
444	}
445	// HeartbeatInterval
446	if opts.HeartbeatInterval != nil {
447		serverOpts = append(serverOpts, topology.WithHeartbeatInterval(
448			func(time.Duration) time.Duration { return *opts.HeartbeatInterval },
449		))
450	}
451	// Hosts
452	hosts := []string{"localhost:27017"} // default host
453	if len(opts.Hosts) > 0 {
454		hosts = opts.Hosts
455	}
456	topologyOpts = append(topologyOpts, topology.WithSeedList(
457		func(...string) []string { return hosts },
458	))
459	// LocalThreshold
460	c.localThreshold = defaultLocalThreshold
461	if opts.LocalThreshold != nil {
462		c.localThreshold = *opts.LocalThreshold
463	}
464	// MaxConIdleTime
465	if opts.MaxConnIdleTime != nil {
466		connOpts = append(connOpts, topology.WithIdleTimeout(
467			func(time.Duration) time.Duration { return *opts.MaxConnIdleTime },
468		))
469	}
470	// MaxPoolSize
471	if opts.MaxPoolSize != nil {
472		serverOpts = append(
473			serverOpts,
474			topology.WithMaxConnections(func(uint64) uint64 { return *opts.MaxPoolSize }),
475		)
476	}
477	// MinPoolSize
478	if opts.MinPoolSize != nil {
479		serverOpts = append(
480			serverOpts,
481			topology.WithMinConnections(func(uint64) uint64 { return *opts.MinPoolSize }),
482		)
483	}
484	// PoolMonitor
485	if opts.PoolMonitor != nil {
486		serverOpts = append(
487			serverOpts,
488			topology.WithConnectionPoolMonitor(func(*event.PoolMonitor) *event.PoolMonitor { return opts.PoolMonitor }),
489		)
490	}
491	// Monitor
492	if opts.Monitor != nil {
493		c.monitor = opts.Monitor
494		connOpts = append(connOpts, topology.WithMonitor(
495			func(*event.CommandMonitor) *event.CommandMonitor { return opts.Monitor },
496		))
497	}
498	// ReadConcern
499	c.readConcern = readconcern.New()
500	if opts.ReadConcern != nil {
501		c.readConcern = opts.ReadConcern
502	}
503	// ReadPreference
504	c.readPreference = readpref.Primary()
505	if opts.ReadPreference != nil {
506		c.readPreference = opts.ReadPreference
507	}
508	// Registry
509	c.registry = bson.DefaultRegistry
510	if opts.Registry != nil {
511		c.registry = opts.Registry
512	}
513	// ReplicaSet
514	if opts.ReplicaSet != nil {
515		topologyOpts = append(topologyOpts, topology.WithReplicaSetName(
516			func(string) string { return *opts.ReplicaSet },
517		))
518	}
519	// RetryWrites
520	c.retryWrites = true // retry writes on by default
521	if opts.RetryWrites != nil {
522		c.retryWrites = *opts.RetryWrites
523	}
524	c.retryReads = true
525	if opts.RetryReads != nil {
526		c.retryReads = *opts.RetryReads
527	}
528	// ServerSelectionTimeout
529	if opts.ServerSelectionTimeout != nil {
530		topologyOpts = append(topologyOpts, topology.WithServerSelectionTimeout(
531			func(time.Duration) time.Duration { return *opts.ServerSelectionTimeout },
532		))
533	}
534	// SocketTimeout
535	if opts.SocketTimeout != nil {
536		connOpts = append(
537			connOpts,
538			topology.WithReadTimeout(func(time.Duration) time.Duration { return *opts.SocketTimeout }),
539			topology.WithWriteTimeout(func(time.Duration) time.Duration { return *opts.SocketTimeout }),
540		)
541	}
542	// TLSConfig
543	if opts.TLSConfig != nil {
544		connOpts = append(connOpts, topology.WithTLSConfig(
545			func(*tls.Config) *tls.Config {
546				return opts.TLSConfig
547			},
548		))
549	}
550	// WriteConcern
551	if opts.WriteConcern != nil {
552		c.writeConcern = opts.WriteConcern
553	}
554	// AutoEncryptionOptions
555	if opts.AutoEncryptionOptions != nil {
556		if err := c.configureAutoEncryption(opts.AutoEncryptionOptions); err != nil {
557			return err
558		}
559	}
560
561	// OCSP cache
562	ocspCache := ocsp.NewCache()
563	connOpts = append(
564		connOpts,
565		topology.WithOCSPCache(func(ocsp.Cache) ocsp.Cache { return ocspCache }),
566	)
567
568	// Disable communication with external OCSP responders.
569	if opts.DisableOCSPEndpointCheck != nil {
570		connOpts = append(
571			connOpts,
572			topology.WithDisableOCSPEndpointCheck(func(bool) bool { return *opts.DisableOCSPEndpointCheck }),
573		)
574	}
575
576	serverOpts = append(
577		serverOpts,
578		topology.WithClock(func(*session.ClusterClock) *session.ClusterClock { return c.clock }),
579		topology.WithConnectionOptions(func(...topology.ConnectionOption) []topology.ConnectionOption { return connOpts }),
580	)
581	c.topologyOptions = append(topologyOpts, topology.WithServerOptions(
582		func(...topology.ServerOption) []topology.ServerOption { return serverOpts },
583	))
584
585	// Deployment
586	if opts.Deployment != nil {
587		// topology options: WithSeedlist and WithURI
588		// server options: WithClock and WithConnectionOptions
589		if len(serverOpts) > 2 || len(topologyOpts) > 2 {
590			return errors.New("cannot specify topology or server options with a deployment")
591		}
592		c.deployment = opts.Deployment
593	}
594
595	return nil
596}
597
598func (c *Client) configureAutoEncryption(opts *options.AutoEncryptionOptions) error {
599	if err := c.configureKeyVault(opts); err != nil {
600		return err
601	}
602	if err := c.configureMongocryptd(opts); err != nil {
603		return err
604	}
605	return c.configureCrypt(opts)
606}
607
608func (c *Client) configureKeyVault(opts *options.AutoEncryptionOptions) error {
609	// parse key vault options and create new client if necessary
610	if opts.KeyVaultClientOptions != nil {
611		var err error
612		c.keyVaultClient, err = NewClient(opts.KeyVaultClientOptions)
613		if err != nil {
614			return err
615		}
616	}
617
618	dbName, collName := splitNamespace(opts.KeyVaultNamespace)
619	client := c.keyVaultClient
620	if client == nil {
621		client = c
622	}
623	c.keyVaultColl = client.Database(dbName).Collection(collName, keyVaultCollOpts)
624	return nil
625}
626
627func (c *Client) configureMongocryptd(opts *options.AutoEncryptionOptions) error {
628	var err error
629	c.mongocryptd, err = newMcryptClient(opts)
630	return err
631}
632
633func (c *Client) configureCrypt(opts *options.AutoEncryptionOptions) error {
634	// convert schemas in SchemaMap to bsoncore documents
635	cryptSchemaMap := make(map[string]bsoncore.Document)
636	for k, v := range opts.SchemaMap {
637		schema, err := transformBsoncoreDocument(c.registry, v)
638		if err != nil {
639			return err
640		}
641		cryptSchemaMap[k] = schema
642	}
643
644	// configure options
645	var bypass bool
646	if opts.BypassAutoEncryption != nil {
647		bypass = *opts.BypassAutoEncryption
648	}
649	kr := keyRetriever{coll: c.keyVaultColl}
650	cir := collInfoRetriever{client: c}
651	cryptOpts := &driver.CryptOptions{
652		CollInfoFn:           cir.cryptCollInfo,
653		KeyFn:                kr.cryptKeys,
654		MarkFn:               c.mongocryptd.markCommand,
655		KmsProviders:         opts.KmsProviders,
656		BypassAutoEncryption: bypass,
657		SchemaMap:            cryptSchemaMap,
658	}
659
660	var err error
661	c.crypt, err = driver.NewCrypt(cryptOpts)
662	return err
663}
664
665// validSession returns an error if the session doesn't belong to the client
666func (c *Client) validSession(sess *session.Client) error {
667	if sess != nil && !uuid.Equal(sess.ClientID, c.id) {
668		return ErrWrongClient
669	}
670	return nil
671}
672
673// Database returns a handle for a database with the given name configured with the given DatabaseOptions.
674func (c *Client) Database(name string, opts ...*options.DatabaseOptions) *Database {
675	return newDatabase(c, name, opts...)
676}
677
678// ListDatabases executes a listDatabases command and returns the result.
679//
680// The filter parameter must be a document containing query operators and can be used to select which
681// databases are included in the result. It cannot be nil. An empty document (e.g. bson.D{}) should be used to include
682// all databases.
683//
684// The opts paramter can be used to specify options for this operation (see the options.ListDatabasesOptions documentation).
685//
686// For more information about the command, see https://docs.mongodb.com/manual/reference/command/listDatabases/.
687func (c *Client) ListDatabases(ctx context.Context, filter interface{}, opts ...*options.ListDatabasesOptions) (ListDatabasesResult, error) {
688	if ctx == nil {
689		ctx = context.Background()
690	}
691
692	sess := sessionFromContext(ctx)
693
694	err := c.validSession(sess)
695	if sess == nil && c.sessionPool != nil {
696		sess, err = session.NewClientSession(c.sessionPool, c.id, session.Implicit)
697		if err != nil {
698			return ListDatabasesResult{}, err
699		}
700		defer sess.EndSession()
701	}
702
703	err = c.validSession(sess)
704	if err != nil {
705		return ListDatabasesResult{}, err
706	}
707
708	filterDoc, err := transformBsoncoreDocument(c.registry, filter)
709	if err != nil {
710		return ListDatabasesResult{}, err
711	}
712
713	selector := description.CompositeSelector([]description.ServerSelector{
714		description.ReadPrefSelector(readpref.Primary()),
715		description.LatencySelector(c.localThreshold),
716	})
717	selector = makeReadPrefSelector(sess, selector, c.localThreshold)
718
719	ldo := options.MergeListDatabasesOptions(opts...)
720	op := operation.NewListDatabases(filterDoc).
721		Session(sess).ReadPreference(c.readPreference).CommandMonitor(c.monitor).
722		ServerSelector(selector).ClusterClock(c.clock).Database("admin").Deployment(c.deployment).Crypt(c.crypt)
723
724	if ldo.NameOnly != nil {
725		op = op.NameOnly(*ldo.NameOnly)
726	}
727	if ldo.AuthorizedDatabases != nil {
728		op = op.AuthorizedDatabases(*ldo.AuthorizedDatabases)
729	}
730
731	retry := driver.RetryNone
732	if c.retryReads {
733		retry = driver.RetryOncePerCommand
734	}
735	op.Retry(retry)
736
737	err = op.Execute(ctx)
738	if err != nil {
739		return ListDatabasesResult{}, replaceErrors(err)
740	}
741
742	return newListDatabasesResultFromOperation(op.Result()), nil
743}
744
745// ListDatabaseNames executes a listDatabases command and returns a slice containing the names of all of the databases
746// on the server.
747//
748// The filter parameter must be a document containing query operators and can be used to select which databases
749// are included in the result. It cannot be nil. An empty document (e.g. bson.D{}) should be used to include all
750// databases.
751//
752// The opts parameter can be used to specify options for this operation (see the options.ListDatabasesOptions
753// documentation.)
754//
755// For more information about the command, see https://docs.mongodb.com/manual/reference/command/listDatabases/.
756func (c *Client) ListDatabaseNames(ctx context.Context, filter interface{}, opts ...*options.ListDatabasesOptions) ([]string, error) {
757	opts = append(opts, options.ListDatabases().SetNameOnly(true))
758
759	res, err := c.ListDatabases(ctx, filter, opts...)
760	if err != nil {
761		return nil, err
762	}
763
764	names := make([]string, 0)
765	for _, spec := range res.Databases {
766		names = append(names, spec.Name)
767	}
768
769	return names, nil
770}
771
772// WithSession creates a new SessionContext from the ctx and sess parameters and uses it to call the fn callback. The
773// SessionContext must be used as the Context parameter for any operations in the fn callback that should be executed
774// under the session.
775//
776// If the ctx parameter already contains a Session, that Session will be replaced with the one provided.
777//
778// Any error returned by the fn callback will be returned without any modifications.
779func WithSession(ctx context.Context, sess Session, fn func(SessionContext) error) error {
780	return fn(NewSessionContext(ctx, sess))
781}
782
783// UseSession creates a new Session and uses it to create a new SessionContext, which is used to call the fn callback.
784// The SessionContext parameter must be used as the Context parameter for any operations in the fn callback that should
785// be executed under a session. After the callback returns, the created Session is ended, meaning that any in-progress
786// transactions started by fn will be aborted even if fn returns an error.
787//
788// If the ctx parameter already contains a Session, that Session will be replaced with the newly created one.
789//
790// Any error returned by the fn callback will be returned without any modifications.
791func (c *Client) UseSession(ctx context.Context, fn func(SessionContext) error) error {
792	return c.UseSessionWithOptions(ctx, options.Session(), fn)
793}
794
795// UseSessionWithOptions operates like UseSession but uses the given SessionOptions to create the Session.
796func (c *Client) UseSessionWithOptions(ctx context.Context, opts *options.SessionOptions, fn func(SessionContext) error) error {
797	defaultSess, err := c.StartSession(opts)
798	if err != nil {
799		return err
800	}
801
802	defer defaultSess.EndSession(ctx)
803	return fn(NewSessionContext(ctx, defaultSess))
804}
805
806// Watch returns a change stream for all changes on the deployment. See
807// https://docs.mongodb.com/manual/changeStreams/ for more information about change streams.
808//
809// The client must be configured with read concern majority or no read concern for a change stream to be created
810// successfully.
811//
812// The pipeline parameter must be an array of documents, each representing a pipeline stage. The pipeline cannot be
813// nil or empty. The stage documents must all be non-nil. See https://docs.mongodb.com/manual/changeStreams/ for a list
814// of pipeline stages that can be used with change streams. For a pipeline of bson.D documents, the mongo.Pipeline{}
815// type can be used.
816//
817// The opts parameter can be used to specify options for change stream creation (see the options.ChangeStreamOptions
818// documentation).
819func (c *Client) Watch(ctx context.Context, pipeline interface{},
820	opts ...*options.ChangeStreamOptions) (*ChangeStream, error) {
821	if c.sessionPool == nil {
822		return nil, ErrClientDisconnected
823	}
824
825	csConfig := changeStreamConfig{
826		readConcern:    c.readConcern,
827		readPreference: c.readPreference,
828		client:         c,
829		registry:       c.registry,
830		streamType:     ClientStream,
831		crypt:          c.crypt,
832	}
833
834	return newChangeStream(ctx, csConfig, pipeline, opts...)
835}
836
837// NumberSessionsInProgress returns the number of sessions that have been started for this client but have not been
838// closed (i.e. EndSession has not been called).
839func (c *Client) NumberSessionsInProgress() int {
840	return c.sessionPool.CheckedOut()
841}
842