1// Copyright (C) MongoDB, Inc. 2017-present. 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); you may 4// not use this file except in compliance with the License. You may obtain 5// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 6 7package mongo 8 9import ( 10 "context" 11 "crypto/tls" 12 "errors" 13 "strings" 14 "time" 15 16 "go.mongodb.org/mongo-driver/bson" 17 "go.mongodb.org/mongo-driver/bson/bsoncodec" 18 "go.mongodb.org/mongo-driver/event" 19 "go.mongodb.org/mongo-driver/mongo/options" 20 "go.mongodb.org/mongo-driver/mongo/readconcern" 21 "go.mongodb.org/mongo-driver/mongo/readpref" 22 "go.mongodb.org/mongo-driver/mongo/writeconcern" 23 "go.mongodb.org/mongo-driver/x/bsonx/bsoncore" 24 "go.mongodb.org/mongo-driver/x/mongo/driver" 25 "go.mongodb.org/mongo-driver/x/mongo/driver/auth" 26 "go.mongodb.org/mongo-driver/x/mongo/driver/connstring" 27 "go.mongodb.org/mongo-driver/x/mongo/driver/description" 28 "go.mongodb.org/mongo-driver/x/mongo/driver/ocsp" 29 "go.mongodb.org/mongo-driver/x/mongo/driver/operation" 30 "go.mongodb.org/mongo-driver/x/mongo/driver/session" 31 "go.mongodb.org/mongo-driver/x/mongo/driver/topology" 32 "go.mongodb.org/mongo-driver/x/mongo/driver/uuid" 33) 34 35const defaultLocalThreshold = 15 * time.Millisecond 36 37var ( 38 // keyVaultCollOpts specifies options used to communicate with the key vault collection 39 keyVaultCollOpts = options.Collection().SetReadConcern(readconcern.Majority()). 40 SetWriteConcern(writeconcern.New(writeconcern.WMajority())) 41 42 endSessionsBatchSize = 10000 43) 44 45// Client is a handle representing a pool of connections to a MongoDB deployment. It is safe for concurrent use by 46// multiple goroutines. 47// 48// The Client type opens and closes connections automatically and maintains a pool of idle connections. For 49// connection pool configuration options, see documentation for the ClientOptions type in the mongo/options package. 50type Client struct { 51 id uuid.UUID 52 topologyOptions []topology.Option 53 deployment driver.Deployment 54 connString connstring.ConnString 55 localThreshold time.Duration 56 retryWrites bool 57 retryReads bool 58 clock *session.ClusterClock 59 readPreference *readpref.ReadPref 60 readConcern *readconcern.ReadConcern 61 writeConcern *writeconcern.WriteConcern 62 registry *bsoncodec.Registry 63 marshaller BSONAppender 64 monitor *event.CommandMonitor 65 sessionPool *session.Pool 66 67 // client-side encryption fields 68 keyVaultClient *Client 69 keyVaultColl *Collection 70 mongocryptd *mcryptClient 71 crypt *driver.Crypt 72} 73 74// Connect creates a new Client and then initializes it using the Connect method. This is equivalent to calling 75// NewClient followed by Client.Connect. 76// 77// When creating an options.ClientOptions, the order the methods are called matters. Later Set* 78// methods will overwrite the values from previous Set* method invocations. This includes the 79// ApplyURI method. This allows callers to determine the order of precedence for option 80// application. For instance, if ApplyURI is called before SetAuth, the Credential from 81// SetAuth will overwrite the values from the connection string. If ApplyURI is called 82// after SetAuth, then its values will overwrite those from SetAuth. 83// 84// The opts parameter is processed using options.MergeClientOptions, which will overwrite entire 85// option fields of previous options, there is no partial overwriting. For example, if Username is 86// set in the Auth field for the first option, and Password is set for the second but with no 87// Username, after the merge the Username field will be empty. 88// 89// The NewClient function does not do any I/O and returns an error if the given options are invalid. 90// The Client.Connect method starts background goroutines to monitor the state of the deployment and does not do 91// any I/O in the main goroutine to prevent the main goroutine from blocking. Therefore, it will not error if the 92// deployment is down. 93// 94// The Client.Ping method can be used to verify that the deployment is successfully connected and the 95// Client was correctly configured. 96func Connect(ctx context.Context, opts ...*options.ClientOptions) (*Client, error) { 97 c, err := NewClient(opts...) 98 if err != nil { 99 return nil, err 100 } 101 err = c.Connect(ctx) 102 if err != nil { 103 return nil, err 104 } 105 return c, nil 106} 107 108// NewClient creates a new client to connect to a deployment specified by the uri. 109// 110// When creating an options.ClientOptions, the order the methods are called matters. Later Set* 111// methods will overwrite the values from previous Set* method invocations. This includes the 112// ApplyURI method. This allows callers to determine the order of precedence for option 113// application. For instance, if ApplyURI is called before SetAuth, the Credential from 114// SetAuth will overwrite the values from the connection string. If ApplyURI is called 115// after SetAuth, then its values will overwrite those from SetAuth. 116// 117// The opts parameter is processed using options.MergeClientOptions, which will overwrite entire 118// option fields of previous options, there is no partial overwriting. For example, if Username is 119// set in the Auth field for the first option, and Password is set for the second but with no 120// Username, after the merge the Username field will be empty. 121func NewClient(opts ...*options.ClientOptions) (*Client, error) { 122 clientOpt := options.MergeClientOptions(opts...) 123 124 id, err := uuid.New() 125 if err != nil { 126 return nil, err 127 } 128 client := &Client{id: id} 129 130 err = client.configure(clientOpt) 131 if err != nil { 132 return nil, err 133 } 134 135 if client.deployment == nil { 136 client.deployment, err = topology.New(client.topologyOptions...) 137 if err != nil { 138 return nil, replaceErrors(err) 139 } 140 } 141 return client, nil 142} 143 144// Connect initializes the Client by starting background monitoring goroutines. 145// If the Client was created using the NewClient function, this method must be called before a Client can be used. 146// 147// Connect starts background goroutines to monitor the state of the deployment and does not do any I/O in the main 148// goroutine. The Client.Ping method can be used to verify that the connection was created successfully. 149func (c *Client) Connect(ctx context.Context) error { 150 if connector, ok := c.deployment.(driver.Connector); ok { 151 err := connector.Connect() 152 if err != nil { 153 return replaceErrors(err) 154 } 155 } 156 157 if c.mongocryptd != nil { 158 if err := c.mongocryptd.connect(ctx); err != nil { 159 return err 160 } 161 } 162 if c.keyVaultClient != nil { 163 if err := c.keyVaultClient.Connect(ctx); err != nil { 164 return err 165 } 166 } 167 168 var updateChan <-chan description.Topology 169 if subscriber, ok := c.deployment.(driver.Subscriber); ok { 170 sub, err := subscriber.Subscribe() 171 if err != nil { 172 return replaceErrors(err) 173 } 174 updateChan = sub.Updates 175 } 176 c.sessionPool = session.NewPool(updateChan) 177 return nil 178} 179 180// Disconnect closes sockets to the topology referenced by this Client. It will 181// shut down any monitoring goroutines, close the idle connection pool, and will 182// wait until all the in use connections have been returned to the connection 183// pool and closed before returning. If the context expires via cancellation, 184// deadline, or timeout before the in use connections have returned, the in use 185// connections will be closed, resulting in the failure of any in flight read 186// or write operations. If this method returns with no errors, all connections 187// associated with this Client have been closed. 188func (c *Client) Disconnect(ctx context.Context) error { 189 if ctx == nil { 190 ctx = context.Background() 191 } 192 193 c.endSessions(ctx) 194 if c.mongocryptd != nil { 195 if err := c.mongocryptd.disconnect(ctx); err != nil { 196 return err 197 } 198 } 199 if c.keyVaultClient != nil { 200 if err := c.keyVaultClient.Disconnect(ctx); err != nil { 201 return err 202 } 203 } 204 if c.crypt != nil { 205 c.crypt.Close() 206 } 207 208 if disconnector, ok := c.deployment.(driver.Disconnector); ok { 209 return replaceErrors(disconnector.Disconnect(ctx)) 210 } 211 return nil 212} 213 214// Ping sends a ping command to verify that the client can connect to the deployment. 215// 216// The rp paramter is used to determine which server is selected for the operation. 217// If it is nil, the client's read preference is used. 218// 219// If the server is down, Ping will try to select a server until the client's server selection timeout expires. 220// This can be configured through the ClientOptions.SetServerSelectionTimeout option when creating a new Client. 221// After the timeout expires, a server selection error is returned. 222// 223// Using Ping reduces application resilience because applications starting up will error if the server is temporarily 224// unavailable or is failing over (e.g. during autoscaling due to a load spike). 225func (c *Client) Ping(ctx context.Context, rp *readpref.ReadPref) error { 226 if ctx == nil { 227 ctx = context.Background() 228 } 229 230 if rp == nil { 231 rp = c.readPreference 232 } 233 234 db := c.Database("admin") 235 res := db.RunCommand(ctx, bson.D{ 236 {"ping", 1}, 237 }, options.RunCmd().SetReadPreference(rp)) 238 239 return replaceErrors(res.Err()) 240} 241 242// StartSession starts a new session configured with the given options. 243// 244// If the DefaultReadConcern, DefaultWriteConcern, or DefaultReadPreference options are not set, the client's read 245// concern, write concern, or read preference will be used, respectively. 246func (c *Client) StartSession(opts ...*options.SessionOptions) (Session, error) { 247 if c.sessionPool == nil { 248 return nil, ErrClientDisconnected 249 } 250 251 sopts := options.MergeSessionOptions(opts...) 252 coreOpts := &session.ClientOptions{ 253 DefaultReadConcern: c.readConcern, 254 DefaultReadPreference: c.readPreference, 255 DefaultWriteConcern: c.writeConcern, 256 } 257 if sopts.CausalConsistency != nil { 258 coreOpts.CausalConsistency = sopts.CausalConsistency 259 } 260 if sopts.DefaultReadConcern != nil { 261 coreOpts.DefaultReadConcern = sopts.DefaultReadConcern 262 } 263 if sopts.DefaultWriteConcern != nil { 264 coreOpts.DefaultWriteConcern = sopts.DefaultWriteConcern 265 } 266 if sopts.DefaultReadPreference != nil { 267 coreOpts.DefaultReadPreference = sopts.DefaultReadPreference 268 } 269 if sopts.DefaultMaxCommitTime != nil { 270 coreOpts.DefaultMaxCommitTime = sopts.DefaultMaxCommitTime 271 } 272 273 sess, err := session.NewClientSession(c.sessionPool, c.id, session.Explicit, coreOpts) 274 if err != nil { 275 return nil, replaceErrors(err) 276 } 277 278 // Writes are not retryable on standalones, so let operation determine whether to retry 279 sess.RetryWrite = false 280 sess.RetryRead = c.retryReads 281 282 return &sessionImpl{ 283 clientSession: sess, 284 client: c, 285 deployment: c.deployment, 286 }, nil 287} 288 289func (c *Client) endSessions(ctx context.Context) { 290 if c.sessionPool == nil { 291 return 292 } 293 294 sessionIDs := c.sessionPool.IDSlice() 295 op := operation.NewEndSessions(nil).ClusterClock(c.clock).Deployment(c.deployment). 296 ServerSelector(description.ReadPrefSelector(readpref.PrimaryPreferred())).CommandMonitor(c.monitor). 297 Database("admin").Crypt(c.crypt) 298 299 totalNumIDs := len(sessionIDs) 300 var currentBatch []bsoncore.Document 301 for i := 0; i < totalNumIDs; i++ { 302 currentBatch = append(currentBatch, sessionIDs[i]) 303 304 // If we are at the end of a batch or the end of the overall IDs array, execute the operation. 305 if ((i+1)%endSessionsBatchSize) == 0 || i == totalNumIDs-1 { 306 // Ignore all errors when ending sessions. 307 _, marshalVal, err := bson.MarshalValue(currentBatch) 308 if err == nil { 309 _ = op.SessionIDs(marshalVal).Execute(ctx) 310 } 311 312 currentBatch = currentBatch[:0] 313 } 314 } 315} 316 317func (c *Client) configure(opts *options.ClientOptions) error { 318 if err := opts.Validate(); err != nil { 319 return err 320 } 321 322 var connOpts []topology.ConnectionOption 323 var serverOpts []topology.ServerOption 324 var topologyOpts []topology.Option 325 326 // TODO(GODRIVER-814): Add tests for topology, server, and connection related options. 327 328 // ClusterClock 329 c.clock = new(session.ClusterClock) 330 331 // Pass down URI so topology can determine whether or not SRV polling is required 332 topologyOpts = append(topologyOpts, topology.WithURI(func(uri string) string { 333 return opts.GetURI() 334 })) 335 336 // AppName 337 var appName string 338 if opts.AppName != nil { 339 appName = *opts.AppName 340 341 serverOpts = append(serverOpts, topology.WithServerAppName(func(string) string { 342 return appName 343 })) 344 } 345 // Compressors & ZlibLevel 346 var comps []string 347 if len(opts.Compressors) > 0 { 348 comps = opts.Compressors 349 350 connOpts = append(connOpts, topology.WithCompressors( 351 func(compressors []string) []string { 352 return append(compressors, comps...) 353 }, 354 )) 355 356 for _, comp := range comps { 357 switch comp { 358 case "zlib": 359 connOpts = append(connOpts, topology.WithZlibLevel(func(level *int) *int { 360 return opts.ZlibLevel 361 })) 362 case "zstd": 363 connOpts = append(connOpts, topology.WithZstdLevel(func(level *int) *int { 364 return opts.ZstdLevel 365 })) 366 } 367 } 368 369 serverOpts = append(serverOpts, topology.WithCompressionOptions( 370 func(opts ...string) []string { return append(opts, comps...) }, 371 )) 372 } 373 // Handshaker 374 var handshaker = func(driver.Handshaker) driver.Handshaker { 375 return operation.NewIsMaster().AppName(appName).Compressors(comps).ClusterClock(c.clock) 376 } 377 // Auth & Database & Password & Username 378 if opts.Auth != nil { 379 cred := &auth.Cred{ 380 Username: opts.Auth.Username, 381 Password: opts.Auth.Password, 382 PasswordSet: opts.Auth.PasswordSet, 383 Props: opts.Auth.AuthMechanismProperties, 384 Source: opts.Auth.AuthSource, 385 } 386 mechanism := opts.Auth.AuthMechanism 387 388 if len(cred.Source) == 0 { 389 switch strings.ToUpper(mechanism) { 390 case auth.MongoDBX509, auth.GSSAPI, auth.PLAIN: 391 cred.Source = "$external" 392 default: 393 cred.Source = "admin" 394 } 395 } 396 397 authenticator, err := auth.CreateAuthenticator(mechanism, cred) 398 if err != nil { 399 return err 400 } 401 402 handshakeOpts := &auth.HandshakeOptions{ 403 AppName: appName, 404 Authenticator: authenticator, 405 Compressors: comps, 406 ClusterClock: c.clock, 407 } 408 if mechanism == "" { 409 // Required for SASL mechanism negotiation during handshake 410 handshakeOpts.DBUser = cred.Source + "." + cred.Username 411 } 412 if opts.AuthenticateToAnything != nil && *opts.AuthenticateToAnything { 413 // Authenticate arbiters 414 handshakeOpts.PerformAuthentication = func(serv description.Server) bool { 415 return true 416 } 417 } 418 419 handshaker = func(driver.Handshaker) driver.Handshaker { 420 return auth.Handshaker(nil, handshakeOpts) 421 } 422 } 423 connOpts = append(connOpts, topology.WithHandshaker(handshaker)) 424 // ConnectTimeout 425 if opts.ConnectTimeout != nil { 426 serverOpts = append(serverOpts, topology.WithHeartbeatTimeout( 427 func(time.Duration) time.Duration { return *opts.ConnectTimeout }, 428 )) 429 connOpts = append(connOpts, topology.WithConnectTimeout( 430 func(time.Duration) time.Duration { return *opts.ConnectTimeout }, 431 )) 432 } 433 // Dialer 434 if opts.Dialer != nil { 435 connOpts = append(connOpts, topology.WithDialer( 436 func(topology.Dialer) topology.Dialer { return opts.Dialer }, 437 )) 438 } 439 // Direct 440 if opts.Direct != nil && *opts.Direct { 441 topologyOpts = append(topologyOpts, topology.WithMode( 442 func(topology.MonitorMode) topology.MonitorMode { return topology.SingleMode }, 443 )) 444 } 445 // HeartbeatInterval 446 if opts.HeartbeatInterval != nil { 447 serverOpts = append(serverOpts, topology.WithHeartbeatInterval( 448 func(time.Duration) time.Duration { return *opts.HeartbeatInterval }, 449 )) 450 } 451 // Hosts 452 hosts := []string{"localhost:27017"} // default host 453 if len(opts.Hosts) > 0 { 454 hosts = opts.Hosts 455 } 456 topologyOpts = append(topologyOpts, topology.WithSeedList( 457 func(...string) []string { return hosts }, 458 )) 459 // LocalThreshold 460 c.localThreshold = defaultLocalThreshold 461 if opts.LocalThreshold != nil { 462 c.localThreshold = *opts.LocalThreshold 463 } 464 // MaxConIdleTime 465 if opts.MaxConnIdleTime != nil { 466 connOpts = append(connOpts, topology.WithIdleTimeout( 467 func(time.Duration) time.Duration { return *opts.MaxConnIdleTime }, 468 )) 469 } 470 // MaxPoolSize 471 if opts.MaxPoolSize != nil { 472 serverOpts = append( 473 serverOpts, 474 topology.WithMaxConnections(func(uint64) uint64 { return *opts.MaxPoolSize }), 475 ) 476 } 477 // MinPoolSize 478 if opts.MinPoolSize != nil { 479 serverOpts = append( 480 serverOpts, 481 topology.WithMinConnections(func(uint64) uint64 { return *opts.MinPoolSize }), 482 ) 483 } 484 // PoolMonitor 485 if opts.PoolMonitor != nil { 486 serverOpts = append( 487 serverOpts, 488 topology.WithConnectionPoolMonitor(func(*event.PoolMonitor) *event.PoolMonitor { return opts.PoolMonitor }), 489 ) 490 } 491 // Monitor 492 if opts.Monitor != nil { 493 c.monitor = opts.Monitor 494 connOpts = append(connOpts, topology.WithMonitor( 495 func(*event.CommandMonitor) *event.CommandMonitor { return opts.Monitor }, 496 )) 497 } 498 // ReadConcern 499 c.readConcern = readconcern.New() 500 if opts.ReadConcern != nil { 501 c.readConcern = opts.ReadConcern 502 } 503 // ReadPreference 504 c.readPreference = readpref.Primary() 505 if opts.ReadPreference != nil { 506 c.readPreference = opts.ReadPreference 507 } 508 // Registry 509 c.registry = bson.DefaultRegistry 510 if opts.Registry != nil { 511 c.registry = opts.Registry 512 } 513 // ReplicaSet 514 if opts.ReplicaSet != nil { 515 topologyOpts = append(topologyOpts, topology.WithReplicaSetName( 516 func(string) string { return *opts.ReplicaSet }, 517 )) 518 } 519 // RetryWrites 520 c.retryWrites = true // retry writes on by default 521 if opts.RetryWrites != nil { 522 c.retryWrites = *opts.RetryWrites 523 } 524 c.retryReads = true 525 if opts.RetryReads != nil { 526 c.retryReads = *opts.RetryReads 527 } 528 // ServerSelectionTimeout 529 if opts.ServerSelectionTimeout != nil { 530 topologyOpts = append(topologyOpts, topology.WithServerSelectionTimeout( 531 func(time.Duration) time.Duration { return *opts.ServerSelectionTimeout }, 532 )) 533 } 534 // SocketTimeout 535 if opts.SocketTimeout != nil { 536 connOpts = append( 537 connOpts, 538 topology.WithReadTimeout(func(time.Duration) time.Duration { return *opts.SocketTimeout }), 539 topology.WithWriteTimeout(func(time.Duration) time.Duration { return *opts.SocketTimeout }), 540 ) 541 } 542 // TLSConfig 543 if opts.TLSConfig != nil { 544 connOpts = append(connOpts, topology.WithTLSConfig( 545 func(*tls.Config) *tls.Config { 546 return opts.TLSConfig 547 }, 548 )) 549 } 550 // WriteConcern 551 if opts.WriteConcern != nil { 552 c.writeConcern = opts.WriteConcern 553 } 554 // AutoEncryptionOptions 555 if opts.AutoEncryptionOptions != nil { 556 if err := c.configureAutoEncryption(opts.AutoEncryptionOptions); err != nil { 557 return err 558 } 559 } 560 561 // OCSP cache 562 ocspCache := ocsp.NewCache() 563 connOpts = append( 564 connOpts, 565 topology.WithOCSPCache(func(ocsp.Cache) ocsp.Cache { return ocspCache }), 566 ) 567 568 // Disable communication with external OCSP responders. 569 if opts.DisableOCSPEndpointCheck != nil { 570 connOpts = append( 571 connOpts, 572 topology.WithDisableOCSPEndpointCheck(func(bool) bool { return *opts.DisableOCSPEndpointCheck }), 573 ) 574 } 575 576 serverOpts = append( 577 serverOpts, 578 topology.WithClock(func(*session.ClusterClock) *session.ClusterClock { return c.clock }), 579 topology.WithConnectionOptions(func(...topology.ConnectionOption) []topology.ConnectionOption { return connOpts }), 580 ) 581 c.topologyOptions = append(topologyOpts, topology.WithServerOptions( 582 func(...topology.ServerOption) []topology.ServerOption { return serverOpts }, 583 )) 584 585 // Deployment 586 if opts.Deployment != nil { 587 // topology options: WithSeedlist and WithURI 588 // server options: WithClock and WithConnectionOptions 589 if len(serverOpts) > 2 || len(topologyOpts) > 2 { 590 return errors.New("cannot specify topology or server options with a deployment") 591 } 592 c.deployment = opts.Deployment 593 } 594 595 return nil 596} 597 598func (c *Client) configureAutoEncryption(opts *options.AutoEncryptionOptions) error { 599 if err := c.configureKeyVault(opts); err != nil { 600 return err 601 } 602 if err := c.configureMongocryptd(opts); err != nil { 603 return err 604 } 605 return c.configureCrypt(opts) 606} 607 608func (c *Client) configureKeyVault(opts *options.AutoEncryptionOptions) error { 609 // parse key vault options and create new client if necessary 610 if opts.KeyVaultClientOptions != nil { 611 var err error 612 c.keyVaultClient, err = NewClient(opts.KeyVaultClientOptions) 613 if err != nil { 614 return err 615 } 616 } 617 618 dbName, collName := splitNamespace(opts.KeyVaultNamespace) 619 client := c.keyVaultClient 620 if client == nil { 621 client = c 622 } 623 c.keyVaultColl = client.Database(dbName).Collection(collName, keyVaultCollOpts) 624 return nil 625} 626 627func (c *Client) configureMongocryptd(opts *options.AutoEncryptionOptions) error { 628 var err error 629 c.mongocryptd, err = newMcryptClient(opts) 630 return err 631} 632 633func (c *Client) configureCrypt(opts *options.AutoEncryptionOptions) error { 634 // convert schemas in SchemaMap to bsoncore documents 635 cryptSchemaMap := make(map[string]bsoncore.Document) 636 for k, v := range opts.SchemaMap { 637 schema, err := transformBsoncoreDocument(c.registry, v) 638 if err != nil { 639 return err 640 } 641 cryptSchemaMap[k] = schema 642 } 643 644 // configure options 645 var bypass bool 646 if opts.BypassAutoEncryption != nil { 647 bypass = *opts.BypassAutoEncryption 648 } 649 kr := keyRetriever{coll: c.keyVaultColl} 650 cir := collInfoRetriever{client: c} 651 cryptOpts := &driver.CryptOptions{ 652 CollInfoFn: cir.cryptCollInfo, 653 KeyFn: kr.cryptKeys, 654 MarkFn: c.mongocryptd.markCommand, 655 KmsProviders: opts.KmsProviders, 656 BypassAutoEncryption: bypass, 657 SchemaMap: cryptSchemaMap, 658 } 659 660 var err error 661 c.crypt, err = driver.NewCrypt(cryptOpts) 662 return err 663} 664 665// validSession returns an error if the session doesn't belong to the client 666func (c *Client) validSession(sess *session.Client) error { 667 if sess != nil && !uuid.Equal(sess.ClientID, c.id) { 668 return ErrWrongClient 669 } 670 return nil 671} 672 673// Database returns a handle for a database with the given name configured with the given DatabaseOptions. 674func (c *Client) Database(name string, opts ...*options.DatabaseOptions) *Database { 675 return newDatabase(c, name, opts...) 676} 677 678// ListDatabases executes a listDatabases command and returns the result. 679// 680// The filter parameter must be a document containing query operators and can be used to select which 681// databases are included in the result. It cannot be nil. An empty document (e.g. bson.D{}) should be used to include 682// all databases. 683// 684// The opts paramter can be used to specify options for this operation (see the options.ListDatabasesOptions documentation). 685// 686// For more information about the command, see https://docs.mongodb.com/manual/reference/command/listDatabases/. 687func (c *Client) ListDatabases(ctx context.Context, filter interface{}, opts ...*options.ListDatabasesOptions) (ListDatabasesResult, error) { 688 if ctx == nil { 689 ctx = context.Background() 690 } 691 692 sess := sessionFromContext(ctx) 693 694 err := c.validSession(sess) 695 if sess == nil && c.sessionPool != nil { 696 sess, err = session.NewClientSession(c.sessionPool, c.id, session.Implicit) 697 if err != nil { 698 return ListDatabasesResult{}, err 699 } 700 defer sess.EndSession() 701 } 702 703 err = c.validSession(sess) 704 if err != nil { 705 return ListDatabasesResult{}, err 706 } 707 708 filterDoc, err := transformBsoncoreDocument(c.registry, filter) 709 if err != nil { 710 return ListDatabasesResult{}, err 711 } 712 713 selector := description.CompositeSelector([]description.ServerSelector{ 714 description.ReadPrefSelector(readpref.Primary()), 715 description.LatencySelector(c.localThreshold), 716 }) 717 selector = makeReadPrefSelector(sess, selector, c.localThreshold) 718 719 ldo := options.MergeListDatabasesOptions(opts...) 720 op := operation.NewListDatabases(filterDoc). 721 Session(sess).ReadPreference(c.readPreference).CommandMonitor(c.monitor). 722 ServerSelector(selector).ClusterClock(c.clock).Database("admin").Deployment(c.deployment).Crypt(c.crypt) 723 724 if ldo.NameOnly != nil { 725 op = op.NameOnly(*ldo.NameOnly) 726 } 727 if ldo.AuthorizedDatabases != nil { 728 op = op.AuthorizedDatabases(*ldo.AuthorizedDatabases) 729 } 730 731 retry := driver.RetryNone 732 if c.retryReads { 733 retry = driver.RetryOncePerCommand 734 } 735 op.Retry(retry) 736 737 err = op.Execute(ctx) 738 if err != nil { 739 return ListDatabasesResult{}, replaceErrors(err) 740 } 741 742 return newListDatabasesResultFromOperation(op.Result()), nil 743} 744 745// ListDatabaseNames executes a listDatabases command and returns a slice containing the names of all of the databases 746// on the server. 747// 748// The filter parameter must be a document containing query operators and can be used to select which databases 749// are included in the result. It cannot be nil. An empty document (e.g. bson.D{}) should be used to include all 750// databases. 751// 752// The opts parameter can be used to specify options for this operation (see the options.ListDatabasesOptions 753// documentation.) 754// 755// For more information about the command, see https://docs.mongodb.com/manual/reference/command/listDatabases/. 756func (c *Client) ListDatabaseNames(ctx context.Context, filter interface{}, opts ...*options.ListDatabasesOptions) ([]string, error) { 757 opts = append(opts, options.ListDatabases().SetNameOnly(true)) 758 759 res, err := c.ListDatabases(ctx, filter, opts...) 760 if err != nil { 761 return nil, err 762 } 763 764 names := make([]string, 0) 765 for _, spec := range res.Databases { 766 names = append(names, spec.Name) 767 } 768 769 return names, nil 770} 771 772// WithSession creates a new SessionContext from the ctx and sess parameters and uses it to call the fn callback. The 773// SessionContext must be used as the Context parameter for any operations in the fn callback that should be executed 774// under the session. 775// 776// If the ctx parameter already contains a Session, that Session will be replaced with the one provided. 777// 778// Any error returned by the fn callback will be returned without any modifications. 779func WithSession(ctx context.Context, sess Session, fn func(SessionContext) error) error { 780 return fn(NewSessionContext(ctx, sess)) 781} 782 783// UseSession creates a new Session and uses it to create a new SessionContext, which is used to call the fn callback. 784// The SessionContext parameter must be used as the Context parameter for any operations in the fn callback that should 785// be executed under a session. After the callback returns, the created Session is ended, meaning that any in-progress 786// transactions started by fn will be aborted even if fn returns an error. 787// 788// If the ctx parameter already contains a Session, that Session will be replaced with the newly created one. 789// 790// Any error returned by the fn callback will be returned without any modifications. 791func (c *Client) UseSession(ctx context.Context, fn func(SessionContext) error) error { 792 return c.UseSessionWithOptions(ctx, options.Session(), fn) 793} 794 795// UseSessionWithOptions operates like UseSession but uses the given SessionOptions to create the Session. 796func (c *Client) UseSessionWithOptions(ctx context.Context, opts *options.SessionOptions, fn func(SessionContext) error) error { 797 defaultSess, err := c.StartSession(opts) 798 if err != nil { 799 return err 800 } 801 802 defer defaultSess.EndSession(ctx) 803 return fn(NewSessionContext(ctx, defaultSess)) 804} 805 806// Watch returns a change stream for all changes on the deployment. See 807// https://docs.mongodb.com/manual/changeStreams/ for more information about change streams. 808// 809// The client must be configured with read concern majority or no read concern for a change stream to be created 810// successfully. 811// 812// The pipeline parameter must be an array of documents, each representing a pipeline stage. The pipeline cannot be 813// nil or empty. The stage documents must all be non-nil. See https://docs.mongodb.com/manual/changeStreams/ for a list 814// of pipeline stages that can be used with change streams. For a pipeline of bson.D documents, the mongo.Pipeline{} 815// type can be used. 816// 817// The opts parameter can be used to specify options for change stream creation (see the options.ChangeStreamOptions 818// documentation). 819func (c *Client) Watch(ctx context.Context, pipeline interface{}, 820 opts ...*options.ChangeStreamOptions) (*ChangeStream, error) { 821 if c.sessionPool == nil { 822 return nil, ErrClientDisconnected 823 } 824 825 csConfig := changeStreamConfig{ 826 readConcern: c.readConcern, 827 readPreference: c.readPreference, 828 client: c, 829 registry: c.registry, 830 streamType: ClientStream, 831 crypt: c.crypt, 832 } 833 834 return newChangeStream(ctx, csConfig, pipeline, opts...) 835} 836 837// NumberSessionsInProgress returns the number of sessions that have been started for this client but have not been 838// closed (i.e. EndSession has not been called). 839func (c *Client) NumberSessionsInProgress() int { 840 return c.sessionPool.CheckedOut() 841} 842