1// Copyright (C) MongoDB, Inc. 2017-present. 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); you may 4// not use this file except in compliance with the License. You may obtain 5// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 6 7package mongo 8 9import ( 10 "context" 11 "crypto/tls" 12 "errors" 13 "strconv" 14 "strings" 15 "time" 16 17 "go.mongodb.org/mongo-driver/bson" 18 "go.mongodb.org/mongo-driver/bson/bsoncodec" 19 "go.mongodb.org/mongo-driver/event" 20 "go.mongodb.org/mongo-driver/mongo/options" 21 "go.mongodb.org/mongo-driver/mongo/readconcern" 22 "go.mongodb.org/mongo-driver/mongo/readpref" 23 "go.mongodb.org/mongo-driver/mongo/writeconcern" 24 "go.mongodb.org/mongo-driver/x/bsonx/bsoncore" 25 "go.mongodb.org/mongo-driver/x/mongo/driver" 26 "go.mongodb.org/mongo-driver/x/mongo/driver/auth" 27 "go.mongodb.org/mongo-driver/x/mongo/driver/connstring" 28 "go.mongodb.org/mongo-driver/x/mongo/driver/description" 29 "go.mongodb.org/mongo-driver/x/mongo/driver/operation" 30 "go.mongodb.org/mongo-driver/x/mongo/driver/session" 31 "go.mongodb.org/mongo-driver/x/mongo/driver/topology" 32 "go.mongodb.org/mongo-driver/x/mongo/driver/uuid" 33) 34 35const defaultLocalThreshold = 15 * time.Millisecond 36const batchSize = 10000 37 38// keyVaultCollOpts specifies options used to communicate with the key vault collection 39var keyVaultCollOpts = options.Collection().SetReadConcern(readconcern.Majority()). 40 SetWriteConcern(writeconcern.New(writeconcern.WMajority())) 41 42// Client is a handle representing a pool of connections to a MongoDB deployment. It is safe for concurrent use by 43// multiple goroutines. 44// 45// The Client type opens and closes connections automatically and maintains a pool of idle connections. For 46// connection pool configuration options, see documentation for the ClientOptions type in the mongo/options package. 47type Client struct { 48 id uuid.UUID 49 topologyOptions []topology.Option 50 deployment driver.Deployment 51 connString connstring.ConnString 52 localThreshold time.Duration 53 retryWrites bool 54 retryReads bool 55 clock *session.ClusterClock 56 readPreference *readpref.ReadPref 57 readConcern *readconcern.ReadConcern 58 writeConcern *writeconcern.WriteConcern 59 registry *bsoncodec.Registry 60 marshaller BSONAppender 61 monitor *event.CommandMonitor 62 sessionPool *session.Pool 63 64 // client-side encryption fields 65 keyVaultClient *Client 66 keyVaultColl *Collection 67 mongocryptd *mcryptClient 68 crypt *driver.Crypt 69} 70 71// Connect creates a new Client and then initializes it using the Connect method. This is equivalent to calling 72// NewClient followed by Client.Connect. 73// 74// When creating an options.ClientOptions, the order the methods are called matters. Later Set* 75// methods will overwrite the values from previous Set* method invocations. This includes the 76// ApplyURI method. This allows callers to determine the order of precedence for option 77// application. For instance, if ApplyURI is called before SetAuth, the Credential from 78// SetAuth will overwrite the values from the connection string. If ApplyURI is called 79// after SetAuth, then its values will overwrite those from SetAuth. 80// 81// The opts parameter is processed using options.MergeClientOptions, which will overwrite entire 82// option fields of previous options, there is no partial overwriting. For example, if Username is 83// set in the Auth field for the first option, and Password is set for the second but with no 84// Username, after the merge the Username field will be empty. 85// 86// The NewClient function does not do any I/O and returns an error if the given options are invalid. 87// The Client.Connect method starts background goroutines to monitor the state of the deployment and does not do 88// any I/O in the main goroutine to prevent the main goroutine from blocking. Therefore, it will not error if the 89// deployment is down. 90// 91// The Client.Ping method can be used to verify that the deployment is successfully connected and the 92// Client was correctly configured. 93func Connect(ctx context.Context, opts ...*options.ClientOptions) (*Client, error) { 94 c, err := NewClient(opts...) 95 if err != nil { 96 return nil, err 97 } 98 err = c.Connect(ctx) 99 if err != nil { 100 return nil, err 101 } 102 return c, nil 103} 104 105// NewClient creates a new client to connect to a deployment specified by the uri. 106// 107// When creating an options.ClientOptions, the order the methods are called matters. Later Set* 108// methods will overwrite the values from previous Set* method invocations. This includes the 109// ApplyURI method. This allows callers to determine the order of precedence for option 110// application. For instance, if ApplyURI is called before SetAuth, the Credential from 111// SetAuth will overwrite the values from the connection string. If ApplyURI is called 112// after SetAuth, then its values will overwrite those from SetAuth. 113// 114// The opts parameter is processed using options.MergeClientOptions, which will overwrite entire 115// option fields of previous options, there is no partial overwriting. For example, if Username is 116// set in the Auth field for the first option, and Password is set for the second but with no 117// Username, after the merge the Username field will be empty. 118func NewClient(opts ...*options.ClientOptions) (*Client, error) { 119 clientOpt := options.MergeClientOptions(opts...) 120 121 id, err := uuid.New() 122 if err != nil { 123 return nil, err 124 } 125 client := &Client{id: id} 126 127 err = client.configure(clientOpt) 128 if err != nil { 129 return nil, err 130 } 131 132 if client.deployment == nil { 133 client.deployment, err = topology.New(client.topologyOptions...) 134 if err != nil { 135 return nil, replaceErrors(err) 136 } 137 } 138 return client, nil 139} 140 141// Connect initializes the Client by starting background monitoring goroutines. 142// If the Client was created using the NewClient function, this method must be called before a Client can be used. 143// 144// Connect starts background goroutines to monitor the state of the deployment and does not do any I/O in the main 145// goroutine. The Client.Ping method can be used to verify that the connection was created successfully. 146func (c *Client) Connect(ctx context.Context) error { 147 if connector, ok := c.deployment.(driver.Connector); ok { 148 err := connector.Connect() 149 if err != nil { 150 return replaceErrors(err) 151 } 152 } 153 154 if c.mongocryptd != nil { 155 if err := c.mongocryptd.connect(ctx); err != nil { 156 return err 157 } 158 } 159 if c.keyVaultClient != nil { 160 if err := c.keyVaultClient.Connect(ctx); err != nil { 161 return err 162 } 163 } 164 165 var updateChan <-chan description.Topology 166 if subscriber, ok := c.deployment.(driver.Subscriber); ok { 167 sub, err := subscriber.Subscribe() 168 if err != nil { 169 return replaceErrors(err) 170 } 171 updateChan = sub.Updates 172 } 173 c.sessionPool = session.NewPool(updateChan) 174 return nil 175} 176 177// Disconnect closes sockets to the topology referenced by this Client. It will 178// shut down any monitoring goroutines, close the idle connection pool, and will 179// wait until all the in use connections have been returned to the connection 180// pool and closed before returning. If the context expires via cancellation, 181// deadline, or timeout before the in use connections have returned, the in use 182// connections will be closed, resulting in the failure of any in flight read 183// or write operations. If this method returns with no errors, all connections 184// associated with this Client have been closed. 185func (c *Client) Disconnect(ctx context.Context) error { 186 if ctx == nil { 187 ctx = context.Background() 188 } 189 190 c.endSessions(ctx) 191 if c.mongocryptd != nil { 192 if err := c.mongocryptd.disconnect(ctx); err != nil { 193 return err 194 } 195 } 196 if c.keyVaultClient != nil { 197 if err := c.keyVaultClient.Disconnect(ctx); err != nil { 198 return err 199 } 200 } 201 if c.crypt != nil { 202 c.crypt.Close() 203 } 204 205 if disconnector, ok := c.deployment.(driver.Disconnector); ok { 206 return replaceErrors(disconnector.Disconnect(ctx)) 207 } 208 return nil 209} 210 211// Ping sends a ping command to verify that the client can connect to the deployment. 212// 213// The rp paramter is used to determine which server is selected for the operation. 214// If it is nil, the client's read preference is used. 215// 216// If the server is down, Ping will try to select a server until the client's server selection timeout expires. 217// This can be configured through the ClientOptions.SetServerSelectionTimeout option when creating a new Client. 218// After the timeout expires, a server selection error is returned. 219// 220// Using Ping reduces application resilience because applications starting up will error if the server is temporarily 221// unavailable or is failing over (e.g. during autoscaling due to a load spike). 222func (c *Client) Ping(ctx context.Context, rp *readpref.ReadPref) error { 223 if ctx == nil { 224 ctx = context.Background() 225 } 226 227 if rp == nil { 228 rp = c.readPreference 229 } 230 231 db := c.Database("admin") 232 res := db.RunCommand(ctx, bson.D{ 233 {"ping", 1}, 234 }, options.RunCmd().SetReadPreference(rp)) 235 236 return replaceErrors(res.Err()) 237} 238 239// StartSession starts a new session configured with the given options. 240// 241// If the DefaultReadConcern, DefaultWriteConcern, or DefaultReadPreference options are not set, the client's read 242// concern, write concern, or read preference will be used, respectively. 243func (c *Client) StartSession(opts ...*options.SessionOptions) (Session, error) { 244 if c.sessionPool == nil { 245 return nil, ErrClientDisconnected 246 } 247 248 sopts := options.MergeSessionOptions(opts...) 249 coreOpts := &session.ClientOptions{ 250 DefaultReadConcern: c.readConcern, 251 DefaultReadPreference: c.readPreference, 252 DefaultWriteConcern: c.writeConcern, 253 } 254 if sopts.CausalConsistency != nil { 255 coreOpts.CausalConsistency = sopts.CausalConsistency 256 } 257 if sopts.DefaultReadConcern != nil { 258 coreOpts.DefaultReadConcern = sopts.DefaultReadConcern 259 } 260 if sopts.DefaultWriteConcern != nil { 261 coreOpts.DefaultWriteConcern = sopts.DefaultWriteConcern 262 } 263 if sopts.DefaultReadPreference != nil { 264 coreOpts.DefaultReadPreference = sopts.DefaultReadPreference 265 } 266 if sopts.DefaultMaxCommitTime != nil { 267 coreOpts.DefaultMaxCommitTime = sopts.DefaultMaxCommitTime 268 } 269 270 sess, err := session.NewClientSession(c.sessionPool, c.id, session.Explicit, coreOpts) 271 if err != nil { 272 return nil, replaceErrors(err) 273 } 274 275 sess.RetryWrite = c.retryWrites 276 sess.RetryRead = c.retryReads 277 278 return &sessionImpl{ 279 clientSession: sess, 280 client: c, 281 deployment: c.deployment, 282 }, nil 283} 284 285func (c *Client) endSessions(ctx context.Context) { 286 if c.sessionPool == nil { 287 return 288 } 289 290 ids := c.sessionPool.IDSlice() 291 idx, idArray := bsoncore.AppendArrayStart(nil) 292 for i, id := range ids { 293 idDoc, _ := id.MarshalBSON() 294 idArray = bsoncore.AppendDocumentElement(idArray, strconv.Itoa(i), idDoc) 295 } 296 idArray, _ = bsoncore.AppendArrayEnd(idArray, idx) 297 298 op := operation.NewEndSessions(idArray).ClusterClock(c.clock).Deployment(c.deployment). 299 ServerSelector(description.ReadPrefSelector(readpref.PrimaryPreferred())).CommandMonitor(c.monitor). 300 Database("admin").Crypt(c.crypt) 301 302 idx, idArray = bsoncore.AppendArrayStart(nil) 303 totalNumIDs := len(ids) 304 for i := 0; i < totalNumIDs; i++ { 305 idDoc, _ := ids[i].MarshalBSON() 306 idArray = bsoncore.AppendDocumentElement(idArray, strconv.Itoa(i), idDoc) 307 if ((i+1)%batchSize) == 0 || i == totalNumIDs-1 { 308 idArray, _ = bsoncore.AppendArrayEnd(idArray, idx) 309 _ = op.SessionIDs(idArray).Execute(ctx) 310 idArray = idArray[:0] 311 idx = 0 312 } 313 } 314 315} 316 317func (c *Client) configure(opts *options.ClientOptions) error { 318 if err := opts.Validate(); err != nil { 319 return err 320 } 321 322 var connOpts []topology.ConnectionOption 323 var serverOpts []topology.ServerOption 324 var topologyOpts []topology.Option 325 326 // TODO(GODRIVER-814): Add tests for topology, server, and connection related options. 327 328 // AppName 329 var appName string 330 if opts.AppName != nil { 331 appName = *opts.AppName 332 } 333 // Compressors & ZlibLevel 334 var comps []string 335 if len(opts.Compressors) > 0 { 336 comps = opts.Compressors 337 338 connOpts = append(connOpts, topology.WithCompressors( 339 func(compressors []string) []string { 340 return append(compressors, comps...) 341 }, 342 )) 343 344 for _, comp := range comps { 345 switch comp { 346 case "zlib": 347 connOpts = append(connOpts, topology.WithZlibLevel(func(level *int) *int { 348 return opts.ZlibLevel 349 })) 350 case "zstd": 351 connOpts = append(connOpts, topology.WithZstdLevel(func(level *int) *int { 352 return opts.ZstdLevel 353 })) 354 } 355 } 356 357 serverOpts = append(serverOpts, topology.WithCompressionOptions( 358 func(opts ...string) []string { return append(opts, comps...) }, 359 )) 360 } 361 // Handshaker 362 var handshaker = func(driver.Handshaker) driver.Handshaker { 363 return operation.NewIsMaster().AppName(appName).Compressors(comps) 364 } 365 // Auth & Database & Password & Username 366 if opts.Auth != nil { 367 cred := &auth.Cred{ 368 Username: opts.Auth.Username, 369 Password: opts.Auth.Password, 370 PasswordSet: opts.Auth.PasswordSet, 371 Props: opts.Auth.AuthMechanismProperties, 372 Source: opts.Auth.AuthSource, 373 } 374 mechanism := opts.Auth.AuthMechanism 375 376 if len(cred.Source) == 0 { 377 switch strings.ToUpper(mechanism) { 378 case auth.MongoDBX509, auth.GSSAPI, auth.PLAIN: 379 cred.Source = "$external" 380 default: 381 cred.Source = "admin" 382 } 383 } 384 385 authenticator, err := auth.CreateAuthenticator(mechanism, cred) 386 if err != nil { 387 return err 388 } 389 390 handshakeOpts := &auth.HandshakeOptions{ 391 AppName: appName, 392 Authenticator: authenticator, 393 Compressors: comps, 394 } 395 if mechanism == "" { 396 // Required for SASL mechanism negotiation during handshake 397 handshakeOpts.DBUser = cred.Source + "." + cred.Username 398 } 399 if opts.AuthenticateToAnything != nil && *opts.AuthenticateToAnything { 400 // Authenticate arbiters 401 handshakeOpts.PerformAuthentication = func(serv description.Server) bool { 402 return true 403 } 404 } 405 406 handshaker = func(driver.Handshaker) driver.Handshaker { 407 return auth.Handshaker(nil, handshakeOpts) 408 } 409 } 410 connOpts = append(connOpts, topology.WithHandshaker(handshaker)) 411 // ConnectTimeout 412 if opts.ConnectTimeout != nil { 413 serverOpts = append(serverOpts, topology.WithHeartbeatTimeout( 414 func(time.Duration) time.Duration { return *opts.ConnectTimeout }, 415 )) 416 connOpts = append(connOpts, topology.WithConnectTimeout( 417 func(time.Duration) time.Duration { return *opts.ConnectTimeout }, 418 )) 419 } 420 // Dialer 421 if opts.Dialer != nil { 422 connOpts = append(connOpts, topology.WithDialer( 423 func(topology.Dialer) topology.Dialer { return opts.Dialer }, 424 )) 425 } 426 // Direct 427 if opts.Direct != nil && *opts.Direct { 428 topologyOpts = append(topologyOpts, topology.WithMode( 429 func(topology.MonitorMode) topology.MonitorMode { return topology.SingleMode }, 430 )) 431 } 432 // HeartbeatInterval 433 if opts.HeartbeatInterval != nil { 434 serverOpts = append(serverOpts, topology.WithHeartbeatInterval( 435 func(time.Duration) time.Duration { return *opts.HeartbeatInterval }, 436 )) 437 } 438 // Hosts 439 hosts := []string{"localhost:27017"} // default host 440 if len(opts.Hosts) > 0 { 441 hosts = opts.Hosts 442 } 443 topologyOpts = append(topologyOpts, topology.WithSeedList( 444 func(...string) []string { return hosts }, 445 )) 446 // LocalThreshold 447 c.localThreshold = defaultLocalThreshold 448 if opts.LocalThreshold != nil { 449 c.localThreshold = *opts.LocalThreshold 450 } 451 // MaxConIdleTime 452 if opts.MaxConnIdleTime != nil { 453 connOpts = append(connOpts, topology.WithIdleTimeout( 454 func(time.Duration) time.Duration { return *opts.MaxConnIdleTime }, 455 )) 456 } 457 // MaxPoolSize 458 if opts.MaxPoolSize != nil { 459 serverOpts = append( 460 serverOpts, 461 topology.WithMaxConnections(func(uint64) uint64 { return *opts.MaxPoolSize }), 462 ) 463 } 464 // MinPoolSize 465 if opts.MinPoolSize != nil { 466 serverOpts = append( 467 serverOpts, 468 topology.WithMinConnections(func(uint64) uint64 { return *opts.MinPoolSize }), 469 ) 470 } 471 // PoolMonitor 472 if opts.PoolMonitor != nil { 473 serverOpts = append( 474 serverOpts, 475 topology.WithConnectionPoolMonitor(func(*event.PoolMonitor) *event.PoolMonitor { return opts.PoolMonitor }), 476 ) 477 } 478 // Monitor 479 if opts.Monitor != nil { 480 c.monitor = opts.Monitor 481 connOpts = append(connOpts, topology.WithMonitor( 482 func(*event.CommandMonitor) *event.CommandMonitor { return opts.Monitor }, 483 )) 484 } 485 // ReadConcern 486 c.readConcern = readconcern.New() 487 if opts.ReadConcern != nil { 488 c.readConcern = opts.ReadConcern 489 } 490 // ReadPreference 491 c.readPreference = readpref.Primary() 492 if opts.ReadPreference != nil { 493 c.readPreference = opts.ReadPreference 494 } 495 // Registry 496 c.registry = bson.DefaultRegistry 497 if opts.Registry != nil { 498 c.registry = opts.Registry 499 } 500 // ReplicaSet 501 if opts.ReplicaSet != nil { 502 topologyOpts = append(topologyOpts, topology.WithReplicaSetName( 503 func(string) string { return *opts.ReplicaSet }, 504 )) 505 } 506 // RetryWrites 507 c.retryWrites = true // retry writes on by default 508 if opts.RetryWrites != nil { 509 c.retryWrites = *opts.RetryWrites 510 } 511 c.retryReads = true 512 if opts.RetryReads != nil { 513 c.retryReads = *opts.RetryReads 514 } 515 // ServerSelectionTimeout 516 if opts.ServerSelectionTimeout != nil { 517 topologyOpts = append(topologyOpts, topology.WithServerSelectionTimeout( 518 func(time.Duration) time.Duration { return *opts.ServerSelectionTimeout }, 519 )) 520 } 521 // SocketTimeout 522 if opts.SocketTimeout != nil { 523 connOpts = append( 524 connOpts, 525 topology.WithReadTimeout(func(time.Duration) time.Duration { return *opts.SocketTimeout }), 526 topology.WithWriteTimeout(func(time.Duration) time.Duration { return *opts.SocketTimeout }), 527 ) 528 } 529 // TLSConfig 530 if opts.TLSConfig != nil { 531 connOpts = append(connOpts, topology.WithTLSConfig( 532 func(*tls.Config) *tls.Config { 533 return opts.TLSConfig 534 }, 535 )) 536 } 537 // WriteConcern 538 if opts.WriteConcern != nil { 539 c.writeConcern = opts.WriteConcern 540 } 541 // AutoEncryptionOptions 542 if opts.AutoEncryptionOptions != nil { 543 if err := c.configureAutoEncryption(opts.AutoEncryptionOptions); err != nil { 544 return err 545 } 546 } 547 548 // ClusterClock 549 c.clock = new(session.ClusterClock) 550 551 serverOpts = append( 552 serverOpts, 553 topology.WithClock(func(*session.ClusterClock) *session.ClusterClock { return c.clock }), 554 topology.WithConnectionOptions(func(...topology.ConnectionOption) []topology.ConnectionOption { return connOpts }), 555 ) 556 c.topologyOptions = append(topologyOpts, topology.WithServerOptions( 557 func(...topology.ServerOption) []topology.ServerOption { return serverOpts }, 558 )) 559 560 // Deployment 561 if opts.Deployment != nil { 562 if len(serverOpts) > 2 || len(topologyOpts) > 1 { 563 return errors.New("cannot specify topology or server options with a deployment") 564 } 565 c.deployment = opts.Deployment 566 } 567 568 return nil 569} 570 571func (c *Client) configureAutoEncryption(opts *options.AutoEncryptionOptions) error { 572 if err := c.configureKeyVault(opts); err != nil { 573 return err 574 } 575 if err := c.configureMongocryptd(opts); err != nil { 576 return err 577 } 578 return c.configureCrypt(opts) 579} 580 581func (c *Client) configureKeyVault(opts *options.AutoEncryptionOptions) error { 582 // parse key vault options and create new client if necessary 583 if opts.KeyVaultClientOptions != nil { 584 var err error 585 c.keyVaultClient, err = NewClient(opts.KeyVaultClientOptions) 586 if err != nil { 587 return err 588 } 589 } 590 591 dbName, collName := splitNamespace(opts.KeyVaultNamespace) 592 client := c.keyVaultClient 593 if client == nil { 594 client = c 595 } 596 c.keyVaultColl = client.Database(dbName).Collection(collName, keyVaultCollOpts) 597 return nil 598} 599 600func (c *Client) configureMongocryptd(opts *options.AutoEncryptionOptions) error { 601 var err error 602 c.mongocryptd, err = newMcryptClient(opts.ExtraOptions) 603 return err 604} 605 606func (c *Client) configureCrypt(opts *options.AutoEncryptionOptions) error { 607 // convert schemas in SchemaMap to bsoncore documents 608 cryptSchemaMap := make(map[string]bsoncore.Document) 609 for k, v := range opts.SchemaMap { 610 schema, err := transformBsoncoreDocument(c.registry, v) 611 if err != nil { 612 return err 613 } 614 cryptSchemaMap[k] = schema 615 } 616 617 // configure options 618 var bypass bool 619 if opts.BypassAutoEncryption != nil { 620 bypass = *opts.BypassAutoEncryption 621 } 622 kr := keyRetriever{coll: c.keyVaultColl} 623 cir := collInfoRetriever{client: c} 624 cryptOpts := &driver.CryptOptions{ 625 CollInfoFn: cir.cryptCollInfo, 626 KeyFn: kr.cryptKeys, 627 MarkFn: c.mongocryptd.markCommand, 628 KmsProviders: opts.KmsProviders, 629 BypassAutoEncryption: bypass, 630 SchemaMap: cryptSchemaMap, 631 } 632 633 var err error 634 c.crypt, err = driver.NewCrypt(cryptOpts) 635 return err 636} 637 638// validSession returns an error if the session doesn't belong to the client 639func (c *Client) validSession(sess *session.Client) error { 640 if sess != nil && !uuid.Equal(sess.ClientID, c.id) { 641 return ErrWrongClient 642 } 643 return nil 644} 645 646// Database returns a handle for a database with the given name configured with the given DatabaseOptions. 647func (c *Client) Database(name string, opts ...*options.DatabaseOptions) *Database { 648 return newDatabase(c, name, opts...) 649} 650 651// ListDatabases executes a listDatabases command and returns the result. 652// 653// The filter parameter must be a document containing query operators and can be used to select which 654// databases are included in the result. It cannot be nil. An empty document (e.g. bson.D{}) should be used to include 655// all databases. 656// 657// The opts paramter can be used to specify options for this operation (see the options.ListDatabasesOptions documentation). 658// 659// For more information about the command, see https://docs.mongodb.com/manual/reference/command/listDatabases/. 660func (c *Client) ListDatabases(ctx context.Context, filter interface{}, opts ...*options.ListDatabasesOptions) (ListDatabasesResult, error) { 661 if ctx == nil { 662 ctx = context.Background() 663 } 664 665 sess := sessionFromContext(ctx) 666 667 err := c.validSession(sess) 668 if sess == nil && c.sessionPool != nil { 669 sess, err = session.NewClientSession(c.sessionPool, c.id, session.Implicit) 670 if err != nil { 671 return ListDatabasesResult{}, err 672 } 673 defer sess.EndSession() 674 } 675 676 err = c.validSession(sess) 677 if err != nil { 678 return ListDatabasesResult{}, err 679 } 680 681 filterDoc, err := transformBsoncoreDocument(c.registry, filter) 682 if err != nil { 683 return ListDatabasesResult{}, err 684 } 685 686 selector := description.CompositeSelector([]description.ServerSelector{ 687 description.ReadPrefSelector(readpref.Primary()), 688 description.LatencySelector(c.localThreshold), 689 }) 690 selector = makeReadPrefSelector(sess, selector, c.localThreshold) 691 692 ldo := options.MergeListDatabasesOptions(opts...) 693 op := operation.NewListDatabases(filterDoc). 694 Session(sess).ReadPreference(c.readPreference).CommandMonitor(c.monitor). 695 ServerSelector(selector).ClusterClock(c.clock).Database("admin").Deployment(c.deployment).Crypt(c.crypt) 696 if ldo.NameOnly != nil { 697 op = op.NameOnly(*ldo.NameOnly) 698 } 699 retry := driver.RetryNone 700 if c.retryReads { 701 retry = driver.RetryOncePerCommand 702 } 703 op.Retry(retry) 704 705 err = op.Execute(ctx) 706 if err != nil { 707 return ListDatabasesResult{}, replaceErrors(err) 708 } 709 710 return newListDatabasesResultFromOperation(op.Result()), nil 711} 712 713// ListDatabaseNames executes a listDatabases command and returns a slice containing the names of all of the databases 714// on the server. 715// 716// The filter parameter must be a document containing query operators and can be used to select which databases 717// are included in the result. It cannot be nil. An empty document (e.g. bson.D{}) should be used to include all 718// databases. 719// 720// The opts parameter can be used to specify options for this operation (see the options.ListDatabasesOptions 721// documentation.) 722// 723// For more information about the command, see https://docs.mongodb.com/manual/reference/command/listDatabases/. 724func (c *Client) ListDatabaseNames(ctx context.Context, filter interface{}, opts ...*options.ListDatabasesOptions) ([]string, error) { 725 opts = append(opts, options.ListDatabases().SetNameOnly(true)) 726 727 res, err := c.ListDatabases(ctx, filter, opts...) 728 if err != nil { 729 return nil, err 730 } 731 732 names := make([]string, 0) 733 for _, spec := range res.Databases { 734 names = append(names, spec.Name) 735 } 736 737 return names, nil 738} 739 740// WithSession creates a new SessionContext from the ctx and sess parameters and uses it to call the fn callback. The 741// SessionContext must be used as the Context parameter for any operations in the fn callback that should be executed 742// under the session. 743// 744// If the ctx parameter already contains a Session, that Session will be replaced with the one provided. 745// 746// Any error returned by the fn callback will be returned without any modifications. 747func WithSession(ctx context.Context, sess Session, fn func(SessionContext) error) error { 748 return fn(contextWithSession(ctx, sess)) 749} 750 751// UseSession creates a new Session and uses it to create a new SessionContext, which is used to call the fn callback. 752// The SessionContext parameter must be used as the Context parameter for any operations in the fn callback that should 753// be executed under a session. After the callback returns, the created Session is ended, meaning that any in-progress 754// transactions started by fn will be aborted even if fn returns an error. 755// 756// If the ctx parameter already contains a Session, that Session will be replaced with the newly created one. 757// 758// Any error returned by the fn callback will be returned without any modifications. 759func (c *Client) UseSession(ctx context.Context, fn func(SessionContext) error) error { 760 return c.UseSessionWithOptions(ctx, options.Session(), fn) 761} 762 763// UseSessionWithOptions operates like UseSession but uses the given SessionOptions to create the Session. 764func (c *Client) UseSessionWithOptions(ctx context.Context, opts *options.SessionOptions, fn func(SessionContext) error) error { 765 defaultSess, err := c.StartSession(opts) 766 if err != nil { 767 return err 768 } 769 770 defer defaultSess.EndSession(ctx) 771 772 sessCtx := sessionContext{ 773 Context: context.WithValue(ctx, sessionKey{}, defaultSess), 774 Session: defaultSess, 775 } 776 777 return fn(sessCtx) 778} 779 780// Watch returns a change stream for all changes on the deployment. See 781// https://docs.mongodb.com/manual/changeStreams/ for more information about change streams. 782// 783// The client must be configured with read concern majority or no read concern for a change stream to be created 784// successfully. 785// 786// The pipeline parameter must be an array of documents, each representing a pipeline stage. The pipeline cannot be 787// nil or empty. The stage documents must all be non-nil. See https://docs.mongodb.com/manual/changeStreams/ for a list 788// of pipeline stages that can be used with change streams. For a pipeline of bson.D documents, the mongo.Pipeline{} 789// type can be used. 790// 791// The opts parameter can be used to specify options for change stream creation (see the options.ChangeStreamOptions 792// documentation). 793func (c *Client) Watch(ctx context.Context, pipeline interface{}, 794 opts ...*options.ChangeStreamOptions) (*ChangeStream, error) { 795 if c.sessionPool == nil { 796 return nil, ErrClientDisconnected 797 } 798 799 csConfig := changeStreamConfig{ 800 readConcern: c.readConcern, 801 readPreference: c.readPreference, 802 client: c, 803 registry: c.registry, 804 streamType: ClientStream, 805 } 806 807 return newChangeStream(ctx, csConfig, pipeline, opts...) 808} 809 810// NumberSessionsInProgress returns the number of sessions that have been started for this client but have not been 811// closed (i.e. EndSession has not been called). 812func (c *Client) NumberSessionsInProgress() int { 813 return c.sessionPool.CheckedOut() 814} 815