1// mgo - MongoDB driver for Go 2// 3// Copyright (c) 2010-2012 - Gustavo Niemeyer <gustavo@niemeyer.net> 4// 5// All rights reserved. 6// 7// Redistribution and use in source and binary forms, with or without 8// modification, are permitted provided that the following conditions are met: 9// 10// 1. Redistributions of source code must retain the above copyright notice, this 11// list of conditions and the following disclaimer. 12// 2. Redistributions in binary form must reproduce the above copyright notice, 13// this list of conditions and the following disclaimer in the documentation 14// and/or other materials provided with the distribution. 15// 16// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND 17// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 18// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 19// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR 20// ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES 21// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 22// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND 23// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 24// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 25// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 26 27package mgo 28 29import ( 30 "crypto/md5" 31 "encoding/hex" 32 "errors" 33 "fmt" 34 "math" 35 "net" 36 "net/url" 37 "reflect" 38 "sort" 39 "strconv" 40 "strings" 41 "sync" 42 "time" 43 44 "gopkg.in/mgo.v2/bson" 45) 46 47type Mode int 48 49const ( 50 // Relevant documentation on read preference modes: 51 // 52 // http://docs.mongodb.org/manual/reference/read-preference/ 53 // 54 Primary Mode = 2 // Default mode. All operations read from the current replica set primary. 55 PrimaryPreferred Mode = 3 // Read from the primary if available. Read from the secondary otherwise. 56 Secondary Mode = 4 // Read from one of the nearest secondary members of the replica set. 57 SecondaryPreferred Mode = 5 // Read from one of the nearest secondaries if available. Read from primary otherwise. 58 Nearest Mode = 6 // Read from one of the nearest members, irrespective of it being primary or secondary. 59 60 // Read preference modes are specific to mgo: 61 Eventual Mode = 0 // Same as Nearest, but may change servers between reads. 62 Monotonic Mode = 1 // Same as SecondaryPreferred before first write. Same as Primary after first write. 63 Strong Mode = 2 // Same as Primary. 64) 65 66// mgo.v3: Drop Strong mode, suffix all modes with "Mode". 67 68// When changing the Session type, check if newSession and copySession 69// need to be updated too. 70 71// Session represents a communication session with the database. 72// 73// All Session methods are concurrency-safe and may be called from multiple 74// goroutines. In all session modes but Eventual, using the session from 75// multiple goroutines will cause them to share the same underlying socket. 76// See the documentation on Session.SetMode for more details. 77type Session struct { 78 m sync.RWMutex 79 cluster_ *mongoCluster 80 slaveSocket *mongoSocket 81 masterSocket *mongoSocket 82 slaveOk bool 83 consistency Mode 84 queryConfig query 85 safeOp *queryOp 86 syncTimeout time.Duration 87 sockTimeout time.Duration 88 defaultdb string 89 sourcedb string 90 dialCred *Credential 91 creds []Credential 92 poolLimit int 93 bypassValidation bool 94} 95 96type Database struct { 97 Session *Session 98 Name string 99} 100 101type Collection struct { 102 Database *Database 103 Name string // "collection" 104 FullName string // "db.collection" 105} 106 107type Query struct { 108 m sync.Mutex 109 session *Session 110 query // Enables default settings in session. 111} 112 113type query struct { 114 op queryOp 115 prefetch float64 116 limit int32 117} 118 119type getLastError struct { 120 CmdName int "getLastError,omitempty" 121 W interface{} "w,omitempty" 122 WTimeout int "wtimeout,omitempty" 123 FSync bool "fsync,omitempty" 124 J bool "j,omitempty" 125} 126 127type Iter struct { 128 m sync.Mutex 129 gotReply sync.Cond 130 session *Session 131 server *mongoServer 132 docData queue 133 err error 134 op getMoreOp 135 prefetch float64 136 limit int32 137 docsToReceive int 138 docsBeforeMore int 139 timeout time.Duration 140 timedout bool 141 findCmd bool 142} 143 144var ( 145 ErrNotFound = errors.New("not found") 146 ErrCursor = errors.New("invalid cursor") 147) 148 149const ( 150 defaultPrefetch = 0.25 151 maxUpsertRetries = 5 152) 153 154// Dial establishes a new session to the cluster identified by the given seed 155// server(s). The session will enable communication with all of the servers in 156// the cluster, so the seed servers are used only to find out about the cluster 157// topology. 158// 159// Dial will timeout after 10 seconds if a server isn't reached. The returned 160// session will timeout operations after one minute by default if servers 161// aren't available. To customize the timeout, see DialWithTimeout, 162// SetSyncTimeout, and SetSocketTimeout. 163// 164// This method is generally called just once for a given cluster. Further 165// sessions to the same cluster are then established using the New or Copy 166// methods on the obtained session. This will make them share the underlying 167// cluster, and manage the pool of connections appropriately. 168// 169// Once the session is not useful anymore, Close must be called to release the 170// resources appropriately. 171// 172// The seed servers must be provided in the following format: 173// 174// [mongodb://][user:pass@]host1[:port1][,host2[:port2],...][/database][?options] 175// 176// For example, it may be as simple as: 177// 178// localhost 179// 180// Or more involved like: 181// 182// mongodb://myuser:mypass@localhost:40001,otherhost:40001/mydb 183// 184// If the port number is not provided for a server, it defaults to 27017. 185// 186// The username and password provided in the URL will be used to authenticate 187// into the database named after the slash at the end of the host names, or 188// into the "admin" database if none is provided. The authentication information 189// will persist in sessions obtained through the New method as well. 190// 191// The following connection options are supported after the question mark: 192// 193// connect=direct 194// 195// Disables the automatic replica set server discovery logic, and 196// forces the use of servers provided only (even if secondaries). 197// Note that to talk to a secondary the consistency requirements 198// must be relaxed to Monotonic or Eventual via SetMode. 199// 200// 201// connect=replicaSet 202// 203// Discover replica sets automatically. Default connection behavior. 204// 205// 206// replicaSet=<setname> 207// 208// If specified will prevent the obtained session from communicating 209// with any server which is not part of a replica set with the given name. 210// The default is to communicate with any server specified or discovered 211// via the servers contacted. 212// 213// 214// authSource=<db> 215// 216// Informs the database used to establish credentials and privileges 217// with a MongoDB server. Defaults to the database name provided via 218// the URL path, and "admin" if that's unset. 219// 220// 221// authMechanism=<mechanism> 222// 223// Defines the protocol for credential negotiation. Defaults to "MONGODB-CR", 224// which is the default username/password challenge-response mechanism. 225// 226// 227// gssapiServiceName=<name> 228// 229// Defines the service name to use when authenticating with the GSSAPI 230// mechanism. Defaults to "mongodb". 231// 232// 233// maxPoolSize=<limit> 234// 235// Defines the per-server socket pool limit. Defaults to 4096. 236// See Session.SetPoolLimit for details. 237// 238// 239// Relevant documentation: 240// 241// http://docs.mongodb.org/manual/reference/connection-string/ 242// 243func Dial(url string) (*Session, error) { 244 session, err := DialWithTimeout(url, 10*time.Second) 245 if err == nil { 246 session.SetSyncTimeout(1 * time.Minute) 247 session.SetSocketTimeout(1 * time.Minute) 248 } 249 return session, err 250} 251 252// DialWithTimeout works like Dial, but uses timeout as the amount of time to 253// wait for a server to respond when first connecting and also on follow up 254// operations in the session. If timeout is zero, the call may block 255// forever waiting for a connection to be made. 256// 257// See SetSyncTimeout for customizing the timeout for the session. 258func DialWithTimeout(url string, timeout time.Duration) (*Session, error) { 259 info, err := ParseURL(url) 260 if err != nil { 261 return nil, err 262 } 263 info.Timeout = timeout 264 return DialWithInfo(info) 265} 266 267// ParseURL parses a MongoDB URL as accepted by the Dial function and returns 268// a value suitable for providing into DialWithInfo. 269// 270// See Dial for more details on the format of url. 271func ParseURL(url string) (*DialInfo, error) { 272 uinfo, err := extractURL(url) 273 if err != nil { 274 return nil, err 275 } 276 direct := false 277 mechanism := "" 278 service := "" 279 source := "" 280 setName := "" 281 poolLimit := 0 282 for k, v := range uinfo.options { 283 switch k { 284 case "authSource": 285 source = v 286 case "authMechanism": 287 mechanism = v 288 case "gssapiServiceName": 289 service = v 290 case "replicaSet": 291 setName = v 292 case "maxPoolSize": 293 poolLimit, err = strconv.Atoi(v) 294 if err != nil { 295 return nil, errors.New("bad value for maxPoolSize: " + v) 296 } 297 case "connect": 298 if v == "direct" { 299 direct = true 300 break 301 } 302 if v == "replicaSet" { 303 break 304 } 305 fallthrough 306 default: 307 return nil, errors.New("unsupported connection URL option: " + k + "=" + v) 308 } 309 } 310 info := DialInfo{ 311 Addrs: uinfo.addrs, 312 Direct: direct, 313 Database: uinfo.db, 314 Username: uinfo.user, 315 Password: uinfo.pass, 316 Mechanism: mechanism, 317 Service: service, 318 Source: source, 319 PoolLimit: poolLimit, 320 ReplicaSetName: setName, 321 } 322 return &info, nil 323} 324 325// DialInfo holds options for establishing a session with a MongoDB cluster. 326// To use a URL, see the Dial function. 327type DialInfo struct { 328 // Addrs holds the addresses for the seed servers. 329 Addrs []string 330 331 // Direct informs whether to establish connections only with the 332 // specified seed servers, or to obtain information for the whole 333 // cluster and establish connections with further servers too. 334 Direct bool 335 336 // Timeout is the amount of time to wait for a server to respond when 337 // first connecting and on follow up operations in the session. If 338 // timeout is zero, the call may block forever waiting for a connection 339 // to be established. Timeout does not affect logic in DialServer. 340 Timeout time.Duration 341 342 // FailFast will cause connection and query attempts to fail faster when 343 // the server is unavailable, instead of retrying until the configured 344 // timeout period. Note that an unavailable server may silently drop 345 // packets instead of rejecting them, in which case it's impossible to 346 // distinguish it from a slow server, so the timeout stays relevant. 347 FailFast bool 348 349 // Database is the default database name used when the Session.DB method 350 // is called with an empty name, and is also used during the initial 351 // authentication if Source is unset. 352 Database string 353 354 // ReplicaSetName, if specified, will prevent the obtained session from 355 // communicating with any server which is not part of a replica set 356 // with the given name. The default is to communicate with any server 357 // specified or discovered via the servers contacted. 358 ReplicaSetName string 359 360 // Source is the database used to establish credentials and privileges 361 // with a MongoDB server. Defaults to the value of Database, if that is 362 // set, or "admin" otherwise. 363 Source string 364 365 // Service defines the service name to use when authenticating with the GSSAPI 366 // mechanism. Defaults to "mongodb". 367 Service string 368 369 // ServiceHost defines which hostname to use when authenticating 370 // with the GSSAPI mechanism. If not specified, defaults to the MongoDB 371 // server's address. 372 ServiceHost string 373 374 // Mechanism defines the protocol for credential negotiation. 375 // Defaults to "MONGODB-CR". 376 Mechanism string 377 378 // Username and Password inform the credentials for the initial authentication 379 // done on the database defined by the Source field. See Session.Login. 380 Username string 381 Password string 382 383 // PoolLimit defines the per-server socket pool limit. Defaults to 4096. 384 // See Session.SetPoolLimit for details. 385 PoolLimit int 386 387 // DialServer optionally specifies the dial function for establishing 388 // connections with the MongoDB servers. 389 DialServer func(addr *ServerAddr) (net.Conn, error) 390 391 // WARNING: This field is obsolete. See DialServer above. 392 Dial func(addr net.Addr) (net.Conn, error) 393} 394 395// mgo.v3: Drop DialInfo.Dial. 396 397// ServerAddr represents the address for establishing a connection to an 398// individual MongoDB server. 399type ServerAddr struct { 400 str string 401 tcp *net.TCPAddr 402} 403 404// String returns the address that was provided for the server before resolution. 405func (addr *ServerAddr) String() string { 406 return addr.str 407} 408 409// TCPAddr returns the resolved TCP address for the server. 410func (addr *ServerAddr) TCPAddr() *net.TCPAddr { 411 return addr.tcp 412} 413 414// DialWithInfo establishes a new session to the cluster identified by info. 415func DialWithInfo(info *DialInfo) (*Session, error) { 416 addrs := make([]string, len(info.Addrs)) 417 for i, addr := range info.Addrs { 418 p := strings.LastIndexAny(addr, "]:") 419 if p == -1 || addr[p] != ':' { 420 // XXX This is untested. The test suite doesn't use the standard port. 421 addr += ":27017" 422 } 423 addrs[i] = addr 424 } 425 cluster := newCluster(addrs, info.Direct, info.FailFast, dialer{info.Dial, info.DialServer}, info.ReplicaSetName) 426 session := newSession(Eventual, cluster, info.Timeout) 427 session.defaultdb = info.Database 428 if session.defaultdb == "" { 429 session.defaultdb = "test" 430 } 431 session.sourcedb = info.Source 432 if session.sourcedb == "" { 433 session.sourcedb = info.Database 434 if session.sourcedb == "" { 435 session.sourcedb = "admin" 436 } 437 } 438 if info.Username != "" { 439 source := session.sourcedb 440 if info.Source == "" && 441 (info.Mechanism == "GSSAPI" || info.Mechanism == "PLAIN" || info.Mechanism == "MONGODB-X509") { 442 source = "$external" 443 } 444 session.dialCred = &Credential{ 445 Username: info.Username, 446 Password: info.Password, 447 Mechanism: info.Mechanism, 448 Service: info.Service, 449 ServiceHost: info.ServiceHost, 450 Source: source, 451 } 452 session.creds = []Credential{*session.dialCred} 453 } 454 if info.PoolLimit > 0 { 455 session.poolLimit = info.PoolLimit 456 } 457 cluster.Release() 458 459 // People get confused when we return a session that is not actually 460 // established to any servers yet (e.g. what if url was wrong). So, 461 // ping the server to ensure there's someone there, and abort if it 462 // fails. 463 if err := session.Ping(); err != nil { 464 session.Close() 465 return nil, err 466 } 467 session.SetMode(Strong, true) 468 return session, nil 469} 470 471func isOptSep(c rune) bool { 472 return c == ';' || c == '&' 473} 474 475type urlInfo struct { 476 addrs []string 477 user string 478 pass string 479 db string 480 options map[string]string 481} 482 483func extractURL(s string) (*urlInfo, error) { 484 if strings.HasPrefix(s, "mongodb://") { 485 s = s[10:] 486 } 487 info := &urlInfo{options: make(map[string]string)} 488 if c := strings.Index(s, "?"); c != -1 { 489 for _, pair := range strings.FieldsFunc(s[c+1:], isOptSep) { 490 l := strings.SplitN(pair, "=", 2) 491 if len(l) != 2 || l[0] == "" || l[1] == "" { 492 return nil, errors.New("connection option must be key=value: " + pair) 493 } 494 info.options[l[0]] = l[1] 495 } 496 s = s[:c] 497 } 498 if c := strings.Index(s, "@"); c != -1 { 499 pair := strings.SplitN(s[:c], ":", 2) 500 if len(pair) > 2 || pair[0] == "" { 501 return nil, errors.New("credentials must be provided as user:pass@host") 502 } 503 var err error 504 info.user, err = url.QueryUnescape(pair[0]) 505 if err != nil { 506 return nil, fmt.Errorf("cannot unescape username in URL: %q", pair[0]) 507 } 508 if len(pair) > 1 { 509 info.pass, err = url.QueryUnescape(pair[1]) 510 if err != nil { 511 return nil, fmt.Errorf("cannot unescape password in URL") 512 } 513 } 514 s = s[c+1:] 515 } 516 if c := strings.Index(s, "/"); c != -1 { 517 info.db = s[c+1:] 518 s = s[:c] 519 } 520 info.addrs = strings.Split(s, ",") 521 return info, nil 522} 523 524func newSession(consistency Mode, cluster *mongoCluster, timeout time.Duration) (session *Session) { 525 cluster.Acquire() 526 session = &Session{ 527 cluster_: cluster, 528 syncTimeout: timeout, 529 sockTimeout: timeout, 530 poolLimit: 4096, 531 } 532 debugf("New session %p on cluster %p", session, cluster) 533 session.SetMode(consistency, true) 534 session.SetSafe(&Safe{}) 535 session.queryConfig.prefetch = defaultPrefetch 536 return session 537} 538 539func copySession(session *Session, keepCreds bool) (s *Session) { 540 cluster := session.cluster() 541 cluster.Acquire() 542 if session.masterSocket != nil { 543 session.masterSocket.Acquire() 544 } 545 if session.slaveSocket != nil { 546 session.slaveSocket.Acquire() 547 } 548 var creds []Credential 549 if keepCreds { 550 creds = make([]Credential, len(session.creds)) 551 copy(creds, session.creds) 552 } else if session.dialCred != nil { 553 creds = []Credential{*session.dialCred} 554 } 555 scopy := *session 556 scopy.m = sync.RWMutex{} 557 scopy.creds = creds 558 s = &scopy 559 debugf("New session %p on cluster %p (copy from %p)", s, cluster, session) 560 return s 561} 562 563// LiveServers returns a list of server addresses which are 564// currently known to be alive. 565func (s *Session) LiveServers() (addrs []string) { 566 s.m.RLock() 567 addrs = s.cluster().LiveServers() 568 s.m.RUnlock() 569 return addrs 570} 571 572// DB returns a value representing the named database. If name 573// is empty, the database name provided in the dialed URL is 574// used instead. If that is also empty, "test" is used as a 575// fallback in a way equivalent to the mongo shell. 576// 577// Creating this value is a very lightweight operation, and 578// involves no network communication. 579func (s *Session) DB(name string) *Database { 580 if name == "" { 581 name = s.defaultdb 582 } 583 return &Database{s, name} 584} 585 586// C returns a value representing the named collection. 587// 588// Creating this value is a very lightweight operation, and 589// involves no network communication. 590func (db *Database) C(name string) *Collection { 591 return &Collection{db, name, db.Name + "." + name} 592} 593 594// With returns a copy of db that uses session s. 595func (db *Database) With(s *Session) *Database { 596 newdb := *db 597 newdb.Session = s 598 return &newdb 599} 600 601// With returns a copy of c that uses session s. 602func (c *Collection) With(s *Session) *Collection { 603 newdb := *c.Database 604 newdb.Session = s 605 newc := *c 606 newc.Database = &newdb 607 return &newc 608} 609 610// GridFS returns a GridFS value representing collections in db that 611// follow the standard GridFS specification. 612// The provided prefix (sometimes known as root) will determine which 613// collections to use, and is usually set to "fs" when there is a 614// single GridFS in the database. 615// 616// See the GridFS Create, Open, and OpenId methods for more details. 617// 618// Relevant documentation: 619// 620// http://www.mongodb.org/display/DOCS/GridFS 621// http://www.mongodb.org/display/DOCS/GridFS+Tools 622// http://www.mongodb.org/display/DOCS/GridFS+Specification 623// 624func (db *Database) GridFS(prefix string) *GridFS { 625 return newGridFS(db, prefix) 626} 627 628// Run issues the provided command on the db database and unmarshals 629// its result in the respective argument. The cmd argument may be either 630// a string with the command name itself, in which case an empty document of 631// the form bson.M{cmd: 1} will be used, or it may be a full command document. 632// 633// Note that MongoDB considers the first marshalled key as the command 634// name, so when providing a command with options, it's important to 635// use an ordering-preserving document, such as a struct value or an 636// instance of bson.D. For instance: 637// 638// db.Run(bson.D{{"create", "mycollection"}, {"size", 1024}}) 639// 640// For privilleged commands typically run on the "admin" database, see 641// the Run method in the Session type. 642// 643// Relevant documentation: 644// 645// http://www.mongodb.org/display/DOCS/Commands 646// http://www.mongodb.org/display/DOCS/List+of+Database+CommandSkips 647// 648func (db *Database) Run(cmd interface{}, result interface{}) error { 649 socket, err := db.Session.acquireSocket(true) 650 if err != nil { 651 return err 652 } 653 defer socket.Release() 654 655 // This is an optimized form of db.C("$cmd").Find(cmd).One(result). 656 return db.run(socket, cmd, result) 657} 658 659// Credential holds details to authenticate with a MongoDB server. 660type Credential struct { 661 // Username and Password hold the basic details for authentication. 662 // Password is optional with some authentication mechanisms. 663 Username string 664 Password string 665 666 // Source is the database used to establish credentials and privileges 667 // with a MongoDB server. Defaults to the default database provided 668 // during dial, or "admin" if that was unset. 669 Source string 670 671 // Service defines the service name to use when authenticating with the GSSAPI 672 // mechanism. Defaults to "mongodb". 673 Service string 674 675 // ServiceHost defines which hostname to use when authenticating 676 // with the GSSAPI mechanism. If not specified, defaults to the MongoDB 677 // server's address. 678 ServiceHost string 679 680 // Mechanism defines the protocol for credential negotiation. 681 // Defaults to "MONGODB-CR". 682 Mechanism string 683} 684 685// Login authenticates with MongoDB using the provided credential. The 686// authentication is valid for the whole session and will stay valid until 687// Logout is explicitly called for the same database, or the session is 688// closed. 689func (db *Database) Login(user, pass string) error { 690 return db.Session.Login(&Credential{Username: user, Password: pass, Source: db.Name}) 691} 692 693// Login authenticates with MongoDB using the provided credential. The 694// authentication is valid for the whole session and will stay valid until 695// Logout is explicitly called for the same database, or the session is 696// closed. 697func (s *Session) Login(cred *Credential) error { 698 socket, err := s.acquireSocket(true) 699 if err != nil { 700 return err 701 } 702 defer socket.Release() 703 704 credCopy := *cred 705 if cred.Source == "" { 706 if cred.Mechanism == "GSSAPI" { 707 credCopy.Source = "$external" 708 } else { 709 credCopy.Source = s.sourcedb 710 } 711 } 712 err = socket.Login(credCopy) 713 if err != nil { 714 return err 715 } 716 717 s.m.Lock() 718 s.creds = append(s.creds, credCopy) 719 s.m.Unlock() 720 return nil 721} 722 723func (s *Session) socketLogin(socket *mongoSocket) error { 724 for _, cred := range s.creds { 725 if err := socket.Login(cred); err != nil { 726 return err 727 } 728 } 729 return nil 730} 731 732// Logout removes any established authentication credentials for the database. 733func (db *Database) Logout() { 734 session := db.Session 735 dbname := db.Name 736 session.m.Lock() 737 found := false 738 for i, cred := range session.creds { 739 if cred.Source == dbname { 740 copy(session.creds[i:], session.creds[i+1:]) 741 session.creds = session.creds[:len(session.creds)-1] 742 found = true 743 break 744 } 745 } 746 if found { 747 if session.masterSocket != nil { 748 session.masterSocket.Logout(dbname) 749 } 750 if session.slaveSocket != nil { 751 session.slaveSocket.Logout(dbname) 752 } 753 } 754 session.m.Unlock() 755} 756 757// LogoutAll removes all established authentication credentials for the session. 758func (s *Session) LogoutAll() { 759 s.m.Lock() 760 for _, cred := range s.creds { 761 if s.masterSocket != nil { 762 s.masterSocket.Logout(cred.Source) 763 } 764 if s.slaveSocket != nil { 765 s.slaveSocket.Logout(cred.Source) 766 } 767 } 768 s.creds = s.creds[0:0] 769 s.m.Unlock() 770} 771 772// User represents a MongoDB user. 773// 774// Relevant documentation: 775// 776// http://docs.mongodb.org/manual/reference/privilege-documents/ 777// http://docs.mongodb.org/manual/reference/user-privileges/ 778// 779type User struct { 780 // Username is how the user identifies itself to the system. 781 Username string `bson:"user"` 782 783 // Password is the plaintext password for the user. If set, 784 // the UpsertUser method will hash it into PasswordHash and 785 // unset it before the user is added to the database. 786 Password string `bson:",omitempty"` 787 788 // PasswordHash is the MD5 hash of Username+":mongo:"+Password. 789 PasswordHash string `bson:"pwd,omitempty"` 790 791 // CustomData holds arbitrary data admins decide to associate 792 // with this user, such as the full name or employee id. 793 CustomData interface{} `bson:"customData,omitempty"` 794 795 // Roles indicates the set of roles the user will be provided. 796 // See the Role constants. 797 Roles []Role `bson:"roles"` 798 799 // OtherDBRoles allows assigning roles in other databases from 800 // user documents inserted in the admin database. This field 801 // only works in the admin database. 802 OtherDBRoles map[string][]Role `bson:"otherDBRoles,omitempty"` 803 804 // UserSource indicates where to look for this user's credentials. 805 // It may be set to a database name, or to "$external" for 806 // consulting an external resource such as Kerberos. UserSource 807 // must not be set if Password or PasswordHash are present. 808 // 809 // WARNING: This setting was only ever supported in MongoDB 2.4, 810 // and is now obsolete. 811 UserSource string `bson:"userSource,omitempty"` 812} 813 814type Role string 815 816const ( 817 // Relevant documentation: 818 // 819 // http://docs.mongodb.org/manual/reference/user-privileges/ 820 // 821 RoleRoot Role = "root" 822 RoleRead Role = "read" 823 RoleReadAny Role = "readAnyDatabase" 824 RoleReadWrite Role = "readWrite" 825 RoleReadWriteAny Role = "readWriteAnyDatabase" 826 RoleDBAdmin Role = "dbAdmin" 827 RoleDBAdminAny Role = "dbAdminAnyDatabase" 828 RoleUserAdmin Role = "userAdmin" 829 RoleUserAdminAny Role = "userAdminAnyDatabase" 830 RoleClusterAdmin Role = "clusterAdmin" 831) 832 833// UpsertUser updates the authentication credentials and the roles for 834// a MongoDB user within the db database. If the named user doesn't exist 835// it will be created. 836// 837// This method should only be used from MongoDB 2.4 and on. For older 838// MongoDB releases, use the obsolete AddUser method instead. 839// 840// Relevant documentation: 841// 842// http://docs.mongodb.org/manual/reference/user-privileges/ 843// http://docs.mongodb.org/manual/reference/privilege-documents/ 844// 845func (db *Database) UpsertUser(user *User) error { 846 if user.Username == "" { 847 return fmt.Errorf("user has no Username") 848 } 849 if (user.Password != "" || user.PasswordHash != "") && user.UserSource != "" { 850 return fmt.Errorf("user has both Password/PasswordHash and UserSource set") 851 } 852 if len(user.OtherDBRoles) > 0 && db.Name != "admin" && db.Name != "$external" { 853 return fmt.Errorf("user with OtherDBRoles is only supported in the admin or $external databases") 854 } 855 856 // Attempt to run this using 2.6+ commands. 857 rundb := db 858 if user.UserSource != "" { 859 // Compatibility logic for the userSource field of MongoDB <= 2.4.X 860 rundb = db.Session.DB(user.UserSource) 861 } 862 err := rundb.runUserCmd("updateUser", user) 863 // retry with createUser when isAuthError in order to enable the "localhost exception" 864 if isNotFound(err) || isAuthError(err) { 865 return rundb.runUserCmd("createUser", user) 866 } 867 if !isNoCmd(err) { 868 return err 869 } 870 871 // Command does not exist. Fallback to pre-2.6 behavior. 872 var set, unset bson.D 873 if user.Password != "" { 874 psum := md5.New() 875 psum.Write([]byte(user.Username + ":mongo:" + user.Password)) 876 set = append(set, bson.DocElem{"pwd", hex.EncodeToString(psum.Sum(nil))}) 877 unset = append(unset, bson.DocElem{"userSource", 1}) 878 } else if user.PasswordHash != "" { 879 set = append(set, bson.DocElem{"pwd", user.PasswordHash}) 880 unset = append(unset, bson.DocElem{"userSource", 1}) 881 } 882 if user.UserSource != "" { 883 set = append(set, bson.DocElem{"userSource", user.UserSource}) 884 unset = append(unset, bson.DocElem{"pwd", 1}) 885 } 886 if user.Roles != nil || user.OtherDBRoles != nil { 887 set = append(set, bson.DocElem{"roles", user.Roles}) 888 if len(user.OtherDBRoles) > 0 { 889 set = append(set, bson.DocElem{"otherDBRoles", user.OtherDBRoles}) 890 } else { 891 unset = append(unset, bson.DocElem{"otherDBRoles", 1}) 892 } 893 } 894 users := db.C("system.users") 895 err = users.Update(bson.D{{"user", user.Username}}, bson.D{{"$unset", unset}, {"$set", set}}) 896 if err == ErrNotFound { 897 set = append(set, bson.DocElem{"user", user.Username}) 898 if user.Roles == nil && user.OtherDBRoles == nil { 899 // Roles must be sent, as it's the way MongoDB distinguishes 900 // old-style documents from new-style documents in pre-2.6. 901 set = append(set, bson.DocElem{"roles", user.Roles}) 902 } 903 err = users.Insert(set) 904 } 905 return err 906} 907 908func isNoCmd(err error) bool { 909 e, ok := err.(*QueryError) 910 return ok && (e.Code == 59 || e.Code == 13390 || strings.HasPrefix(e.Message, "no such cmd:")) 911} 912 913func isNotFound(err error) bool { 914 e, ok := err.(*QueryError) 915 return ok && e.Code == 11 916} 917 918func isAuthError(err error) bool { 919 e, ok := err.(*QueryError) 920 return ok && e.Code == 13 921} 922 923func (db *Database) runUserCmd(cmdName string, user *User) error { 924 cmd := make(bson.D, 0, 16) 925 cmd = append(cmd, bson.DocElem{cmdName, user.Username}) 926 if user.Password != "" { 927 cmd = append(cmd, bson.DocElem{"pwd", user.Password}) 928 } 929 var roles []interface{} 930 for _, role := range user.Roles { 931 roles = append(roles, role) 932 } 933 for db, dbroles := range user.OtherDBRoles { 934 for _, role := range dbroles { 935 roles = append(roles, bson.D{{"role", role}, {"db", db}}) 936 } 937 } 938 if roles != nil || user.Roles != nil || cmdName == "createUser" { 939 cmd = append(cmd, bson.DocElem{"roles", roles}) 940 } 941 err := db.Run(cmd, nil) 942 if !isNoCmd(err) && user.UserSource != "" && (user.UserSource != "$external" || db.Name != "$external") { 943 return fmt.Errorf("MongoDB 2.6+ does not support the UserSource setting") 944 } 945 return err 946} 947 948// AddUser creates or updates the authentication credentials of user within 949// the db database. 950// 951// WARNING: This method is obsolete and should only be used with MongoDB 2.2 952// or earlier. For MongoDB 2.4 and on, use UpsertUser instead. 953func (db *Database) AddUser(username, password string, readOnly bool) error { 954 // Try to emulate the old behavior on 2.6+ 955 user := &User{Username: username, Password: password} 956 if db.Name == "admin" { 957 if readOnly { 958 user.Roles = []Role{RoleReadAny} 959 } else { 960 user.Roles = []Role{RoleReadWriteAny} 961 } 962 } else { 963 if readOnly { 964 user.Roles = []Role{RoleRead} 965 } else { 966 user.Roles = []Role{RoleReadWrite} 967 } 968 } 969 err := db.runUserCmd("updateUser", user) 970 if isNotFound(err) { 971 return db.runUserCmd("createUser", user) 972 } 973 if !isNoCmd(err) { 974 return err 975 } 976 977 // Command doesn't exist. Fallback to pre-2.6 behavior. 978 psum := md5.New() 979 psum.Write([]byte(username + ":mongo:" + password)) 980 digest := hex.EncodeToString(psum.Sum(nil)) 981 c := db.C("system.users") 982 _, err = c.Upsert(bson.M{"user": username}, bson.M{"$set": bson.M{"user": username, "pwd": digest, "readOnly": readOnly}}) 983 return err 984} 985 986// RemoveUser removes the authentication credentials of user from the database. 987func (db *Database) RemoveUser(user string) error { 988 err := db.Run(bson.D{{"dropUser", user}}, nil) 989 if isNoCmd(err) { 990 users := db.C("system.users") 991 return users.Remove(bson.M{"user": user}) 992 } 993 if isNotFound(err) { 994 return ErrNotFound 995 } 996 return err 997} 998 999type indexSpec struct { 1000 Name, NS string 1001 Key bson.D 1002 Unique bool ",omitempty" 1003 DropDups bool "dropDups,omitempty" 1004 Background bool ",omitempty" 1005 Sparse bool ",omitempty" 1006 Bits int ",omitempty" 1007 Min, Max float64 ",omitempty" 1008 BucketSize float64 "bucketSize,omitempty" 1009 ExpireAfter int "expireAfterSeconds,omitempty" 1010 Weights bson.D ",omitempty" 1011 DefaultLanguage string "default_language,omitempty" 1012 LanguageOverride string "language_override,omitempty" 1013 TextIndexVersion int "textIndexVersion,omitempty" 1014 1015 Collation *Collation "collation,omitempty" 1016} 1017 1018type Index struct { 1019 Key []string // Index key fields; prefix name with dash (-) for descending order 1020 Unique bool // Prevent two documents from having the same index key 1021 DropDups bool // Drop documents with the same index key as a previously indexed one 1022 Background bool // Build index in background and return immediately 1023 Sparse bool // Only index documents containing the Key fields 1024 1025 // If ExpireAfter is defined the server will periodically delete 1026 // documents with indexed time.Time older than the provided delta. 1027 ExpireAfter time.Duration 1028 1029 // Name holds the stored index name. On creation if this field is unset it is 1030 // computed by EnsureIndex based on the index key. 1031 Name string 1032 1033 // Properties for spatial indexes. 1034 // 1035 // Min and Max were improperly typed as int when they should have been 1036 // floats. To preserve backwards compatibility they are still typed as 1037 // int and the following two fields enable reading and writing the same 1038 // fields as float numbers. In mgo.v3, these fields will be dropped and 1039 // Min/Max will become floats. 1040 Min, Max int 1041 Minf, Maxf float64 1042 BucketSize float64 1043 Bits int 1044 1045 // Properties for text indexes. 1046 DefaultLanguage string 1047 LanguageOverride string 1048 1049 // Weights defines the significance of provided fields relative to other 1050 // fields in a text index. The score for a given word in a document is derived 1051 // from the weighted sum of the frequency for each of the indexed fields in 1052 // that document. The default field weight is 1. 1053 Weights map[string]int 1054 1055 // Collation defines the collation to use for the index. 1056 Collation *Collation 1057} 1058 1059type Collation struct { 1060 1061 // Locale defines the collation locale. 1062 Locale string `bson:"locale"` 1063 1064 // CaseLevel defines whether to turn case sensitivity on at strength 1 or 2. 1065 CaseLevel bool `bson:"caseLevel,omitempty"` 1066 1067 // CaseFirst may be set to "upper" or "lower" to define whether 1068 // to have uppercase or lowercase items first. Default is "off". 1069 CaseFirst string `bson:"caseFirst,omitempty"` 1070 1071 // Strength defines the priority of comparison properties, as follows: 1072 // 1073 // 1 (primary) - Strongest level, denote difference between base characters 1074 // 2 (secondary) - Accents in characters are considered secondary differences 1075 // 3 (tertiary) - Upper and lower case differences in characters are 1076 // distinguished at the tertiary level 1077 // 4 (quaternary) - When punctuation is ignored at level 1-3, an additional 1078 // level can be used to distinguish words with and without 1079 // punctuation. Should only be used if ignoring punctuation 1080 // is required or when processing Japanese text. 1081 // 5 (identical) - When all other levels are equal, the identical level is 1082 // used as a tiebreaker. The Unicode code point values of 1083 // the NFD form of each string are compared at this level, 1084 // just in case there is no difference at levels 1-4 1085 // 1086 // Strength defaults to 3. 1087 Strength int `bson:"strength,omitempty"` 1088 1089 // NumericOrdering defines whether to order numbers based on numerical 1090 // order and not collation order. 1091 NumericOrdering bool `bson:"numericOrdering,omitempty"` 1092 1093 // Alternate controls whether spaces and punctuation are considered base characters. 1094 // May be set to "non-ignorable" (spaces and punctuation considered base characters) 1095 // or "shifted" (spaces and punctuation not considered base characters, and only 1096 // distinguished at strength > 3). Defaults to "non-ignorable". 1097 Alternate string `bson:"alternate,omitempty"` 1098 1099 // Backwards defines whether to have secondary differences considered in reverse order, 1100 // as done in the French language. 1101 Backwards bool `bson:"backwards,omitempty"` 1102} 1103 1104// mgo.v3: Drop Minf and Maxf and transform Min and Max to floats. 1105// mgo.v3: Drop DropDups as it's unsupported past 2.8. 1106 1107type indexKeyInfo struct { 1108 name string 1109 key bson.D 1110 weights bson.D 1111} 1112 1113func parseIndexKey(key []string) (*indexKeyInfo, error) { 1114 var keyInfo indexKeyInfo 1115 isText := false 1116 var order interface{} 1117 for _, field := range key { 1118 raw := field 1119 if keyInfo.name != "" { 1120 keyInfo.name += "_" 1121 } 1122 var kind string 1123 if field != "" { 1124 if field[0] == '$' { 1125 if c := strings.Index(field, ":"); c > 1 && c < len(field)-1 { 1126 kind = field[1:c] 1127 field = field[c+1:] 1128 keyInfo.name += field + "_" + kind 1129 } else { 1130 field = "\x00" 1131 } 1132 } 1133 switch field[0] { 1134 case 0: 1135 // Logic above failed. Reset and error. 1136 field = "" 1137 case '@': 1138 order = "2d" 1139 field = field[1:] 1140 // The shell used to render this field as key_ instead of key_2d, 1141 // and mgo followed suit. This has been fixed in recent server 1142 // releases, and mgo followed as well. 1143 keyInfo.name += field + "_2d" 1144 case '-': 1145 order = -1 1146 field = field[1:] 1147 keyInfo.name += field + "_-1" 1148 case '+': 1149 field = field[1:] 1150 fallthrough 1151 default: 1152 if kind == "" { 1153 order = 1 1154 keyInfo.name += field + "_1" 1155 } else { 1156 order = kind 1157 } 1158 } 1159 } 1160 if field == "" || kind != "" && order != kind { 1161 return nil, fmt.Errorf(`invalid index key: want "[$<kind>:][-]<field name>", got %q`, raw) 1162 } 1163 if kind == "text" { 1164 if !isText { 1165 keyInfo.key = append(keyInfo.key, bson.DocElem{"_fts", "text"}, bson.DocElem{"_ftsx", 1}) 1166 isText = true 1167 } 1168 keyInfo.weights = append(keyInfo.weights, bson.DocElem{field, 1}) 1169 } else { 1170 keyInfo.key = append(keyInfo.key, bson.DocElem{field, order}) 1171 } 1172 } 1173 if keyInfo.name == "" { 1174 return nil, errors.New("invalid index key: no fields provided") 1175 } 1176 return &keyInfo, nil 1177} 1178 1179// EnsureIndexKey ensures an index with the given key exists, creating it 1180// if necessary. 1181// 1182// This example: 1183// 1184// err := collection.EnsureIndexKey("a", "b") 1185// 1186// Is equivalent to: 1187// 1188// err := collection.EnsureIndex(mgo.Index{Key: []string{"a", "b"}}) 1189// 1190// See the EnsureIndex method for more details. 1191func (c *Collection) EnsureIndexKey(key ...string) error { 1192 return c.EnsureIndex(Index{Key: key}) 1193} 1194 1195// EnsureIndex ensures an index with the given key exists, creating it with 1196// the provided parameters if necessary. EnsureIndex does not modify a previously 1197// existent index with a matching key. The old index must be dropped first instead. 1198// 1199// Once EnsureIndex returns successfully, following requests for the same index 1200// will not contact the server unless Collection.DropIndex is used to drop the 1201// same index, or Session.ResetIndexCache is called. 1202// 1203// For example: 1204// 1205// index := Index{ 1206// Key: []string{"lastname", "firstname"}, 1207// Unique: true, 1208// DropDups: true, 1209// Background: true, // See notes. 1210// Sparse: true, 1211// } 1212// err := collection.EnsureIndex(index) 1213// 1214// The Key value determines which fields compose the index. The index ordering 1215// will be ascending by default. To obtain an index with a descending order, 1216// the field name should be prefixed by a dash (e.g. []string{"-time"}). It can 1217// also be optionally prefixed by an index kind, as in "$text:summary" or 1218// "$2d:-point". The key string format is: 1219// 1220// [$<kind>:][-]<field name> 1221// 1222// If the Unique field is true, the index must necessarily contain only a single 1223// document per Key. With DropDups set to true, documents with the same key 1224// as a previously indexed one will be dropped rather than an error returned. 1225// 1226// If Background is true, other connections will be allowed to proceed using 1227// the collection without the index while it's being built. Note that the 1228// session executing EnsureIndex will be blocked for as long as it takes for 1229// the index to be built. 1230// 1231// If Sparse is true, only documents containing the provided Key fields will be 1232// included in the index. When using a sparse index for sorting, only indexed 1233// documents will be returned. 1234// 1235// If ExpireAfter is non-zero, the server will periodically scan the collection 1236// and remove documents containing an indexed time.Time field with a value 1237// older than ExpireAfter. See the documentation for details: 1238// 1239// http://docs.mongodb.org/manual/tutorial/expire-data 1240// 1241// Other kinds of indexes are also supported through that API. Here is an example: 1242// 1243// index := Index{ 1244// Key: []string{"$2d:loc"}, 1245// Bits: 26, 1246// } 1247// err := collection.EnsureIndex(index) 1248// 1249// The example above requests the creation of a "2d" index for the "loc" field. 1250// 1251// The 2D index bounds may be changed using the Min and Max attributes of the 1252// Index value. The default bound setting of (-180, 180) is suitable for 1253// latitude/longitude pairs. 1254// 1255// The Bits parameter sets the precision of the 2D geohash values. If not 1256// provided, 26 bits are used, which is roughly equivalent to 1 foot of 1257// precision for the default (-180, 180) index bounds. 1258// 1259// Relevant documentation: 1260// 1261// http://www.mongodb.org/display/DOCS/Indexes 1262// http://www.mongodb.org/display/DOCS/Indexing+Advice+and+FAQ 1263// http://www.mongodb.org/display/DOCS/Indexing+as+a+Background+Operation 1264// http://www.mongodb.org/display/DOCS/Geospatial+Indexing 1265// http://www.mongodb.org/display/DOCS/Multikeys 1266// 1267func (c *Collection) EnsureIndex(index Index) error { 1268 keyInfo, err := parseIndexKey(index.Key) 1269 if err != nil { 1270 return err 1271 } 1272 1273 session := c.Database.Session 1274 cacheKey := c.FullName + "\x00" + keyInfo.name 1275 if session.cluster().HasCachedIndex(cacheKey) { 1276 return nil 1277 } 1278 1279 spec := indexSpec{ 1280 Name: keyInfo.name, 1281 NS: c.FullName, 1282 Key: keyInfo.key, 1283 Unique: index.Unique, 1284 DropDups: index.DropDups, 1285 Background: index.Background, 1286 Sparse: index.Sparse, 1287 Bits: index.Bits, 1288 Min: index.Minf, 1289 Max: index.Maxf, 1290 BucketSize: index.BucketSize, 1291 ExpireAfter: int(index.ExpireAfter / time.Second), 1292 Weights: keyInfo.weights, 1293 DefaultLanguage: index.DefaultLanguage, 1294 LanguageOverride: index.LanguageOverride, 1295 Collation: index.Collation, 1296 } 1297 1298 if spec.Min == 0 && spec.Max == 0 { 1299 spec.Min = float64(index.Min) 1300 spec.Max = float64(index.Max) 1301 } 1302 1303 if index.Name != "" { 1304 spec.Name = index.Name 1305 } 1306 1307NextField: 1308 for name, weight := range index.Weights { 1309 for i, elem := range spec.Weights { 1310 if elem.Name == name { 1311 spec.Weights[i].Value = weight 1312 continue NextField 1313 } 1314 } 1315 panic("weight provided for field that is not part of index key: " + name) 1316 } 1317 1318 cloned := session.Clone() 1319 defer cloned.Close() 1320 cloned.SetMode(Strong, false) 1321 cloned.EnsureSafe(&Safe{}) 1322 db := c.Database.With(cloned) 1323 1324 // Try with a command first. 1325 err = db.Run(bson.D{{"createIndexes", c.Name}, {"indexes", []indexSpec{spec}}}, nil) 1326 if isNoCmd(err) { 1327 // Command not yet supported. Insert into the indexes collection instead. 1328 err = db.C("system.indexes").Insert(&spec) 1329 } 1330 if err == nil { 1331 session.cluster().CacheIndex(cacheKey, true) 1332 } 1333 return err 1334} 1335 1336// DropIndex drops the index with the provided key from the c collection. 1337// 1338// See EnsureIndex for details on the accepted key variants. 1339// 1340// For example: 1341// 1342// err1 := collection.DropIndex("firstField", "-secondField") 1343// err2 := collection.DropIndex("customIndexName") 1344// 1345func (c *Collection) DropIndex(key ...string) error { 1346 keyInfo, err := parseIndexKey(key) 1347 if err != nil { 1348 return err 1349 } 1350 1351 session := c.Database.Session 1352 cacheKey := c.FullName + "\x00" + keyInfo.name 1353 session.cluster().CacheIndex(cacheKey, false) 1354 1355 session = session.Clone() 1356 defer session.Close() 1357 session.SetMode(Strong, false) 1358 1359 db := c.Database.With(session) 1360 result := struct { 1361 ErrMsg string 1362 Ok bool 1363 }{} 1364 err = db.Run(bson.D{{"dropIndexes", c.Name}, {"index", keyInfo.name}}, &result) 1365 if err != nil { 1366 return err 1367 } 1368 if !result.Ok { 1369 return errors.New(result.ErrMsg) 1370 } 1371 return nil 1372} 1373 1374// DropIndexName removes the index with the provided index name. 1375// 1376// For example: 1377// 1378// err := collection.DropIndex("customIndexName") 1379// 1380func (c *Collection) DropIndexName(name string) error { 1381 session := c.Database.Session 1382 1383 session = session.Clone() 1384 defer session.Close() 1385 session.SetMode(Strong, false) 1386 1387 c = c.With(session) 1388 1389 indexes, err := c.Indexes() 1390 if err != nil { 1391 return err 1392 } 1393 1394 var index Index 1395 for _, idx := range indexes { 1396 if idx.Name == name { 1397 index = idx 1398 break 1399 } 1400 } 1401 1402 if index.Name != "" { 1403 keyInfo, err := parseIndexKey(index.Key) 1404 if err != nil { 1405 return err 1406 } 1407 1408 cacheKey := c.FullName + "\x00" + keyInfo.name 1409 session.cluster().CacheIndex(cacheKey, false) 1410 } 1411 1412 result := struct { 1413 ErrMsg string 1414 Ok bool 1415 }{} 1416 err = c.Database.Run(bson.D{{"dropIndexes", c.Name}, {"index", name}}, &result) 1417 if err != nil { 1418 return err 1419 } 1420 if !result.Ok { 1421 return errors.New(result.ErrMsg) 1422 } 1423 return nil 1424} 1425 1426// nonEventual returns a clone of session and ensures it is not Eventual. 1427// This guarantees that the server that is used for queries may be reused 1428// afterwards when a cursor is received. 1429func (session *Session) nonEventual() *Session { 1430 cloned := session.Clone() 1431 if cloned.consistency == Eventual { 1432 cloned.SetMode(Monotonic, false) 1433 } 1434 return cloned 1435} 1436 1437// Indexes returns a list of all indexes for the collection. 1438// 1439// For example, this snippet would drop all available indexes: 1440// 1441// indexes, err := collection.Indexes() 1442// if err != nil { 1443// return err 1444// } 1445// for _, index := range indexes { 1446// err = collection.DropIndex(index.Key...) 1447// if err != nil { 1448// return err 1449// } 1450// } 1451// 1452// See the EnsureIndex method for more details on indexes. 1453func (c *Collection) Indexes() (indexes []Index, err error) { 1454 cloned := c.Database.Session.nonEventual() 1455 defer cloned.Close() 1456 1457 batchSize := int(cloned.queryConfig.op.limit) 1458 1459 // Try with a command. 1460 var result struct { 1461 Indexes []bson.Raw 1462 Cursor cursorData 1463 } 1464 var iter *Iter 1465 err = c.Database.With(cloned).Run(bson.D{{"listIndexes", c.Name}, {"cursor", bson.D{{"batchSize", batchSize}}}}, &result) 1466 if err == nil { 1467 firstBatch := result.Indexes 1468 if firstBatch == nil { 1469 firstBatch = result.Cursor.FirstBatch 1470 } 1471 ns := strings.SplitN(result.Cursor.NS, ".", 2) 1472 if len(ns) < 2 { 1473 iter = c.With(cloned).NewIter(nil, firstBatch, result.Cursor.Id, nil) 1474 } else { 1475 iter = cloned.DB(ns[0]).C(ns[1]).NewIter(nil, firstBatch, result.Cursor.Id, nil) 1476 } 1477 } else if isNoCmd(err) { 1478 // Command not yet supported. Query the database instead. 1479 iter = c.Database.C("system.indexes").Find(bson.M{"ns": c.FullName}).Iter() 1480 } else { 1481 return nil, err 1482 } 1483 1484 var spec indexSpec 1485 for iter.Next(&spec) { 1486 indexes = append(indexes, indexFromSpec(spec)) 1487 } 1488 if err = iter.Close(); err != nil { 1489 return nil, err 1490 } 1491 sort.Sort(indexSlice(indexes)) 1492 return indexes, nil 1493} 1494 1495func indexFromSpec(spec indexSpec) Index { 1496 index := Index{ 1497 Name: spec.Name, 1498 Key: simpleIndexKey(spec.Key), 1499 Unique: spec.Unique, 1500 DropDups: spec.DropDups, 1501 Background: spec.Background, 1502 Sparse: spec.Sparse, 1503 Minf: spec.Min, 1504 Maxf: spec.Max, 1505 Bits: spec.Bits, 1506 BucketSize: spec.BucketSize, 1507 DefaultLanguage: spec.DefaultLanguage, 1508 LanguageOverride: spec.LanguageOverride, 1509 ExpireAfter: time.Duration(spec.ExpireAfter) * time.Second, 1510 Collation: spec.Collation, 1511 } 1512 if float64(int(spec.Min)) == spec.Min && float64(int(spec.Max)) == spec.Max { 1513 index.Min = int(spec.Min) 1514 index.Max = int(spec.Max) 1515 } 1516 if spec.TextIndexVersion > 0 { 1517 index.Key = make([]string, len(spec.Weights)) 1518 index.Weights = make(map[string]int) 1519 for i, elem := range spec.Weights { 1520 index.Key[i] = "$text:" + elem.Name 1521 if w, ok := elem.Value.(int); ok { 1522 index.Weights[elem.Name] = w 1523 } 1524 } 1525 } 1526 return index 1527} 1528 1529type indexSlice []Index 1530 1531func (idxs indexSlice) Len() int { return len(idxs) } 1532func (idxs indexSlice) Less(i, j int) bool { return idxs[i].Name < idxs[j].Name } 1533func (idxs indexSlice) Swap(i, j int) { idxs[i], idxs[j] = idxs[j], idxs[i] } 1534 1535func simpleIndexKey(realKey bson.D) (key []string) { 1536 for i := range realKey { 1537 var vi int 1538 field := realKey[i].Name 1539 1540 switch realKey[i].Value.(type) { 1541 case int64: 1542 vf, _ := realKey[i].Value.(int64) 1543 vi = int(vf) 1544 case float64: 1545 vf, _ := realKey[i].Value.(float64) 1546 vi = int(vf) 1547 case string: 1548 if vs, ok := realKey[i].Value.(string); ok { 1549 key = append(key, "$"+vs+":"+field) 1550 continue 1551 } 1552 case int: 1553 vi = realKey[i].Value.(int) 1554 } 1555 1556 if vi == 1 { 1557 key = append(key, field) 1558 continue 1559 } 1560 if vi == -1 { 1561 key = append(key, "-"+field) 1562 continue 1563 } 1564 panic("Got unknown index key type for field " + field) 1565 } 1566 return 1567} 1568 1569// ResetIndexCache() clears the cache of previously ensured indexes. 1570// Following requests to EnsureIndex will contact the server. 1571func (s *Session) ResetIndexCache() { 1572 s.cluster().ResetIndexCache() 1573} 1574 1575// New creates a new session with the same parameters as the original 1576// session, including consistency, batch size, prefetching, safety mode, 1577// etc. The returned session will use sockets from the pool, so there's 1578// a chance that writes just performed in another session may not yet 1579// be visible. 1580// 1581// Login information from the original session will not be copied over 1582// into the new session unless it was provided through the initial URL 1583// for the Dial function. 1584// 1585// See the Copy and Clone methods. 1586// 1587func (s *Session) New() *Session { 1588 s.m.Lock() 1589 scopy := copySession(s, false) 1590 s.m.Unlock() 1591 scopy.Refresh() 1592 return scopy 1593} 1594 1595// Copy works just like New, but preserves the exact authentication 1596// information from the original session. 1597func (s *Session) Copy() *Session { 1598 s.m.Lock() 1599 scopy := copySession(s, true) 1600 s.m.Unlock() 1601 scopy.Refresh() 1602 return scopy 1603} 1604 1605// Clone works just like Copy, but also reuses the same socket as the original 1606// session, in case it had already reserved one due to its consistency 1607// guarantees. This behavior ensures that writes performed in the old session 1608// are necessarily observed when using the new session, as long as it was a 1609// strong or monotonic session. That said, it also means that long operations 1610// may cause other goroutines using the original session to wait. 1611func (s *Session) Clone() *Session { 1612 s.m.Lock() 1613 scopy := copySession(s, true) 1614 s.m.Unlock() 1615 return scopy 1616} 1617 1618// Close terminates the session. It's a runtime error to use a session 1619// after it has been closed. 1620func (s *Session) Close() { 1621 s.m.Lock() 1622 if s.cluster_ != nil { 1623 debugf("Closing session %p", s) 1624 s.unsetSocket() 1625 s.cluster_.Release() 1626 s.cluster_ = nil 1627 } 1628 s.m.Unlock() 1629} 1630 1631func (s *Session) cluster() *mongoCluster { 1632 if s.cluster_ == nil { 1633 panic("Session already closed") 1634 } 1635 return s.cluster_ 1636} 1637 1638// Refresh puts back any reserved sockets in use and restarts the consistency 1639// guarantees according to the current consistency setting for the session. 1640func (s *Session) Refresh() { 1641 s.m.Lock() 1642 s.slaveOk = s.consistency != Strong 1643 s.unsetSocket() 1644 s.m.Unlock() 1645} 1646 1647// SetMode changes the consistency mode for the session. 1648// 1649// The default mode is Strong. 1650// 1651// In the Strong consistency mode reads and writes will always be made to 1652// the primary server using a unique connection so that reads and writes are 1653// fully consistent, ordered, and observing the most up-to-date data. 1654// This offers the least benefits in terms of distributing load, but the 1655// most guarantees. See also Monotonic and Eventual. 1656// 1657// In the Monotonic consistency mode reads may not be entirely up-to-date, 1658// but they will always see the history of changes moving forward, the data 1659// read will be consistent across sequential queries in the same session, 1660// and modifications made within the session will be observed in following 1661// queries (read-your-writes). 1662// 1663// In practice, the Monotonic mode is obtained by performing initial reads 1664// on a unique connection to an arbitrary secondary, if one is available, 1665// and once the first write happens, the session connection is switched over 1666// to the primary server. This manages to distribute some of the reading 1667// load with secondaries, while maintaining some useful guarantees. 1668// 1669// In the Eventual consistency mode reads will be made to any secondary in the 1670// cluster, if one is available, and sequential reads will not necessarily 1671// be made with the same connection. This means that data may be observed 1672// out of order. Writes will of course be issued to the primary, but 1673// independent writes in the same Eventual session may also be made with 1674// independent connections, so there are also no guarantees in terms of 1675// write ordering (no read-your-writes guarantees either). 1676// 1677// The Eventual mode is the fastest and most resource-friendly, but is 1678// also the one offering the least guarantees about ordering of the data 1679// read and written. 1680// 1681// If refresh is true, in addition to ensuring the session is in the given 1682// consistency mode, the consistency guarantees will also be reset (e.g. 1683// a Monotonic session will be allowed to read from secondaries again). 1684// This is equivalent to calling the Refresh function. 1685// 1686// Shifting between Monotonic and Strong modes will keep a previously 1687// reserved connection for the session unless refresh is true or the 1688// connection is unsuitable (to a secondary server in a Strong session). 1689func (s *Session) SetMode(consistency Mode, refresh bool) { 1690 s.m.Lock() 1691 debugf("Session %p: setting mode %d with refresh=%v (master=%p, slave=%p)", s, consistency, refresh, s.masterSocket, s.slaveSocket) 1692 s.consistency = consistency 1693 if refresh { 1694 s.slaveOk = s.consistency != Strong 1695 s.unsetSocket() 1696 } else if s.consistency == Strong { 1697 s.slaveOk = false 1698 } else if s.masterSocket == nil { 1699 s.slaveOk = true 1700 } 1701 s.m.Unlock() 1702} 1703 1704// Mode returns the current consistency mode for the session. 1705func (s *Session) Mode() Mode { 1706 s.m.RLock() 1707 mode := s.consistency 1708 s.m.RUnlock() 1709 return mode 1710} 1711 1712// SetSyncTimeout sets the amount of time an operation with this session 1713// will wait before returning an error in case a connection to a usable 1714// server can't be established. Set it to zero to wait forever. The 1715// default value is 7 seconds. 1716func (s *Session) SetSyncTimeout(d time.Duration) { 1717 s.m.Lock() 1718 s.syncTimeout = d 1719 s.m.Unlock() 1720} 1721 1722// SetSocketTimeout sets the amount of time to wait for a non-responding 1723// socket to the database before it is forcefully closed. 1724// 1725// The default timeout is 1 minute. 1726func (s *Session) SetSocketTimeout(d time.Duration) { 1727 s.m.Lock() 1728 s.sockTimeout = d 1729 if s.masterSocket != nil { 1730 s.masterSocket.SetTimeout(d) 1731 } 1732 if s.slaveSocket != nil { 1733 s.slaveSocket.SetTimeout(d) 1734 } 1735 s.m.Unlock() 1736} 1737 1738// SetCursorTimeout changes the standard timeout period that the server 1739// enforces on created cursors. The only supported value right now is 1740// 0, which disables the timeout. The standard server timeout is 10 minutes. 1741func (s *Session) SetCursorTimeout(d time.Duration) { 1742 s.m.Lock() 1743 if d == 0 { 1744 s.queryConfig.op.flags |= flagNoCursorTimeout 1745 } else { 1746 panic("SetCursorTimeout: only 0 (disable timeout) supported for now") 1747 } 1748 s.m.Unlock() 1749} 1750 1751// SetPoolLimit sets the maximum number of sockets in use in a single server 1752// before this session will block waiting for a socket to be available. 1753// The default limit is 4096. 1754// 1755// This limit must be set to cover more than any expected workload of the 1756// application. It is a bad practice and an unsupported use case to use the 1757// database driver to define the concurrency limit of an application. Prevent 1758// such concurrency "at the door" instead, by properly restricting the amount 1759// of used resources and number of goroutines before they are created. 1760func (s *Session) SetPoolLimit(limit int) { 1761 s.m.Lock() 1762 s.poolLimit = limit 1763 s.m.Unlock() 1764} 1765 1766// SetBypassValidation sets whether the server should bypass the registered 1767// validation expressions executed when documents are inserted or modified, 1768// in the interest of preserving invariants in the collection being modified. 1769// The default is to not bypass, and thus to perform the validation 1770// expressions registered for modified collections. 1771// 1772// Document validation was introuced in MongoDB 3.2. 1773// 1774// Relevant documentation: 1775// 1776// https://docs.mongodb.org/manual/release-notes/3.2/#bypass-validation 1777// 1778func (s *Session) SetBypassValidation(bypass bool) { 1779 s.m.Lock() 1780 s.bypassValidation = bypass 1781 s.m.Unlock() 1782} 1783 1784// SetBatch sets the default batch size used when fetching documents from the 1785// database. It's possible to change this setting on a per-query basis as 1786// well, using the Query.Batch method. 1787// 1788// The default batch size is defined by the database itself. As of this 1789// writing, MongoDB will use an initial size of min(100 docs, 4MB) on the 1790// first batch, and 4MB on remaining ones. 1791func (s *Session) SetBatch(n int) { 1792 if n == 1 { 1793 // Server interprets 1 as -1 and closes the cursor (!?) 1794 n = 2 1795 } 1796 s.m.Lock() 1797 s.queryConfig.op.limit = int32(n) 1798 s.m.Unlock() 1799} 1800 1801// SetPrefetch sets the default point at which the next batch of results will be 1802// requested. When there are p*batch_size remaining documents cached in an 1803// Iter, the next batch will be requested in background. For instance, when 1804// using this: 1805// 1806// session.SetBatch(200) 1807// session.SetPrefetch(0.25) 1808// 1809// and there are only 50 documents cached in the Iter to be processed, the 1810// next batch of 200 will be requested. It's possible to change this setting on 1811// a per-query basis as well, using the Prefetch method of Query. 1812// 1813// The default prefetch value is 0.25. 1814func (s *Session) SetPrefetch(p float64) { 1815 s.m.Lock() 1816 s.queryConfig.prefetch = p 1817 s.m.Unlock() 1818} 1819 1820// See SetSafe for details on the Safe type. 1821type Safe struct { 1822 W int // Min # of servers to ack before success 1823 WMode string // Write mode for MongoDB 2.0+ (e.g. "majority") 1824 WTimeout int // Milliseconds to wait for W before timing out 1825 FSync bool // Sync via the journal if present, or via data files sync otherwise 1826 J bool // Sync via the journal if present 1827} 1828 1829// Safe returns the current safety mode for the session. 1830func (s *Session) Safe() (safe *Safe) { 1831 s.m.Lock() 1832 defer s.m.Unlock() 1833 if s.safeOp != nil { 1834 cmd := s.safeOp.query.(*getLastError) 1835 safe = &Safe{WTimeout: cmd.WTimeout, FSync: cmd.FSync, J: cmd.J} 1836 switch w := cmd.W.(type) { 1837 case string: 1838 safe.WMode = w 1839 case int: 1840 safe.W = w 1841 } 1842 } 1843 return 1844} 1845 1846// SetSafe changes the session safety mode. 1847// 1848// If the safe parameter is nil, the session is put in unsafe mode, and writes 1849// become fire-and-forget, without error checking. The unsafe mode is faster 1850// since operations won't hold on waiting for a confirmation. 1851// 1852// If the safe parameter is not nil, any changing query (insert, update, ...) 1853// will be followed by a getLastError command with the specified parameters, 1854// to ensure the request was correctly processed. 1855// 1856// The default is &Safe{}, meaning check for errors and use the default 1857// behavior for all fields. 1858// 1859// The safe.W parameter determines how many servers should confirm a write 1860// before the operation is considered successful. If set to 0 or 1, the 1861// command will return as soon as the primary is done with the request. 1862// If safe.WTimeout is greater than zero, it determines how many milliseconds 1863// to wait for the safe.W servers to respond before returning an error. 1864// 1865// Starting with MongoDB 2.0.0 the safe.WMode parameter can be used instead 1866// of W to request for richer semantics. If set to "majority" the server will 1867// wait for a majority of members from the replica set to respond before 1868// returning. Custom modes may also be defined within the server to create 1869// very detailed placement schemas. See the data awareness documentation in 1870// the links below for more details (note that MongoDB internally reuses the 1871// "w" field name for WMode). 1872// 1873// If safe.J is true, servers will block until write operations have been 1874// committed to the journal. Cannot be used in combination with FSync. Prior 1875// to MongoDB 2.6 this option was ignored if the server was running without 1876// journaling. Starting with MongoDB 2.6 write operations will fail with an 1877// exception if this option is used when the server is running without 1878// journaling. 1879// 1880// If safe.FSync is true and the server is running without journaling, blocks 1881// until the server has synced all data files to disk. If the server is running 1882// with journaling, this acts the same as the J option, blocking until write 1883// operations have been committed to the journal. Cannot be used in 1884// combination with J. 1885// 1886// Since MongoDB 2.0.0, the safe.J option can also be used instead of FSync 1887// to force the server to wait for a group commit in case journaling is 1888// enabled. The option has no effect if the server has journaling disabled. 1889// 1890// For example, the following statement will make the session check for 1891// errors, without imposing further constraints: 1892// 1893// session.SetSafe(&mgo.Safe{}) 1894// 1895// The following statement will force the server to wait for a majority of 1896// members of a replica set to return (MongoDB 2.0+ only): 1897// 1898// session.SetSafe(&mgo.Safe{WMode: "majority"}) 1899// 1900// The following statement, on the other hand, ensures that at least two 1901// servers have flushed the change to disk before confirming the success 1902// of operations: 1903// 1904// session.EnsureSafe(&mgo.Safe{W: 2, FSync: true}) 1905// 1906// The following statement, on the other hand, disables the verification 1907// of errors entirely: 1908// 1909// session.SetSafe(nil) 1910// 1911// See also the EnsureSafe method. 1912// 1913// Relevant documentation: 1914// 1915// http://www.mongodb.org/display/DOCS/getLastError+Command 1916// http://www.mongodb.org/display/DOCS/Verifying+Propagation+of+Writes+with+getLastError 1917// http://www.mongodb.org/display/DOCS/Data+Center+Awareness 1918// 1919func (s *Session) SetSafe(safe *Safe) { 1920 s.m.Lock() 1921 s.safeOp = nil 1922 s.ensureSafe(safe) 1923 s.m.Unlock() 1924} 1925 1926// EnsureSafe compares the provided safety parameters with the ones 1927// currently in use by the session and picks the most conservative 1928// choice for each setting. 1929// 1930// That is: 1931// 1932// - safe.WMode is always used if set. 1933// - safe.W is used if larger than the current W and WMode is empty. 1934// - safe.FSync is always used if true. 1935// - safe.J is used if FSync is false. 1936// - safe.WTimeout is used if set and smaller than the current WTimeout. 1937// 1938// For example, the following statement will ensure the session is 1939// at least checking for errors, without enforcing further constraints. 1940// If a more conservative SetSafe or EnsureSafe call was previously done, 1941// the following call will be ignored. 1942// 1943// session.EnsureSafe(&mgo.Safe{}) 1944// 1945// See also the SetSafe method for details on what each option means. 1946// 1947// Relevant documentation: 1948// 1949// http://www.mongodb.org/display/DOCS/getLastError+Command 1950// http://www.mongodb.org/display/DOCS/Verifying+Propagation+of+Writes+with+getLastError 1951// http://www.mongodb.org/display/DOCS/Data+Center+Awareness 1952// 1953func (s *Session) EnsureSafe(safe *Safe) { 1954 s.m.Lock() 1955 s.ensureSafe(safe) 1956 s.m.Unlock() 1957} 1958 1959func (s *Session) ensureSafe(safe *Safe) { 1960 if safe == nil { 1961 return 1962 } 1963 1964 var w interface{} 1965 if safe.WMode != "" { 1966 w = safe.WMode 1967 } else if safe.W > 0 { 1968 w = safe.W 1969 } 1970 1971 var cmd getLastError 1972 if s.safeOp == nil { 1973 cmd = getLastError{1, w, safe.WTimeout, safe.FSync, safe.J} 1974 } else { 1975 // Copy. We don't want to mutate the existing query. 1976 cmd = *(s.safeOp.query.(*getLastError)) 1977 if cmd.W == nil { 1978 cmd.W = w 1979 } else if safe.WMode != "" { 1980 cmd.W = safe.WMode 1981 } else if i, ok := cmd.W.(int); ok && safe.W > i { 1982 cmd.W = safe.W 1983 } 1984 if safe.WTimeout > 0 && safe.WTimeout < cmd.WTimeout { 1985 cmd.WTimeout = safe.WTimeout 1986 } 1987 if safe.FSync { 1988 cmd.FSync = true 1989 cmd.J = false 1990 } else if safe.J && !cmd.FSync { 1991 cmd.J = true 1992 } 1993 } 1994 s.safeOp = &queryOp{ 1995 query: &cmd, 1996 collection: "admin.$cmd", 1997 limit: -1, 1998 } 1999} 2000 2001// Run issues the provided command on the "admin" database and 2002// and unmarshals its result in the respective argument. The cmd 2003// argument may be either a string with the command name itself, in 2004// which case an empty document of the form bson.M{cmd: 1} will be used, 2005// or it may be a full command document. 2006// 2007// Note that MongoDB considers the first marshalled key as the command 2008// name, so when providing a command with options, it's important to 2009// use an ordering-preserving document, such as a struct value or an 2010// instance of bson.D. For instance: 2011// 2012// db.Run(bson.D{{"create", "mycollection"}, {"size", 1024}}) 2013// 2014// For commands on arbitrary databases, see the Run method in 2015// the Database type. 2016// 2017// Relevant documentation: 2018// 2019// http://www.mongodb.org/display/DOCS/Commands 2020// http://www.mongodb.org/display/DOCS/List+of+Database+CommandSkips 2021// 2022func (s *Session) Run(cmd interface{}, result interface{}) error { 2023 return s.DB("admin").Run(cmd, result) 2024} 2025 2026// SelectServers restricts communication to servers configured with the 2027// given tags. For example, the following statement restricts servers 2028// used for reading operations to those with both tag "disk" set to 2029// "ssd" and tag "rack" set to 1: 2030// 2031// session.SelectServers(bson.D{{"disk", "ssd"}, {"rack", 1}}) 2032// 2033// Multiple sets of tags may be provided, in which case the used server 2034// must match all tags within any one set. 2035// 2036// If a connection was previously assigned to the session due to the 2037// current session mode (see Session.SetMode), the tag selection will 2038// only be enforced after the session is refreshed. 2039// 2040// Relevant documentation: 2041// 2042// http://docs.mongodb.org/manual/tutorial/configure-replica-set-tag-sets 2043// 2044func (s *Session) SelectServers(tags ...bson.D) { 2045 s.m.Lock() 2046 s.queryConfig.op.serverTags = tags 2047 s.m.Unlock() 2048} 2049 2050// Ping runs a trivial ping command just to get in touch with the server. 2051func (s *Session) Ping() error { 2052 return s.Run("ping", nil) 2053} 2054 2055// Fsync flushes in-memory writes to disk on the server the session 2056// is established with. If async is true, the call returns immediately, 2057// otherwise it returns after the flush has been made. 2058func (s *Session) Fsync(async bool) error { 2059 return s.Run(bson.D{{"fsync", 1}, {"async", async}}, nil) 2060} 2061 2062// FsyncLock locks all writes in the specific server the session is 2063// established with and returns. Any writes attempted to the server 2064// after it is successfully locked will block until FsyncUnlock is 2065// called for the same server. 2066// 2067// This method works on secondaries as well, preventing the oplog from 2068// being flushed while the server is locked, but since only the server 2069// connected to is locked, for locking specific secondaries it may be 2070// necessary to establish a connection directly to the secondary (see 2071// Dial's connect=direct option). 2072// 2073// As an important caveat, note that once a write is attempted and 2074// blocks, follow up reads will block as well due to the way the 2075// lock is internally implemented in the server. More details at: 2076// 2077// https://jira.mongodb.org/browse/SERVER-4243 2078// 2079// FsyncLock is often used for performing consistent backups of 2080// the database files on disk. 2081// 2082// Relevant documentation: 2083// 2084// http://www.mongodb.org/display/DOCS/fsync+Command 2085// http://www.mongodb.org/display/DOCS/Backups 2086// 2087func (s *Session) FsyncLock() error { 2088 return s.Run(bson.D{{"fsync", 1}, {"lock", true}}, nil) 2089} 2090 2091// FsyncUnlock releases the server for writes. See FsyncLock for details. 2092func (s *Session) FsyncUnlock() error { 2093 err := s.Run(bson.D{{"fsyncUnlock", 1}}, nil) 2094 if isNoCmd(err) { 2095 err = s.DB("admin").C("$cmd.sys.unlock").Find(nil).One(nil) // WTF? 2096 } 2097 return err 2098} 2099 2100// Find prepares a query using the provided document. The document may be a 2101// map or a struct value capable of being marshalled with bson. The map 2102// may be a generic one using interface{} for its key and/or values, such as 2103// bson.M, or it may be a properly typed map. Providing nil as the document 2104// is equivalent to providing an empty document such as bson.M{}. 2105// 2106// Further details of the query may be tweaked using the resulting Query value, 2107// and then executed to retrieve results using methods such as One, For, 2108// Iter, or Tail. 2109// 2110// In case the resulting document includes a field named $err or errmsg, which 2111// are standard ways for MongoDB to return query errors, the returned err will 2112// be set to a *QueryError value including the Err message and the Code. In 2113// those cases, the result argument is still unmarshalled into with the 2114// received document so that any other custom values may be obtained if 2115// desired. 2116// 2117// Relevant documentation: 2118// 2119// http://www.mongodb.org/display/DOCS/Querying 2120// http://www.mongodb.org/display/DOCS/Advanced+Queries 2121// 2122func (c *Collection) Find(query interface{}) *Query { 2123 session := c.Database.Session 2124 session.m.RLock() 2125 q := &Query{session: session, query: session.queryConfig} 2126 session.m.RUnlock() 2127 q.op.query = query 2128 q.op.collection = c.FullName 2129 return q 2130} 2131 2132type repairCmd struct { 2133 RepairCursor string `bson:"repairCursor"` 2134 Cursor *repairCmdCursor ",omitempty" 2135} 2136 2137type repairCmdCursor struct { 2138 BatchSize int `bson:"batchSize,omitempty"` 2139} 2140 2141// Repair returns an iterator that goes over all recovered documents in the 2142// collection, in a best-effort manner. This is most useful when there are 2143// damaged data files. Multiple copies of the same document may be returned 2144// by the iterator. 2145// 2146// Repair is supported in MongoDB 2.7.8 and later. 2147func (c *Collection) Repair() *Iter { 2148 // Clone session and set it to Monotonic mode so that the server 2149 // used for the query may be safely obtained afterwards, if 2150 // necessary for iteration when a cursor is received. 2151 session := c.Database.Session 2152 cloned := session.nonEventual() 2153 defer cloned.Close() 2154 2155 batchSize := int(cloned.queryConfig.op.limit) 2156 2157 var result struct{ Cursor cursorData } 2158 2159 cmd := repairCmd{ 2160 RepairCursor: c.Name, 2161 Cursor: &repairCmdCursor{batchSize}, 2162 } 2163 2164 clonedc := c.With(cloned) 2165 err := clonedc.Database.Run(cmd, &result) 2166 return clonedc.NewIter(session, result.Cursor.FirstBatch, result.Cursor.Id, err) 2167} 2168 2169// FindId is a convenience helper equivalent to: 2170// 2171// query := collection.Find(bson.M{"_id": id}) 2172// 2173// See the Find method for more details. 2174func (c *Collection) FindId(id interface{}) *Query { 2175 return c.Find(bson.D{{"_id", id}}) 2176} 2177 2178type Pipe struct { 2179 session *Session 2180 collection *Collection 2181 pipeline interface{} 2182 allowDisk bool 2183 batchSize int 2184} 2185 2186type pipeCmd struct { 2187 Aggregate string 2188 Pipeline interface{} 2189 Cursor *pipeCmdCursor ",omitempty" 2190 Explain bool ",omitempty" 2191 AllowDisk bool "allowDiskUse,omitempty" 2192} 2193 2194type pipeCmdCursor struct { 2195 BatchSize int `bson:"batchSize,omitempty"` 2196} 2197 2198// Pipe prepares a pipeline to aggregate. The pipeline document 2199// must be a slice built in terms of the aggregation framework language. 2200// 2201// For example: 2202// 2203// pipe := collection.Pipe([]bson.M{{"$match": bson.M{"name": "Otavio"}}}) 2204// iter := pipe.Iter() 2205// 2206// Relevant documentation: 2207// 2208// http://docs.mongodb.org/manual/reference/aggregation 2209// http://docs.mongodb.org/manual/applications/aggregation 2210// http://docs.mongodb.org/manual/tutorial/aggregation-examples 2211// 2212func (c *Collection) Pipe(pipeline interface{}) *Pipe { 2213 session := c.Database.Session 2214 session.m.RLock() 2215 batchSize := int(session.queryConfig.op.limit) 2216 session.m.RUnlock() 2217 return &Pipe{ 2218 session: session, 2219 collection: c, 2220 pipeline: pipeline, 2221 batchSize: batchSize, 2222 } 2223} 2224 2225// Iter executes the pipeline and returns an iterator capable of going 2226// over all the generated results. 2227func (p *Pipe) Iter() *Iter { 2228 // Clone session and set it to Monotonic mode so that the server 2229 // used for the query may be safely obtained afterwards, if 2230 // necessary for iteration when a cursor is received. 2231 cloned := p.session.nonEventual() 2232 defer cloned.Close() 2233 c := p.collection.With(cloned) 2234 2235 var result struct { 2236 Result []bson.Raw // 2.4, no cursors. 2237 Cursor cursorData // 2.6+, with cursors. 2238 } 2239 2240 cmd := pipeCmd{ 2241 Aggregate: c.Name, 2242 Pipeline: p.pipeline, 2243 AllowDisk: p.allowDisk, 2244 Cursor: &pipeCmdCursor{p.batchSize}, 2245 } 2246 err := c.Database.Run(cmd, &result) 2247 if e, ok := err.(*QueryError); ok && e.Message == `unrecognized field "cursor` { 2248 cmd.Cursor = nil 2249 cmd.AllowDisk = false 2250 err = c.Database.Run(cmd, &result) 2251 } 2252 firstBatch := result.Result 2253 if firstBatch == nil { 2254 firstBatch = result.Cursor.FirstBatch 2255 } 2256 return c.NewIter(p.session, firstBatch, result.Cursor.Id, err) 2257} 2258 2259// NewIter returns a newly created iterator with the provided parameters. 2260// Using this method is not recommended unless the desired functionality 2261// is not yet exposed via a more convenient interface (Find, Pipe, etc). 2262// 2263// The optional session parameter associates the lifetime of the returned 2264// iterator to an arbitrary session. If nil, the iterator will be bound to 2265// c's session. 2266// 2267// Documents in firstBatch will be individually provided by the returned 2268// iterator before documents from cursorId are made available. If cursorId 2269// is zero, only the documents in firstBatch are provided. 2270// 2271// If err is not nil, the iterator's Err method will report it after 2272// exhausting documents in firstBatch. 2273// 2274// NewIter must be called right after the cursor id is obtained, and must not 2275// be called on a collection in Eventual mode, because the cursor id is 2276// associated with the specific server that returned it. The provided session 2277// parameter may be in any mode or state, though. 2278// 2279func (c *Collection) NewIter(session *Session, firstBatch []bson.Raw, cursorId int64, err error) *Iter { 2280 var server *mongoServer 2281 csession := c.Database.Session 2282 csession.m.RLock() 2283 socket := csession.masterSocket 2284 if socket == nil { 2285 socket = csession.slaveSocket 2286 } 2287 if socket != nil { 2288 server = socket.Server() 2289 } 2290 csession.m.RUnlock() 2291 2292 if server == nil { 2293 if csession.Mode() == Eventual { 2294 panic("Collection.NewIter called in Eventual mode") 2295 } 2296 if err == nil { 2297 err = errors.New("server not available") 2298 } 2299 } 2300 2301 if session == nil { 2302 session = csession 2303 } 2304 2305 iter := &Iter{ 2306 session: session, 2307 server: server, 2308 timeout: -1, 2309 err: err, 2310 } 2311 iter.gotReply.L = &iter.m 2312 for _, doc := range firstBatch { 2313 iter.docData.Push(doc.Data) 2314 } 2315 if cursorId != 0 { 2316 iter.op.cursorId = cursorId 2317 iter.op.collection = c.FullName 2318 iter.op.replyFunc = iter.replyFunc() 2319 } 2320 return iter 2321} 2322 2323// All works like Iter.All. 2324func (p *Pipe) All(result interface{}) error { 2325 return p.Iter().All(result) 2326} 2327 2328// One executes the pipeline and unmarshals the first item from the 2329// result set into the result parameter. 2330// It returns ErrNotFound if no items are generated by the pipeline. 2331func (p *Pipe) One(result interface{}) error { 2332 iter := p.Iter() 2333 if iter.Next(result) { 2334 return nil 2335 } 2336 if err := iter.Err(); err != nil { 2337 return err 2338 } 2339 return ErrNotFound 2340} 2341 2342// Explain returns a number of details about how the MongoDB server would 2343// execute the requested pipeline, such as the number of objects examined, 2344// the number of times the read lock was yielded to allow writes to go in, 2345// and so on. 2346// 2347// For example: 2348// 2349// var m bson.M 2350// err := collection.Pipe(pipeline).Explain(&m) 2351// if err == nil { 2352// fmt.Printf("Explain: %#v\n", m) 2353// } 2354// 2355func (p *Pipe) Explain(result interface{}) error { 2356 c := p.collection 2357 cmd := pipeCmd{ 2358 Aggregate: c.Name, 2359 Pipeline: p.pipeline, 2360 AllowDisk: p.allowDisk, 2361 Explain: true, 2362 } 2363 return c.Database.Run(cmd, result) 2364} 2365 2366// AllowDiskUse enables writing to the "<dbpath>/_tmp" server directory so 2367// that aggregation pipelines do not have to be held entirely in memory. 2368func (p *Pipe) AllowDiskUse() *Pipe { 2369 p.allowDisk = true 2370 return p 2371} 2372 2373// Batch sets the batch size used when fetching documents from the database. 2374// It's possible to change this setting on a per-session basis as well, using 2375// the Batch method of Session. 2376// 2377// The default batch size is defined by the database server. 2378func (p *Pipe) Batch(n int) *Pipe { 2379 p.batchSize = n 2380 return p 2381} 2382 2383// mgo.v3: Use a single user-visible error type. 2384 2385type LastError struct { 2386 Err string 2387 Code, N, Waited int 2388 FSyncFiles int `bson:"fsyncFiles"` 2389 WTimeout bool 2390 UpdatedExisting bool `bson:"updatedExisting"` 2391 UpsertedId interface{} `bson:"upserted"` 2392 2393 modified int 2394 ecases []BulkErrorCase 2395} 2396 2397func (err *LastError) Error() string { 2398 return err.Err 2399} 2400 2401type queryError struct { 2402 Err string "$err" 2403 ErrMsg string 2404 Assertion string 2405 Code int 2406 AssertionCode int "assertionCode" 2407} 2408 2409type QueryError struct { 2410 Code int 2411 Message string 2412 Assertion bool 2413} 2414 2415func (err *QueryError) Error() string { 2416 return err.Message 2417} 2418 2419// IsDup returns whether err informs of a duplicate key error because 2420// a primary key index or a secondary unique index already has an entry 2421// with the given value. 2422func IsDup(err error) bool { 2423 // Besides being handy, helps with MongoDB bugs SERVER-7164 and SERVER-11493. 2424 // What follows makes me sad. Hopefully conventions will be more clear over time. 2425 switch e := err.(type) { 2426 case *LastError: 2427 return e.Code == 11000 || e.Code == 11001 || e.Code == 12582 || e.Code == 16460 && strings.Contains(e.Err, " E11000 ") 2428 case *QueryError: 2429 return e.Code == 11000 || e.Code == 11001 || e.Code == 12582 2430 case *BulkError: 2431 for _, ecase := range e.ecases { 2432 if !IsDup(ecase.Err) { 2433 return false 2434 } 2435 } 2436 return true 2437 } 2438 return false 2439} 2440 2441// Insert inserts one or more documents in the respective collection. In 2442// case the session is in safe mode (see the SetSafe method) and an error 2443// happens while inserting the provided documents, the returned error will 2444// be of type *LastError. 2445func (c *Collection) Insert(docs ...interface{}) error { 2446 _, err := c.writeOp(&insertOp{c.FullName, docs, 0}, true) 2447 return err 2448} 2449 2450// Update finds a single document matching the provided selector document 2451// and modifies it according to the update document. 2452// If the session is in safe mode (see SetSafe) a ErrNotFound error is 2453// returned if a document isn't found, or a value of type *LastError 2454// when some other error is detected. 2455// 2456// Relevant documentation: 2457// 2458// http://www.mongodb.org/display/DOCS/Updating 2459// http://www.mongodb.org/display/DOCS/Atomic+Operations 2460// 2461func (c *Collection) Update(selector interface{}, update interface{}) error { 2462 if selector == nil { 2463 selector = bson.D{} 2464 } 2465 op := updateOp{ 2466 Collection: c.FullName, 2467 Selector: selector, 2468 Update: update, 2469 } 2470 lerr, err := c.writeOp(&op, true) 2471 if err == nil && lerr != nil && !lerr.UpdatedExisting { 2472 return ErrNotFound 2473 } 2474 return err 2475} 2476 2477// UpdateId is a convenience helper equivalent to: 2478// 2479// err := collection.Update(bson.M{"_id": id}, update) 2480// 2481// See the Update method for more details. 2482func (c *Collection) UpdateId(id interface{}, update interface{}) error { 2483 return c.Update(bson.D{{"_id", id}}, update) 2484} 2485 2486// ChangeInfo holds details about the outcome of an update operation. 2487type ChangeInfo struct { 2488 // Updated reports the number of existing documents modified. 2489 // Due to server limitations, this reports the same value as the Matched field when 2490 // talking to MongoDB <= 2.4 and on Upsert and Apply (findAndModify) operations. 2491 Updated int 2492 Removed int // Number of documents removed 2493 Matched int // Number of documents matched but not necessarily changed 2494 UpsertedId interface{} // Upserted _id field, when not explicitly provided 2495} 2496 2497// UpdateAll finds all documents matching the provided selector document 2498// and modifies them according to the update document. 2499// If the session is in safe mode (see SetSafe) details of the executed 2500// operation are returned in info or an error of type *LastError when 2501// some problem is detected. It is not an error for the update to not be 2502// applied on any documents because the selector doesn't match. 2503// 2504// Relevant documentation: 2505// 2506// http://www.mongodb.org/display/DOCS/Updating 2507// http://www.mongodb.org/display/DOCS/Atomic+Operations 2508// 2509func (c *Collection) UpdateAll(selector interface{}, update interface{}) (info *ChangeInfo, err error) { 2510 if selector == nil { 2511 selector = bson.D{} 2512 } 2513 op := updateOp{ 2514 Collection: c.FullName, 2515 Selector: selector, 2516 Update: update, 2517 Flags: 2, 2518 Multi: true, 2519 } 2520 lerr, err := c.writeOp(&op, true) 2521 if err == nil && lerr != nil { 2522 info = &ChangeInfo{Updated: lerr.modified, Matched: lerr.N} 2523 } 2524 return info, err 2525} 2526 2527// Upsert finds a single document matching the provided selector document 2528// and modifies it according to the update document. If no document matching 2529// the selector is found, the update document is applied to the selector 2530// document and the result is inserted in the collection. 2531// If the session is in safe mode (see SetSafe) details of the executed 2532// operation are returned in info, or an error of type *LastError when 2533// some problem is detected. 2534// 2535// Relevant documentation: 2536// 2537// http://www.mongodb.org/display/DOCS/Updating 2538// http://www.mongodb.org/display/DOCS/Atomic+Operations 2539// 2540func (c *Collection) Upsert(selector interface{}, update interface{}) (info *ChangeInfo, err error) { 2541 if selector == nil { 2542 selector = bson.D{} 2543 } 2544 op := updateOp{ 2545 Collection: c.FullName, 2546 Selector: selector, 2547 Update: update, 2548 Flags: 1, 2549 Upsert: true, 2550 } 2551 var lerr *LastError 2552 for i := 0; i < maxUpsertRetries; i++ { 2553 lerr, err = c.writeOp(&op, true) 2554 // Retry duplicate key errors on upserts. 2555 // https://docs.mongodb.com/v3.2/reference/method/db.collection.update/#use-unique-indexes 2556 if !IsDup(err) { 2557 break 2558 } 2559 } 2560 if err == nil && lerr != nil { 2561 info = &ChangeInfo{} 2562 if lerr.UpdatedExisting { 2563 info.Matched = lerr.N 2564 info.Updated = lerr.modified 2565 } else { 2566 info.UpsertedId = lerr.UpsertedId 2567 } 2568 } 2569 return info, err 2570} 2571 2572// UpsertId is a convenience helper equivalent to: 2573// 2574// info, err := collection.Upsert(bson.M{"_id": id}, update) 2575// 2576// See the Upsert method for more details. 2577func (c *Collection) UpsertId(id interface{}, update interface{}) (info *ChangeInfo, err error) { 2578 return c.Upsert(bson.D{{"_id", id}}, update) 2579} 2580 2581// Remove finds a single document matching the provided selector document 2582// and removes it from the database. 2583// If the session is in safe mode (see SetSafe) a ErrNotFound error is 2584// returned if a document isn't found, or a value of type *LastError 2585// when some other error is detected. 2586// 2587// Relevant documentation: 2588// 2589// http://www.mongodb.org/display/DOCS/Removing 2590// 2591func (c *Collection) Remove(selector interface{}) error { 2592 if selector == nil { 2593 selector = bson.D{} 2594 } 2595 lerr, err := c.writeOp(&deleteOp{c.FullName, selector, 1, 1}, true) 2596 if err == nil && lerr != nil && lerr.N == 0 { 2597 return ErrNotFound 2598 } 2599 return err 2600} 2601 2602// RemoveId is a convenience helper equivalent to: 2603// 2604// err := collection.Remove(bson.M{"_id": id}) 2605// 2606// See the Remove method for more details. 2607func (c *Collection) RemoveId(id interface{}) error { 2608 return c.Remove(bson.D{{"_id", id}}) 2609} 2610 2611// RemoveAll finds all documents matching the provided selector document 2612// and removes them from the database. In case the session is in safe mode 2613// (see the SetSafe method) and an error happens when attempting the change, 2614// the returned error will be of type *LastError. 2615// 2616// Relevant documentation: 2617// 2618// http://www.mongodb.org/display/DOCS/Removing 2619// 2620func (c *Collection) RemoveAll(selector interface{}) (info *ChangeInfo, err error) { 2621 if selector == nil { 2622 selector = bson.D{} 2623 } 2624 lerr, err := c.writeOp(&deleteOp{c.FullName, selector, 0, 0}, true) 2625 if err == nil && lerr != nil { 2626 info = &ChangeInfo{Removed: lerr.N, Matched: lerr.N} 2627 } 2628 return info, err 2629} 2630 2631// DropDatabase removes the entire database including all of its collections. 2632func (db *Database) DropDatabase() error { 2633 return db.Run(bson.D{{"dropDatabase", 1}}, nil) 2634} 2635 2636// DropCollection removes the entire collection including all of its documents. 2637func (c *Collection) DropCollection() error { 2638 return c.Database.Run(bson.D{{"drop", c.Name}}, nil) 2639} 2640 2641// The CollectionInfo type holds metadata about a collection. 2642// 2643// Relevant documentation: 2644// 2645// http://www.mongodb.org/display/DOCS/createCollection+Command 2646// http://www.mongodb.org/display/DOCS/Capped+Collections 2647// 2648type CollectionInfo struct { 2649 // DisableIdIndex prevents the automatic creation of the index 2650 // on the _id field for the collection. 2651 DisableIdIndex bool 2652 2653 // ForceIdIndex enforces the automatic creation of the index 2654 // on the _id field for the collection. Capped collections, 2655 // for example, do not have such an index by default. 2656 ForceIdIndex bool 2657 2658 // If Capped is true new documents will replace old ones when 2659 // the collection is full. MaxBytes must necessarily be set 2660 // to define the size when the collection wraps around. 2661 // MaxDocs optionally defines the number of documents when it 2662 // wraps, but MaxBytes still needs to be set. 2663 Capped bool 2664 MaxBytes int 2665 MaxDocs int 2666 2667 // Validator contains a validation expression that defines which 2668 // documents should be considered valid for this collection. 2669 Validator interface{} 2670 2671 // ValidationLevel may be set to "strict" (the default) to force 2672 // MongoDB to validate all documents on inserts and updates, to 2673 // "moderate" to apply the validation rules only to documents 2674 // that already fulfill the validation criteria, or to "off" for 2675 // disabling validation entirely. 2676 ValidationLevel string 2677 2678 // ValidationAction determines how MongoDB handles documents that 2679 // violate the validation rules. It may be set to "error" (the default) 2680 // to reject inserts or updates that violate the rules, or to "warn" 2681 // to log invalid operations but allow them to proceed. 2682 ValidationAction string 2683 2684 // StorageEngine allows specifying collection options for the 2685 // storage engine in use. The map keys must hold the storage engine 2686 // name for which options are being specified. 2687 StorageEngine interface{} 2688} 2689 2690// Create explicitly creates the c collection with details of info. 2691// MongoDB creates collections automatically on use, so this method 2692// is only necessary when creating collection with non-default 2693// characteristics, such as capped collections. 2694// 2695// Relevant documentation: 2696// 2697// http://www.mongodb.org/display/DOCS/createCollection+Command 2698// http://www.mongodb.org/display/DOCS/Capped+Collections 2699// 2700func (c *Collection) Create(info *CollectionInfo) error { 2701 cmd := make(bson.D, 0, 4) 2702 cmd = append(cmd, bson.DocElem{"create", c.Name}) 2703 if info.Capped { 2704 if info.MaxBytes < 1 { 2705 return fmt.Errorf("Collection.Create: with Capped, MaxBytes must also be set") 2706 } 2707 cmd = append(cmd, bson.DocElem{"capped", true}) 2708 cmd = append(cmd, bson.DocElem{"size", info.MaxBytes}) 2709 if info.MaxDocs > 0 { 2710 cmd = append(cmd, bson.DocElem{"max", info.MaxDocs}) 2711 } 2712 } 2713 if info.DisableIdIndex { 2714 cmd = append(cmd, bson.DocElem{"autoIndexId", false}) 2715 } 2716 if info.ForceIdIndex { 2717 cmd = append(cmd, bson.DocElem{"autoIndexId", true}) 2718 } 2719 if info.Validator != nil { 2720 cmd = append(cmd, bson.DocElem{"validator", info.Validator}) 2721 } 2722 if info.ValidationLevel != "" { 2723 cmd = append(cmd, bson.DocElem{"validationLevel", info.ValidationLevel}) 2724 } 2725 if info.ValidationAction != "" { 2726 cmd = append(cmd, bson.DocElem{"validationAction", info.ValidationAction}) 2727 } 2728 if info.StorageEngine != nil { 2729 cmd = append(cmd, bson.DocElem{"storageEngine", info.StorageEngine}) 2730 } 2731 return c.Database.Run(cmd, nil) 2732} 2733 2734// Batch sets the batch size used when fetching documents from the database. 2735// It's possible to change this setting on a per-session basis as well, using 2736// the Batch method of Session. 2737 2738// The default batch size is defined by the database itself. As of this 2739// writing, MongoDB will use an initial size of min(100 docs, 4MB) on the 2740// first batch, and 4MB on remaining ones. 2741func (q *Query) Batch(n int) *Query { 2742 if n == 1 { 2743 // Server interprets 1 as -1 and closes the cursor (!?) 2744 n = 2 2745 } 2746 q.m.Lock() 2747 q.op.limit = int32(n) 2748 q.m.Unlock() 2749 return q 2750} 2751 2752// Prefetch sets the point at which the next batch of results will be requested. 2753// When there are p*batch_size remaining documents cached in an Iter, the next 2754// batch will be requested in background. For instance, when using this: 2755// 2756// query.Batch(200).Prefetch(0.25) 2757// 2758// and there are only 50 documents cached in the Iter to be processed, the 2759// next batch of 200 will be requested. It's possible to change this setting on 2760// a per-session basis as well, using the SetPrefetch method of Session. 2761// 2762// The default prefetch value is 0.25. 2763func (q *Query) Prefetch(p float64) *Query { 2764 q.m.Lock() 2765 q.prefetch = p 2766 q.m.Unlock() 2767 return q 2768} 2769 2770// Skip skips over the n initial documents from the query results. Note that 2771// this only makes sense with capped collections where documents are naturally 2772// ordered by insertion time, or with sorted results. 2773func (q *Query) Skip(n int) *Query { 2774 q.m.Lock() 2775 q.op.skip = int32(n) 2776 q.m.Unlock() 2777 return q 2778} 2779 2780// Limit restricts the maximum number of documents retrieved to n, and also 2781// changes the batch size to the same value. Once n documents have been 2782// returned by Next, the following call will return ErrNotFound. 2783func (q *Query) Limit(n int) *Query { 2784 q.m.Lock() 2785 switch { 2786 case n == 1: 2787 q.limit = 1 2788 q.op.limit = -1 2789 case n == math.MinInt32: // -MinInt32 == -MinInt32 2790 q.limit = math.MaxInt32 2791 q.op.limit = math.MinInt32 + 1 2792 case n < 0: 2793 q.limit = int32(-n) 2794 q.op.limit = int32(n) 2795 default: 2796 q.limit = int32(n) 2797 q.op.limit = int32(n) 2798 } 2799 q.m.Unlock() 2800 return q 2801} 2802 2803// Select enables selecting which fields should be retrieved for the results 2804// found. For example, the following query would only retrieve the name field: 2805// 2806// err := collection.Find(nil).Select(bson.M{"name": 1}).One(&result) 2807// 2808// Relevant documentation: 2809// 2810// http://www.mongodb.org/display/DOCS/Retrieving+a+Subset+of+Fields 2811// 2812func (q *Query) Select(selector interface{}) *Query { 2813 q.m.Lock() 2814 q.op.selector = selector 2815 q.m.Unlock() 2816 return q 2817} 2818 2819// Sort asks the database to order returned documents according to the 2820// provided field names. A field name may be prefixed by - (minus) for 2821// it to be sorted in reverse order. 2822// 2823// For example: 2824// 2825// query1 := collection.Find(nil).Sort("firstname", "lastname") 2826// query2 := collection.Find(nil).Sort("-age") 2827// query3 := collection.Find(nil).Sort("$natural") 2828// query4 := collection.Find(nil).Select(bson.M{"score": bson.M{"$meta": "textScore"}}).Sort("$textScore:score") 2829// 2830// Relevant documentation: 2831// 2832// http://www.mongodb.org/display/DOCS/Sorting+and+Natural+Order 2833// 2834func (q *Query) Sort(fields ...string) *Query { 2835 q.m.Lock() 2836 var order bson.D 2837 for _, field := range fields { 2838 n := 1 2839 var kind string 2840 if field != "" { 2841 if field[0] == '$' { 2842 if c := strings.Index(field, ":"); c > 1 && c < len(field)-1 { 2843 kind = field[1:c] 2844 field = field[c+1:] 2845 } 2846 } 2847 switch field[0] { 2848 case '+': 2849 field = field[1:] 2850 case '-': 2851 n = -1 2852 field = field[1:] 2853 } 2854 } 2855 if field == "" { 2856 panic("Sort: empty field name") 2857 } 2858 if kind == "textScore" { 2859 order = append(order, bson.DocElem{field, bson.M{"$meta": kind}}) 2860 } else { 2861 order = append(order, bson.DocElem{field, n}) 2862 } 2863 } 2864 q.op.options.OrderBy = order 2865 q.op.hasOptions = true 2866 q.m.Unlock() 2867 return q 2868} 2869 2870// Explain returns a number of details about how the MongoDB server would 2871// execute the requested query, such as the number of objects examined, 2872// the number of times the read lock was yielded to allow writes to go in, 2873// and so on. 2874// 2875// For example: 2876// 2877// m := bson.M{} 2878// err := collection.Find(bson.M{"filename": name}).Explain(m) 2879// if err == nil { 2880// fmt.Printf("Explain: %#v\n", m) 2881// } 2882// 2883// Relevant documentation: 2884// 2885// http://www.mongodb.org/display/DOCS/Optimization 2886// http://www.mongodb.org/display/DOCS/Query+Optimizer 2887// 2888func (q *Query) Explain(result interface{}) error { 2889 q.m.Lock() 2890 clone := &Query{session: q.session, query: q.query} 2891 q.m.Unlock() 2892 clone.op.options.Explain = true 2893 clone.op.hasOptions = true 2894 if clone.op.limit > 0 { 2895 clone.op.limit = -q.op.limit 2896 } 2897 iter := clone.Iter() 2898 if iter.Next(result) { 2899 return nil 2900 } 2901 return iter.Close() 2902} 2903 2904// TODO: Add Collection.Explain. See https://goo.gl/1MDlvz. 2905 2906// Hint will include an explicit "hint" in the query to force the server 2907// to use a specified index, potentially improving performance in some 2908// situations. The provided parameters are the fields that compose the 2909// key of the index to be used. For details on how the indexKey may be 2910// built, see the EnsureIndex method. 2911// 2912// For example: 2913// 2914// query := collection.Find(bson.M{"firstname": "Joe", "lastname": "Winter"}) 2915// query.Hint("lastname", "firstname") 2916// 2917// Relevant documentation: 2918// 2919// http://www.mongodb.org/display/DOCS/Optimization 2920// http://www.mongodb.org/display/DOCS/Query+Optimizer 2921// 2922func (q *Query) Hint(indexKey ...string) *Query { 2923 q.m.Lock() 2924 keyInfo, err := parseIndexKey(indexKey) 2925 q.op.options.Hint = keyInfo.key 2926 q.op.hasOptions = true 2927 q.m.Unlock() 2928 if err != nil { 2929 panic(err) 2930 } 2931 return q 2932} 2933 2934// SetMaxScan constrains the query to stop after scanning the specified 2935// number of documents. 2936// 2937// This modifier is generally used to prevent potentially long running 2938// queries from disrupting performance by scanning through too much data. 2939func (q *Query) SetMaxScan(n int) *Query { 2940 q.m.Lock() 2941 q.op.options.MaxScan = n 2942 q.op.hasOptions = true 2943 q.m.Unlock() 2944 return q 2945} 2946 2947// SetMaxTime constrains the query to stop after running for the specified time. 2948// 2949// When the time limit is reached MongoDB automatically cancels the query. 2950// This can be used to efficiently prevent and identify unexpectedly slow queries. 2951// 2952// A few important notes about the mechanism enforcing this limit: 2953// 2954// - Requests can block behind locking operations on the server, and that blocking 2955// time is not accounted for. In other words, the timer starts ticking only after 2956// the actual start of the query when it initially acquires the appropriate lock; 2957// 2958// - Operations are interrupted only at interrupt points where an operation can be 2959// safely aborted – the total execution time may exceed the specified value; 2960// 2961// - The limit can be applied to both CRUD operations and commands, but not all 2962// commands are interruptible; 2963// 2964// - While iterating over results, computing follow up batches is included in the 2965// total time and the iteration continues until the alloted time is over, but 2966// network roundtrips are not taken into account for the limit. 2967// 2968// - This limit does not override the inactive cursor timeout for idle cursors 2969// (default is 10 min). 2970// 2971// This mechanism was introduced in MongoDB 2.6. 2972// 2973// Relevant documentation: 2974// 2975// http://blog.mongodb.org/post/83621787773/maxtimems-and-query-optimizer-introspection-in 2976// 2977func (q *Query) SetMaxTime(d time.Duration) *Query { 2978 q.m.Lock() 2979 q.op.options.MaxTimeMS = int(d / time.Millisecond) 2980 q.op.hasOptions = true 2981 q.m.Unlock() 2982 return q 2983} 2984 2985// Snapshot will force the performed query to make use of an available 2986// index on the _id field to prevent the same document from being returned 2987// more than once in a single iteration. This might happen without this 2988// setting in situations when the document changes in size and thus has to 2989// be moved while the iteration is running. 2990// 2991// Because snapshot mode traverses the _id index, it may not be used with 2992// sorting or explicit hints. It also cannot use any other index for the 2993// query. 2994// 2995// Even with snapshot mode, items inserted or deleted during the query may 2996// or may not be returned; that is, this mode is not a true point-in-time 2997// snapshot. 2998// 2999// The same effect of Snapshot may be obtained by using any unique index on 3000// field(s) that will not be modified (best to use Hint explicitly too). 3001// A non-unique index (such as creation time) may be made unique by 3002// appending _id to the index when creating it. 3003// 3004// Relevant documentation: 3005// 3006// http://www.mongodb.org/display/DOCS/How+to+do+Snapshotted+Queries+in+the+Mongo+Database 3007// 3008func (q *Query) Snapshot() *Query { 3009 q.m.Lock() 3010 q.op.options.Snapshot = true 3011 q.op.hasOptions = true 3012 q.m.Unlock() 3013 return q 3014} 3015 3016// Comment adds a comment to the query to identify it in the database profiler output. 3017// 3018// Relevant documentation: 3019// 3020// http://docs.mongodb.org/manual/reference/operator/meta/comment 3021// http://docs.mongodb.org/manual/reference/command/profile 3022// http://docs.mongodb.org/manual/administration/analyzing-mongodb-performance/#database-profiling 3023// 3024func (q *Query) Comment(comment string) *Query { 3025 q.m.Lock() 3026 q.op.options.Comment = comment 3027 q.op.hasOptions = true 3028 q.m.Unlock() 3029 return q 3030} 3031 3032// LogReplay enables an option that optimizes queries that are typically 3033// made on the MongoDB oplog for replaying it. This is an internal 3034// implementation aspect and most likely uninteresting for other uses. 3035// It has seen at least one use case, though, so it's exposed via the API. 3036func (q *Query) LogReplay() *Query { 3037 q.m.Lock() 3038 q.op.flags |= flagLogReplay 3039 q.m.Unlock() 3040 return q 3041} 3042 3043func checkQueryError(fullname string, d []byte) error { 3044 l := len(d) 3045 if l < 16 { 3046 return nil 3047 } 3048 if d[5] == '$' && d[6] == 'e' && d[7] == 'r' && d[8] == 'r' && d[9] == '\x00' && d[4] == '\x02' { 3049 goto Error 3050 } 3051 if len(fullname) < 5 || fullname[len(fullname)-5:] != ".$cmd" { 3052 return nil 3053 } 3054 for i := 0; i+8 < l; i++ { 3055 if d[i] == '\x02' && d[i+1] == 'e' && d[i+2] == 'r' && d[i+3] == 'r' && d[i+4] == 'm' && d[i+5] == 's' && d[i+6] == 'g' && d[i+7] == '\x00' { 3056 goto Error 3057 } 3058 } 3059 return nil 3060 3061Error: 3062 result := &queryError{} 3063 bson.Unmarshal(d, result) 3064 if result.Err == "" && result.ErrMsg == "" { 3065 return nil 3066 } 3067 if result.AssertionCode != 0 && result.Assertion != "" { 3068 return &QueryError{Code: result.AssertionCode, Message: result.Assertion, Assertion: true} 3069 } 3070 if result.Err != "" { 3071 return &QueryError{Code: result.Code, Message: result.Err} 3072 } 3073 return &QueryError{Code: result.Code, Message: result.ErrMsg} 3074} 3075 3076// One executes the query and unmarshals the first obtained document into the 3077// result argument. The result must be a struct or map value capable of being 3078// unmarshalled into by gobson. This function blocks until either a result 3079// is available or an error happens. For example: 3080// 3081// err := collection.Find(bson.M{"a": 1}).One(&result) 3082// 3083// In case the resulting document includes a field named $err or errmsg, which 3084// are standard ways for MongoDB to return query errors, the returned err will 3085// be set to a *QueryError value including the Err message and the Code. In 3086// those cases, the result argument is still unmarshalled into with the 3087// received document so that any other custom values may be obtained if 3088// desired. 3089// 3090func (q *Query) One(result interface{}) (err error) { 3091 q.m.Lock() 3092 session := q.session 3093 op := q.op // Copy. 3094 q.m.Unlock() 3095 3096 socket, err := session.acquireSocket(true) 3097 if err != nil { 3098 return err 3099 } 3100 defer socket.Release() 3101 3102 op.limit = -1 3103 3104 session.prepareQuery(&op) 3105 3106 expectFindReply := prepareFindOp(socket, &op, 1) 3107 3108 data, err := socket.SimpleQuery(&op) 3109 if err != nil { 3110 return err 3111 } 3112 if data == nil { 3113 return ErrNotFound 3114 } 3115 if expectFindReply { 3116 var findReply struct { 3117 Ok bool 3118 Code int 3119 Errmsg string 3120 Cursor cursorData 3121 } 3122 err = bson.Unmarshal(data, &findReply) 3123 if err != nil { 3124 return err 3125 } 3126 if !findReply.Ok && findReply.Errmsg != "" { 3127 return &QueryError{Code: findReply.Code, Message: findReply.Errmsg} 3128 } 3129 if len(findReply.Cursor.FirstBatch) == 0 { 3130 return ErrNotFound 3131 } 3132 data = findReply.Cursor.FirstBatch[0].Data 3133 } 3134 if result != nil { 3135 err = bson.Unmarshal(data, result) 3136 if err == nil { 3137 debugf("Query %p document unmarshaled: %#v", q, result) 3138 } else { 3139 debugf("Query %p document unmarshaling failed: %#v", q, err) 3140 return err 3141 } 3142 } 3143 return checkQueryError(op.collection, data) 3144} 3145 3146// prepareFindOp translates op from being an old-style wire protocol query into 3147// a new-style find command if that's supported by the MongoDB server (3.2+). 3148// It returns whether to expect a find command result or not. Note op may be 3149// translated into an explain command, in which case the function returns false. 3150func prepareFindOp(socket *mongoSocket, op *queryOp, limit int32) bool { 3151 if socket.ServerInfo().MaxWireVersion < 4 || op.collection == "admin.$cmd" { 3152 return false 3153 } 3154 3155 nameDot := strings.Index(op.collection, ".") 3156 if nameDot < 0 { 3157 panic("invalid query collection name: " + op.collection) 3158 } 3159 3160 find := findCmd{ 3161 Collection: op.collection[nameDot+1:], 3162 Filter: op.query, 3163 Projection: op.selector, 3164 Sort: op.options.OrderBy, 3165 Skip: op.skip, 3166 Limit: limit, 3167 MaxTimeMS: op.options.MaxTimeMS, 3168 MaxScan: op.options.MaxScan, 3169 Hint: op.options.Hint, 3170 Comment: op.options.Comment, 3171 Snapshot: op.options.Snapshot, 3172 OplogReplay: op.flags&flagLogReplay != 0, 3173 } 3174 if op.limit < 0 { 3175 find.BatchSize = -op.limit 3176 find.SingleBatch = true 3177 } else { 3178 find.BatchSize = op.limit 3179 } 3180 3181 explain := op.options.Explain 3182 3183 op.collection = op.collection[:nameDot] + ".$cmd" 3184 op.query = &find 3185 op.skip = 0 3186 op.limit = -1 3187 op.options = queryWrapper{} 3188 op.hasOptions = false 3189 3190 if explain { 3191 op.query = bson.D{{"explain", op.query}} 3192 return false 3193 } 3194 return true 3195} 3196 3197type cursorData struct { 3198 FirstBatch []bson.Raw "firstBatch" 3199 NextBatch []bson.Raw "nextBatch" 3200 NS string 3201 Id int64 3202} 3203 3204// findCmd holds the command used for performing queries on MongoDB 3.2+. 3205// 3206// Relevant documentation: 3207// 3208// https://docs.mongodb.org/master/reference/command/find/#dbcmd.find 3209// 3210type findCmd struct { 3211 Collection string `bson:"find"` 3212 Filter interface{} `bson:"filter,omitempty"` 3213 Sort interface{} `bson:"sort,omitempty"` 3214 Projection interface{} `bson:"projection,omitempty"` 3215 Hint interface{} `bson:"hint,omitempty"` 3216 Skip interface{} `bson:"skip,omitempty"` 3217 Limit int32 `bson:"limit,omitempty"` 3218 BatchSize int32 `bson:"batchSize,omitempty"` 3219 SingleBatch bool `bson:"singleBatch,omitempty"` 3220 Comment string `bson:"comment,omitempty"` 3221 MaxScan int `bson:"maxScan,omitempty"` 3222 MaxTimeMS int `bson:"maxTimeMS,omitempty"` 3223 ReadConcern interface{} `bson:"readConcern,omitempty"` 3224 Max interface{} `bson:"max,omitempty"` 3225 Min interface{} `bson:"min,omitempty"` 3226 ReturnKey bool `bson:"returnKey,omitempty"` 3227 ShowRecordId bool `bson:"showRecordId,omitempty"` 3228 Snapshot bool `bson:"snapshot,omitempty"` 3229 Tailable bool `bson:"tailable,omitempty"` 3230 AwaitData bool `bson:"awaitData,omitempty"` 3231 OplogReplay bool `bson:"oplogReplay,omitempty"` 3232 NoCursorTimeout bool `bson:"noCursorTimeout,omitempty"` 3233 AllowPartialResults bool `bson:"allowPartialResults,omitempty"` 3234} 3235 3236// getMoreCmd holds the command used for requesting more query results on MongoDB 3.2+. 3237// 3238// Relevant documentation: 3239// 3240// https://docs.mongodb.org/master/reference/command/getMore/#dbcmd.getMore 3241// 3242type getMoreCmd struct { 3243 CursorId int64 `bson:"getMore"` 3244 Collection string `bson:"collection"` 3245 BatchSize int32 `bson:"batchSize,omitempty"` 3246 MaxTimeMS int64 `bson:"maxTimeMS,omitempty"` 3247} 3248 3249// run duplicates the behavior of collection.Find(query).One(&result) 3250// as performed by Database.Run, specializing the logic for running 3251// database commands on a given socket. 3252func (db *Database) run(socket *mongoSocket, cmd, result interface{}) (err error) { 3253 // Database.Run: 3254 if name, ok := cmd.(string); ok { 3255 cmd = bson.D{{name, 1}} 3256 } 3257 3258 // Collection.Find: 3259 session := db.Session 3260 session.m.RLock() 3261 op := session.queryConfig.op // Copy. 3262 session.m.RUnlock() 3263 op.query = cmd 3264 op.collection = db.Name + ".$cmd" 3265 3266 // Query.One: 3267 session.prepareQuery(&op) 3268 op.limit = -1 3269 3270 data, err := socket.SimpleQuery(&op) 3271 if err != nil { 3272 return err 3273 } 3274 if data == nil { 3275 return ErrNotFound 3276 } 3277 if result != nil { 3278 err = bson.Unmarshal(data, result) 3279 if err != nil { 3280 debugf("Run command unmarshaling failed: %#v", op, err) 3281 return err 3282 } 3283 if globalDebug && globalLogger != nil { 3284 var res bson.M 3285 bson.Unmarshal(data, &res) 3286 debugf("Run command unmarshaled: %#v, result: %#v", op, res) 3287 } 3288 } 3289 return checkQueryError(op.collection, data) 3290} 3291 3292// The DBRef type implements support for the database reference MongoDB 3293// convention as supported by multiple drivers. This convention enables 3294// cross-referencing documents between collections and databases using 3295// a structure which includes a collection name, a document id, and 3296// optionally a database name. 3297// 3298// See the FindRef methods on Session and on Database. 3299// 3300// Relevant documentation: 3301// 3302// http://www.mongodb.org/display/DOCS/Database+References 3303// 3304type DBRef struct { 3305 Collection string `bson:"$ref"` 3306 Id interface{} `bson:"$id"` 3307 Database string `bson:"$db,omitempty"` 3308} 3309 3310// NOTE: Order of fields for DBRef above does matter, per documentation. 3311 3312// FindRef returns a query that looks for the document in the provided 3313// reference. If the reference includes the DB field, the document will 3314// be retrieved from the respective database. 3315// 3316// See also the DBRef type and the FindRef method on Session. 3317// 3318// Relevant documentation: 3319// 3320// http://www.mongodb.org/display/DOCS/Database+References 3321// 3322func (db *Database) FindRef(ref *DBRef) *Query { 3323 var c *Collection 3324 if ref.Database == "" { 3325 c = db.C(ref.Collection) 3326 } else { 3327 c = db.Session.DB(ref.Database).C(ref.Collection) 3328 } 3329 return c.FindId(ref.Id) 3330} 3331 3332// FindRef returns a query that looks for the document in the provided 3333// reference. For a DBRef to be resolved correctly at the session level 3334// it must necessarily have the optional DB field defined. 3335// 3336// See also the DBRef type and the FindRef method on Database. 3337// 3338// Relevant documentation: 3339// 3340// http://www.mongodb.org/display/DOCS/Database+References 3341// 3342func (s *Session) FindRef(ref *DBRef) *Query { 3343 if ref.Database == "" { 3344 panic(errors.New(fmt.Sprintf("Can't resolve database for %#v", ref))) 3345 } 3346 c := s.DB(ref.Database).C(ref.Collection) 3347 return c.FindId(ref.Id) 3348} 3349 3350// CollectionNames returns the collection names present in the db database. 3351func (db *Database) CollectionNames() (names []string, err error) { 3352 // Clone session and set it to Monotonic mode so that the server 3353 // used for the query may be safely obtained afterwards, if 3354 // necessary for iteration when a cursor is received. 3355 cloned := db.Session.nonEventual() 3356 defer cloned.Close() 3357 3358 batchSize := int(cloned.queryConfig.op.limit) 3359 3360 // Try with a command. 3361 var result struct { 3362 Collections []bson.Raw 3363 Cursor cursorData 3364 } 3365 err = db.With(cloned).Run(bson.D{{"listCollections", 1}, {"cursor", bson.D{{"batchSize", batchSize}}}}, &result) 3366 if err == nil { 3367 firstBatch := result.Collections 3368 if firstBatch == nil { 3369 firstBatch = result.Cursor.FirstBatch 3370 } 3371 var iter *Iter 3372 ns := strings.SplitN(result.Cursor.NS, ".", 2) 3373 if len(ns) < 2 { 3374 iter = db.With(cloned).C("").NewIter(nil, firstBatch, result.Cursor.Id, nil) 3375 } else { 3376 iter = cloned.DB(ns[0]).C(ns[1]).NewIter(nil, firstBatch, result.Cursor.Id, nil) 3377 } 3378 var coll struct{ Name string } 3379 for iter.Next(&coll) { 3380 names = append(names, coll.Name) 3381 } 3382 if err := iter.Close(); err != nil { 3383 return nil, err 3384 } 3385 sort.Strings(names) 3386 return names, err 3387 } 3388 if err != nil && !isNoCmd(err) { 3389 return nil, err 3390 } 3391 3392 // Command not yet supported. Query the database instead. 3393 nameIndex := len(db.Name) + 1 3394 iter := db.C("system.namespaces").Find(nil).Iter() 3395 var coll struct{ Name string } 3396 for iter.Next(&coll) { 3397 if strings.Index(coll.Name, "$") < 0 || strings.Index(coll.Name, ".oplog.$") >= 0 { 3398 names = append(names, coll.Name[nameIndex:]) 3399 } 3400 } 3401 if err := iter.Close(); err != nil { 3402 return nil, err 3403 } 3404 sort.Strings(names) 3405 return names, nil 3406} 3407 3408type dbNames struct { 3409 Databases []struct { 3410 Name string 3411 Empty bool 3412 } 3413} 3414 3415// DatabaseNames returns the names of non-empty databases present in the cluster. 3416func (s *Session) DatabaseNames() (names []string, err error) { 3417 var result dbNames 3418 err = s.Run("listDatabases", &result) 3419 if err != nil { 3420 return nil, err 3421 } 3422 for _, db := range result.Databases { 3423 if !db.Empty { 3424 names = append(names, db.Name) 3425 } 3426 } 3427 sort.Strings(names) 3428 return names, nil 3429} 3430 3431// Iter executes the query and returns an iterator capable of going over all 3432// the results. Results will be returned in batches of configurable 3433// size (see the Batch method) and more documents will be requested when a 3434// configurable number of documents is iterated over (see the Prefetch method). 3435func (q *Query) Iter() *Iter { 3436 q.m.Lock() 3437 session := q.session 3438 op := q.op 3439 prefetch := q.prefetch 3440 limit := q.limit 3441 q.m.Unlock() 3442 3443 iter := &Iter{ 3444 session: session, 3445 prefetch: prefetch, 3446 limit: limit, 3447 timeout: -1, 3448 } 3449 iter.gotReply.L = &iter.m 3450 iter.op.collection = op.collection 3451 iter.op.limit = op.limit 3452 iter.op.replyFunc = iter.replyFunc() 3453 iter.docsToReceive++ 3454 3455 socket, err := session.acquireSocket(true) 3456 if err != nil { 3457 iter.err = err 3458 return iter 3459 } 3460 defer socket.Release() 3461 3462 session.prepareQuery(&op) 3463 op.replyFunc = iter.op.replyFunc 3464 3465 if prepareFindOp(socket, &op, limit) { 3466 iter.findCmd = true 3467 } 3468 3469 iter.server = socket.Server() 3470 err = socket.Query(&op) 3471 if err != nil { 3472 // Must lock as the query is already out and it may call replyFunc. 3473 iter.m.Lock() 3474 iter.err = err 3475 iter.m.Unlock() 3476 } 3477 3478 return iter 3479} 3480 3481// Tail returns a tailable iterator. Unlike a normal iterator, a 3482// tailable iterator may wait for new values to be inserted in the 3483// collection once the end of the current result set is reached, 3484// A tailable iterator may only be used with capped collections. 3485// 3486// The timeout parameter indicates how long Next will block waiting 3487// for a result before timing out. If set to -1, Next will not 3488// timeout, and will continue waiting for a result for as long as 3489// the cursor is valid and the session is not closed. If set to 0, 3490// Next times out as soon as it reaches the end of the result set. 3491// Otherwise, Next will wait for at least the given number of 3492// seconds for a new document to be available before timing out. 3493// 3494// On timeouts, Next will unblock and return false, and the Timeout 3495// method will return true if called. In these cases, Next may still 3496// be called again on the same iterator to check if a new value is 3497// available at the current cursor position, and again it will block 3498// according to the specified timeoutSecs. If the cursor becomes 3499// invalid, though, both Next and Timeout will return false and 3500// the query must be restarted. 3501// 3502// The following example demonstrates timeout handling and query 3503// restarting: 3504// 3505// iter := collection.Find(nil).Sort("$natural").Tail(5 * time.Second) 3506// for { 3507// for iter.Next(&result) { 3508// fmt.Println(result.Id) 3509// lastId = result.Id 3510// } 3511// if iter.Err() != nil { 3512// return iter.Close() 3513// } 3514// if iter.Timeout() { 3515// continue 3516// } 3517// query := collection.Find(bson.M{"_id": bson.M{"$gt": lastId}}) 3518// iter = query.Sort("$natural").Tail(5 * time.Second) 3519// } 3520// iter.Close() 3521// 3522// Relevant documentation: 3523// 3524// http://www.mongodb.org/display/DOCS/Tailable+Cursors 3525// http://www.mongodb.org/display/DOCS/Capped+Collections 3526// http://www.mongodb.org/display/DOCS/Sorting+and+Natural+Order 3527// 3528func (q *Query) Tail(timeout time.Duration) *Iter { 3529 q.m.Lock() 3530 session := q.session 3531 op := q.op 3532 prefetch := q.prefetch 3533 q.m.Unlock() 3534 3535 iter := &Iter{session: session, prefetch: prefetch} 3536 iter.gotReply.L = &iter.m 3537 iter.timeout = timeout 3538 iter.op.collection = op.collection 3539 iter.op.limit = op.limit 3540 iter.op.replyFunc = iter.replyFunc() 3541 iter.docsToReceive++ 3542 session.prepareQuery(&op) 3543 op.replyFunc = iter.op.replyFunc 3544 op.flags |= flagTailable | flagAwaitData 3545 3546 socket, err := session.acquireSocket(true) 3547 if err != nil { 3548 iter.err = err 3549 } else { 3550 iter.server = socket.Server() 3551 err = socket.Query(&op) 3552 if err != nil { 3553 // Must lock as the query is already out and it may call replyFunc. 3554 iter.m.Lock() 3555 iter.err = err 3556 iter.m.Unlock() 3557 } 3558 socket.Release() 3559 } 3560 return iter 3561} 3562 3563func (s *Session) prepareQuery(op *queryOp) { 3564 s.m.RLock() 3565 op.mode = s.consistency 3566 if s.slaveOk { 3567 op.flags |= flagSlaveOk 3568 } 3569 s.m.RUnlock() 3570 return 3571} 3572 3573// Err returns nil if no errors happened during iteration, or the actual 3574// error otherwise. 3575// 3576// In case a resulting document included a field named $err or errmsg, which are 3577// standard ways for MongoDB to report an improper query, the returned value has 3578// a *QueryError type, and includes the Err message and the Code. 3579func (iter *Iter) Err() error { 3580 iter.m.Lock() 3581 err := iter.err 3582 iter.m.Unlock() 3583 if err == ErrNotFound { 3584 return nil 3585 } 3586 return err 3587} 3588 3589// Close kills the server cursor used by the iterator, if any, and returns 3590// nil if no errors happened during iteration, or the actual error otherwise. 3591// 3592// Server cursors are automatically closed at the end of an iteration, which 3593// means close will do nothing unless the iteration was interrupted before 3594// the server finished sending results to the driver. If Close is not called 3595// in such a situation, the cursor will remain available at the server until 3596// the default cursor timeout period is reached. No further problems arise. 3597// 3598// Close is idempotent. That means it can be called repeatedly and will 3599// return the same result every time. 3600// 3601// In case a resulting document included a field named $err or errmsg, which are 3602// standard ways for MongoDB to report an improper query, the returned value has 3603// a *QueryError type. 3604func (iter *Iter) Close() error { 3605 iter.m.Lock() 3606 cursorId := iter.op.cursorId 3607 iter.op.cursorId = 0 3608 err := iter.err 3609 iter.m.Unlock() 3610 if cursorId == 0 { 3611 if err == ErrNotFound { 3612 return nil 3613 } 3614 return err 3615 } 3616 socket, err := iter.acquireSocket() 3617 if err == nil { 3618 // TODO Batch kills. 3619 err = socket.Query(&killCursorsOp{[]int64{cursorId}}) 3620 socket.Release() 3621 } 3622 3623 iter.m.Lock() 3624 if err != nil && (iter.err == nil || iter.err == ErrNotFound) { 3625 iter.err = err 3626 } else if iter.err != ErrNotFound { 3627 err = iter.err 3628 } 3629 iter.m.Unlock() 3630 return err 3631} 3632 3633// Done returns true only if a follow up Next call is guaranteed 3634// to return false. 3635// 3636// For an iterator created with Tail, Done may return false for 3637// an iterator that has no more data. Otherwise it's guaranteed 3638// to return false only if there is data or an error happened. 3639// 3640// Done may block waiting for a pending query to verify whether 3641// more data is actually available or not. 3642func (iter *Iter) Done() bool { 3643 iter.m.Lock() 3644 defer iter.m.Unlock() 3645 3646 for { 3647 if iter.docData.Len() > 0 { 3648 return false 3649 } 3650 if iter.docsToReceive > 1 { 3651 return true 3652 } 3653 if iter.docsToReceive > 0 { 3654 iter.gotReply.Wait() 3655 continue 3656 } 3657 return iter.op.cursorId == 0 3658 } 3659} 3660 3661// Timeout returns true if Next returned false due to a timeout of 3662// a tailable cursor. In those cases, Next may be called again to continue 3663// the iteration at the previous cursor position. 3664func (iter *Iter) Timeout() bool { 3665 iter.m.Lock() 3666 result := iter.timedout 3667 iter.m.Unlock() 3668 return result 3669} 3670 3671// Next retrieves the next document from the result set, blocking if necessary. 3672// This method will also automatically retrieve another batch of documents from 3673// the server when the current one is exhausted, or before that in background 3674// if pre-fetching is enabled (see the Query.Prefetch and Session.SetPrefetch 3675// methods). 3676// 3677// Next returns true if a document was successfully unmarshalled onto result, 3678// and false at the end of the result set or if an error happened. 3679// When Next returns false, the Err method should be called to verify if 3680// there was an error during iteration. 3681// 3682// For example: 3683// 3684// iter := collection.Find(nil).Iter() 3685// for iter.Next(&result) { 3686// fmt.Printf("Result: %v\n", result.Id) 3687// } 3688// if err := iter.Close(); err != nil { 3689// return err 3690// } 3691// 3692func (iter *Iter) Next(result interface{}) bool { 3693 iter.m.Lock() 3694 iter.timedout = false 3695 timeout := time.Time{} 3696 for iter.err == nil && iter.docData.Len() == 0 && (iter.docsToReceive > 0 || iter.op.cursorId != 0) { 3697 if iter.docsToReceive == 0 { 3698 if iter.timeout >= 0 { 3699 if timeout.IsZero() { 3700 timeout = time.Now().Add(iter.timeout) 3701 } 3702 if time.Now().After(timeout) { 3703 iter.timedout = true 3704 iter.m.Unlock() 3705 return false 3706 } 3707 } 3708 iter.getMore() 3709 if iter.err != nil { 3710 break 3711 } 3712 } 3713 iter.gotReply.Wait() 3714 } 3715 3716 // Exhaust available data before reporting any errors. 3717 if docData, ok := iter.docData.Pop().([]byte); ok { 3718 close := false 3719 if iter.limit > 0 { 3720 iter.limit-- 3721 if iter.limit == 0 { 3722 if iter.docData.Len() > 0 { 3723 iter.m.Unlock() 3724 panic(fmt.Errorf("data remains after limit exhausted: %d", iter.docData.Len())) 3725 } 3726 iter.err = ErrNotFound 3727 close = true 3728 } 3729 } 3730 if iter.op.cursorId != 0 && iter.err == nil { 3731 iter.docsBeforeMore-- 3732 if iter.docsBeforeMore == -1 { 3733 iter.getMore() 3734 } 3735 } 3736 iter.m.Unlock() 3737 3738 if close { 3739 iter.Close() 3740 } 3741 err := bson.Unmarshal(docData, result) 3742 if err != nil { 3743 debugf("Iter %p document unmarshaling failed: %#v", iter, err) 3744 iter.m.Lock() 3745 if iter.err == nil { 3746 iter.err = err 3747 } 3748 iter.m.Unlock() 3749 return false 3750 } 3751 debugf("Iter %p document unmarshaled: %#v", iter, result) 3752 // XXX Only have to check first document for a query error? 3753 err = checkQueryError(iter.op.collection, docData) 3754 if err != nil { 3755 iter.m.Lock() 3756 if iter.err == nil { 3757 iter.err = err 3758 } 3759 iter.m.Unlock() 3760 return false 3761 } 3762 return true 3763 } else if iter.err != nil { 3764 debugf("Iter %p returning false: %s", iter, iter.err) 3765 iter.m.Unlock() 3766 return false 3767 } else if iter.op.cursorId == 0 { 3768 iter.err = ErrNotFound 3769 debugf("Iter %p exhausted with cursor=0", iter) 3770 iter.m.Unlock() 3771 return false 3772 } 3773 3774 panic("unreachable") 3775} 3776 3777// All retrieves all documents from the result set into the provided slice 3778// and closes the iterator. 3779// 3780// The result argument must necessarily be the address for a slice. The slice 3781// may be nil or previously allocated. 3782// 3783// WARNING: Obviously, All must not be used with result sets that may be 3784// potentially large, since it may consume all memory until the system 3785// crashes. Consider building the query with a Limit clause to ensure the 3786// result size is bounded. 3787// 3788// For instance: 3789// 3790// var result []struct{ Value int } 3791// iter := collection.Find(nil).Limit(100).Iter() 3792// err := iter.All(&result) 3793// if err != nil { 3794// return err 3795// } 3796// 3797func (iter *Iter) All(result interface{}) error { 3798 resultv := reflect.ValueOf(result) 3799 if resultv.Kind() != reflect.Ptr || resultv.Elem().Kind() != reflect.Slice { 3800 panic("result argument must be a slice address") 3801 } 3802 slicev := resultv.Elem() 3803 slicev = slicev.Slice(0, slicev.Cap()) 3804 elemt := slicev.Type().Elem() 3805 i := 0 3806 for { 3807 if slicev.Len() == i { 3808 elemp := reflect.New(elemt) 3809 if !iter.Next(elemp.Interface()) { 3810 break 3811 } 3812 slicev = reflect.Append(slicev, elemp.Elem()) 3813 slicev = slicev.Slice(0, slicev.Cap()) 3814 } else { 3815 if !iter.Next(slicev.Index(i).Addr().Interface()) { 3816 break 3817 } 3818 } 3819 i++ 3820 } 3821 resultv.Elem().Set(slicev.Slice(0, i)) 3822 return iter.Close() 3823} 3824 3825// All works like Iter.All. 3826func (q *Query) All(result interface{}) error { 3827 return q.Iter().All(result) 3828} 3829 3830// The For method is obsolete and will be removed in a future release. 3831// See Iter as an elegant replacement. 3832func (q *Query) For(result interface{}, f func() error) error { 3833 return q.Iter().For(result, f) 3834} 3835 3836// The For method is obsolete and will be removed in a future release. 3837// See Iter as an elegant replacement. 3838func (iter *Iter) For(result interface{}, f func() error) (err error) { 3839 valid := false 3840 v := reflect.ValueOf(result) 3841 if v.Kind() == reflect.Ptr { 3842 v = v.Elem() 3843 switch v.Kind() { 3844 case reflect.Map, reflect.Ptr, reflect.Interface, reflect.Slice: 3845 valid = v.IsNil() 3846 } 3847 } 3848 if !valid { 3849 panic("For needs a pointer to nil reference value. See the documentation.") 3850 } 3851 zero := reflect.Zero(v.Type()) 3852 for { 3853 v.Set(zero) 3854 if !iter.Next(result) { 3855 break 3856 } 3857 err = f() 3858 if err != nil { 3859 return err 3860 } 3861 } 3862 return iter.Err() 3863} 3864 3865// acquireSocket acquires a socket from the same server that the iterator 3866// cursor was obtained from. 3867// 3868// WARNING: This method must not be called with iter.m locked. Acquiring the 3869// socket depends on the cluster sync loop, and the cluster sync loop might 3870// attempt actions which cause replyFunc to be called, inducing a deadlock. 3871func (iter *Iter) acquireSocket() (*mongoSocket, error) { 3872 socket, err := iter.session.acquireSocket(true) 3873 if err != nil { 3874 return nil, err 3875 } 3876 if socket.Server() != iter.server { 3877 // Socket server changed during iteration. This may happen 3878 // with Eventual sessions, if a Refresh is done, or if a 3879 // monotonic session gets a write and shifts from secondary 3880 // to primary. Our cursor is in a specific server, though. 3881 iter.session.m.Lock() 3882 sockTimeout := iter.session.sockTimeout 3883 iter.session.m.Unlock() 3884 socket.Release() 3885 socket, _, err = iter.server.AcquireSocket(0, sockTimeout) 3886 if err != nil { 3887 return nil, err 3888 } 3889 err := iter.session.socketLogin(socket) 3890 if err != nil { 3891 socket.Release() 3892 return nil, err 3893 } 3894 } 3895 return socket, nil 3896} 3897 3898func (iter *Iter) getMore() { 3899 // Increment now so that unlocking the iterator won't cause a 3900 // different goroutine to get here as well. 3901 iter.docsToReceive++ 3902 iter.m.Unlock() 3903 socket, err := iter.acquireSocket() 3904 iter.m.Lock() 3905 if err != nil { 3906 iter.err = err 3907 return 3908 } 3909 defer socket.Release() 3910 3911 debugf("Iter %p requesting more documents", iter) 3912 if iter.limit > 0 { 3913 // The -1 below accounts for the fact docsToReceive was incremented above. 3914 limit := iter.limit - int32(iter.docsToReceive-1) - int32(iter.docData.Len()) 3915 if limit < iter.op.limit { 3916 iter.op.limit = limit 3917 } 3918 } 3919 var op interface{} 3920 if iter.findCmd { 3921 op = iter.getMoreCmd() 3922 } else { 3923 op = &iter.op 3924 } 3925 if err := socket.Query(op); err != nil { 3926 iter.docsToReceive-- 3927 iter.err = err 3928 } 3929} 3930 3931func (iter *Iter) getMoreCmd() *queryOp { 3932 // TODO: Define the query statically in the Iter type, next to getMoreOp. 3933 nameDot := strings.Index(iter.op.collection, ".") 3934 if nameDot < 0 { 3935 panic("invalid query collection name: " + iter.op.collection) 3936 } 3937 3938 getMore := getMoreCmd{ 3939 CursorId: iter.op.cursorId, 3940 Collection: iter.op.collection[nameDot+1:], 3941 BatchSize: iter.op.limit, 3942 } 3943 3944 var op queryOp 3945 op.collection = iter.op.collection[:nameDot] + ".$cmd" 3946 op.query = &getMore 3947 op.limit = -1 3948 op.replyFunc = iter.op.replyFunc 3949 return &op 3950} 3951 3952type countCmd struct { 3953 Count string 3954 Query interface{} 3955 Limit int32 ",omitempty" 3956 Skip int32 ",omitempty" 3957} 3958 3959// Count returns the total number of documents in the result set. 3960func (q *Query) Count() (n int, err error) { 3961 q.m.Lock() 3962 session := q.session 3963 op := q.op 3964 limit := q.limit 3965 q.m.Unlock() 3966 3967 c := strings.Index(op.collection, ".") 3968 if c < 0 { 3969 return 0, errors.New("Bad collection name: " + op.collection) 3970 } 3971 3972 dbname := op.collection[:c] 3973 cname := op.collection[c+1:] 3974 query := op.query 3975 if query == nil { 3976 query = bson.D{} 3977 } 3978 result := struct{ N int }{} 3979 err = session.DB(dbname).Run(countCmd{cname, query, limit, op.skip}, &result) 3980 return result.N, err 3981} 3982 3983// Count returns the total number of documents in the collection. 3984func (c *Collection) Count() (n int, err error) { 3985 return c.Find(nil).Count() 3986} 3987 3988type distinctCmd struct { 3989 Collection string "distinct" 3990 Key string 3991 Query interface{} ",omitempty" 3992} 3993 3994// Distinct unmarshals into result the list of distinct values for the given key. 3995// 3996// For example: 3997// 3998// var result []int 3999// err := collection.Find(bson.M{"gender": "F"}).Distinct("age", &result) 4000// 4001// Relevant documentation: 4002// 4003// http://www.mongodb.org/display/DOCS/Aggregation 4004// 4005func (q *Query) Distinct(key string, result interface{}) error { 4006 q.m.Lock() 4007 session := q.session 4008 op := q.op // Copy. 4009 q.m.Unlock() 4010 4011 c := strings.Index(op.collection, ".") 4012 if c < 0 { 4013 return errors.New("Bad collection name: " + op.collection) 4014 } 4015 4016 dbname := op.collection[:c] 4017 cname := op.collection[c+1:] 4018 4019 var doc struct{ Values bson.Raw } 4020 err := session.DB(dbname).Run(distinctCmd{cname, key, op.query}, &doc) 4021 if err != nil { 4022 return err 4023 } 4024 return doc.Values.Unmarshal(result) 4025} 4026 4027type mapReduceCmd struct { 4028 Collection string "mapreduce" 4029 Map string ",omitempty" 4030 Reduce string ",omitempty" 4031 Finalize string ",omitempty" 4032 Limit int32 ",omitempty" 4033 Out interface{} 4034 Query interface{} ",omitempty" 4035 Sort interface{} ",omitempty" 4036 Scope interface{} ",omitempty" 4037 Verbose bool ",omitempty" 4038} 4039 4040type mapReduceResult struct { 4041 Results bson.Raw 4042 Result bson.Raw 4043 TimeMillis int64 "timeMillis" 4044 Counts struct{ Input, Emit, Output int } 4045 Ok bool 4046 Err string 4047 Timing *MapReduceTime 4048} 4049 4050type MapReduce struct { 4051 Map string // Map Javascript function code (required) 4052 Reduce string // Reduce Javascript function code (required) 4053 Finalize string // Finalize Javascript function code (optional) 4054 Out interface{} // Output collection name or document. If nil, results are inlined into the result parameter. 4055 Scope interface{} // Optional global scope for Javascript functions 4056 Verbose bool 4057} 4058 4059type MapReduceInfo struct { 4060 InputCount int // Number of documents mapped 4061 EmitCount int // Number of times reduce called emit 4062 OutputCount int // Number of documents in resulting collection 4063 Database string // Output database, if results are not inlined 4064 Collection string // Output collection, if results are not inlined 4065 Time int64 // Time to run the job, in nanoseconds 4066 VerboseTime *MapReduceTime // Only defined if Verbose was true 4067} 4068 4069type MapReduceTime struct { 4070 Total int64 // Total time, in nanoseconds 4071 Map int64 "mapTime" // Time within map function, in nanoseconds 4072 EmitLoop int64 "emitLoop" // Time within the emit/map loop, in nanoseconds 4073} 4074 4075// MapReduce executes a map/reduce job for documents covered by the query. 4076// That kind of job is suitable for very flexible bulk aggregation of data 4077// performed at the server side via Javascript functions. 4078// 4079// Results from the job may be returned as a result of the query itself 4080// through the result parameter in case they'll certainly fit in memory 4081// and in a single document. If there's the possibility that the amount 4082// of data might be too large, results must be stored back in an alternative 4083// collection or even a separate database, by setting the Out field of the 4084// provided MapReduce job. In that case, provide nil as the result parameter. 4085// 4086// These are some of the ways to set Out: 4087// 4088// nil 4089// Inline results into the result parameter. 4090// 4091// bson.M{"replace": "mycollection"} 4092// The output will be inserted into a collection which replaces any 4093// existing collection with the same name. 4094// 4095// bson.M{"merge": "mycollection"} 4096// This option will merge new data into the old output collection. In 4097// other words, if the same key exists in both the result set and the 4098// old collection, the new key will overwrite the old one. 4099// 4100// bson.M{"reduce": "mycollection"} 4101// If documents exist for a given key in the result set and in the old 4102// collection, then a reduce operation (using the specified reduce 4103// function) will be performed on the two values and the result will be 4104// written to the output collection. If a finalize function was 4105// provided, this will be run after the reduce as well. 4106// 4107// bson.M{...., "db": "mydb"} 4108// Any of the above options can have the "db" key included for doing 4109// the respective action in a separate database. 4110// 4111// The following is a trivial example which will count the number of 4112// occurrences of a field named n on each document in a collection, and 4113// will return results inline: 4114// 4115// job := &mgo.MapReduce{ 4116// Map: "function() { emit(this.n, 1) }", 4117// Reduce: "function(key, values) { return Array.sum(values) }", 4118// } 4119// var result []struct { Id int "_id"; Value int } 4120// _, err := collection.Find(nil).MapReduce(job, &result) 4121// if err != nil { 4122// return err 4123// } 4124// for _, item := range result { 4125// fmt.Println(item.Value) 4126// } 4127// 4128// This function is compatible with MongoDB 1.7.4+. 4129// 4130// Relevant documentation: 4131// 4132// http://www.mongodb.org/display/DOCS/MapReduce 4133// 4134func (q *Query) MapReduce(job *MapReduce, result interface{}) (info *MapReduceInfo, err error) { 4135 q.m.Lock() 4136 session := q.session 4137 op := q.op // Copy. 4138 limit := q.limit 4139 q.m.Unlock() 4140 4141 c := strings.Index(op.collection, ".") 4142 if c < 0 { 4143 return nil, errors.New("Bad collection name: " + op.collection) 4144 } 4145 4146 dbname := op.collection[:c] 4147 cname := op.collection[c+1:] 4148 4149 cmd := mapReduceCmd{ 4150 Collection: cname, 4151 Map: job.Map, 4152 Reduce: job.Reduce, 4153 Finalize: job.Finalize, 4154 Out: fixMROut(job.Out), 4155 Scope: job.Scope, 4156 Verbose: job.Verbose, 4157 Query: op.query, 4158 Sort: op.options.OrderBy, 4159 Limit: limit, 4160 } 4161 4162 if cmd.Out == nil { 4163 cmd.Out = bson.D{{"inline", 1}} 4164 } 4165 4166 var doc mapReduceResult 4167 err = session.DB(dbname).Run(&cmd, &doc) 4168 if err != nil { 4169 return nil, err 4170 } 4171 if doc.Err != "" { 4172 return nil, errors.New(doc.Err) 4173 } 4174 4175 info = &MapReduceInfo{ 4176 InputCount: doc.Counts.Input, 4177 EmitCount: doc.Counts.Emit, 4178 OutputCount: doc.Counts.Output, 4179 Time: doc.TimeMillis * 1e6, 4180 } 4181 4182 if doc.Result.Kind == 0x02 { 4183 err = doc.Result.Unmarshal(&info.Collection) 4184 info.Database = dbname 4185 } else if doc.Result.Kind == 0x03 { 4186 var v struct{ Collection, Db string } 4187 err = doc.Result.Unmarshal(&v) 4188 info.Collection = v.Collection 4189 info.Database = v.Db 4190 } 4191 4192 if doc.Timing != nil { 4193 info.VerboseTime = doc.Timing 4194 info.VerboseTime.Total *= 1e6 4195 info.VerboseTime.Map *= 1e6 4196 info.VerboseTime.EmitLoop *= 1e6 4197 } 4198 4199 if err != nil { 4200 return nil, err 4201 } 4202 if result != nil { 4203 return info, doc.Results.Unmarshal(result) 4204 } 4205 return info, nil 4206} 4207 4208// The "out" option in the MapReduce command must be ordered. This was 4209// found after the implementation was accepting maps for a long time, 4210// so rather than breaking the API, we'll fix the order if necessary. 4211// Details about the order requirement may be seen in MongoDB's code: 4212// 4213// http://goo.gl/L8jwJX 4214// 4215func fixMROut(out interface{}) interface{} { 4216 outv := reflect.ValueOf(out) 4217 if outv.Kind() != reflect.Map || outv.Type().Key() != reflect.TypeOf("") { 4218 return out 4219 } 4220 outs := make(bson.D, outv.Len()) 4221 4222 outTypeIndex := -1 4223 for i, k := range outv.MapKeys() { 4224 ks := k.String() 4225 outs[i].Name = ks 4226 outs[i].Value = outv.MapIndex(k).Interface() 4227 switch ks { 4228 case "normal", "replace", "merge", "reduce", "inline": 4229 outTypeIndex = i 4230 } 4231 } 4232 if outTypeIndex > 0 { 4233 outs[0], outs[outTypeIndex] = outs[outTypeIndex], outs[0] 4234 } 4235 return outs 4236} 4237 4238// Change holds fields for running a findAndModify MongoDB command via 4239// the Query.Apply method. 4240type Change struct { 4241 Update interface{} // The update document 4242 Upsert bool // Whether to insert in case the document isn't found 4243 Remove bool // Whether to remove the document found rather than updating 4244 ReturnNew bool // Should the modified document be returned rather than the old one 4245} 4246 4247type findModifyCmd struct { 4248 Collection string "findAndModify" 4249 Query, Update, Sort, Fields interface{} ",omitempty" 4250 Upsert, Remove, New bool ",omitempty" 4251} 4252 4253type valueResult struct { 4254 Value bson.Raw 4255 LastError LastError "lastErrorObject" 4256} 4257 4258// Apply runs the findAndModify MongoDB command, which allows updating, upserting 4259// or removing a document matching a query and atomically returning either the old 4260// version (the default) or the new version of the document (when ReturnNew is true). 4261// If no objects are found Apply returns ErrNotFound. 4262// 4263// The Sort and Select query methods affect the result of Apply. In case 4264// multiple documents match the query, Sort enables selecting which document to 4265// act upon by ordering it first. Select enables retrieving only a selection 4266// of fields of the new or old document. 4267// 4268// This simple example increments a counter and prints its new value: 4269// 4270// change := mgo.Change{ 4271// Update: bson.M{"$inc": bson.M{"n": 1}}, 4272// ReturnNew: true, 4273// } 4274// info, err = col.Find(M{"_id": id}).Apply(change, &doc) 4275// fmt.Println(doc.N) 4276// 4277// This method depends on MongoDB >= 2.0 to work properly. 4278// 4279// Relevant documentation: 4280// 4281// http://www.mongodb.org/display/DOCS/findAndModify+Command 4282// http://www.mongodb.org/display/DOCS/Updating 4283// http://www.mongodb.org/display/DOCS/Atomic+Operations 4284// 4285func (q *Query) Apply(change Change, result interface{}) (info *ChangeInfo, err error) { 4286 q.m.Lock() 4287 session := q.session 4288 op := q.op // Copy. 4289 q.m.Unlock() 4290 4291 c := strings.Index(op.collection, ".") 4292 if c < 0 { 4293 return nil, errors.New("bad collection name: " + op.collection) 4294 } 4295 4296 dbname := op.collection[:c] 4297 cname := op.collection[c+1:] 4298 4299 cmd := findModifyCmd{ 4300 Collection: cname, 4301 Update: change.Update, 4302 Upsert: change.Upsert, 4303 Remove: change.Remove, 4304 New: change.ReturnNew, 4305 Query: op.query, 4306 Sort: op.options.OrderBy, 4307 Fields: op.selector, 4308 } 4309 4310 session = session.Clone() 4311 defer session.Close() 4312 session.SetMode(Strong, false) 4313 4314 var doc valueResult 4315 for i := 0; i < maxUpsertRetries; i++ { 4316 err = session.DB(dbname).Run(&cmd, &doc) 4317 if err == nil { 4318 break 4319 } 4320 if change.Upsert && IsDup(err) && i+1 < maxUpsertRetries { 4321 // Retry duplicate key errors on upserts. 4322 // https://docs.mongodb.com/v3.2/reference/method/db.collection.update/#use-unique-indexes 4323 continue 4324 } 4325 if qerr, ok := err.(*QueryError); ok && qerr.Message == "No matching object found" { 4326 return nil, ErrNotFound 4327 } 4328 return nil, err 4329 } 4330 if doc.LastError.N == 0 { 4331 return nil, ErrNotFound 4332 } 4333 if doc.Value.Kind != 0x0A && result != nil { 4334 err = doc.Value.Unmarshal(result) 4335 if err != nil { 4336 return nil, err 4337 } 4338 } 4339 info = &ChangeInfo{} 4340 lerr := &doc.LastError 4341 if lerr.UpdatedExisting { 4342 info.Updated = lerr.N 4343 info.Matched = lerr.N 4344 } else if change.Remove { 4345 info.Removed = lerr.N 4346 info.Matched = lerr.N 4347 } else if change.Upsert { 4348 info.UpsertedId = lerr.UpsertedId 4349 } 4350 return info, nil 4351} 4352 4353// The BuildInfo type encapsulates details about the running MongoDB server. 4354// 4355// Note that the VersionArray field was introduced in MongoDB 2.0+, but it is 4356// internally assembled from the Version information for previous versions. 4357// In both cases, VersionArray is guaranteed to have at least 4 entries. 4358type BuildInfo struct { 4359 Version string 4360 VersionArray []int `bson:"versionArray"` // On MongoDB 2.0+; assembled from Version otherwise 4361 GitVersion string `bson:"gitVersion"` 4362 OpenSSLVersion string `bson:"OpenSSLVersion"` 4363 SysInfo string `bson:"sysInfo"` // Deprecated and empty on MongoDB 3.2+. 4364 Bits int 4365 Debug bool 4366 MaxObjectSize int `bson:"maxBsonObjectSize"` 4367} 4368 4369// VersionAtLeast returns whether the BuildInfo version is greater than or 4370// equal to the provided version number. If more than one number is 4371// provided, numbers will be considered as major, minor, and so on. 4372func (bi *BuildInfo) VersionAtLeast(version ...int) bool { 4373 for i, vi := range version { 4374 if i == len(bi.VersionArray) { 4375 return false 4376 } 4377 if bivi := bi.VersionArray[i]; bivi != vi { 4378 return bivi >= vi 4379 } 4380 } 4381 return true 4382} 4383 4384// BuildInfo retrieves the version and other details about the 4385// running MongoDB server. 4386func (s *Session) BuildInfo() (info BuildInfo, err error) { 4387 err = s.Run(bson.D{{"buildInfo", "1"}}, &info) 4388 if len(info.VersionArray) == 0 { 4389 for _, a := range strings.Split(info.Version, ".") { 4390 i, err := strconv.Atoi(a) 4391 if err != nil { 4392 break 4393 } 4394 info.VersionArray = append(info.VersionArray, i) 4395 } 4396 } 4397 for len(info.VersionArray) < 4 { 4398 info.VersionArray = append(info.VersionArray, 0) 4399 } 4400 if i := strings.IndexByte(info.GitVersion, ' '); i >= 0 { 4401 // Strip off the " modules: enterprise" suffix. This is a _git version_. 4402 // That information may be moved to another field if people need it. 4403 info.GitVersion = info.GitVersion[:i] 4404 } 4405 if info.SysInfo == "deprecated" { 4406 info.SysInfo = "" 4407 } 4408 return 4409} 4410 4411// --------------------------------------------------------------------------- 4412// Internal session handling helpers. 4413 4414func (s *Session) acquireSocket(slaveOk bool) (*mongoSocket, error) { 4415 4416 // Read-only lock to check for previously reserved socket. 4417 s.m.RLock() 4418 // If there is a slave socket reserved and its use is acceptable, take it as long 4419 // as there isn't a master socket which would be preferred by the read preference mode. 4420 if s.slaveSocket != nil && s.slaveOk && slaveOk && (s.masterSocket == nil || s.consistency != PrimaryPreferred && s.consistency != Monotonic) { 4421 socket := s.slaveSocket 4422 socket.Acquire() 4423 s.m.RUnlock() 4424 return socket, nil 4425 } 4426 if s.masterSocket != nil { 4427 socket := s.masterSocket 4428 socket.Acquire() 4429 s.m.RUnlock() 4430 return socket, nil 4431 } 4432 s.m.RUnlock() 4433 4434 // No go. We may have to request a new socket and change the session, 4435 // so try again but with an exclusive lock now. 4436 s.m.Lock() 4437 defer s.m.Unlock() 4438 4439 if s.slaveSocket != nil && s.slaveOk && slaveOk && (s.masterSocket == nil || s.consistency != PrimaryPreferred && s.consistency != Monotonic) { 4440 s.slaveSocket.Acquire() 4441 return s.slaveSocket, nil 4442 } 4443 if s.masterSocket != nil { 4444 s.masterSocket.Acquire() 4445 return s.masterSocket, nil 4446 } 4447 4448 // Still not good. We need a new socket. 4449 sock, err := s.cluster().AcquireSocket(s.consistency, slaveOk && s.slaveOk, s.syncTimeout, s.sockTimeout, s.queryConfig.op.serverTags, s.poolLimit) 4450 if err != nil { 4451 return nil, err 4452 } 4453 4454 // Authenticate the new socket. 4455 if err = s.socketLogin(sock); err != nil { 4456 sock.Release() 4457 return nil, err 4458 } 4459 4460 // Keep track of the new socket, if necessary. 4461 // Note that, as a special case, if the Eventual session was 4462 // not refreshed (s.slaveSocket != nil), it means the developer 4463 // asked to preserve an existing reserved socket, so we'll 4464 // keep a master one around too before a Refresh happens. 4465 if s.consistency != Eventual || s.slaveSocket != nil { 4466 s.setSocket(sock) 4467 } 4468 4469 // Switch over a Monotonic session to the master. 4470 if !slaveOk && s.consistency == Monotonic { 4471 s.slaveOk = false 4472 } 4473 4474 return sock, nil 4475} 4476 4477// setSocket binds socket to this section. 4478func (s *Session) setSocket(socket *mongoSocket) { 4479 info := socket.Acquire() 4480 if info.Master { 4481 if s.masterSocket != nil { 4482 panic("setSocket(master) with existing master socket reserved") 4483 } 4484 s.masterSocket = socket 4485 } else { 4486 if s.slaveSocket != nil { 4487 panic("setSocket(slave) with existing slave socket reserved") 4488 } 4489 s.slaveSocket = socket 4490 } 4491} 4492 4493// unsetSocket releases any slave and/or master sockets reserved. 4494func (s *Session) unsetSocket() { 4495 if s.masterSocket != nil { 4496 s.masterSocket.Release() 4497 } 4498 if s.slaveSocket != nil { 4499 s.slaveSocket.Release() 4500 } 4501 s.masterSocket = nil 4502 s.slaveSocket = nil 4503} 4504 4505func (iter *Iter) replyFunc() replyFunc { 4506 return func(err error, op *replyOp, docNum int, docData []byte) { 4507 iter.m.Lock() 4508 iter.docsToReceive-- 4509 if err != nil { 4510 iter.err = err 4511 debugf("Iter %p received an error: %s", iter, err.Error()) 4512 } else if docNum == -1 { 4513 debugf("Iter %p received no documents (cursor=%d).", iter, op.cursorId) 4514 if op != nil && op.cursorId != 0 { 4515 // It's a tailable cursor. 4516 iter.op.cursorId = op.cursorId 4517 } else if op != nil && op.cursorId == 0 && op.flags&1 == 1 { 4518 // Cursor likely timed out. 4519 iter.err = ErrCursor 4520 } else { 4521 iter.err = ErrNotFound 4522 } 4523 } else if iter.findCmd { 4524 debugf("Iter %p received reply document %d/%d (cursor=%d)", iter, docNum+1, int(op.replyDocs), op.cursorId) 4525 var findReply struct { 4526 Ok bool 4527 Code int 4528 Errmsg string 4529 Cursor cursorData 4530 } 4531 if err := bson.Unmarshal(docData, &findReply); err != nil { 4532 iter.err = err 4533 } else if !findReply.Ok && findReply.Errmsg != "" { 4534 iter.err = &QueryError{Code: findReply.Code, Message: findReply.Errmsg} 4535 } else if len(findReply.Cursor.FirstBatch) == 0 && len(findReply.Cursor.NextBatch) == 0 { 4536 iter.err = ErrNotFound 4537 } else { 4538 batch := findReply.Cursor.FirstBatch 4539 if len(batch) == 0 { 4540 batch = findReply.Cursor.NextBatch 4541 } 4542 rdocs := len(batch) 4543 for _, raw := range batch { 4544 iter.docData.Push(raw.Data) 4545 } 4546 iter.docsToReceive = 0 4547 docsToProcess := iter.docData.Len() 4548 if iter.limit == 0 || int32(docsToProcess) < iter.limit { 4549 iter.docsBeforeMore = docsToProcess - int(iter.prefetch*float64(rdocs)) 4550 } else { 4551 iter.docsBeforeMore = -1 4552 } 4553 iter.op.cursorId = findReply.Cursor.Id 4554 } 4555 } else { 4556 rdocs := int(op.replyDocs) 4557 if docNum == 0 { 4558 iter.docsToReceive += rdocs - 1 4559 docsToProcess := iter.docData.Len() + rdocs 4560 if iter.limit == 0 || int32(docsToProcess) < iter.limit { 4561 iter.docsBeforeMore = docsToProcess - int(iter.prefetch*float64(rdocs)) 4562 } else { 4563 iter.docsBeforeMore = -1 4564 } 4565 iter.op.cursorId = op.cursorId 4566 } 4567 debugf("Iter %p received reply document %d/%d (cursor=%d)", iter, docNum+1, rdocs, op.cursorId) 4568 iter.docData.Push(docData) 4569 } 4570 iter.gotReply.Broadcast() 4571 iter.m.Unlock() 4572 } 4573} 4574 4575type writeCmdResult struct { 4576 Ok bool 4577 N int 4578 NModified int `bson:"nModified"` 4579 Upserted []struct { 4580 Index int 4581 Id interface{} `_id` 4582 } 4583 ConcernError writeConcernError `bson:"writeConcernError"` 4584 Errors []writeCmdError `bson:"writeErrors"` 4585} 4586 4587type writeConcernError struct { 4588 Code int 4589 ErrMsg string 4590} 4591 4592type writeCmdError struct { 4593 Index int 4594 Code int 4595 ErrMsg string 4596} 4597 4598func (r *writeCmdResult) BulkErrorCases() []BulkErrorCase { 4599 ecases := make([]BulkErrorCase, len(r.Errors)) 4600 for i, err := range r.Errors { 4601 ecases[i] = BulkErrorCase{err.Index, &QueryError{Code: err.Code, Message: err.ErrMsg}} 4602 } 4603 return ecases 4604} 4605 4606// writeOp runs the given modifying operation, potentially followed up 4607// by a getLastError command in case the session is in safe mode. The 4608// LastError result is made available in lerr, and if lerr.Err is set it 4609// will also be returned as err. 4610func (c *Collection) writeOp(op interface{}, ordered bool) (lerr *LastError, err error) { 4611 s := c.Database.Session 4612 socket, err := s.acquireSocket(c.Database.Name == "local") 4613 if err != nil { 4614 return nil, err 4615 } 4616 defer socket.Release() 4617 4618 s.m.RLock() 4619 safeOp := s.safeOp 4620 bypassValidation := s.bypassValidation 4621 s.m.RUnlock() 4622 4623 if socket.ServerInfo().MaxWireVersion >= 2 { 4624 // Servers with a more recent write protocol benefit from write commands. 4625 if op, ok := op.(*insertOp); ok && len(op.documents) > 1000 { 4626 var lerr LastError 4627 4628 // Maximum batch size is 1000. Must split out in separate operations for compatibility. 4629 all := op.documents 4630 for i := 0; i < len(all); i += 1000 { 4631 l := i + 1000 4632 if l > len(all) { 4633 l = len(all) 4634 } 4635 op.documents = all[i:l] 4636 oplerr, err := c.writeOpCommand(socket, safeOp, op, ordered, bypassValidation) 4637 lerr.N += oplerr.N 4638 lerr.modified += oplerr.modified 4639 if err != nil { 4640 for ei := range oplerr.ecases { 4641 oplerr.ecases[ei].Index += i 4642 } 4643 lerr.ecases = append(lerr.ecases, oplerr.ecases...) 4644 if op.flags&1 == 0 { 4645 return &lerr, err 4646 } 4647 } 4648 } 4649 if len(lerr.ecases) != 0 { 4650 return &lerr, lerr.ecases[0].Err 4651 } 4652 return &lerr, nil 4653 } 4654 return c.writeOpCommand(socket, safeOp, op, ordered, bypassValidation) 4655 } else if updateOps, ok := op.(bulkUpdateOp); ok { 4656 var lerr LastError 4657 for i, updateOp := range updateOps { 4658 oplerr, err := c.writeOpQuery(socket, safeOp, updateOp, ordered) 4659 lerr.N += oplerr.N 4660 lerr.modified += oplerr.modified 4661 if err != nil { 4662 lerr.ecases = append(lerr.ecases, BulkErrorCase{i, err}) 4663 if ordered { 4664 break 4665 } 4666 } 4667 } 4668 if len(lerr.ecases) != 0 { 4669 return &lerr, lerr.ecases[0].Err 4670 } 4671 return &lerr, nil 4672 } else if deleteOps, ok := op.(bulkDeleteOp); ok { 4673 var lerr LastError 4674 for i, deleteOp := range deleteOps { 4675 oplerr, err := c.writeOpQuery(socket, safeOp, deleteOp, ordered) 4676 lerr.N += oplerr.N 4677 lerr.modified += oplerr.modified 4678 if err != nil { 4679 lerr.ecases = append(lerr.ecases, BulkErrorCase{i, err}) 4680 if ordered { 4681 break 4682 } 4683 } 4684 } 4685 if len(lerr.ecases) != 0 { 4686 return &lerr, lerr.ecases[0].Err 4687 } 4688 return &lerr, nil 4689 } 4690 return c.writeOpQuery(socket, safeOp, op, ordered) 4691} 4692 4693func (c *Collection) writeOpQuery(socket *mongoSocket, safeOp *queryOp, op interface{}, ordered bool) (lerr *LastError, err error) { 4694 if safeOp == nil { 4695 return nil, socket.Query(op) 4696 } 4697 4698 var mutex sync.Mutex 4699 var replyData []byte 4700 var replyErr error 4701 mutex.Lock() 4702 query := *safeOp // Copy the data. 4703 query.collection = c.Database.Name + ".$cmd" 4704 query.replyFunc = func(err error, reply *replyOp, docNum int, docData []byte) { 4705 replyData = docData 4706 replyErr = err 4707 mutex.Unlock() 4708 } 4709 err = socket.Query(op, &query) 4710 if err != nil { 4711 return nil, err 4712 } 4713 mutex.Lock() // Wait. 4714 if replyErr != nil { 4715 return nil, replyErr // XXX TESTME 4716 } 4717 if hasErrMsg(replyData) { 4718 // Looks like getLastError itself failed. 4719 err = checkQueryError(query.collection, replyData) 4720 if err != nil { 4721 return nil, err 4722 } 4723 } 4724 result := &LastError{} 4725 bson.Unmarshal(replyData, &result) 4726 debugf("Result from writing query: %#v", result) 4727 if result.Err != "" { 4728 result.ecases = []BulkErrorCase{{Index: 0, Err: result}} 4729 if insert, ok := op.(*insertOp); ok && len(insert.documents) > 1 { 4730 result.ecases[0].Index = -1 4731 } 4732 return result, result 4733 } 4734 // With MongoDB <2.6 we don't know how many actually changed, so make it the same as matched. 4735 result.modified = result.N 4736 return result, nil 4737} 4738 4739func (c *Collection) writeOpCommand(socket *mongoSocket, safeOp *queryOp, op interface{}, ordered, bypassValidation bool) (lerr *LastError, err error) { 4740 var writeConcern interface{} 4741 if safeOp == nil { 4742 writeConcern = bson.D{{"w", 0}} 4743 } else { 4744 writeConcern = safeOp.query.(*getLastError) 4745 } 4746 4747 var cmd bson.D 4748 switch op := op.(type) { 4749 case *insertOp: 4750 // http://docs.mongodb.org/manual/reference/command/insert 4751 cmd = bson.D{ 4752 {"insert", c.Name}, 4753 {"documents", op.documents}, 4754 {"writeConcern", writeConcern}, 4755 {"ordered", op.flags&1 == 0}, 4756 } 4757 case *updateOp: 4758 // http://docs.mongodb.org/manual/reference/command/update 4759 cmd = bson.D{ 4760 {"update", c.Name}, 4761 {"updates", []interface{}{op}}, 4762 {"writeConcern", writeConcern}, 4763 {"ordered", ordered}, 4764 } 4765 case bulkUpdateOp: 4766 // http://docs.mongodb.org/manual/reference/command/update 4767 cmd = bson.D{ 4768 {"update", c.Name}, 4769 {"updates", op}, 4770 {"writeConcern", writeConcern}, 4771 {"ordered", ordered}, 4772 } 4773 case *deleteOp: 4774 // http://docs.mongodb.org/manual/reference/command/delete 4775 cmd = bson.D{ 4776 {"delete", c.Name}, 4777 {"deletes", []interface{}{op}}, 4778 {"writeConcern", writeConcern}, 4779 {"ordered", ordered}, 4780 } 4781 case bulkDeleteOp: 4782 // http://docs.mongodb.org/manual/reference/command/delete 4783 cmd = bson.D{ 4784 {"delete", c.Name}, 4785 {"deletes", op}, 4786 {"writeConcern", writeConcern}, 4787 {"ordered", ordered}, 4788 } 4789 } 4790 if bypassValidation { 4791 cmd = append(cmd, bson.DocElem{"bypassDocumentValidation", true}) 4792 } 4793 4794 var result writeCmdResult 4795 err = c.Database.run(socket, cmd, &result) 4796 debugf("Write command result: %#v (err=%v)", result, err) 4797 ecases := result.BulkErrorCases() 4798 lerr = &LastError{ 4799 UpdatedExisting: result.N > 0 && len(result.Upserted) == 0, 4800 N: result.N, 4801 4802 modified: result.NModified, 4803 ecases: ecases, 4804 } 4805 if len(result.Upserted) > 0 { 4806 lerr.UpsertedId = result.Upserted[0].Id 4807 } 4808 if len(result.Errors) > 0 { 4809 e := result.Errors[0] 4810 lerr.Code = e.Code 4811 lerr.Err = e.ErrMsg 4812 err = lerr 4813 } else if result.ConcernError.Code != 0 { 4814 e := result.ConcernError 4815 lerr.Code = e.Code 4816 lerr.Err = e.ErrMsg 4817 err = lerr 4818 } 4819 4820 if err == nil && safeOp == nil { 4821 return nil, nil 4822 } 4823 return lerr, err 4824} 4825 4826func hasErrMsg(d []byte) bool { 4827 l := len(d) 4828 for i := 0; i+8 < l; i++ { 4829 if d[i] == '\x02' && d[i+1] == 'e' && d[i+2] == 'r' && d[i+3] == 'r' && d[i+4] == 'm' && d[i+5] == 's' && d[i+6] == 'g' && d[i+7] == '\x00' { 4830 return true 4831 } 4832 } 4833 return false 4834} 4835