1// mgo - MongoDB driver for Go
2//
3// Copyright (c) 2010-2012 - Gustavo Niemeyer <gustavo@niemeyer.net>
4//
5// All rights reserved.
6//
7// Redistribution and use in source and binary forms, with or without
8// modification, are permitted provided that the following conditions are met:
9//
10// 1. Redistributions of source code must retain the above copyright notice, this
11//    list of conditions and the following disclaimer.
12// 2. Redistributions in binary form must reproduce the above copyright notice,
13//    this list of conditions and the following disclaimer in the documentation
14//    and/or other materials provided with the distribution.
15//
16// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
17// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
19// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
20// ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
21// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
22// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
23// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
24// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27package mgo
28
29import (
30	"crypto/md5"
31	"encoding/hex"
32	"errors"
33	"fmt"
34	"math"
35	"net"
36	"net/url"
37	"reflect"
38	"sort"
39	"strconv"
40	"strings"
41	"sync"
42	"time"
43
44	"gopkg.in/mgo.v2/bson"
45)
46
47type Mode int
48
49const (
50	// Relevant documentation on read preference modes:
51	//
52	//     http://docs.mongodb.org/manual/reference/read-preference/
53	//
54	Primary            Mode = 2 // Default mode. All operations read from the current replica set primary.
55	PrimaryPreferred   Mode = 3 // Read from the primary if available. Read from the secondary otherwise.
56	Secondary          Mode = 4 // Read from one of the nearest secondary members of the replica set.
57	SecondaryPreferred Mode = 5 // Read from one of the nearest secondaries if available. Read from primary otherwise.
58	Nearest            Mode = 6 // Read from one of the nearest members, irrespective of it being primary or secondary.
59
60	// Read preference modes are specific to mgo:
61	Eventual  Mode = 0 // Same as Nearest, but may change servers between reads.
62	Monotonic Mode = 1 // Same as SecondaryPreferred before first write. Same as Primary after first write.
63	Strong    Mode = 2 // Same as Primary.
64)
65
66// mgo.v3: Drop Strong mode, suffix all modes with "Mode".
67
68// When changing the Session type, check if newSession and copySession
69// need to be updated too.
70
71// Session represents a communication session with the database.
72//
73// All Session methods are concurrency-safe and may be called from multiple
74// goroutines. In all session modes but Eventual, using the session from
75// multiple goroutines will cause them to share the same underlying socket.
76// See the documentation on Session.SetMode for more details.
77type Session struct {
78	m                sync.RWMutex
79	cluster_         *mongoCluster
80	slaveSocket      *mongoSocket
81	masterSocket     *mongoSocket
82	slaveOk          bool
83	consistency      Mode
84	queryConfig      query
85	safeOp           *queryOp
86	syncTimeout      time.Duration
87	sockTimeout      time.Duration
88	defaultdb        string
89	sourcedb         string
90	dialCred         *Credential
91	creds            []Credential
92	poolLimit        int
93	bypassValidation bool
94}
95
96type Database struct {
97	Session *Session
98	Name    string
99}
100
101type Collection struct {
102	Database *Database
103	Name     string // "collection"
104	FullName string // "db.collection"
105}
106
107type Query struct {
108	m       sync.Mutex
109	session *Session
110	query   // Enables default settings in session.
111}
112
113type query struct {
114	op       queryOp
115	prefetch float64
116	limit    int32
117}
118
119type getLastError struct {
120	CmdName  int         "getLastError,omitempty"
121	W        interface{} "w,omitempty"
122	WTimeout int         "wtimeout,omitempty"
123	FSync    bool        "fsync,omitempty"
124	J        bool        "j,omitempty"
125}
126
127type Iter struct {
128	m              sync.Mutex
129	gotReply       sync.Cond
130	session        *Session
131	server         *mongoServer
132	docData        queue
133	err            error
134	op             getMoreOp
135	prefetch       float64
136	limit          int32
137	docsToReceive  int
138	docsBeforeMore int
139	timeout        time.Duration
140	timedout       bool
141	findCmd        bool
142}
143
144var (
145	ErrNotFound = errors.New("not found")
146	ErrCursor   = errors.New("invalid cursor")
147)
148
149const (
150	defaultPrefetch  = 0.25
151	maxUpsertRetries = 5
152)
153
154// Dial establishes a new session to the cluster identified by the given seed
155// server(s). The session will enable communication with all of the servers in
156// the cluster, so the seed servers are used only to find out about the cluster
157// topology.
158//
159// Dial will timeout after 10 seconds if a server isn't reached. The returned
160// session will timeout operations after one minute by default if servers
161// aren't available. To customize the timeout, see DialWithTimeout,
162// SetSyncTimeout, and SetSocketTimeout.
163//
164// This method is generally called just once for a given cluster.  Further
165// sessions to the same cluster are then established using the New or Copy
166// methods on the obtained session. This will make them share the underlying
167// cluster, and manage the pool of connections appropriately.
168//
169// Once the session is not useful anymore, Close must be called to release the
170// resources appropriately.
171//
172// The seed servers must be provided in the following format:
173//
174//     [mongodb://][user:pass@]host1[:port1][,host2[:port2],...][/database][?options]
175//
176// For example, it may be as simple as:
177//
178//     localhost
179//
180// Or more involved like:
181//
182//     mongodb://myuser:mypass@localhost:40001,otherhost:40001/mydb
183//
184// If the port number is not provided for a server, it defaults to 27017.
185//
186// The username and password provided in the URL will be used to authenticate
187// into the database named after the slash at the end of the host names, or
188// into the "admin" database if none is provided.  The authentication information
189// will persist in sessions obtained through the New method as well.
190//
191// The following connection options are supported after the question mark:
192//
193//     connect=direct
194//
195//         Disables the automatic replica set server discovery logic, and
196//         forces the use of servers provided only (even if secondaries).
197//         Note that to talk to a secondary the consistency requirements
198//         must be relaxed to Monotonic or Eventual via SetMode.
199//
200//
201//     connect=replicaSet
202//
203//  	   Discover replica sets automatically. Default connection behavior.
204//
205//
206//     replicaSet=<setname>
207//
208//         If specified will prevent the obtained session from communicating
209//         with any server which is not part of a replica set with the given name.
210//         The default is to communicate with any server specified or discovered
211//         via the servers contacted.
212//
213//
214//     authSource=<db>
215//
216//         Informs the database used to establish credentials and privileges
217//         with a MongoDB server. Defaults to the database name provided via
218//         the URL path, and "admin" if that's unset.
219//
220//
221//     authMechanism=<mechanism>
222//
223//        Defines the protocol for credential negotiation. Defaults to "MONGODB-CR",
224//        which is the default username/password challenge-response mechanism.
225//
226//
227//     gssapiServiceName=<name>
228//
229//        Defines the service name to use when authenticating with the GSSAPI
230//        mechanism. Defaults to "mongodb".
231//
232//
233//     maxPoolSize=<limit>
234//
235//        Defines the per-server socket pool limit. Defaults to 4096.
236//        See Session.SetPoolLimit for details.
237//
238//
239// Relevant documentation:
240//
241//     http://docs.mongodb.org/manual/reference/connection-string/
242//
243func Dial(url string) (*Session, error) {
244	session, err := DialWithTimeout(url, 10*time.Second)
245	if err == nil {
246		session.SetSyncTimeout(1 * time.Minute)
247		session.SetSocketTimeout(1 * time.Minute)
248	}
249	return session, err
250}
251
252// DialWithTimeout works like Dial, but uses timeout as the amount of time to
253// wait for a server to respond when first connecting and also on follow up
254// operations in the session. If timeout is zero, the call may block
255// forever waiting for a connection to be made.
256//
257// See SetSyncTimeout for customizing the timeout for the session.
258func DialWithTimeout(url string, timeout time.Duration) (*Session, error) {
259	info, err := ParseURL(url)
260	if err != nil {
261		return nil, err
262	}
263	info.Timeout = timeout
264	return DialWithInfo(info)
265}
266
267// ParseURL parses a MongoDB URL as accepted by the Dial function and returns
268// a value suitable for providing into DialWithInfo.
269//
270// See Dial for more details on the format of url.
271func ParseURL(url string) (*DialInfo, error) {
272	uinfo, err := extractURL(url)
273	if err != nil {
274		return nil, err
275	}
276	direct := false
277	mechanism := ""
278	service := ""
279	source := ""
280	setName := ""
281	poolLimit := 0
282	for k, v := range uinfo.options {
283		switch k {
284		case "authSource":
285			source = v
286		case "authMechanism":
287			mechanism = v
288		case "gssapiServiceName":
289			service = v
290		case "replicaSet":
291			setName = v
292		case "maxPoolSize":
293			poolLimit, err = strconv.Atoi(v)
294			if err != nil {
295				return nil, errors.New("bad value for maxPoolSize: " + v)
296			}
297		case "connect":
298			if v == "direct" {
299				direct = true
300				break
301			}
302			if v == "replicaSet" {
303				break
304			}
305			fallthrough
306		default:
307			return nil, errors.New("unsupported connection URL option: " + k + "=" + v)
308		}
309	}
310	info := DialInfo{
311		Addrs:          uinfo.addrs,
312		Direct:         direct,
313		Database:       uinfo.db,
314		Username:       uinfo.user,
315		Password:       uinfo.pass,
316		Mechanism:      mechanism,
317		Service:        service,
318		Source:         source,
319		PoolLimit:      poolLimit,
320		ReplicaSetName: setName,
321	}
322	return &info, nil
323}
324
325// DialInfo holds options for establishing a session with a MongoDB cluster.
326// To use a URL, see the Dial function.
327type DialInfo struct {
328	// Addrs holds the addresses for the seed servers.
329	Addrs []string
330
331	// Direct informs whether to establish connections only with the
332	// specified seed servers, or to obtain information for the whole
333	// cluster and establish connections with further servers too.
334	Direct bool
335
336	// Timeout is the amount of time to wait for a server to respond when
337	// first connecting and on follow up operations in the session. If
338	// timeout is zero, the call may block forever waiting for a connection
339	// to be established. Timeout does not affect logic in DialServer.
340	Timeout time.Duration
341
342	// FailFast will cause connection and query attempts to fail faster when
343	// the server is unavailable, instead of retrying until the configured
344	// timeout period. Note that an unavailable server may silently drop
345	// packets instead of rejecting them, in which case it's impossible to
346	// distinguish it from a slow server, so the timeout stays relevant.
347	FailFast bool
348
349	// Database is the default database name used when the Session.DB method
350	// is called with an empty name, and is also used during the initial
351	// authentication if Source is unset.
352	Database string
353
354	// ReplicaSetName, if specified, will prevent the obtained session from
355	// communicating with any server which is not part of a replica set
356	// with the given name. The default is to communicate with any server
357	// specified or discovered via the servers contacted.
358	ReplicaSetName string
359
360	// Source is the database used to establish credentials and privileges
361	// with a MongoDB server. Defaults to the value of Database, if that is
362	// set, or "admin" otherwise.
363	Source string
364
365	// Service defines the service name to use when authenticating with the GSSAPI
366	// mechanism. Defaults to "mongodb".
367	Service string
368
369	// ServiceHost defines which hostname to use when authenticating
370	// with the GSSAPI mechanism. If not specified, defaults to the MongoDB
371	// server's address.
372	ServiceHost string
373
374	// Mechanism defines the protocol for credential negotiation.
375	// Defaults to "MONGODB-CR".
376	Mechanism string
377
378	// Username and Password inform the credentials for the initial authentication
379	// done on the database defined by the Source field. See Session.Login.
380	Username string
381	Password string
382
383	// PoolLimit defines the per-server socket pool limit. Defaults to 4096.
384	// See Session.SetPoolLimit for details.
385	PoolLimit int
386
387	// DialServer optionally specifies the dial function for establishing
388	// connections with the MongoDB servers.
389	DialServer func(addr *ServerAddr) (net.Conn, error)
390
391	// WARNING: This field is obsolete. See DialServer above.
392	Dial func(addr net.Addr) (net.Conn, error)
393}
394
395// mgo.v3: Drop DialInfo.Dial.
396
397// ServerAddr represents the address for establishing a connection to an
398// individual MongoDB server.
399type ServerAddr struct {
400	str string
401	tcp *net.TCPAddr
402}
403
404// String returns the address that was provided for the server before resolution.
405func (addr *ServerAddr) String() string {
406	return addr.str
407}
408
409// TCPAddr returns the resolved TCP address for the server.
410func (addr *ServerAddr) TCPAddr() *net.TCPAddr {
411	return addr.tcp
412}
413
414// DialWithInfo establishes a new session to the cluster identified by info.
415func DialWithInfo(info *DialInfo) (*Session, error) {
416	addrs := make([]string, len(info.Addrs))
417	for i, addr := range info.Addrs {
418		p := strings.LastIndexAny(addr, "]:")
419		if p == -1 || addr[p] != ':' {
420			// XXX This is untested. The test suite doesn't use the standard port.
421			addr += ":27017"
422		}
423		addrs[i] = addr
424	}
425	cluster := newCluster(addrs, info.Direct, info.FailFast, dialer{info.Dial, info.DialServer}, info.ReplicaSetName)
426	session := newSession(Eventual, cluster, info.Timeout)
427	session.defaultdb = info.Database
428	if session.defaultdb == "" {
429		session.defaultdb = "test"
430	}
431	session.sourcedb = info.Source
432	if session.sourcedb == "" {
433		session.sourcedb = info.Database
434		if session.sourcedb == "" {
435			session.sourcedb = "admin"
436		}
437	}
438	if info.Username != "" {
439		source := session.sourcedb
440		if info.Source == "" &&
441			(info.Mechanism == "GSSAPI" || info.Mechanism == "PLAIN" || info.Mechanism == "MONGODB-X509") {
442			source = "$external"
443		}
444		session.dialCred = &Credential{
445			Username:    info.Username,
446			Password:    info.Password,
447			Mechanism:   info.Mechanism,
448			Service:     info.Service,
449			ServiceHost: info.ServiceHost,
450			Source:      source,
451		}
452		session.creds = []Credential{*session.dialCred}
453	}
454	if info.PoolLimit > 0 {
455		session.poolLimit = info.PoolLimit
456	}
457	cluster.Release()
458
459	// People get confused when we return a session that is not actually
460	// established to any servers yet (e.g. what if url was wrong). So,
461	// ping the server to ensure there's someone there, and abort if it
462	// fails.
463	if err := session.Ping(); err != nil {
464		session.Close()
465		return nil, err
466	}
467	session.SetMode(Strong, true)
468	return session, nil
469}
470
471func isOptSep(c rune) bool {
472	return c == ';' || c == '&'
473}
474
475type urlInfo struct {
476	addrs   []string
477	user    string
478	pass    string
479	db      string
480	options map[string]string
481}
482
483func extractURL(s string) (*urlInfo, error) {
484	if strings.HasPrefix(s, "mongodb://") {
485		s = s[10:]
486	}
487	info := &urlInfo{options: make(map[string]string)}
488	if c := strings.Index(s, "?"); c != -1 {
489		for _, pair := range strings.FieldsFunc(s[c+1:], isOptSep) {
490			l := strings.SplitN(pair, "=", 2)
491			if len(l) != 2 || l[0] == "" || l[1] == "" {
492				return nil, errors.New("connection option must be key=value: " + pair)
493			}
494			info.options[l[0]] = l[1]
495		}
496		s = s[:c]
497	}
498	if c := strings.Index(s, "@"); c != -1 {
499		pair := strings.SplitN(s[:c], ":", 2)
500		if len(pair) > 2 || pair[0] == "" {
501			return nil, errors.New("credentials must be provided as user:pass@host")
502		}
503		var err error
504		info.user, err = url.QueryUnescape(pair[0])
505		if err != nil {
506			return nil, fmt.Errorf("cannot unescape username in URL: %q", pair[0])
507		}
508		if len(pair) > 1 {
509			info.pass, err = url.QueryUnescape(pair[1])
510			if err != nil {
511				return nil, fmt.Errorf("cannot unescape password in URL")
512			}
513		}
514		s = s[c+1:]
515	}
516	if c := strings.Index(s, "/"); c != -1 {
517		info.db = s[c+1:]
518		s = s[:c]
519	}
520	info.addrs = strings.Split(s, ",")
521	return info, nil
522}
523
524func newSession(consistency Mode, cluster *mongoCluster, timeout time.Duration) (session *Session) {
525	cluster.Acquire()
526	session = &Session{
527		cluster_:    cluster,
528		syncTimeout: timeout,
529		sockTimeout: timeout,
530		poolLimit:   4096,
531	}
532	debugf("New session %p on cluster %p", session, cluster)
533	session.SetMode(consistency, true)
534	session.SetSafe(&Safe{})
535	session.queryConfig.prefetch = defaultPrefetch
536	return session
537}
538
539func copySession(session *Session, keepCreds bool) (s *Session) {
540	cluster := session.cluster()
541	cluster.Acquire()
542	if session.masterSocket != nil {
543		session.masterSocket.Acquire()
544	}
545	if session.slaveSocket != nil {
546		session.slaveSocket.Acquire()
547	}
548	var creds []Credential
549	if keepCreds {
550		creds = make([]Credential, len(session.creds))
551		copy(creds, session.creds)
552	} else if session.dialCred != nil {
553		creds = []Credential{*session.dialCred}
554	}
555	scopy := *session
556	scopy.m = sync.RWMutex{}
557	scopy.creds = creds
558	s = &scopy
559	debugf("New session %p on cluster %p (copy from %p)", s, cluster, session)
560	return s
561}
562
563// LiveServers returns a list of server addresses which are
564// currently known to be alive.
565func (s *Session) LiveServers() (addrs []string) {
566	s.m.RLock()
567	addrs = s.cluster().LiveServers()
568	s.m.RUnlock()
569	return addrs
570}
571
572// DB returns a value representing the named database. If name
573// is empty, the database name provided in the dialed URL is
574// used instead. If that is also empty, "test" is used as a
575// fallback in a way equivalent to the mongo shell.
576//
577// Creating this value is a very lightweight operation, and
578// involves no network communication.
579func (s *Session) DB(name string) *Database {
580	if name == "" {
581		name = s.defaultdb
582	}
583	return &Database{s, name}
584}
585
586// C returns a value representing the named collection.
587//
588// Creating this value is a very lightweight operation, and
589// involves no network communication.
590func (db *Database) C(name string) *Collection {
591	return &Collection{db, name, db.Name + "." + name}
592}
593
594// With returns a copy of db that uses session s.
595func (db *Database) With(s *Session) *Database {
596	newdb := *db
597	newdb.Session = s
598	return &newdb
599}
600
601// With returns a copy of c that uses session s.
602func (c *Collection) With(s *Session) *Collection {
603	newdb := *c.Database
604	newdb.Session = s
605	newc := *c
606	newc.Database = &newdb
607	return &newc
608}
609
610// GridFS returns a GridFS value representing collections in db that
611// follow the standard GridFS specification.
612// The provided prefix (sometimes known as root) will determine which
613// collections to use, and is usually set to "fs" when there is a
614// single GridFS in the database.
615//
616// See the GridFS Create, Open, and OpenId methods for more details.
617//
618// Relevant documentation:
619//
620//     http://www.mongodb.org/display/DOCS/GridFS
621//     http://www.mongodb.org/display/DOCS/GridFS+Tools
622//     http://www.mongodb.org/display/DOCS/GridFS+Specification
623//
624func (db *Database) GridFS(prefix string) *GridFS {
625	return newGridFS(db, prefix)
626}
627
628// Run issues the provided command on the db database and unmarshals
629// its result in the respective argument. The cmd argument may be either
630// a string with the command name itself, in which case an empty document of
631// the form bson.M{cmd: 1} will be used, or it may be a full command document.
632//
633// Note that MongoDB considers the first marshalled key as the command
634// name, so when providing a command with options, it's important to
635// use an ordering-preserving document, such as a struct value or an
636// instance of bson.D.  For instance:
637//
638//     db.Run(bson.D{{"create", "mycollection"}, {"size", 1024}})
639//
640// For privilleged commands typically run on the "admin" database, see
641// the Run method in the Session type.
642//
643// Relevant documentation:
644//
645//     http://www.mongodb.org/display/DOCS/Commands
646//     http://www.mongodb.org/display/DOCS/List+of+Database+CommandSkips
647//
648func (db *Database) Run(cmd interface{}, result interface{}) error {
649	socket, err := db.Session.acquireSocket(true)
650	if err != nil {
651		return err
652	}
653	defer socket.Release()
654
655	// This is an optimized form of db.C("$cmd").Find(cmd).One(result).
656	return db.run(socket, cmd, result)
657}
658
659// Credential holds details to authenticate with a MongoDB server.
660type Credential struct {
661	// Username and Password hold the basic details for authentication.
662	// Password is optional with some authentication mechanisms.
663	Username string
664	Password string
665
666	// Source is the database used to establish credentials and privileges
667	// with a MongoDB server. Defaults to the default database provided
668	// during dial, or "admin" if that was unset.
669	Source string
670
671	// Service defines the service name to use when authenticating with the GSSAPI
672	// mechanism. Defaults to "mongodb".
673	Service string
674
675	// ServiceHost defines which hostname to use when authenticating
676	// with the GSSAPI mechanism. If not specified, defaults to the MongoDB
677	// server's address.
678	ServiceHost string
679
680	// Mechanism defines the protocol for credential negotiation.
681	// Defaults to "MONGODB-CR".
682	Mechanism string
683}
684
685// Login authenticates with MongoDB using the provided credential.  The
686// authentication is valid for the whole session and will stay valid until
687// Logout is explicitly called for the same database, or the session is
688// closed.
689func (db *Database) Login(user, pass string) error {
690	return db.Session.Login(&Credential{Username: user, Password: pass, Source: db.Name})
691}
692
693// Login authenticates with MongoDB using the provided credential.  The
694// authentication is valid for the whole session and will stay valid until
695// Logout is explicitly called for the same database, or the session is
696// closed.
697func (s *Session) Login(cred *Credential) error {
698	socket, err := s.acquireSocket(true)
699	if err != nil {
700		return err
701	}
702	defer socket.Release()
703
704	credCopy := *cred
705	if cred.Source == "" {
706		if cred.Mechanism == "GSSAPI" {
707			credCopy.Source = "$external"
708		} else {
709			credCopy.Source = s.sourcedb
710		}
711	}
712	err = socket.Login(credCopy)
713	if err != nil {
714		return err
715	}
716
717	s.m.Lock()
718	s.creds = append(s.creds, credCopy)
719	s.m.Unlock()
720	return nil
721}
722
723func (s *Session) socketLogin(socket *mongoSocket) error {
724	for _, cred := range s.creds {
725		if err := socket.Login(cred); err != nil {
726			return err
727		}
728	}
729	return nil
730}
731
732// Logout removes any established authentication credentials for the database.
733func (db *Database) Logout() {
734	session := db.Session
735	dbname := db.Name
736	session.m.Lock()
737	found := false
738	for i, cred := range session.creds {
739		if cred.Source == dbname {
740			copy(session.creds[i:], session.creds[i+1:])
741			session.creds = session.creds[:len(session.creds)-1]
742			found = true
743			break
744		}
745	}
746	if found {
747		if session.masterSocket != nil {
748			session.masterSocket.Logout(dbname)
749		}
750		if session.slaveSocket != nil {
751			session.slaveSocket.Logout(dbname)
752		}
753	}
754	session.m.Unlock()
755}
756
757// LogoutAll removes all established authentication credentials for the session.
758func (s *Session) LogoutAll() {
759	s.m.Lock()
760	for _, cred := range s.creds {
761		if s.masterSocket != nil {
762			s.masterSocket.Logout(cred.Source)
763		}
764		if s.slaveSocket != nil {
765			s.slaveSocket.Logout(cred.Source)
766		}
767	}
768	s.creds = s.creds[0:0]
769	s.m.Unlock()
770}
771
772// User represents a MongoDB user.
773//
774// Relevant documentation:
775//
776//     http://docs.mongodb.org/manual/reference/privilege-documents/
777//     http://docs.mongodb.org/manual/reference/user-privileges/
778//
779type User struct {
780	// Username is how the user identifies itself to the system.
781	Username string `bson:"user"`
782
783	// Password is the plaintext password for the user. If set,
784	// the UpsertUser method will hash it into PasswordHash and
785	// unset it before the user is added to the database.
786	Password string `bson:",omitempty"`
787
788	// PasswordHash is the MD5 hash of Username+":mongo:"+Password.
789	PasswordHash string `bson:"pwd,omitempty"`
790
791	// CustomData holds arbitrary data admins decide to associate
792	// with this user, such as the full name or employee id.
793	CustomData interface{} `bson:"customData,omitempty"`
794
795	// Roles indicates the set of roles the user will be provided.
796	// See the Role constants.
797	Roles []Role `bson:"roles"`
798
799	// OtherDBRoles allows assigning roles in other databases from
800	// user documents inserted in the admin database. This field
801	// only works in the admin database.
802	OtherDBRoles map[string][]Role `bson:"otherDBRoles,omitempty"`
803
804	// UserSource indicates where to look for this user's credentials.
805	// It may be set to a database name, or to "$external" for
806	// consulting an external resource such as Kerberos. UserSource
807	// must not be set if Password or PasswordHash are present.
808	//
809	// WARNING: This setting was only ever supported in MongoDB 2.4,
810	// and is now obsolete.
811	UserSource string `bson:"userSource,omitempty"`
812}
813
814type Role string
815
816const (
817	// Relevant documentation:
818	//
819	//     http://docs.mongodb.org/manual/reference/user-privileges/
820	//
821	RoleRoot         Role = "root"
822	RoleRead         Role = "read"
823	RoleReadAny      Role = "readAnyDatabase"
824	RoleReadWrite    Role = "readWrite"
825	RoleReadWriteAny Role = "readWriteAnyDatabase"
826	RoleDBAdmin      Role = "dbAdmin"
827	RoleDBAdminAny   Role = "dbAdminAnyDatabase"
828	RoleUserAdmin    Role = "userAdmin"
829	RoleUserAdminAny Role = "userAdminAnyDatabase"
830	RoleClusterAdmin Role = "clusterAdmin"
831)
832
833// UpsertUser updates the authentication credentials and the roles for
834// a MongoDB user within the db database. If the named user doesn't exist
835// it will be created.
836//
837// This method should only be used from MongoDB 2.4 and on. For older
838// MongoDB releases, use the obsolete AddUser method instead.
839//
840// Relevant documentation:
841//
842//     http://docs.mongodb.org/manual/reference/user-privileges/
843//     http://docs.mongodb.org/manual/reference/privilege-documents/
844//
845func (db *Database) UpsertUser(user *User) error {
846	if user.Username == "" {
847		return fmt.Errorf("user has no Username")
848	}
849	if (user.Password != "" || user.PasswordHash != "") && user.UserSource != "" {
850		return fmt.Errorf("user has both Password/PasswordHash and UserSource set")
851	}
852	if len(user.OtherDBRoles) > 0 && db.Name != "admin" && db.Name != "$external" {
853		return fmt.Errorf("user with OtherDBRoles is only supported in the admin or $external databases")
854	}
855
856	// Attempt to run this using 2.6+ commands.
857	rundb := db
858	if user.UserSource != "" {
859		// Compatibility logic for the userSource field of MongoDB <= 2.4.X
860		rundb = db.Session.DB(user.UserSource)
861	}
862	err := rundb.runUserCmd("updateUser", user)
863	// retry with createUser when isAuthError in order to enable the "localhost exception"
864	if isNotFound(err) || isAuthError(err) {
865		return rundb.runUserCmd("createUser", user)
866	}
867	if !isNoCmd(err) {
868		return err
869	}
870
871	// Command does not exist. Fallback to pre-2.6 behavior.
872	var set, unset bson.D
873	if user.Password != "" {
874		psum := md5.New()
875		psum.Write([]byte(user.Username + ":mongo:" + user.Password))
876		set = append(set, bson.DocElem{"pwd", hex.EncodeToString(psum.Sum(nil))})
877		unset = append(unset, bson.DocElem{"userSource", 1})
878	} else if user.PasswordHash != "" {
879		set = append(set, bson.DocElem{"pwd", user.PasswordHash})
880		unset = append(unset, bson.DocElem{"userSource", 1})
881	}
882	if user.UserSource != "" {
883		set = append(set, bson.DocElem{"userSource", user.UserSource})
884		unset = append(unset, bson.DocElem{"pwd", 1})
885	}
886	if user.Roles != nil || user.OtherDBRoles != nil {
887		set = append(set, bson.DocElem{"roles", user.Roles})
888		if len(user.OtherDBRoles) > 0 {
889			set = append(set, bson.DocElem{"otherDBRoles", user.OtherDBRoles})
890		} else {
891			unset = append(unset, bson.DocElem{"otherDBRoles", 1})
892		}
893	}
894	users := db.C("system.users")
895	err = users.Update(bson.D{{"user", user.Username}}, bson.D{{"$unset", unset}, {"$set", set}})
896	if err == ErrNotFound {
897		set = append(set, bson.DocElem{"user", user.Username})
898		if user.Roles == nil && user.OtherDBRoles == nil {
899			// Roles must be sent, as it's the way MongoDB distinguishes
900			// old-style documents from new-style documents in pre-2.6.
901			set = append(set, bson.DocElem{"roles", user.Roles})
902		}
903		err = users.Insert(set)
904	}
905	return err
906}
907
908func isNoCmd(err error) bool {
909	e, ok := err.(*QueryError)
910	return ok && (e.Code == 59 || e.Code == 13390 || strings.HasPrefix(e.Message, "no such cmd:"))
911}
912
913func isNotFound(err error) bool {
914	e, ok := err.(*QueryError)
915	return ok && e.Code == 11
916}
917
918func isAuthError(err error) bool {
919	e, ok := err.(*QueryError)
920	return ok && e.Code == 13
921}
922
923func (db *Database) runUserCmd(cmdName string, user *User) error {
924	cmd := make(bson.D, 0, 16)
925	cmd = append(cmd, bson.DocElem{cmdName, user.Username})
926	if user.Password != "" {
927		cmd = append(cmd, bson.DocElem{"pwd", user.Password})
928	}
929	var roles []interface{}
930	for _, role := range user.Roles {
931		roles = append(roles, role)
932	}
933	for db, dbroles := range user.OtherDBRoles {
934		for _, role := range dbroles {
935			roles = append(roles, bson.D{{"role", role}, {"db", db}})
936		}
937	}
938	if roles != nil || user.Roles != nil || cmdName == "createUser" {
939		cmd = append(cmd, bson.DocElem{"roles", roles})
940	}
941	err := db.Run(cmd, nil)
942	if !isNoCmd(err) && user.UserSource != "" && (user.UserSource != "$external" || db.Name != "$external") {
943		return fmt.Errorf("MongoDB 2.6+ does not support the UserSource setting")
944	}
945	return err
946}
947
948// AddUser creates or updates the authentication credentials of user within
949// the db database.
950//
951// WARNING: This method is obsolete and should only be used with MongoDB 2.2
952// or earlier. For MongoDB 2.4 and on, use UpsertUser instead.
953func (db *Database) AddUser(username, password string, readOnly bool) error {
954	// Try to emulate the old behavior on 2.6+
955	user := &User{Username: username, Password: password}
956	if db.Name == "admin" {
957		if readOnly {
958			user.Roles = []Role{RoleReadAny}
959		} else {
960			user.Roles = []Role{RoleReadWriteAny}
961		}
962	} else {
963		if readOnly {
964			user.Roles = []Role{RoleRead}
965		} else {
966			user.Roles = []Role{RoleReadWrite}
967		}
968	}
969	err := db.runUserCmd("updateUser", user)
970	if isNotFound(err) {
971		return db.runUserCmd("createUser", user)
972	}
973	if !isNoCmd(err) {
974		return err
975	}
976
977	// Command doesn't exist. Fallback to pre-2.6 behavior.
978	psum := md5.New()
979	psum.Write([]byte(username + ":mongo:" + password))
980	digest := hex.EncodeToString(psum.Sum(nil))
981	c := db.C("system.users")
982	_, err = c.Upsert(bson.M{"user": username}, bson.M{"$set": bson.M{"user": username, "pwd": digest, "readOnly": readOnly}})
983	return err
984}
985
986// RemoveUser removes the authentication credentials of user from the database.
987func (db *Database) RemoveUser(user string) error {
988	err := db.Run(bson.D{{"dropUser", user}}, nil)
989	if isNoCmd(err) {
990		users := db.C("system.users")
991		return users.Remove(bson.M{"user": user})
992	}
993	if isNotFound(err) {
994		return ErrNotFound
995	}
996	return err
997}
998
999type indexSpec struct {
1000	Name, NS         string
1001	Key              bson.D
1002	Unique           bool    ",omitempty"
1003	DropDups         bool    "dropDups,omitempty"
1004	Background       bool    ",omitempty"
1005	Sparse           bool    ",omitempty"
1006	Bits             int     ",omitempty"
1007	Min, Max         float64 ",omitempty"
1008	BucketSize       float64 "bucketSize,omitempty"
1009	ExpireAfter      int     "expireAfterSeconds,omitempty"
1010	Weights          bson.D  ",omitempty"
1011	DefaultLanguage  string  "default_language,omitempty"
1012	LanguageOverride string  "language_override,omitempty"
1013	TextIndexVersion int     "textIndexVersion,omitempty"
1014
1015	Collation *Collation "collation,omitempty"
1016}
1017
1018type Index struct {
1019	Key        []string // Index key fields; prefix name with dash (-) for descending order
1020	Unique     bool     // Prevent two documents from having the same index key
1021	DropDups   bool     // Drop documents with the same index key as a previously indexed one
1022	Background bool     // Build index in background and return immediately
1023	Sparse     bool     // Only index documents containing the Key fields
1024
1025	// If ExpireAfter is defined the server will periodically delete
1026	// documents with indexed time.Time older than the provided delta.
1027	ExpireAfter time.Duration
1028
1029	// Name holds the stored index name. On creation if this field is unset it is
1030	// computed by EnsureIndex based on the index key.
1031	Name string
1032
1033	// Properties for spatial indexes.
1034	//
1035	// Min and Max were improperly typed as int when they should have been
1036	// floats.  To preserve backwards compatibility they are still typed as
1037	// int and the following two fields enable reading and writing the same
1038	// fields as float numbers. In mgo.v3, these fields will be dropped and
1039	// Min/Max will become floats.
1040	Min, Max   int
1041	Minf, Maxf float64
1042	BucketSize float64
1043	Bits       int
1044
1045	// Properties for text indexes.
1046	DefaultLanguage  string
1047	LanguageOverride string
1048
1049	// Weights defines the significance of provided fields relative to other
1050	// fields in a text index. The score for a given word in a document is derived
1051	// from the weighted sum of the frequency for each of the indexed fields in
1052	// that document. The default field weight is 1.
1053	Weights map[string]int
1054
1055	// Collation defines the collation to use for the index.
1056	Collation *Collation
1057}
1058
1059type Collation struct {
1060
1061	// Locale defines the collation locale.
1062	Locale string `bson:"locale"`
1063
1064	// CaseLevel defines whether to turn case sensitivity on at strength 1 or 2.
1065	CaseLevel bool `bson:"caseLevel,omitempty"`
1066
1067	// CaseFirst may be set to "upper" or "lower" to define whether
1068	// to have uppercase or lowercase items first. Default is "off".
1069	CaseFirst string `bson:"caseFirst,omitempty"`
1070
1071	// Strength defines the priority of comparison properties, as follows:
1072	//
1073	//   1 (primary)    - Strongest level, denote difference between base characters
1074	//   2 (secondary)  - Accents in characters are considered secondary differences
1075	//   3 (tertiary)   - Upper and lower case differences in characters are
1076	//                    distinguished at the tertiary level
1077	//   4 (quaternary) - When punctuation is ignored at level 1-3, an additional
1078	//                    level can be used to distinguish words with and without
1079	//                    punctuation. Should only be used if ignoring punctuation
1080	//                    is required or when processing Japanese text.
1081	//   5 (identical)  - When all other levels are equal, the identical level is
1082	//                    used as a tiebreaker. The Unicode code point values of
1083	//                    the NFD form of each string are compared at this level,
1084	//                    just in case there is no difference at levels 1-4
1085	//
1086	// Strength defaults to 3.
1087	Strength int `bson:"strength,omitempty"`
1088
1089	// NumericOrdering defines whether to order numbers based on numerical
1090	// order and not collation order.
1091	NumericOrdering bool `bson:"numericOrdering,omitempty"`
1092
1093	// Alternate controls whether spaces and punctuation are considered base characters.
1094	// May be set to "non-ignorable" (spaces and punctuation considered base characters)
1095	// or "shifted" (spaces and punctuation not considered base characters, and only
1096	// distinguished at strength > 3). Defaults to "non-ignorable".
1097	Alternate string `bson:"alternate,omitempty"`
1098
1099	// Backwards defines whether to have secondary differences considered in reverse order,
1100	// as done in the French language.
1101	Backwards bool `bson:"backwards,omitempty"`
1102}
1103
1104// mgo.v3: Drop Minf and Maxf and transform Min and Max to floats.
1105// mgo.v3: Drop DropDups as it's unsupported past 2.8.
1106
1107type indexKeyInfo struct {
1108	name    string
1109	key     bson.D
1110	weights bson.D
1111}
1112
1113func parseIndexKey(key []string) (*indexKeyInfo, error) {
1114	var keyInfo indexKeyInfo
1115	isText := false
1116	var order interface{}
1117	for _, field := range key {
1118		raw := field
1119		if keyInfo.name != "" {
1120			keyInfo.name += "_"
1121		}
1122		var kind string
1123		if field != "" {
1124			if field[0] == '$' {
1125				if c := strings.Index(field, ":"); c > 1 && c < len(field)-1 {
1126					kind = field[1:c]
1127					field = field[c+1:]
1128					keyInfo.name += field + "_" + kind
1129				} else {
1130					field = "\x00"
1131				}
1132			}
1133			switch field[0] {
1134			case 0:
1135				// Logic above failed. Reset and error.
1136				field = ""
1137			case '@':
1138				order = "2d"
1139				field = field[1:]
1140				// The shell used to render this field as key_ instead of key_2d,
1141				// and mgo followed suit. This has been fixed in recent server
1142				// releases, and mgo followed as well.
1143				keyInfo.name += field + "_2d"
1144			case '-':
1145				order = -1
1146				field = field[1:]
1147				keyInfo.name += field + "_-1"
1148			case '+':
1149				field = field[1:]
1150				fallthrough
1151			default:
1152				if kind == "" {
1153					order = 1
1154					keyInfo.name += field + "_1"
1155				} else {
1156					order = kind
1157				}
1158			}
1159		}
1160		if field == "" || kind != "" && order != kind {
1161			return nil, fmt.Errorf(`invalid index key: want "[$<kind>:][-]<field name>", got %q`, raw)
1162		}
1163		if kind == "text" {
1164			if !isText {
1165				keyInfo.key = append(keyInfo.key, bson.DocElem{"_fts", "text"}, bson.DocElem{"_ftsx", 1})
1166				isText = true
1167			}
1168			keyInfo.weights = append(keyInfo.weights, bson.DocElem{field, 1})
1169		} else {
1170			keyInfo.key = append(keyInfo.key, bson.DocElem{field, order})
1171		}
1172	}
1173	if keyInfo.name == "" {
1174		return nil, errors.New("invalid index key: no fields provided")
1175	}
1176	return &keyInfo, nil
1177}
1178
1179// EnsureIndexKey ensures an index with the given key exists, creating it
1180// if necessary.
1181//
1182// This example:
1183//
1184//     err := collection.EnsureIndexKey("a", "b")
1185//
1186// Is equivalent to:
1187//
1188//     err := collection.EnsureIndex(mgo.Index{Key: []string{"a", "b"}})
1189//
1190// See the EnsureIndex method for more details.
1191func (c *Collection) EnsureIndexKey(key ...string) error {
1192	return c.EnsureIndex(Index{Key: key})
1193}
1194
1195// EnsureIndex ensures an index with the given key exists, creating it with
1196// the provided parameters if necessary. EnsureIndex does not modify a previously
1197// existent index with a matching key. The old index must be dropped first instead.
1198//
1199// Once EnsureIndex returns successfully, following requests for the same index
1200// will not contact the server unless Collection.DropIndex is used to drop the
1201// same index, or Session.ResetIndexCache is called.
1202//
1203// For example:
1204//
1205//     index := Index{
1206//         Key: []string{"lastname", "firstname"},
1207//         Unique: true,
1208//         DropDups: true,
1209//         Background: true, // See notes.
1210//         Sparse: true,
1211//     }
1212//     err := collection.EnsureIndex(index)
1213//
1214// The Key value determines which fields compose the index. The index ordering
1215// will be ascending by default.  To obtain an index with a descending order,
1216// the field name should be prefixed by a dash (e.g. []string{"-time"}). It can
1217// also be optionally prefixed by an index kind, as in "$text:summary" or
1218// "$2d:-point". The key string format is:
1219//
1220//     [$<kind>:][-]<field name>
1221//
1222// If the Unique field is true, the index must necessarily contain only a single
1223// document per Key.  With DropDups set to true, documents with the same key
1224// as a previously indexed one will be dropped rather than an error returned.
1225//
1226// If Background is true, other connections will be allowed to proceed using
1227// the collection without the index while it's being built. Note that the
1228// session executing EnsureIndex will be blocked for as long as it takes for
1229// the index to be built.
1230//
1231// If Sparse is true, only documents containing the provided Key fields will be
1232// included in the index.  When using a sparse index for sorting, only indexed
1233// documents will be returned.
1234//
1235// If ExpireAfter is non-zero, the server will periodically scan the collection
1236// and remove documents containing an indexed time.Time field with a value
1237// older than ExpireAfter. See the documentation for details:
1238//
1239//     http://docs.mongodb.org/manual/tutorial/expire-data
1240//
1241// Other kinds of indexes are also supported through that API. Here is an example:
1242//
1243//     index := Index{
1244//         Key: []string{"$2d:loc"},
1245//         Bits: 26,
1246//     }
1247//     err := collection.EnsureIndex(index)
1248//
1249// The example above requests the creation of a "2d" index for the "loc" field.
1250//
1251// The 2D index bounds may be changed using the Min and Max attributes of the
1252// Index value.  The default bound setting of (-180, 180) is suitable for
1253// latitude/longitude pairs.
1254//
1255// The Bits parameter sets the precision of the 2D geohash values.  If not
1256// provided, 26 bits are used, which is roughly equivalent to 1 foot of
1257// precision for the default (-180, 180) index bounds.
1258//
1259// Relevant documentation:
1260//
1261//     http://www.mongodb.org/display/DOCS/Indexes
1262//     http://www.mongodb.org/display/DOCS/Indexing+Advice+and+FAQ
1263//     http://www.mongodb.org/display/DOCS/Indexing+as+a+Background+Operation
1264//     http://www.mongodb.org/display/DOCS/Geospatial+Indexing
1265//     http://www.mongodb.org/display/DOCS/Multikeys
1266//
1267func (c *Collection) EnsureIndex(index Index) error {
1268	keyInfo, err := parseIndexKey(index.Key)
1269	if err != nil {
1270		return err
1271	}
1272
1273	session := c.Database.Session
1274	cacheKey := c.FullName + "\x00" + keyInfo.name
1275	if session.cluster().HasCachedIndex(cacheKey) {
1276		return nil
1277	}
1278
1279	spec := indexSpec{
1280		Name:             keyInfo.name,
1281		NS:               c.FullName,
1282		Key:              keyInfo.key,
1283		Unique:           index.Unique,
1284		DropDups:         index.DropDups,
1285		Background:       index.Background,
1286		Sparse:           index.Sparse,
1287		Bits:             index.Bits,
1288		Min:              index.Minf,
1289		Max:              index.Maxf,
1290		BucketSize:       index.BucketSize,
1291		ExpireAfter:      int(index.ExpireAfter / time.Second),
1292		Weights:          keyInfo.weights,
1293		DefaultLanguage:  index.DefaultLanguage,
1294		LanguageOverride: index.LanguageOverride,
1295		Collation:        index.Collation,
1296	}
1297
1298	if spec.Min == 0 && spec.Max == 0 {
1299		spec.Min = float64(index.Min)
1300		spec.Max = float64(index.Max)
1301	}
1302
1303	if index.Name != "" {
1304		spec.Name = index.Name
1305	}
1306
1307NextField:
1308	for name, weight := range index.Weights {
1309		for i, elem := range spec.Weights {
1310			if elem.Name == name {
1311				spec.Weights[i].Value = weight
1312				continue NextField
1313			}
1314		}
1315		panic("weight provided for field that is not part of index key: " + name)
1316	}
1317
1318	cloned := session.Clone()
1319	defer cloned.Close()
1320	cloned.SetMode(Strong, false)
1321	cloned.EnsureSafe(&Safe{})
1322	db := c.Database.With(cloned)
1323
1324	// Try with a command first.
1325	err = db.Run(bson.D{{"createIndexes", c.Name}, {"indexes", []indexSpec{spec}}}, nil)
1326	if isNoCmd(err) {
1327		// Command not yet supported. Insert into the indexes collection instead.
1328		err = db.C("system.indexes").Insert(&spec)
1329	}
1330	if err == nil {
1331		session.cluster().CacheIndex(cacheKey, true)
1332	}
1333	return err
1334}
1335
1336// DropIndex drops the index with the provided key from the c collection.
1337//
1338// See EnsureIndex for details on the accepted key variants.
1339//
1340// For example:
1341//
1342//     err1 := collection.DropIndex("firstField", "-secondField")
1343//     err2 := collection.DropIndex("customIndexName")
1344//
1345func (c *Collection) DropIndex(key ...string) error {
1346	keyInfo, err := parseIndexKey(key)
1347	if err != nil {
1348		return err
1349	}
1350
1351	session := c.Database.Session
1352	cacheKey := c.FullName + "\x00" + keyInfo.name
1353	session.cluster().CacheIndex(cacheKey, false)
1354
1355	session = session.Clone()
1356	defer session.Close()
1357	session.SetMode(Strong, false)
1358
1359	db := c.Database.With(session)
1360	result := struct {
1361		ErrMsg string
1362		Ok     bool
1363	}{}
1364	err = db.Run(bson.D{{"dropIndexes", c.Name}, {"index", keyInfo.name}}, &result)
1365	if err != nil {
1366		return err
1367	}
1368	if !result.Ok {
1369		return errors.New(result.ErrMsg)
1370	}
1371	return nil
1372}
1373
1374// DropIndexName removes the index with the provided index name.
1375//
1376// For example:
1377//
1378//     err := collection.DropIndex("customIndexName")
1379//
1380func (c *Collection) DropIndexName(name string) error {
1381	session := c.Database.Session
1382
1383	session = session.Clone()
1384	defer session.Close()
1385	session.SetMode(Strong, false)
1386
1387	c = c.With(session)
1388
1389	indexes, err := c.Indexes()
1390	if err != nil {
1391		return err
1392	}
1393
1394	var index Index
1395	for _, idx := range indexes {
1396		if idx.Name == name {
1397			index = idx
1398			break
1399		}
1400	}
1401
1402	if index.Name != "" {
1403		keyInfo, err := parseIndexKey(index.Key)
1404		if err != nil {
1405			return err
1406		}
1407
1408		cacheKey := c.FullName + "\x00" + keyInfo.name
1409		session.cluster().CacheIndex(cacheKey, false)
1410	}
1411
1412	result := struct {
1413		ErrMsg string
1414		Ok     bool
1415	}{}
1416	err = c.Database.Run(bson.D{{"dropIndexes", c.Name}, {"index", name}}, &result)
1417	if err != nil {
1418		return err
1419	}
1420	if !result.Ok {
1421		return errors.New(result.ErrMsg)
1422	}
1423	return nil
1424}
1425
1426// nonEventual returns a clone of session and ensures it is not Eventual.
1427// This guarantees that the server that is used for queries may be reused
1428// afterwards when a cursor is received.
1429func (session *Session) nonEventual() *Session {
1430	cloned := session.Clone()
1431	if cloned.consistency == Eventual {
1432		cloned.SetMode(Monotonic, false)
1433	}
1434	return cloned
1435}
1436
1437// Indexes returns a list of all indexes for the collection.
1438//
1439// For example, this snippet would drop all available indexes:
1440//
1441//   indexes, err := collection.Indexes()
1442//   if err != nil {
1443//       return err
1444//   }
1445//   for _, index := range indexes {
1446//       err = collection.DropIndex(index.Key...)
1447//       if err != nil {
1448//           return err
1449//       }
1450//   }
1451//
1452// See the EnsureIndex method for more details on indexes.
1453func (c *Collection) Indexes() (indexes []Index, err error) {
1454	cloned := c.Database.Session.nonEventual()
1455	defer cloned.Close()
1456
1457	batchSize := int(cloned.queryConfig.op.limit)
1458
1459	// Try with a command.
1460	var result struct {
1461		Indexes []bson.Raw
1462		Cursor  cursorData
1463	}
1464	var iter *Iter
1465	err = c.Database.With(cloned).Run(bson.D{{"listIndexes", c.Name}, {"cursor", bson.D{{"batchSize", batchSize}}}}, &result)
1466	if err == nil {
1467		firstBatch := result.Indexes
1468		if firstBatch == nil {
1469			firstBatch = result.Cursor.FirstBatch
1470		}
1471		ns := strings.SplitN(result.Cursor.NS, ".", 2)
1472		if len(ns) < 2 {
1473			iter = c.With(cloned).NewIter(nil, firstBatch, result.Cursor.Id, nil)
1474		} else {
1475			iter = cloned.DB(ns[0]).C(ns[1]).NewIter(nil, firstBatch, result.Cursor.Id, nil)
1476		}
1477	} else if isNoCmd(err) {
1478		// Command not yet supported. Query the database instead.
1479		iter = c.Database.C("system.indexes").Find(bson.M{"ns": c.FullName}).Iter()
1480	} else {
1481		return nil, err
1482	}
1483
1484	var spec indexSpec
1485	for iter.Next(&spec) {
1486		indexes = append(indexes, indexFromSpec(spec))
1487	}
1488	if err = iter.Close(); err != nil {
1489		return nil, err
1490	}
1491	sort.Sort(indexSlice(indexes))
1492	return indexes, nil
1493}
1494
1495func indexFromSpec(spec indexSpec) Index {
1496	index := Index{
1497		Name:             spec.Name,
1498		Key:              simpleIndexKey(spec.Key),
1499		Unique:           spec.Unique,
1500		DropDups:         spec.DropDups,
1501		Background:       spec.Background,
1502		Sparse:           spec.Sparse,
1503		Minf:             spec.Min,
1504		Maxf:             spec.Max,
1505		Bits:             spec.Bits,
1506		BucketSize:       spec.BucketSize,
1507		DefaultLanguage:  spec.DefaultLanguage,
1508		LanguageOverride: spec.LanguageOverride,
1509		ExpireAfter:      time.Duration(spec.ExpireAfter) * time.Second,
1510		Collation:        spec.Collation,
1511	}
1512	if float64(int(spec.Min)) == spec.Min && float64(int(spec.Max)) == spec.Max {
1513		index.Min = int(spec.Min)
1514		index.Max = int(spec.Max)
1515	}
1516	if spec.TextIndexVersion > 0 {
1517		index.Key = make([]string, len(spec.Weights))
1518		index.Weights = make(map[string]int)
1519		for i, elem := range spec.Weights {
1520			index.Key[i] = "$text:" + elem.Name
1521			if w, ok := elem.Value.(int); ok {
1522				index.Weights[elem.Name] = w
1523			}
1524		}
1525	}
1526	return index
1527}
1528
1529type indexSlice []Index
1530
1531func (idxs indexSlice) Len() int           { return len(idxs) }
1532func (idxs indexSlice) Less(i, j int) bool { return idxs[i].Name < idxs[j].Name }
1533func (idxs indexSlice) Swap(i, j int)      { idxs[i], idxs[j] = idxs[j], idxs[i] }
1534
1535func simpleIndexKey(realKey bson.D) (key []string) {
1536	for i := range realKey {
1537		var vi int
1538		field := realKey[i].Name
1539
1540		switch realKey[i].Value.(type) {
1541		case int64:
1542			vf, _ := realKey[i].Value.(int64)
1543			vi = int(vf)
1544		case float64:
1545			vf, _ := realKey[i].Value.(float64)
1546			vi = int(vf)
1547		case string:
1548			if vs, ok := realKey[i].Value.(string); ok {
1549				key = append(key, "$"+vs+":"+field)
1550				continue
1551			}
1552		case int:
1553			vi = realKey[i].Value.(int)
1554		}
1555
1556		if vi == 1 {
1557			key = append(key, field)
1558			continue
1559		}
1560		if vi == -1 {
1561			key = append(key, "-"+field)
1562			continue
1563		}
1564		panic("Got unknown index key type for field " + field)
1565	}
1566	return
1567}
1568
1569// ResetIndexCache() clears the cache of previously ensured indexes.
1570// Following requests to EnsureIndex will contact the server.
1571func (s *Session) ResetIndexCache() {
1572	s.cluster().ResetIndexCache()
1573}
1574
1575// New creates a new session with the same parameters as the original
1576// session, including consistency, batch size, prefetching, safety mode,
1577// etc. The returned session will use sockets from the pool, so there's
1578// a chance that writes just performed in another session may not yet
1579// be visible.
1580//
1581// Login information from the original session will not be copied over
1582// into the new session unless it was provided through the initial URL
1583// for the Dial function.
1584//
1585// See the Copy and Clone methods.
1586//
1587func (s *Session) New() *Session {
1588	s.m.Lock()
1589	scopy := copySession(s, false)
1590	s.m.Unlock()
1591	scopy.Refresh()
1592	return scopy
1593}
1594
1595// Copy works just like New, but preserves the exact authentication
1596// information from the original session.
1597func (s *Session) Copy() *Session {
1598	s.m.Lock()
1599	scopy := copySession(s, true)
1600	s.m.Unlock()
1601	scopy.Refresh()
1602	return scopy
1603}
1604
1605// Clone works just like Copy, but also reuses the same socket as the original
1606// session, in case it had already reserved one due to its consistency
1607// guarantees.  This behavior ensures that writes performed in the old session
1608// are necessarily observed when using the new session, as long as it was a
1609// strong or monotonic session.  That said, it also means that long operations
1610// may cause other goroutines using the original session to wait.
1611func (s *Session) Clone() *Session {
1612	s.m.Lock()
1613	scopy := copySession(s, true)
1614	s.m.Unlock()
1615	return scopy
1616}
1617
1618// Close terminates the session.  It's a runtime error to use a session
1619// after it has been closed.
1620func (s *Session) Close() {
1621	s.m.Lock()
1622	if s.cluster_ != nil {
1623		debugf("Closing session %p", s)
1624		s.unsetSocket()
1625		s.cluster_.Release()
1626		s.cluster_ = nil
1627	}
1628	s.m.Unlock()
1629}
1630
1631func (s *Session) cluster() *mongoCluster {
1632	if s.cluster_ == nil {
1633		panic("Session already closed")
1634	}
1635	return s.cluster_
1636}
1637
1638// Refresh puts back any reserved sockets in use and restarts the consistency
1639// guarantees according to the current consistency setting for the session.
1640func (s *Session) Refresh() {
1641	s.m.Lock()
1642	s.slaveOk = s.consistency != Strong
1643	s.unsetSocket()
1644	s.m.Unlock()
1645}
1646
1647// SetMode changes the consistency mode for the session.
1648//
1649// The default mode is Strong.
1650//
1651// In the Strong consistency mode reads and writes will always be made to
1652// the primary server using a unique connection so that reads and writes are
1653// fully consistent, ordered, and observing the most up-to-date data.
1654// This offers the least benefits in terms of distributing load, but the
1655// most guarantees.  See also Monotonic and Eventual.
1656//
1657// In the Monotonic consistency mode reads may not be entirely up-to-date,
1658// but they will always see the history of changes moving forward, the data
1659// read will be consistent across sequential queries in the same session,
1660// and modifications made within the session will be observed in following
1661// queries (read-your-writes).
1662//
1663// In practice, the Monotonic mode is obtained by performing initial reads
1664// on a unique connection to an arbitrary secondary, if one is available,
1665// and once the first write happens, the session connection is switched over
1666// to the primary server.  This manages to distribute some of the reading
1667// load with secondaries, while maintaining some useful guarantees.
1668//
1669// In the Eventual consistency mode reads will be made to any secondary in the
1670// cluster, if one is available, and sequential reads will not necessarily
1671// be made with the same connection.  This means that data may be observed
1672// out of order.  Writes will of course be issued to the primary, but
1673// independent writes in the same Eventual session may also be made with
1674// independent connections, so there are also no guarantees in terms of
1675// write ordering (no read-your-writes guarantees either).
1676//
1677// The Eventual mode is the fastest and most resource-friendly, but is
1678// also the one offering the least guarantees about ordering of the data
1679// read and written.
1680//
1681// If refresh is true, in addition to ensuring the session is in the given
1682// consistency mode, the consistency guarantees will also be reset (e.g.
1683// a Monotonic session will be allowed to read from secondaries again).
1684// This is equivalent to calling the Refresh function.
1685//
1686// Shifting between Monotonic and Strong modes will keep a previously
1687// reserved connection for the session unless refresh is true or the
1688// connection is unsuitable (to a secondary server in a Strong session).
1689func (s *Session) SetMode(consistency Mode, refresh bool) {
1690	s.m.Lock()
1691	debugf("Session %p: setting mode %d with refresh=%v (master=%p, slave=%p)", s, consistency, refresh, s.masterSocket, s.slaveSocket)
1692	s.consistency = consistency
1693	if refresh {
1694		s.slaveOk = s.consistency != Strong
1695		s.unsetSocket()
1696	} else if s.consistency == Strong {
1697		s.slaveOk = false
1698	} else if s.masterSocket == nil {
1699		s.slaveOk = true
1700	}
1701	s.m.Unlock()
1702}
1703
1704// Mode returns the current consistency mode for the session.
1705func (s *Session) Mode() Mode {
1706	s.m.RLock()
1707	mode := s.consistency
1708	s.m.RUnlock()
1709	return mode
1710}
1711
1712// SetSyncTimeout sets the amount of time an operation with this session
1713// will wait before returning an error in case a connection to a usable
1714// server can't be established. Set it to zero to wait forever. The
1715// default value is 7 seconds.
1716func (s *Session) SetSyncTimeout(d time.Duration) {
1717	s.m.Lock()
1718	s.syncTimeout = d
1719	s.m.Unlock()
1720}
1721
1722// SetSocketTimeout sets the amount of time to wait for a non-responding
1723// socket to the database before it is forcefully closed.
1724//
1725// The default timeout is 1 minute.
1726func (s *Session) SetSocketTimeout(d time.Duration) {
1727	s.m.Lock()
1728	s.sockTimeout = d
1729	if s.masterSocket != nil {
1730		s.masterSocket.SetTimeout(d)
1731	}
1732	if s.slaveSocket != nil {
1733		s.slaveSocket.SetTimeout(d)
1734	}
1735	s.m.Unlock()
1736}
1737
1738// SetCursorTimeout changes the standard timeout period that the server
1739// enforces on created cursors. The only supported value right now is
1740// 0, which disables the timeout. The standard server timeout is 10 minutes.
1741func (s *Session) SetCursorTimeout(d time.Duration) {
1742	s.m.Lock()
1743	if d == 0 {
1744		s.queryConfig.op.flags |= flagNoCursorTimeout
1745	} else {
1746		panic("SetCursorTimeout: only 0 (disable timeout) supported for now")
1747	}
1748	s.m.Unlock()
1749}
1750
1751// SetPoolLimit sets the maximum number of sockets in use in a single server
1752// before this session will block waiting for a socket to be available.
1753// The default limit is 4096.
1754//
1755// This limit must be set to cover more than any expected workload of the
1756// application. It is a bad practice and an unsupported use case to use the
1757// database driver to define the concurrency limit of an application. Prevent
1758// such concurrency "at the door" instead, by properly restricting the amount
1759// of used resources and number of goroutines before they are created.
1760func (s *Session) SetPoolLimit(limit int) {
1761	s.m.Lock()
1762	s.poolLimit = limit
1763	s.m.Unlock()
1764}
1765
1766// SetBypassValidation sets whether the server should bypass the registered
1767// validation expressions executed when documents are inserted or modified,
1768// in the interest of preserving invariants in the collection being modified.
1769// The default is to not bypass, and thus to perform the validation
1770// expressions registered for modified collections.
1771//
1772// Document validation was introuced in MongoDB 3.2.
1773//
1774// Relevant documentation:
1775//
1776//   https://docs.mongodb.org/manual/release-notes/3.2/#bypass-validation
1777//
1778func (s *Session) SetBypassValidation(bypass bool) {
1779	s.m.Lock()
1780	s.bypassValidation = bypass
1781	s.m.Unlock()
1782}
1783
1784// SetBatch sets the default batch size used when fetching documents from the
1785// database. It's possible to change this setting on a per-query basis as
1786// well, using the Query.Batch method.
1787//
1788// The default batch size is defined by the database itself.  As of this
1789// writing, MongoDB will use an initial size of min(100 docs, 4MB) on the
1790// first batch, and 4MB on remaining ones.
1791func (s *Session) SetBatch(n int) {
1792	if n == 1 {
1793		// Server interprets 1 as -1 and closes the cursor (!?)
1794		n = 2
1795	}
1796	s.m.Lock()
1797	s.queryConfig.op.limit = int32(n)
1798	s.m.Unlock()
1799}
1800
1801// SetPrefetch sets the default point at which the next batch of results will be
1802// requested.  When there are p*batch_size remaining documents cached in an
1803// Iter, the next batch will be requested in background. For instance, when
1804// using this:
1805//
1806//     session.SetBatch(200)
1807//     session.SetPrefetch(0.25)
1808//
1809// and there are only 50 documents cached in the Iter to be processed, the
1810// next batch of 200 will be requested. It's possible to change this setting on
1811// a per-query basis as well, using the Prefetch method of Query.
1812//
1813// The default prefetch value is 0.25.
1814func (s *Session) SetPrefetch(p float64) {
1815	s.m.Lock()
1816	s.queryConfig.prefetch = p
1817	s.m.Unlock()
1818}
1819
1820// See SetSafe for details on the Safe type.
1821type Safe struct {
1822	W        int    // Min # of servers to ack before success
1823	WMode    string // Write mode for MongoDB 2.0+ (e.g. "majority")
1824	WTimeout int    // Milliseconds to wait for W before timing out
1825	FSync    bool   // Sync via the journal if present, or via data files sync otherwise
1826	J        bool   // Sync via the journal if present
1827}
1828
1829// Safe returns the current safety mode for the session.
1830func (s *Session) Safe() (safe *Safe) {
1831	s.m.Lock()
1832	defer s.m.Unlock()
1833	if s.safeOp != nil {
1834		cmd := s.safeOp.query.(*getLastError)
1835		safe = &Safe{WTimeout: cmd.WTimeout, FSync: cmd.FSync, J: cmd.J}
1836		switch w := cmd.W.(type) {
1837		case string:
1838			safe.WMode = w
1839		case int:
1840			safe.W = w
1841		}
1842	}
1843	return
1844}
1845
1846// SetSafe changes the session safety mode.
1847//
1848// If the safe parameter is nil, the session is put in unsafe mode, and writes
1849// become fire-and-forget, without error checking.  The unsafe mode is faster
1850// since operations won't hold on waiting for a confirmation.
1851//
1852// If the safe parameter is not nil, any changing query (insert, update, ...)
1853// will be followed by a getLastError command with the specified parameters,
1854// to ensure the request was correctly processed.
1855//
1856// The default is &Safe{}, meaning check for errors and use the default
1857// behavior for all fields.
1858//
1859// The safe.W parameter determines how many servers should confirm a write
1860// before the operation is considered successful.  If set to 0 or 1, the
1861// command will return as soon as the primary is done with the request.
1862// If safe.WTimeout is greater than zero, it determines how many milliseconds
1863// to wait for the safe.W servers to respond before returning an error.
1864//
1865// Starting with MongoDB 2.0.0 the safe.WMode parameter can be used instead
1866// of W to request for richer semantics. If set to "majority" the server will
1867// wait for a majority of members from the replica set to respond before
1868// returning. Custom modes may also be defined within the server to create
1869// very detailed placement schemas. See the data awareness documentation in
1870// the links below for more details (note that MongoDB internally reuses the
1871// "w" field name for WMode).
1872//
1873// If safe.J is true, servers will block until write operations have been
1874// committed to the journal. Cannot be used in combination with FSync. Prior
1875// to MongoDB 2.6 this option was ignored if the server was running without
1876// journaling. Starting with MongoDB 2.6 write operations will fail with an
1877// exception if this option is used when the server is running without
1878// journaling.
1879//
1880// If safe.FSync is true and the server is running without journaling, blocks
1881// until the server has synced all data files to disk. If the server is running
1882// with journaling, this acts the same as the J option, blocking until write
1883// operations have been committed to the journal. Cannot be used in
1884// combination with J.
1885//
1886// Since MongoDB 2.0.0, the safe.J option can also be used instead of FSync
1887// to force the server to wait for a group commit in case journaling is
1888// enabled. The option has no effect if the server has journaling disabled.
1889//
1890// For example, the following statement will make the session check for
1891// errors, without imposing further constraints:
1892//
1893//     session.SetSafe(&mgo.Safe{})
1894//
1895// The following statement will force the server to wait for a majority of
1896// members of a replica set to return (MongoDB 2.0+ only):
1897//
1898//     session.SetSafe(&mgo.Safe{WMode: "majority"})
1899//
1900// The following statement, on the other hand, ensures that at least two
1901// servers have flushed the change to disk before confirming the success
1902// of operations:
1903//
1904//     session.EnsureSafe(&mgo.Safe{W: 2, FSync: true})
1905//
1906// The following statement, on the other hand, disables the verification
1907// of errors entirely:
1908//
1909//     session.SetSafe(nil)
1910//
1911// See also the EnsureSafe method.
1912//
1913// Relevant documentation:
1914//
1915//     http://www.mongodb.org/display/DOCS/getLastError+Command
1916//     http://www.mongodb.org/display/DOCS/Verifying+Propagation+of+Writes+with+getLastError
1917//     http://www.mongodb.org/display/DOCS/Data+Center+Awareness
1918//
1919func (s *Session) SetSafe(safe *Safe) {
1920	s.m.Lock()
1921	s.safeOp = nil
1922	s.ensureSafe(safe)
1923	s.m.Unlock()
1924}
1925
1926// EnsureSafe compares the provided safety parameters with the ones
1927// currently in use by the session and picks the most conservative
1928// choice for each setting.
1929//
1930// That is:
1931//
1932//     - safe.WMode is always used if set.
1933//     - safe.W is used if larger than the current W and WMode is empty.
1934//     - safe.FSync is always used if true.
1935//     - safe.J is used if FSync is false.
1936//     - safe.WTimeout is used if set and smaller than the current WTimeout.
1937//
1938// For example, the following statement will ensure the session is
1939// at least checking for errors, without enforcing further constraints.
1940// If a more conservative SetSafe or EnsureSafe call was previously done,
1941// the following call will be ignored.
1942//
1943//     session.EnsureSafe(&mgo.Safe{})
1944//
1945// See also the SetSafe method for details on what each option means.
1946//
1947// Relevant documentation:
1948//
1949//     http://www.mongodb.org/display/DOCS/getLastError+Command
1950//     http://www.mongodb.org/display/DOCS/Verifying+Propagation+of+Writes+with+getLastError
1951//     http://www.mongodb.org/display/DOCS/Data+Center+Awareness
1952//
1953func (s *Session) EnsureSafe(safe *Safe) {
1954	s.m.Lock()
1955	s.ensureSafe(safe)
1956	s.m.Unlock()
1957}
1958
1959func (s *Session) ensureSafe(safe *Safe) {
1960	if safe == nil {
1961		return
1962	}
1963
1964	var w interface{}
1965	if safe.WMode != "" {
1966		w = safe.WMode
1967	} else if safe.W > 0 {
1968		w = safe.W
1969	}
1970
1971	var cmd getLastError
1972	if s.safeOp == nil {
1973		cmd = getLastError{1, w, safe.WTimeout, safe.FSync, safe.J}
1974	} else {
1975		// Copy.  We don't want to mutate the existing query.
1976		cmd = *(s.safeOp.query.(*getLastError))
1977		if cmd.W == nil {
1978			cmd.W = w
1979		} else if safe.WMode != "" {
1980			cmd.W = safe.WMode
1981		} else if i, ok := cmd.W.(int); ok && safe.W > i {
1982			cmd.W = safe.W
1983		}
1984		if safe.WTimeout > 0 && safe.WTimeout < cmd.WTimeout {
1985			cmd.WTimeout = safe.WTimeout
1986		}
1987		if safe.FSync {
1988			cmd.FSync = true
1989			cmd.J = false
1990		} else if safe.J && !cmd.FSync {
1991			cmd.J = true
1992		}
1993	}
1994	s.safeOp = &queryOp{
1995		query:      &cmd,
1996		collection: "admin.$cmd",
1997		limit:      -1,
1998	}
1999}
2000
2001// Run issues the provided command on the "admin" database and
2002// and unmarshals its result in the respective argument. The cmd
2003// argument may be either a string with the command name itself, in
2004// which case an empty document of the form bson.M{cmd: 1} will be used,
2005// or it may be a full command document.
2006//
2007// Note that MongoDB considers the first marshalled key as the command
2008// name, so when providing a command with options, it's important to
2009// use an ordering-preserving document, such as a struct value or an
2010// instance of bson.D.  For instance:
2011//
2012//     db.Run(bson.D{{"create", "mycollection"}, {"size", 1024}})
2013//
2014// For commands on arbitrary databases, see the Run method in
2015// the Database type.
2016//
2017// Relevant documentation:
2018//
2019//     http://www.mongodb.org/display/DOCS/Commands
2020//     http://www.mongodb.org/display/DOCS/List+of+Database+CommandSkips
2021//
2022func (s *Session) Run(cmd interface{}, result interface{}) error {
2023	return s.DB("admin").Run(cmd, result)
2024}
2025
2026// SelectServers restricts communication to servers configured with the
2027// given tags. For example, the following statement restricts servers
2028// used for reading operations to those with both tag "disk" set to
2029// "ssd" and tag "rack" set to 1:
2030//
2031//     session.SelectServers(bson.D{{"disk", "ssd"}, {"rack", 1}})
2032//
2033// Multiple sets of tags may be provided, in which case the used server
2034// must match all tags within any one set.
2035//
2036// If a connection was previously assigned to the session due to the
2037// current session mode (see Session.SetMode), the tag selection will
2038// only be enforced after the session is refreshed.
2039//
2040// Relevant documentation:
2041//
2042//     http://docs.mongodb.org/manual/tutorial/configure-replica-set-tag-sets
2043//
2044func (s *Session) SelectServers(tags ...bson.D) {
2045	s.m.Lock()
2046	s.queryConfig.op.serverTags = tags
2047	s.m.Unlock()
2048}
2049
2050// Ping runs a trivial ping command just to get in touch with the server.
2051func (s *Session) Ping() error {
2052	return s.Run("ping", nil)
2053}
2054
2055// Fsync flushes in-memory writes to disk on the server the session
2056// is established with. If async is true, the call returns immediately,
2057// otherwise it returns after the flush has been made.
2058func (s *Session) Fsync(async bool) error {
2059	return s.Run(bson.D{{"fsync", 1}, {"async", async}}, nil)
2060}
2061
2062// FsyncLock locks all writes in the specific server the session is
2063// established with and returns. Any writes attempted to the server
2064// after it is successfully locked will block until FsyncUnlock is
2065// called for the same server.
2066//
2067// This method works on secondaries as well, preventing the oplog from
2068// being flushed while the server is locked, but since only the server
2069// connected to is locked, for locking specific secondaries it may be
2070// necessary to establish a connection directly to the secondary (see
2071// Dial's connect=direct option).
2072//
2073// As an important caveat, note that once a write is attempted and
2074// blocks, follow up reads will block as well due to the way the
2075// lock is internally implemented in the server. More details at:
2076//
2077//     https://jira.mongodb.org/browse/SERVER-4243
2078//
2079// FsyncLock is often used for performing consistent backups of
2080// the database files on disk.
2081//
2082// Relevant documentation:
2083//
2084//     http://www.mongodb.org/display/DOCS/fsync+Command
2085//     http://www.mongodb.org/display/DOCS/Backups
2086//
2087func (s *Session) FsyncLock() error {
2088	return s.Run(bson.D{{"fsync", 1}, {"lock", true}}, nil)
2089}
2090
2091// FsyncUnlock releases the server for writes. See FsyncLock for details.
2092func (s *Session) FsyncUnlock() error {
2093	err := s.Run(bson.D{{"fsyncUnlock", 1}}, nil)
2094	if isNoCmd(err) {
2095		err = s.DB("admin").C("$cmd.sys.unlock").Find(nil).One(nil) // WTF?
2096	}
2097	return err
2098}
2099
2100// Find prepares a query using the provided document.  The document may be a
2101// map or a struct value capable of being marshalled with bson.  The map
2102// may be a generic one using interface{} for its key and/or values, such as
2103// bson.M, or it may be a properly typed map.  Providing nil as the document
2104// is equivalent to providing an empty document such as bson.M{}.
2105//
2106// Further details of the query may be tweaked using the resulting Query value,
2107// and then executed to retrieve results using methods such as One, For,
2108// Iter, or Tail.
2109//
2110// In case the resulting document includes a field named $err or errmsg, which
2111// are standard ways for MongoDB to return query errors, the returned err will
2112// be set to a *QueryError value including the Err message and the Code.  In
2113// those cases, the result argument is still unmarshalled into with the
2114// received document so that any other custom values may be obtained if
2115// desired.
2116//
2117// Relevant documentation:
2118//
2119//     http://www.mongodb.org/display/DOCS/Querying
2120//     http://www.mongodb.org/display/DOCS/Advanced+Queries
2121//
2122func (c *Collection) Find(query interface{}) *Query {
2123	session := c.Database.Session
2124	session.m.RLock()
2125	q := &Query{session: session, query: session.queryConfig}
2126	session.m.RUnlock()
2127	q.op.query = query
2128	q.op.collection = c.FullName
2129	return q
2130}
2131
2132type repairCmd struct {
2133	RepairCursor string           `bson:"repairCursor"`
2134	Cursor       *repairCmdCursor ",omitempty"
2135}
2136
2137type repairCmdCursor struct {
2138	BatchSize int `bson:"batchSize,omitempty"`
2139}
2140
2141// Repair returns an iterator that goes over all recovered documents in the
2142// collection, in a best-effort manner. This is most useful when there are
2143// damaged data files. Multiple copies of the same document may be returned
2144// by the iterator.
2145//
2146// Repair is supported in MongoDB 2.7.8 and later.
2147func (c *Collection) Repair() *Iter {
2148	// Clone session and set it to Monotonic mode so that the server
2149	// used for the query may be safely obtained afterwards, if
2150	// necessary for iteration when a cursor is received.
2151	session := c.Database.Session
2152	cloned := session.nonEventual()
2153	defer cloned.Close()
2154
2155	batchSize := int(cloned.queryConfig.op.limit)
2156
2157	var result struct{ Cursor cursorData }
2158
2159	cmd := repairCmd{
2160		RepairCursor: c.Name,
2161		Cursor:       &repairCmdCursor{batchSize},
2162	}
2163
2164	clonedc := c.With(cloned)
2165	err := clonedc.Database.Run(cmd, &result)
2166	return clonedc.NewIter(session, result.Cursor.FirstBatch, result.Cursor.Id, err)
2167}
2168
2169// FindId is a convenience helper equivalent to:
2170//
2171//     query := collection.Find(bson.M{"_id": id})
2172//
2173// See the Find method for more details.
2174func (c *Collection) FindId(id interface{}) *Query {
2175	return c.Find(bson.D{{"_id", id}})
2176}
2177
2178type Pipe struct {
2179	session    *Session
2180	collection *Collection
2181	pipeline   interface{}
2182	allowDisk  bool
2183	batchSize  int
2184}
2185
2186type pipeCmd struct {
2187	Aggregate string
2188	Pipeline  interface{}
2189	Cursor    *pipeCmdCursor ",omitempty"
2190	Explain   bool           ",omitempty"
2191	AllowDisk bool           "allowDiskUse,omitempty"
2192}
2193
2194type pipeCmdCursor struct {
2195	BatchSize int `bson:"batchSize,omitempty"`
2196}
2197
2198// Pipe prepares a pipeline to aggregate. The pipeline document
2199// must be a slice built in terms of the aggregation framework language.
2200//
2201// For example:
2202//
2203//     pipe := collection.Pipe([]bson.M{{"$match": bson.M{"name": "Otavio"}}})
2204//     iter := pipe.Iter()
2205//
2206// Relevant documentation:
2207//
2208//     http://docs.mongodb.org/manual/reference/aggregation
2209//     http://docs.mongodb.org/manual/applications/aggregation
2210//     http://docs.mongodb.org/manual/tutorial/aggregation-examples
2211//
2212func (c *Collection) Pipe(pipeline interface{}) *Pipe {
2213	session := c.Database.Session
2214	session.m.RLock()
2215	batchSize := int(session.queryConfig.op.limit)
2216	session.m.RUnlock()
2217	return &Pipe{
2218		session:    session,
2219		collection: c,
2220		pipeline:   pipeline,
2221		batchSize:  batchSize,
2222	}
2223}
2224
2225// Iter executes the pipeline and returns an iterator capable of going
2226// over all the generated results.
2227func (p *Pipe) Iter() *Iter {
2228	// Clone session and set it to Monotonic mode so that the server
2229	// used for the query may be safely obtained afterwards, if
2230	// necessary for iteration when a cursor is received.
2231	cloned := p.session.nonEventual()
2232	defer cloned.Close()
2233	c := p.collection.With(cloned)
2234
2235	var result struct {
2236		Result []bson.Raw // 2.4, no cursors.
2237		Cursor cursorData // 2.6+, with cursors.
2238	}
2239
2240	cmd := pipeCmd{
2241		Aggregate: c.Name,
2242		Pipeline:  p.pipeline,
2243		AllowDisk: p.allowDisk,
2244		Cursor:    &pipeCmdCursor{p.batchSize},
2245	}
2246	err := c.Database.Run(cmd, &result)
2247	if e, ok := err.(*QueryError); ok && e.Message == `unrecognized field "cursor` {
2248		cmd.Cursor = nil
2249		cmd.AllowDisk = false
2250		err = c.Database.Run(cmd, &result)
2251	}
2252	firstBatch := result.Result
2253	if firstBatch == nil {
2254		firstBatch = result.Cursor.FirstBatch
2255	}
2256	return c.NewIter(p.session, firstBatch, result.Cursor.Id, err)
2257}
2258
2259// NewIter returns a newly created iterator with the provided parameters.
2260// Using this method is not recommended unless the desired functionality
2261// is not yet exposed via a more convenient interface (Find, Pipe, etc).
2262//
2263// The optional session parameter associates the lifetime of the returned
2264// iterator to an arbitrary session. If nil, the iterator will be bound to
2265// c's session.
2266//
2267// Documents in firstBatch will be individually provided by the returned
2268// iterator before documents from cursorId are made available. If cursorId
2269// is zero, only the documents in firstBatch are provided.
2270//
2271// If err is not nil, the iterator's Err method will report it after
2272// exhausting documents in firstBatch.
2273//
2274// NewIter must be called right after the cursor id is obtained, and must not
2275// be called on a collection in Eventual mode, because the cursor id is
2276// associated with the specific server that returned it. The provided session
2277// parameter may be in any mode or state, though.
2278//
2279func (c *Collection) NewIter(session *Session, firstBatch []bson.Raw, cursorId int64, err error) *Iter {
2280	var server *mongoServer
2281	csession := c.Database.Session
2282	csession.m.RLock()
2283	socket := csession.masterSocket
2284	if socket == nil {
2285		socket = csession.slaveSocket
2286	}
2287	if socket != nil {
2288		server = socket.Server()
2289	}
2290	csession.m.RUnlock()
2291
2292	if server == nil {
2293		if csession.Mode() == Eventual {
2294			panic("Collection.NewIter called in Eventual mode")
2295		}
2296		if err == nil {
2297			err = errors.New("server not available")
2298		}
2299	}
2300
2301	if session == nil {
2302		session = csession
2303	}
2304
2305	iter := &Iter{
2306		session: session,
2307		server:  server,
2308		timeout: -1,
2309		err:     err,
2310	}
2311	iter.gotReply.L = &iter.m
2312	for _, doc := range firstBatch {
2313		iter.docData.Push(doc.Data)
2314	}
2315	if cursorId != 0 {
2316		iter.op.cursorId = cursorId
2317		iter.op.collection = c.FullName
2318		iter.op.replyFunc = iter.replyFunc()
2319	}
2320	return iter
2321}
2322
2323// All works like Iter.All.
2324func (p *Pipe) All(result interface{}) error {
2325	return p.Iter().All(result)
2326}
2327
2328// One executes the pipeline and unmarshals the first item from the
2329// result set into the result parameter.
2330// It returns ErrNotFound if no items are generated by the pipeline.
2331func (p *Pipe) One(result interface{}) error {
2332	iter := p.Iter()
2333	if iter.Next(result) {
2334		return nil
2335	}
2336	if err := iter.Err(); err != nil {
2337		return err
2338	}
2339	return ErrNotFound
2340}
2341
2342// Explain returns a number of details about how the MongoDB server would
2343// execute the requested pipeline, such as the number of objects examined,
2344// the number of times the read lock was yielded to allow writes to go in,
2345// and so on.
2346//
2347// For example:
2348//
2349//     var m bson.M
2350//     err := collection.Pipe(pipeline).Explain(&m)
2351//     if err == nil {
2352//         fmt.Printf("Explain: %#v\n", m)
2353//     }
2354//
2355func (p *Pipe) Explain(result interface{}) error {
2356	c := p.collection
2357	cmd := pipeCmd{
2358		Aggregate: c.Name,
2359		Pipeline:  p.pipeline,
2360		AllowDisk: p.allowDisk,
2361		Explain:   true,
2362	}
2363	return c.Database.Run(cmd, result)
2364}
2365
2366// AllowDiskUse enables writing to the "<dbpath>/_tmp" server directory so
2367// that aggregation pipelines do not have to be held entirely in memory.
2368func (p *Pipe) AllowDiskUse() *Pipe {
2369	p.allowDisk = true
2370	return p
2371}
2372
2373// Batch sets the batch size used when fetching documents from the database.
2374// It's possible to change this setting on a per-session basis as well, using
2375// the Batch method of Session.
2376//
2377// The default batch size is defined by the database server.
2378func (p *Pipe) Batch(n int) *Pipe {
2379	p.batchSize = n
2380	return p
2381}
2382
2383// mgo.v3: Use a single user-visible error type.
2384
2385type LastError struct {
2386	Err             string
2387	Code, N, Waited int
2388	FSyncFiles      int `bson:"fsyncFiles"`
2389	WTimeout        bool
2390	UpdatedExisting bool        `bson:"updatedExisting"`
2391	UpsertedId      interface{} `bson:"upserted"`
2392
2393	modified int
2394	ecases   []BulkErrorCase
2395}
2396
2397func (err *LastError) Error() string {
2398	return err.Err
2399}
2400
2401type queryError struct {
2402	Err           string "$err"
2403	ErrMsg        string
2404	Assertion     string
2405	Code          int
2406	AssertionCode int "assertionCode"
2407}
2408
2409type QueryError struct {
2410	Code      int
2411	Message   string
2412	Assertion bool
2413}
2414
2415func (err *QueryError) Error() string {
2416	return err.Message
2417}
2418
2419// IsDup returns whether err informs of a duplicate key error because
2420// a primary key index or a secondary unique index already has an entry
2421// with the given value.
2422func IsDup(err error) bool {
2423	// Besides being handy, helps with MongoDB bugs SERVER-7164 and SERVER-11493.
2424	// What follows makes me sad. Hopefully conventions will be more clear over time.
2425	switch e := err.(type) {
2426	case *LastError:
2427		return e.Code == 11000 || e.Code == 11001 || e.Code == 12582 || e.Code == 16460 && strings.Contains(e.Err, " E11000 ")
2428	case *QueryError:
2429		return e.Code == 11000 || e.Code == 11001 || e.Code == 12582
2430	case *BulkError:
2431		for _, ecase := range e.ecases {
2432			if !IsDup(ecase.Err) {
2433				return false
2434			}
2435		}
2436		return true
2437	}
2438	return false
2439}
2440
2441// Insert inserts one or more documents in the respective collection.  In
2442// case the session is in safe mode (see the SetSafe method) and an error
2443// happens while inserting the provided documents, the returned error will
2444// be of type *LastError.
2445func (c *Collection) Insert(docs ...interface{}) error {
2446	_, err := c.writeOp(&insertOp{c.FullName, docs, 0}, true)
2447	return err
2448}
2449
2450// Update finds a single document matching the provided selector document
2451// and modifies it according to the update document.
2452// If the session is in safe mode (see SetSafe) a ErrNotFound error is
2453// returned if a document isn't found, or a value of type *LastError
2454// when some other error is detected.
2455//
2456// Relevant documentation:
2457//
2458//     http://www.mongodb.org/display/DOCS/Updating
2459//     http://www.mongodb.org/display/DOCS/Atomic+Operations
2460//
2461func (c *Collection) Update(selector interface{}, update interface{}) error {
2462	if selector == nil {
2463		selector = bson.D{}
2464	}
2465	op := updateOp{
2466		Collection: c.FullName,
2467		Selector:   selector,
2468		Update:     update,
2469	}
2470	lerr, err := c.writeOp(&op, true)
2471	if err == nil && lerr != nil && !lerr.UpdatedExisting {
2472		return ErrNotFound
2473	}
2474	return err
2475}
2476
2477// UpdateId is a convenience helper equivalent to:
2478//
2479//     err := collection.Update(bson.M{"_id": id}, update)
2480//
2481// See the Update method for more details.
2482func (c *Collection) UpdateId(id interface{}, update interface{}) error {
2483	return c.Update(bson.D{{"_id", id}}, update)
2484}
2485
2486// ChangeInfo holds details about the outcome of an update operation.
2487type ChangeInfo struct {
2488	// Updated reports the number of existing documents modified.
2489	// Due to server limitations, this reports the same value as the Matched field when
2490	// talking to MongoDB <= 2.4 and on Upsert and Apply (findAndModify) operations.
2491	Updated    int
2492	Removed    int         // Number of documents removed
2493	Matched    int         // Number of documents matched but not necessarily changed
2494	UpsertedId interface{} // Upserted _id field, when not explicitly provided
2495}
2496
2497// UpdateAll finds all documents matching the provided selector document
2498// and modifies them according to the update document.
2499// If the session is in safe mode (see SetSafe) details of the executed
2500// operation are returned in info or an error of type *LastError when
2501// some problem is detected. It is not an error for the update to not be
2502// applied on any documents because the selector doesn't match.
2503//
2504// Relevant documentation:
2505//
2506//     http://www.mongodb.org/display/DOCS/Updating
2507//     http://www.mongodb.org/display/DOCS/Atomic+Operations
2508//
2509func (c *Collection) UpdateAll(selector interface{}, update interface{}) (info *ChangeInfo, err error) {
2510	if selector == nil {
2511		selector = bson.D{}
2512	}
2513	op := updateOp{
2514		Collection: c.FullName,
2515		Selector:   selector,
2516		Update:     update,
2517		Flags:      2,
2518		Multi:      true,
2519	}
2520	lerr, err := c.writeOp(&op, true)
2521	if err == nil && lerr != nil {
2522		info = &ChangeInfo{Updated: lerr.modified, Matched: lerr.N}
2523	}
2524	return info, err
2525}
2526
2527// Upsert finds a single document matching the provided selector document
2528// and modifies it according to the update document.  If no document matching
2529// the selector is found, the update document is applied to the selector
2530// document and the result is inserted in the collection.
2531// If the session is in safe mode (see SetSafe) details of the executed
2532// operation are returned in info, or an error of type *LastError when
2533// some problem is detected.
2534//
2535// Relevant documentation:
2536//
2537//     http://www.mongodb.org/display/DOCS/Updating
2538//     http://www.mongodb.org/display/DOCS/Atomic+Operations
2539//
2540func (c *Collection) Upsert(selector interface{}, update interface{}) (info *ChangeInfo, err error) {
2541	if selector == nil {
2542		selector = bson.D{}
2543	}
2544	op := updateOp{
2545		Collection: c.FullName,
2546		Selector:   selector,
2547		Update:     update,
2548		Flags:      1,
2549		Upsert:     true,
2550	}
2551	var lerr *LastError
2552	for i := 0; i < maxUpsertRetries; i++ {
2553		lerr, err = c.writeOp(&op, true)
2554		// Retry duplicate key errors on upserts.
2555		// https://docs.mongodb.com/v3.2/reference/method/db.collection.update/#use-unique-indexes
2556		if !IsDup(err) {
2557			break
2558		}
2559	}
2560	if err == nil && lerr != nil {
2561		info = &ChangeInfo{}
2562		if lerr.UpdatedExisting {
2563			info.Matched = lerr.N
2564			info.Updated = lerr.modified
2565		} else {
2566			info.UpsertedId = lerr.UpsertedId
2567		}
2568	}
2569	return info, err
2570}
2571
2572// UpsertId is a convenience helper equivalent to:
2573//
2574//     info, err := collection.Upsert(bson.M{"_id": id}, update)
2575//
2576// See the Upsert method for more details.
2577func (c *Collection) UpsertId(id interface{}, update interface{}) (info *ChangeInfo, err error) {
2578	return c.Upsert(bson.D{{"_id", id}}, update)
2579}
2580
2581// Remove finds a single document matching the provided selector document
2582// and removes it from the database.
2583// If the session is in safe mode (see SetSafe) a ErrNotFound error is
2584// returned if a document isn't found, or a value of type *LastError
2585// when some other error is detected.
2586//
2587// Relevant documentation:
2588//
2589//     http://www.mongodb.org/display/DOCS/Removing
2590//
2591func (c *Collection) Remove(selector interface{}) error {
2592	if selector == nil {
2593		selector = bson.D{}
2594	}
2595	lerr, err := c.writeOp(&deleteOp{c.FullName, selector, 1, 1}, true)
2596	if err == nil && lerr != nil && lerr.N == 0 {
2597		return ErrNotFound
2598	}
2599	return err
2600}
2601
2602// RemoveId is a convenience helper equivalent to:
2603//
2604//     err := collection.Remove(bson.M{"_id": id})
2605//
2606// See the Remove method for more details.
2607func (c *Collection) RemoveId(id interface{}) error {
2608	return c.Remove(bson.D{{"_id", id}})
2609}
2610
2611// RemoveAll finds all documents matching the provided selector document
2612// and removes them from the database.  In case the session is in safe mode
2613// (see the SetSafe method) and an error happens when attempting the change,
2614// the returned error will be of type *LastError.
2615//
2616// Relevant documentation:
2617//
2618//     http://www.mongodb.org/display/DOCS/Removing
2619//
2620func (c *Collection) RemoveAll(selector interface{}) (info *ChangeInfo, err error) {
2621	if selector == nil {
2622		selector = bson.D{}
2623	}
2624	lerr, err := c.writeOp(&deleteOp{c.FullName, selector, 0, 0}, true)
2625	if err == nil && lerr != nil {
2626		info = &ChangeInfo{Removed: lerr.N, Matched: lerr.N}
2627	}
2628	return info, err
2629}
2630
2631// DropDatabase removes the entire database including all of its collections.
2632func (db *Database) DropDatabase() error {
2633	return db.Run(bson.D{{"dropDatabase", 1}}, nil)
2634}
2635
2636// DropCollection removes the entire collection including all of its documents.
2637func (c *Collection) DropCollection() error {
2638	return c.Database.Run(bson.D{{"drop", c.Name}}, nil)
2639}
2640
2641// The CollectionInfo type holds metadata about a collection.
2642//
2643// Relevant documentation:
2644//
2645//     http://www.mongodb.org/display/DOCS/createCollection+Command
2646//     http://www.mongodb.org/display/DOCS/Capped+Collections
2647//
2648type CollectionInfo struct {
2649	// DisableIdIndex prevents the automatic creation of the index
2650	// on the _id field for the collection.
2651	DisableIdIndex bool
2652
2653	// ForceIdIndex enforces the automatic creation of the index
2654	// on the _id field for the collection. Capped collections,
2655	// for example, do not have such an index by default.
2656	ForceIdIndex bool
2657
2658	// If Capped is true new documents will replace old ones when
2659	// the collection is full. MaxBytes must necessarily be set
2660	// to define the size when the collection wraps around.
2661	// MaxDocs optionally defines the number of documents when it
2662	// wraps, but MaxBytes still needs to be set.
2663	Capped   bool
2664	MaxBytes int
2665	MaxDocs  int
2666
2667	// Validator contains a validation expression that defines which
2668	// documents should be considered valid for this collection.
2669	Validator interface{}
2670
2671	// ValidationLevel may be set to "strict" (the default) to force
2672	// MongoDB to validate all documents on inserts and updates, to
2673	// "moderate" to apply the validation rules only to documents
2674	// that already fulfill the validation criteria, or to "off" for
2675	// disabling validation entirely.
2676	ValidationLevel string
2677
2678	// ValidationAction determines how MongoDB handles documents that
2679	// violate the validation rules. It may be set to "error" (the default)
2680	// to reject inserts or updates that violate the rules, or to "warn"
2681	// to log invalid operations but allow them to proceed.
2682	ValidationAction string
2683
2684	// StorageEngine allows specifying collection options for the
2685	// storage engine in use. The map keys must hold the storage engine
2686	// name for which options are being specified.
2687	StorageEngine interface{}
2688}
2689
2690// Create explicitly creates the c collection with details of info.
2691// MongoDB creates collections automatically on use, so this method
2692// is only necessary when creating collection with non-default
2693// characteristics, such as capped collections.
2694//
2695// Relevant documentation:
2696//
2697//     http://www.mongodb.org/display/DOCS/createCollection+Command
2698//     http://www.mongodb.org/display/DOCS/Capped+Collections
2699//
2700func (c *Collection) Create(info *CollectionInfo) error {
2701	cmd := make(bson.D, 0, 4)
2702	cmd = append(cmd, bson.DocElem{"create", c.Name})
2703	if info.Capped {
2704		if info.MaxBytes < 1 {
2705			return fmt.Errorf("Collection.Create: with Capped, MaxBytes must also be set")
2706		}
2707		cmd = append(cmd, bson.DocElem{"capped", true})
2708		cmd = append(cmd, bson.DocElem{"size", info.MaxBytes})
2709		if info.MaxDocs > 0 {
2710			cmd = append(cmd, bson.DocElem{"max", info.MaxDocs})
2711		}
2712	}
2713	if info.DisableIdIndex {
2714		cmd = append(cmd, bson.DocElem{"autoIndexId", false})
2715	}
2716	if info.ForceIdIndex {
2717		cmd = append(cmd, bson.DocElem{"autoIndexId", true})
2718	}
2719	if info.Validator != nil {
2720		cmd = append(cmd, bson.DocElem{"validator", info.Validator})
2721	}
2722	if info.ValidationLevel != "" {
2723		cmd = append(cmd, bson.DocElem{"validationLevel", info.ValidationLevel})
2724	}
2725	if info.ValidationAction != "" {
2726		cmd = append(cmd, bson.DocElem{"validationAction", info.ValidationAction})
2727	}
2728	if info.StorageEngine != nil {
2729		cmd = append(cmd, bson.DocElem{"storageEngine", info.StorageEngine})
2730	}
2731	return c.Database.Run(cmd, nil)
2732}
2733
2734// Batch sets the batch size used when fetching documents from the database.
2735// It's possible to change this setting on a per-session basis as well, using
2736// the Batch method of Session.
2737
2738// The default batch size is defined by the database itself.  As of this
2739// writing, MongoDB will use an initial size of min(100 docs, 4MB) on the
2740// first batch, and 4MB on remaining ones.
2741func (q *Query) Batch(n int) *Query {
2742	if n == 1 {
2743		// Server interprets 1 as -1 and closes the cursor (!?)
2744		n = 2
2745	}
2746	q.m.Lock()
2747	q.op.limit = int32(n)
2748	q.m.Unlock()
2749	return q
2750}
2751
2752// Prefetch sets the point at which the next batch of results will be requested.
2753// When there are p*batch_size remaining documents cached in an Iter, the next
2754// batch will be requested in background. For instance, when using this:
2755//
2756//     query.Batch(200).Prefetch(0.25)
2757//
2758// and there are only 50 documents cached in the Iter to be processed, the
2759// next batch of 200 will be requested. It's possible to change this setting on
2760// a per-session basis as well, using the SetPrefetch method of Session.
2761//
2762// The default prefetch value is 0.25.
2763func (q *Query) Prefetch(p float64) *Query {
2764	q.m.Lock()
2765	q.prefetch = p
2766	q.m.Unlock()
2767	return q
2768}
2769
2770// Skip skips over the n initial documents from the query results.  Note that
2771// this only makes sense with capped collections where documents are naturally
2772// ordered by insertion time, or with sorted results.
2773func (q *Query) Skip(n int) *Query {
2774	q.m.Lock()
2775	q.op.skip = int32(n)
2776	q.m.Unlock()
2777	return q
2778}
2779
2780// Limit restricts the maximum number of documents retrieved to n, and also
2781// changes the batch size to the same value.  Once n documents have been
2782// returned by Next, the following call will return ErrNotFound.
2783func (q *Query) Limit(n int) *Query {
2784	q.m.Lock()
2785	switch {
2786	case n == 1:
2787		q.limit = 1
2788		q.op.limit = -1
2789	case n == math.MinInt32: // -MinInt32 == -MinInt32
2790		q.limit = math.MaxInt32
2791		q.op.limit = math.MinInt32 + 1
2792	case n < 0:
2793		q.limit = int32(-n)
2794		q.op.limit = int32(n)
2795	default:
2796		q.limit = int32(n)
2797		q.op.limit = int32(n)
2798	}
2799	q.m.Unlock()
2800	return q
2801}
2802
2803// Select enables selecting which fields should be retrieved for the results
2804// found. For example, the following query would only retrieve the name field:
2805//
2806//     err := collection.Find(nil).Select(bson.M{"name": 1}).One(&result)
2807//
2808// Relevant documentation:
2809//
2810//     http://www.mongodb.org/display/DOCS/Retrieving+a+Subset+of+Fields
2811//
2812func (q *Query) Select(selector interface{}) *Query {
2813	q.m.Lock()
2814	q.op.selector = selector
2815	q.m.Unlock()
2816	return q
2817}
2818
2819// Sort asks the database to order returned documents according to the
2820// provided field names. A field name may be prefixed by - (minus) for
2821// it to be sorted in reverse order.
2822//
2823// For example:
2824//
2825//     query1 := collection.Find(nil).Sort("firstname", "lastname")
2826//     query2 := collection.Find(nil).Sort("-age")
2827//     query3 := collection.Find(nil).Sort("$natural")
2828//     query4 := collection.Find(nil).Select(bson.M{"score": bson.M{"$meta": "textScore"}}).Sort("$textScore:score")
2829//
2830// Relevant documentation:
2831//
2832//     http://www.mongodb.org/display/DOCS/Sorting+and+Natural+Order
2833//
2834func (q *Query) Sort(fields ...string) *Query {
2835	q.m.Lock()
2836	var order bson.D
2837	for _, field := range fields {
2838		n := 1
2839		var kind string
2840		if field != "" {
2841			if field[0] == '$' {
2842				if c := strings.Index(field, ":"); c > 1 && c < len(field)-1 {
2843					kind = field[1:c]
2844					field = field[c+1:]
2845				}
2846			}
2847			switch field[0] {
2848			case '+':
2849				field = field[1:]
2850			case '-':
2851				n = -1
2852				field = field[1:]
2853			}
2854		}
2855		if field == "" {
2856			panic("Sort: empty field name")
2857		}
2858		if kind == "textScore" {
2859			order = append(order, bson.DocElem{field, bson.M{"$meta": kind}})
2860		} else {
2861			order = append(order, bson.DocElem{field, n})
2862		}
2863	}
2864	q.op.options.OrderBy = order
2865	q.op.hasOptions = true
2866	q.m.Unlock()
2867	return q
2868}
2869
2870// Explain returns a number of details about how the MongoDB server would
2871// execute the requested query, such as the number of objects examined,
2872// the number of times the read lock was yielded to allow writes to go in,
2873// and so on.
2874//
2875// For example:
2876//
2877//     m := bson.M{}
2878//     err := collection.Find(bson.M{"filename": name}).Explain(m)
2879//     if err == nil {
2880//         fmt.Printf("Explain: %#v\n", m)
2881//     }
2882//
2883// Relevant documentation:
2884//
2885//     http://www.mongodb.org/display/DOCS/Optimization
2886//     http://www.mongodb.org/display/DOCS/Query+Optimizer
2887//
2888func (q *Query) Explain(result interface{}) error {
2889	q.m.Lock()
2890	clone := &Query{session: q.session, query: q.query}
2891	q.m.Unlock()
2892	clone.op.options.Explain = true
2893	clone.op.hasOptions = true
2894	if clone.op.limit > 0 {
2895		clone.op.limit = -q.op.limit
2896	}
2897	iter := clone.Iter()
2898	if iter.Next(result) {
2899		return nil
2900	}
2901	return iter.Close()
2902}
2903
2904// TODO: Add Collection.Explain. See https://goo.gl/1MDlvz.
2905
2906// Hint will include an explicit "hint" in the query to force the server
2907// to use a specified index, potentially improving performance in some
2908// situations.  The provided parameters are the fields that compose the
2909// key of the index to be used.  For details on how the indexKey may be
2910// built, see the EnsureIndex method.
2911//
2912// For example:
2913//
2914//     query := collection.Find(bson.M{"firstname": "Joe", "lastname": "Winter"})
2915//     query.Hint("lastname", "firstname")
2916//
2917// Relevant documentation:
2918//
2919//     http://www.mongodb.org/display/DOCS/Optimization
2920//     http://www.mongodb.org/display/DOCS/Query+Optimizer
2921//
2922func (q *Query) Hint(indexKey ...string) *Query {
2923	q.m.Lock()
2924	keyInfo, err := parseIndexKey(indexKey)
2925	q.op.options.Hint = keyInfo.key
2926	q.op.hasOptions = true
2927	q.m.Unlock()
2928	if err != nil {
2929		panic(err)
2930	}
2931	return q
2932}
2933
2934// SetMaxScan constrains the query to stop after scanning the specified
2935// number of documents.
2936//
2937// This modifier is generally used to prevent potentially long running
2938// queries from disrupting performance by scanning through too much data.
2939func (q *Query) SetMaxScan(n int) *Query {
2940	q.m.Lock()
2941	q.op.options.MaxScan = n
2942	q.op.hasOptions = true
2943	q.m.Unlock()
2944	return q
2945}
2946
2947// SetMaxTime constrains the query to stop after running for the specified time.
2948//
2949// When the time limit is reached MongoDB automatically cancels the query.
2950// This can be used to efficiently prevent and identify unexpectedly slow queries.
2951//
2952// A few important notes about the mechanism enforcing this limit:
2953//
2954//  - Requests can block behind locking operations on the server, and that blocking
2955//    time is not accounted for. In other words, the timer starts ticking only after
2956//    the actual start of the query when it initially acquires the appropriate lock;
2957//
2958//  - Operations are interrupted only at interrupt points where an operation can be
2959//    safely aborted – the total execution time may exceed the specified value;
2960//
2961//  - The limit can be applied to both CRUD operations and commands, but not all
2962//    commands are interruptible;
2963//
2964//  - While iterating over results, computing follow up batches is included in the
2965//    total time and the iteration continues until the alloted time is over, but
2966//    network roundtrips are not taken into account for the limit.
2967//
2968//  - This limit does not override the inactive cursor timeout for idle cursors
2969//    (default is 10 min).
2970//
2971// This mechanism was introduced in MongoDB 2.6.
2972//
2973// Relevant documentation:
2974//
2975//   http://blog.mongodb.org/post/83621787773/maxtimems-and-query-optimizer-introspection-in
2976//
2977func (q *Query) SetMaxTime(d time.Duration) *Query {
2978	q.m.Lock()
2979	q.op.options.MaxTimeMS = int(d / time.Millisecond)
2980	q.op.hasOptions = true
2981	q.m.Unlock()
2982	return q
2983}
2984
2985// Snapshot will force the performed query to make use of an available
2986// index on the _id field to prevent the same document from being returned
2987// more than once in a single iteration. This might happen without this
2988// setting in situations when the document changes in size and thus has to
2989// be moved while the iteration is running.
2990//
2991// Because snapshot mode traverses the _id index, it may not be used with
2992// sorting or explicit hints. It also cannot use any other index for the
2993// query.
2994//
2995// Even with snapshot mode, items inserted or deleted during the query may
2996// or may not be returned; that is, this mode is not a true point-in-time
2997// snapshot.
2998//
2999// The same effect of Snapshot may be obtained by using any unique index on
3000// field(s) that will not be modified (best to use Hint explicitly too).
3001// A non-unique index (such as creation time) may be made unique by
3002// appending _id to the index when creating it.
3003//
3004// Relevant documentation:
3005//
3006//     http://www.mongodb.org/display/DOCS/How+to+do+Snapshotted+Queries+in+the+Mongo+Database
3007//
3008func (q *Query) Snapshot() *Query {
3009	q.m.Lock()
3010	q.op.options.Snapshot = true
3011	q.op.hasOptions = true
3012	q.m.Unlock()
3013	return q
3014}
3015
3016// Comment adds a comment to the query to identify it in the database profiler output.
3017//
3018// Relevant documentation:
3019//
3020//     http://docs.mongodb.org/manual/reference/operator/meta/comment
3021//     http://docs.mongodb.org/manual/reference/command/profile
3022//     http://docs.mongodb.org/manual/administration/analyzing-mongodb-performance/#database-profiling
3023//
3024func (q *Query) Comment(comment string) *Query {
3025	q.m.Lock()
3026	q.op.options.Comment = comment
3027	q.op.hasOptions = true
3028	q.m.Unlock()
3029	return q
3030}
3031
3032// LogReplay enables an option that optimizes queries that are typically
3033// made on the MongoDB oplog for replaying it. This is an internal
3034// implementation aspect and most likely uninteresting for other uses.
3035// It has seen at least one use case, though, so it's exposed via the API.
3036func (q *Query) LogReplay() *Query {
3037	q.m.Lock()
3038	q.op.flags |= flagLogReplay
3039	q.m.Unlock()
3040	return q
3041}
3042
3043func checkQueryError(fullname string, d []byte) error {
3044	l := len(d)
3045	if l < 16 {
3046		return nil
3047	}
3048	if d[5] == '$' && d[6] == 'e' && d[7] == 'r' && d[8] == 'r' && d[9] == '\x00' && d[4] == '\x02' {
3049		goto Error
3050	}
3051	if len(fullname) < 5 || fullname[len(fullname)-5:] != ".$cmd" {
3052		return nil
3053	}
3054	for i := 0; i+8 < l; i++ {
3055		if d[i] == '\x02' && d[i+1] == 'e' && d[i+2] == 'r' && d[i+3] == 'r' && d[i+4] == 'm' && d[i+5] == 's' && d[i+6] == 'g' && d[i+7] == '\x00' {
3056			goto Error
3057		}
3058	}
3059	return nil
3060
3061Error:
3062	result := &queryError{}
3063	bson.Unmarshal(d, result)
3064	if result.Err == "" && result.ErrMsg == "" {
3065		return nil
3066	}
3067	if result.AssertionCode != 0 && result.Assertion != "" {
3068		return &QueryError{Code: result.AssertionCode, Message: result.Assertion, Assertion: true}
3069	}
3070	if result.Err != "" {
3071		return &QueryError{Code: result.Code, Message: result.Err}
3072	}
3073	return &QueryError{Code: result.Code, Message: result.ErrMsg}
3074}
3075
3076// One executes the query and unmarshals the first obtained document into the
3077// result argument.  The result must be a struct or map value capable of being
3078// unmarshalled into by gobson.  This function blocks until either a result
3079// is available or an error happens.  For example:
3080//
3081//     err := collection.Find(bson.M{"a": 1}).One(&result)
3082//
3083// In case the resulting document includes a field named $err or errmsg, which
3084// are standard ways for MongoDB to return query errors, the returned err will
3085// be set to a *QueryError value including the Err message and the Code.  In
3086// those cases, the result argument is still unmarshalled into with the
3087// received document so that any other custom values may be obtained if
3088// desired.
3089//
3090func (q *Query) One(result interface{}) (err error) {
3091	q.m.Lock()
3092	session := q.session
3093	op := q.op // Copy.
3094	q.m.Unlock()
3095
3096	socket, err := session.acquireSocket(true)
3097	if err != nil {
3098		return err
3099	}
3100	defer socket.Release()
3101
3102	op.limit = -1
3103
3104	session.prepareQuery(&op)
3105
3106	expectFindReply := prepareFindOp(socket, &op, 1)
3107
3108	data, err := socket.SimpleQuery(&op)
3109	if err != nil {
3110		return err
3111	}
3112	if data == nil {
3113		return ErrNotFound
3114	}
3115	if expectFindReply {
3116		var findReply struct {
3117			Ok     bool
3118			Code   int
3119			Errmsg string
3120			Cursor cursorData
3121		}
3122		err = bson.Unmarshal(data, &findReply)
3123		if err != nil {
3124			return err
3125		}
3126		if !findReply.Ok && findReply.Errmsg != "" {
3127			return &QueryError{Code: findReply.Code, Message: findReply.Errmsg}
3128		}
3129		if len(findReply.Cursor.FirstBatch) == 0 {
3130			return ErrNotFound
3131		}
3132		data = findReply.Cursor.FirstBatch[0].Data
3133	}
3134	if result != nil {
3135		err = bson.Unmarshal(data, result)
3136		if err == nil {
3137			debugf("Query %p document unmarshaled: %#v", q, result)
3138		} else {
3139			debugf("Query %p document unmarshaling failed: %#v", q, err)
3140			return err
3141		}
3142	}
3143	return checkQueryError(op.collection, data)
3144}
3145
3146// prepareFindOp translates op from being an old-style wire protocol query into
3147// a new-style find command if that's supported by the MongoDB server (3.2+).
3148// It returns whether to expect a find command result or not. Note op may be
3149// translated into an explain command, in which case the function returns false.
3150func prepareFindOp(socket *mongoSocket, op *queryOp, limit int32) bool {
3151	if socket.ServerInfo().MaxWireVersion < 4 || op.collection == "admin.$cmd" {
3152		return false
3153	}
3154
3155	nameDot := strings.Index(op.collection, ".")
3156	if nameDot < 0 {
3157		panic("invalid query collection name: " + op.collection)
3158	}
3159
3160	find := findCmd{
3161		Collection:  op.collection[nameDot+1:],
3162		Filter:      op.query,
3163		Projection:  op.selector,
3164		Sort:        op.options.OrderBy,
3165		Skip:        op.skip,
3166		Limit:       limit,
3167		MaxTimeMS:   op.options.MaxTimeMS,
3168		MaxScan:     op.options.MaxScan,
3169		Hint:        op.options.Hint,
3170		Comment:     op.options.Comment,
3171		Snapshot:    op.options.Snapshot,
3172		OplogReplay: op.flags&flagLogReplay != 0,
3173	}
3174	if op.limit < 0 {
3175		find.BatchSize = -op.limit
3176		find.SingleBatch = true
3177	} else {
3178		find.BatchSize = op.limit
3179	}
3180
3181	explain := op.options.Explain
3182
3183	op.collection = op.collection[:nameDot] + ".$cmd"
3184	op.query = &find
3185	op.skip = 0
3186	op.limit = -1
3187	op.options = queryWrapper{}
3188	op.hasOptions = false
3189
3190	if explain {
3191		op.query = bson.D{{"explain", op.query}}
3192		return false
3193	}
3194	return true
3195}
3196
3197type cursorData struct {
3198	FirstBatch []bson.Raw "firstBatch"
3199	NextBatch  []bson.Raw "nextBatch"
3200	NS         string
3201	Id         int64
3202}
3203
3204// findCmd holds the command used for performing queries on MongoDB 3.2+.
3205//
3206// Relevant documentation:
3207//
3208//     https://docs.mongodb.org/master/reference/command/find/#dbcmd.find
3209//
3210type findCmd struct {
3211	Collection          string      `bson:"find"`
3212	Filter              interface{} `bson:"filter,omitempty"`
3213	Sort                interface{} `bson:"sort,omitempty"`
3214	Projection          interface{} `bson:"projection,omitempty"`
3215	Hint                interface{} `bson:"hint,omitempty"`
3216	Skip                interface{} `bson:"skip,omitempty"`
3217	Limit               int32       `bson:"limit,omitempty"`
3218	BatchSize           int32       `bson:"batchSize,omitempty"`
3219	SingleBatch         bool        `bson:"singleBatch,omitempty"`
3220	Comment             string      `bson:"comment,omitempty"`
3221	MaxScan             int         `bson:"maxScan,omitempty"`
3222	MaxTimeMS           int         `bson:"maxTimeMS,omitempty"`
3223	ReadConcern         interface{} `bson:"readConcern,omitempty"`
3224	Max                 interface{} `bson:"max,omitempty"`
3225	Min                 interface{} `bson:"min,omitempty"`
3226	ReturnKey           bool        `bson:"returnKey,omitempty"`
3227	ShowRecordId        bool        `bson:"showRecordId,omitempty"`
3228	Snapshot            bool        `bson:"snapshot,omitempty"`
3229	Tailable            bool        `bson:"tailable,omitempty"`
3230	AwaitData           bool        `bson:"awaitData,omitempty"`
3231	OplogReplay         bool        `bson:"oplogReplay,omitempty"`
3232	NoCursorTimeout     bool        `bson:"noCursorTimeout,omitempty"`
3233	AllowPartialResults bool        `bson:"allowPartialResults,omitempty"`
3234}
3235
3236// getMoreCmd holds the command used for requesting more query results on MongoDB 3.2+.
3237//
3238// Relevant documentation:
3239//
3240//     https://docs.mongodb.org/master/reference/command/getMore/#dbcmd.getMore
3241//
3242type getMoreCmd struct {
3243	CursorId   int64  `bson:"getMore"`
3244	Collection string `bson:"collection"`
3245	BatchSize  int32  `bson:"batchSize,omitempty"`
3246	MaxTimeMS  int64  `bson:"maxTimeMS,omitempty"`
3247}
3248
3249// run duplicates the behavior of collection.Find(query).One(&result)
3250// as performed by Database.Run, specializing the logic for running
3251// database commands on a given socket.
3252func (db *Database) run(socket *mongoSocket, cmd, result interface{}) (err error) {
3253	// Database.Run:
3254	if name, ok := cmd.(string); ok {
3255		cmd = bson.D{{name, 1}}
3256	}
3257
3258	// Collection.Find:
3259	session := db.Session
3260	session.m.RLock()
3261	op := session.queryConfig.op // Copy.
3262	session.m.RUnlock()
3263	op.query = cmd
3264	op.collection = db.Name + ".$cmd"
3265
3266	// Query.One:
3267	session.prepareQuery(&op)
3268	op.limit = -1
3269
3270	data, err := socket.SimpleQuery(&op)
3271	if err != nil {
3272		return err
3273	}
3274	if data == nil {
3275		return ErrNotFound
3276	}
3277	if result != nil {
3278		err = bson.Unmarshal(data, result)
3279		if err != nil {
3280			debugf("Run command unmarshaling failed: %#v", op, err)
3281			return err
3282		}
3283		if globalDebug && globalLogger != nil {
3284			var res bson.M
3285			bson.Unmarshal(data, &res)
3286			debugf("Run command unmarshaled: %#v, result: %#v", op, res)
3287		}
3288	}
3289	return checkQueryError(op.collection, data)
3290}
3291
3292// The DBRef type implements support for the database reference MongoDB
3293// convention as supported by multiple drivers.  This convention enables
3294// cross-referencing documents between collections and databases using
3295// a structure which includes a collection name, a document id, and
3296// optionally a database name.
3297//
3298// See the FindRef methods on Session and on Database.
3299//
3300// Relevant documentation:
3301//
3302//     http://www.mongodb.org/display/DOCS/Database+References
3303//
3304type DBRef struct {
3305	Collection string      `bson:"$ref"`
3306	Id         interface{} `bson:"$id"`
3307	Database   string      `bson:"$db,omitempty"`
3308}
3309
3310// NOTE: Order of fields for DBRef above does matter, per documentation.
3311
3312// FindRef returns a query that looks for the document in the provided
3313// reference. If the reference includes the DB field, the document will
3314// be retrieved from the respective database.
3315//
3316// See also the DBRef type and the FindRef method on Session.
3317//
3318// Relevant documentation:
3319//
3320//     http://www.mongodb.org/display/DOCS/Database+References
3321//
3322func (db *Database) FindRef(ref *DBRef) *Query {
3323	var c *Collection
3324	if ref.Database == "" {
3325		c = db.C(ref.Collection)
3326	} else {
3327		c = db.Session.DB(ref.Database).C(ref.Collection)
3328	}
3329	return c.FindId(ref.Id)
3330}
3331
3332// FindRef returns a query that looks for the document in the provided
3333// reference. For a DBRef to be resolved correctly at the session level
3334// it must necessarily have the optional DB field defined.
3335//
3336// See also the DBRef type and the FindRef method on Database.
3337//
3338// Relevant documentation:
3339//
3340//     http://www.mongodb.org/display/DOCS/Database+References
3341//
3342func (s *Session) FindRef(ref *DBRef) *Query {
3343	if ref.Database == "" {
3344		panic(errors.New(fmt.Sprintf("Can't resolve database for %#v", ref)))
3345	}
3346	c := s.DB(ref.Database).C(ref.Collection)
3347	return c.FindId(ref.Id)
3348}
3349
3350// CollectionNames returns the collection names present in the db database.
3351func (db *Database) CollectionNames() (names []string, err error) {
3352	// Clone session and set it to Monotonic mode so that the server
3353	// used for the query may be safely obtained afterwards, if
3354	// necessary for iteration when a cursor is received.
3355	cloned := db.Session.nonEventual()
3356	defer cloned.Close()
3357
3358	batchSize := int(cloned.queryConfig.op.limit)
3359
3360	// Try with a command.
3361	var result struct {
3362		Collections []bson.Raw
3363		Cursor      cursorData
3364	}
3365	err = db.With(cloned).Run(bson.D{{"listCollections", 1}, {"cursor", bson.D{{"batchSize", batchSize}}}}, &result)
3366	if err == nil {
3367		firstBatch := result.Collections
3368		if firstBatch == nil {
3369			firstBatch = result.Cursor.FirstBatch
3370		}
3371		var iter *Iter
3372		ns := strings.SplitN(result.Cursor.NS, ".", 2)
3373		if len(ns) < 2 {
3374			iter = db.With(cloned).C("").NewIter(nil, firstBatch, result.Cursor.Id, nil)
3375		} else {
3376			iter = cloned.DB(ns[0]).C(ns[1]).NewIter(nil, firstBatch, result.Cursor.Id, nil)
3377		}
3378		var coll struct{ Name string }
3379		for iter.Next(&coll) {
3380			names = append(names, coll.Name)
3381		}
3382		if err := iter.Close(); err != nil {
3383			return nil, err
3384		}
3385		sort.Strings(names)
3386		return names, err
3387	}
3388	if err != nil && !isNoCmd(err) {
3389		return nil, err
3390	}
3391
3392	// Command not yet supported. Query the database instead.
3393	nameIndex := len(db.Name) + 1
3394	iter := db.C("system.namespaces").Find(nil).Iter()
3395	var coll struct{ Name string }
3396	for iter.Next(&coll) {
3397		if strings.Index(coll.Name, "$") < 0 || strings.Index(coll.Name, ".oplog.$") >= 0 {
3398			names = append(names, coll.Name[nameIndex:])
3399		}
3400	}
3401	if err := iter.Close(); err != nil {
3402		return nil, err
3403	}
3404	sort.Strings(names)
3405	return names, nil
3406}
3407
3408type dbNames struct {
3409	Databases []struct {
3410		Name  string
3411		Empty bool
3412	}
3413}
3414
3415// DatabaseNames returns the names of non-empty databases present in the cluster.
3416func (s *Session) DatabaseNames() (names []string, err error) {
3417	var result dbNames
3418	err = s.Run("listDatabases", &result)
3419	if err != nil {
3420		return nil, err
3421	}
3422	for _, db := range result.Databases {
3423		if !db.Empty {
3424			names = append(names, db.Name)
3425		}
3426	}
3427	sort.Strings(names)
3428	return names, nil
3429}
3430
3431// Iter executes the query and returns an iterator capable of going over all
3432// the results. Results will be returned in batches of configurable
3433// size (see the Batch method) and more documents will be requested when a
3434// configurable number of documents is iterated over (see the Prefetch method).
3435func (q *Query) Iter() *Iter {
3436	q.m.Lock()
3437	session := q.session
3438	op := q.op
3439	prefetch := q.prefetch
3440	limit := q.limit
3441	q.m.Unlock()
3442
3443	iter := &Iter{
3444		session:  session,
3445		prefetch: prefetch,
3446		limit:    limit,
3447		timeout:  -1,
3448	}
3449	iter.gotReply.L = &iter.m
3450	iter.op.collection = op.collection
3451	iter.op.limit = op.limit
3452	iter.op.replyFunc = iter.replyFunc()
3453	iter.docsToReceive++
3454
3455	socket, err := session.acquireSocket(true)
3456	if err != nil {
3457		iter.err = err
3458		return iter
3459	}
3460	defer socket.Release()
3461
3462	session.prepareQuery(&op)
3463	op.replyFunc = iter.op.replyFunc
3464
3465	if prepareFindOp(socket, &op, limit) {
3466		iter.findCmd = true
3467	}
3468
3469	iter.server = socket.Server()
3470	err = socket.Query(&op)
3471	if err != nil {
3472		// Must lock as the query is already out and it may call replyFunc.
3473		iter.m.Lock()
3474		iter.err = err
3475		iter.m.Unlock()
3476	}
3477
3478	return iter
3479}
3480
3481// Tail returns a tailable iterator. Unlike a normal iterator, a
3482// tailable iterator may wait for new values to be inserted in the
3483// collection once the end of the current result set is reached,
3484// A tailable iterator may only be used with capped collections.
3485//
3486// The timeout parameter indicates how long Next will block waiting
3487// for a result before timing out.  If set to -1, Next will not
3488// timeout, and will continue waiting for a result for as long as
3489// the cursor is valid and the session is not closed. If set to 0,
3490// Next times out as soon as it reaches the end of the result set.
3491// Otherwise, Next will wait for at least the given number of
3492// seconds for a new document to be available before timing out.
3493//
3494// On timeouts, Next will unblock and return false, and the Timeout
3495// method will return true if called. In these cases, Next may still
3496// be called again on the same iterator to check if a new value is
3497// available at the current cursor position, and again it will block
3498// according to the specified timeoutSecs. If the cursor becomes
3499// invalid, though, both Next and Timeout will return false and
3500// the query must be restarted.
3501//
3502// The following example demonstrates timeout handling and query
3503// restarting:
3504//
3505//    iter := collection.Find(nil).Sort("$natural").Tail(5 * time.Second)
3506//    for {
3507//         for iter.Next(&result) {
3508//             fmt.Println(result.Id)
3509//             lastId = result.Id
3510//         }
3511//         if iter.Err() != nil {
3512//             return iter.Close()
3513//         }
3514//         if iter.Timeout() {
3515//             continue
3516//         }
3517//         query := collection.Find(bson.M{"_id": bson.M{"$gt": lastId}})
3518//         iter = query.Sort("$natural").Tail(5 * time.Second)
3519//    }
3520//    iter.Close()
3521//
3522// Relevant documentation:
3523//
3524//     http://www.mongodb.org/display/DOCS/Tailable+Cursors
3525//     http://www.mongodb.org/display/DOCS/Capped+Collections
3526//     http://www.mongodb.org/display/DOCS/Sorting+and+Natural+Order
3527//
3528func (q *Query) Tail(timeout time.Duration) *Iter {
3529	q.m.Lock()
3530	session := q.session
3531	op := q.op
3532	prefetch := q.prefetch
3533	q.m.Unlock()
3534
3535	iter := &Iter{session: session, prefetch: prefetch}
3536	iter.gotReply.L = &iter.m
3537	iter.timeout = timeout
3538	iter.op.collection = op.collection
3539	iter.op.limit = op.limit
3540	iter.op.replyFunc = iter.replyFunc()
3541	iter.docsToReceive++
3542	session.prepareQuery(&op)
3543	op.replyFunc = iter.op.replyFunc
3544	op.flags |= flagTailable | flagAwaitData
3545
3546	socket, err := session.acquireSocket(true)
3547	if err != nil {
3548		iter.err = err
3549	} else {
3550		iter.server = socket.Server()
3551		err = socket.Query(&op)
3552		if err != nil {
3553			// Must lock as the query is already out and it may call replyFunc.
3554			iter.m.Lock()
3555			iter.err = err
3556			iter.m.Unlock()
3557		}
3558		socket.Release()
3559	}
3560	return iter
3561}
3562
3563func (s *Session) prepareQuery(op *queryOp) {
3564	s.m.RLock()
3565	op.mode = s.consistency
3566	if s.slaveOk {
3567		op.flags |= flagSlaveOk
3568	}
3569	s.m.RUnlock()
3570	return
3571}
3572
3573// Err returns nil if no errors happened during iteration, or the actual
3574// error otherwise.
3575//
3576// In case a resulting document included a field named $err or errmsg, which are
3577// standard ways for MongoDB to report an improper query, the returned value has
3578// a *QueryError type, and includes the Err message and the Code.
3579func (iter *Iter) Err() error {
3580	iter.m.Lock()
3581	err := iter.err
3582	iter.m.Unlock()
3583	if err == ErrNotFound {
3584		return nil
3585	}
3586	return err
3587}
3588
3589// Close kills the server cursor used by the iterator, if any, and returns
3590// nil if no errors happened during iteration, or the actual error otherwise.
3591//
3592// Server cursors are automatically closed at the end of an iteration, which
3593// means close will do nothing unless the iteration was interrupted before
3594// the server finished sending results to the driver. If Close is not called
3595// in such a situation, the cursor will remain available at the server until
3596// the default cursor timeout period is reached. No further problems arise.
3597//
3598// Close is idempotent. That means it can be called repeatedly and will
3599// return the same result every time.
3600//
3601// In case a resulting document included a field named $err or errmsg, which are
3602// standard ways for MongoDB to report an improper query, the returned value has
3603// a *QueryError type.
3604func (iter *Iter) Close() error {
3605	iter.m.Lock()
3606	cursorId := iter.op.cursorId
3607	iter.op.cursorId = 0
3608	err := iter.err
3609	iter.m.Unlock()
3610	if cursorId == 0 {
3611		if err == ErrNotFound {
3612			return nil
3613		}
3614		return err
3615	}
3616	socket, err := iter.acquireSocket()
3617	if err == nil {
3618		// TODO Batch kills.
3619		err = socket.Query(&killCursorsOp{[]int64{cursorId}})
3620		socket.Release()
3621	}
3622
3623	iter.m.Lock()
3624	if err != nil && (iter.err == nil || iter.err == ErrNotFound) {
3625		iter.err = err
3626	} else if iter.err != ErrNotFound {
3627		err = iter.err
3628	}
3629	iter.m.Unlock()
3630	return err
3631}
3632
3633// Done returns true only if a follow up Next call is guaranteed
3634// to return false.
3635//
3636// For an iterator created with Tail, Done may return false for
3637// an iterator that has no more data. Otherwise it's guaranteed
3638// to return false only if there is data or an error happened.
3639//
3640// Done may block waiting for a pending query to verify whether
3641// more data is actually available or not.
3642func (iter *Iter) Done() bool {
3643	iter.m.Lock()
3644	defer iter.m.Unlock()
3645
3646	for {
3647		if iter.docData.Len() > 0 {
3648			return false
3649		}
3650		if iter.docsToReceive > 1 {
3651			return true
3652		}
3653		if iter.docsToReceive > 0 {
3654			iter.gotReply.Wait()
3655			continue
3656		}
3657		return iter.op.cursorId == 0
3658	}
3659}
3660
3661// Timeout returns true if Next returned false due to a timeout of
3662// a tailable cursor. In those cases, Next may be called again to continue
3663// the iteration at the previous cursor position.
3664func (iter *Iter) Timeout() bool {
3665	iter.m.Lock()
3666	result := iter.timedout
3667	iter.m.Unlock()
3668	return result
3669}
3670
3671// Next retrieves the next document from the result set, blocking if necessary.
3672// This method will also automatically retrieve another batch of documents from
3673// the server when the current one is exhausted, or before that in background
3674// if pre-fetching is enabled (see the Query.Prefetch and Session.SetPrefetch
3675// methods).
3676//
3677// Next returns true if a document was successfully unmarshalled onto result,
3678// and false at the end of the result set or if an error happened.
3679// When Next returns false, the Err method should be called to verify if
3680// there was an error during iteration.
3681//
3682// For example:
3683//
3684//    iter := collection.Find(nil).Iter()
3685//    for iter.Next(&result) {
3686//        fmt.Printf("Result: %v\n", result.Id)
3687//    }
3688//    if err := iter.Close(); err != nil {
3689//        return err
3690//    }
3691//
3692func (iter *Iter) Next(result interface{}) bool {
3693	iter.m.Lock()
3694	iter.timedout = false
3695	timeout := time.Time{}
3696	for iter.err == nil && iter.docData.Len() == 0 && (iter.docsToReceive > 0 || iter.op.cursorId != 0) {
3697		if iter.docsToReceive == 0 {
3698			if iter.timeout >= 0 {
3699				if timeout.IsZero() {
3700					timeout = time.Now().Add(iter.timeout)
3701				}
3702				if time.Now().After(timeout) {
3703					iter.timedout = true
3704					iter.m.Unlock()
3705					return false
3706				}
3707			}
3708			iter.getMore()
3709			if iter.err != nil {
3710				break
3711			}
3712		}
3713		iter.gotReply.Wait()
3714	}
3715
3716	// Exhaust available data before reporting any errors.
3717	if docData, ok := iter.docData.Pop().([]byte); ok {
3718		close := false
3719		if iter.limit > 0 {
3720			iter.limit--
3721			if iter.limit == 0 {
3722				if iter.docData.Len() > 0 {
3723					iter.m.Unlock()
3724					panic(fmt.Errorf("data remains after limit exhausted: %d", iter.docData.Len()))
3725				}
3726				iter.err = ErrNotFound
3727				close = true
3728			}
3729		}
3730		if iter.op.cursorId != 0 && iter.err == nil {
3731			iter.docsBeforeMore--
3732			if iter.docsBeforeMore == -1 {
3733				iter.getMore()
3734			}
3735		}
3736		iter.m.Unlock()
3737
3738		if close {
3739			iter.Close()
3740		}
3741		err := bson.Unmarshal(docData, result)
3742		if err != nil {
3743			debugf("Iter %p document unmarshaling failed: %#v", iter, err)
3744			iter.m.Lock()
3745			if iter.err == nil {
3746				iter.err = err
3747			}
3748			iter.m.Unlock()
3749			return false
3750		}
3751		debugf("Iter %p document unmarshaled: %#v", iter, result)
3752		// XXX Only have to check first document for a query error?
3753		err = checkQueryError(iter.op.collection, docData)
3754		if err != nil {
3755			iter.m.Lock()
3756			if iter.err == nil {
3757				iter.err = err
3758			}
3759			iter.m.Unlock()
3760			return false
3761		}
3762		return true
3763	} else if iter.err != nil {
3764		debugf("Iter %p returning false: %s", iter, iter.err)
3765		iter.m.Unlock()
3766		return false
3767	} else if iter.op.cursorId == 0 {
3768		iter.err = ErrNotFound
3769		debugf("Iter %p exhausted with cursor=0", iter)
3770		iter.m.Unlock()
3771		return false
3772	}
3773
3774	panic("unreachable")
3775}
3776
3777// All retrieves all documents from the result set into the provided slice
3778// and closes the iterator.
3779//
3780// The result argument must necessarily be the address for a slice. The slice
3781// may be nil or previously allocated.
3782//
3783// WARNING: Obviously, All must not be used with result sets that may be
3784// potentially large, since it may consume all memory until the system
3785// crashes. Consider building the query with a Limit clause to ensure the
3786// result size is bounded.
3787//
3788// For instance:
3789//
3790//    var result []struct{ Value int }
3791//    iter := collection.Find(nil).Limit(100).Iter()
3792//    err := iter.All(&result)
3793//    if err != nil {
3794//        return err
3795//    }
3796//
3797func (iter *Iter) All(result interface{}) error {
3798	resultv := reflect.ValueOf(result)
3799	if resultv.Kind() != reflect.Ptr || resultv.Elem().Kind() != reflect.Slice {
3800		panic("result argument must be a slice address")
3801	}
3802	slicev := resultv.Elem()
3803	slicev = slicev.Slice(0, slicev.Cap())
3804	elemt := slicev.Type().Elem()
3805	i := 0
3806	for {
3807		if slicev.Len() == i {
3808			elemp := reflect.New(elemt)
3809			if !iter.Next(elemp.Interface()) {
3810				break
3811			}
3812			slicev = reflect.Append(slicev, elemp.Elem())
3813			slicev = slicev.Slice(0, slicev.Cap())
3814		} else {
3815			if !iter.Next(slicev.Index(i).Addr().Interface()) {
3816				break
3817			}
3818		}
3819		i++
3820	}
3821	resultv.Elem().Set(slicev.Slice(0, i))
3822	return iter.Close()
3823}
3824
3825// All works like Iter.All.
3826func (q *Query) All(result interface{}) error {
3827	return q.Iter().All(result)
3828}
3829
3830// The For method is obsolete and will be removed in a future release.
3831// See Iter as an elegant replacement.
3832func (q *Query) For(result interface{}, f func() error) error {
3833	return q.Iter().For(result, f)
3834}
3835
3836// The For method is obsolete and will be removed in a future release.
3837// See Iter as an elegant replacement.
3838func (iter *Iter) For(result interface{}, f func() error) (err error) {
3839	valid := false
3840	v := reflect.ValueOf(result)
3841	if v.Kind() == reflect.Ptr {
3842		v = v.Elem()
3843		switch v.Kind() {
3844		case reflect.Map, reflect.Ptr, reflect.Interface, reflect.Slice:
3845			valid = v.IsNil()
3846		}
3847	}
3848	if !valid {
3849		panic("For needs a pointer to nil reference value.  See the documentation.")
3850	}
3851	zero := reflect.Zero(v.Type())
3852	for {
3853		v.Set(zero)
3854		if !iter.Next(result) {
3855			break
3856		}
3857		err = f()
3858		if err != nil {
3859			return err
3860		}
3861	}
3862	return iter.Err()
3863}
3864
3865// acquireSocket acquires a socket from the same server that the iterator
3866// cursor was obtained from.
3867//
3868// WARNING: This method must not be called with iter.m locked. Acquiring the
3869// socket depends on the cluster sync loop, and the cluster sync loop might
3870// attempt actions which cause replyFunc to be called, inducing a deadlock.
3871func (iter *Iter) acquireSocket() (*mongoSocket, error) {
3872	socket, err := iter.session.acquireSocket(true)
3873	if err != nil {
3874		return nil, err
3875	}
3876	if socket.Server() != iter.server {
3877		// Socket server changed during iteration. This may happen
3878		// with Eventual sessions, if a Refresh is done, or if a
3879		// monotonic session gets a write and shifts from secondary
3880		// to primary. Our cursor is in a specific server, though.
3881		iter.session.m.Lock()
3882		sockTimeout := iter.session.sockTimeout
3883		iter.session.m.Unlock()
3884		socket.Release()
3885		socket, _, err = iter.server.AcquireSocket(0, sockTimeout)
3886		if err != nil {
3887			return nil, err
3888		}
3889		err := iter.session.socketLogin(socket)
3890		if err != nil {
3891			socket.Release()
3892			return nil, err
3893		}
3894	}
3895	return socket, nil
3896}
3897
3898func (iter *Iter) getMore() {
3899	// Increment now so that unlocking the iterator won't cause a
3900	// different goroutine to get here as well.
3901	iter.docsToReceive++
3902	iter.m.Unlock()
3903	socket, err := iter.acquireSocket()
3904	iter.m.Lock()
3905	if err != nil {
3906		iter.err = err
3907		return
3908	}
3909	defer socket.Release()
3910
3911	debugf("Iter %p requesting more documents", iter)
3912	if iter.limit > 0 {
3913		// The -1 below accounts for the fact docsToReceive was incremented above.
3914		limit := iter.limit - int32(iter.docsToReceive-1) - int32(iter.docData.Len())
3915		if limit < iter.op.limit {
3916			iter.op.limit = limit
3917		}
3918	}
3919	var op interface{}
3920	if iter.findCmd {
3921		op = iter.getMoreCmd()
3922	} else {
3923		op = &iter.op
3924	}
3925	if err := socket.Query(op); err != nil {
3926		iter.docsToReceive--
3927		iter.err = err
3928	}
3929}
3930
3931func (iter *Iter) getMoreCmd() *queryOp {
3932	// TODO: Define the query statically in the Iter type, next to getMoreOp.
3933	nameDot := strings.Index(iter.op.collection, ".")
3934	if nameDot < 0 {
3935		panic("invalid query collection name: " + iter.op.collection)
3936	}
3937
3938	getMore := getMoreCmd{
3939		CursorId:   iter.op.cursorId,
3940		Collection: iter.op.collection[nameDot+1:],
3941		BatchSize:  iter.op.limit,
3942	}
3943
3944	var op queryOp
3945	op.collection = iter.op.collection[:nameDot] + ".$cmd"
3946	op.query = &getMore
3947	op.limit = -1
3948	op.replyFunc = iter.op.replyFunc
3949	return &op
3950}
3951
3952type countCmd struct {
3953	Count string
3954	Query interface{}
3955	Limit int32 ",omitempty"
3956	Skip  int32 ",omitempty"
3957}
3958
3959// Count returns the total number of documents in the result set.
3960func (q *Query) Count() (n int, err error) {
3961	q.m.Lock()
3962	session := q.session
3963	op := q.op
3964	limit := q.limit
3965	q.m.Unlock()
3966
3967	c := strings.Index(op.collection, ".")
3968	if c < 0 {
3969		return 0, errors.New("Bad collection name: " + op.collection)
3970	}
3971
3972	dbname := op.collection[:c]
3973	cname := op.collection[c+1:]
3974	query := op.query
3975	if query == nil {
3976		query = bson.D{}
3977	}
3978	result := struct{ N int }{}
3979	err = session.DB(dbname).Run(countCmd{cname, query, limit, op.skip}, &result)
3980	return result.N, err
3981}
3982
3983// Count returns the total number of documents in the collection.
3984func (c *Collection) Count() (n int, err error) {
3985	return c.Find(nil).Count()
3986}
3987
3988type distinctCmd struct {
3989	Collection string "distinct"
3990	Key        string
3991	Query      interface{} ",omitempty"
3992}
3993
3994// Distinct unmarshals into result the list of distinct values for the given key.
3995//
3996// For example:
3997//
3998//     var result []int
3999//     err := collection.Find(bson.M{"gender": "F"}).Distinct("age", &result)
4000//
4001// Relevant documentation:
4002//
4003//     http://www.mongodb.org/display/DOCS/Aggregation
4004//
4005func (q *Query) Distinct(key string, result interface{}) error {
4006	q.m.Lock()
4007	session := q.session
4008	op := q.op // Copy.
4009	q.m.Unlock()
4010
4011	c := strings.Index(op.collection, ".")
4012	if c < 0 {
4013		return errors.New("Bad collection name: " + op.collection)
4014	}
4015
4016	dbname := op.collection[:c]
4017	cname := op.collection[c+1:]
4018
4019	var doc struct{ Values bson.Raw }
4020	err := session.DB(dbname).Run(distinctCmd{cname, key, op.query}, &doc)
4021	if err != nil {
4022		return err
4023	}
4024	return doc.Values.Unmarshal(result)
4025}
4026
4027type mapReduceCmd struct {
4028	Collection string "mapreduce"
4029	Map        string ",omitempty"
4030	Reduce     string ",omitempty"
4031	Finalize   string ",omitempty"
4032	Limit      int32  ",omitempty"
4033	Out        interface{}
4034	Query      interface{} ",omitempty"
4035	Sort       interface{} ",omitempty"
4036	Scope      interface{} ",omitempty"
4037	Verbose    bool        ",omitempty"
4038}
4039
4040type mapReduceResult struct {
4041	Results    bson.Raw
4042	Result     bson.Raw
4043	TimeMillis int64 "timeMillis"
4044	Counts     struct{ Input, Emit, Output int }
4045	Ok         bool
4046	Err        string
4047	Timing     *MapReduceTime
4048}
4049
4050type MapReduce struct {
4051	Map      string      // Map Javascript function code (required)
4052	Reduce   string      // Reduce Javascript function code (required)
4053	Finalize string      // Finalize Javascript function code (optional)
4054	Out      interface{} // Output collection name or document. If nil, results are inlined into the result parameter.
4055	Scope    interface{} // Optional global scope for Javascript functions
4056	Verbose  bool
4057}
4058
4059type MapReduceInfo struct {
4060	InputCount  int            // Number of documents mapped
4061	EmitCount   int            // Number of times reduce called emit
4062	OutputCount int            // Number of documents in resulting collection
4063	Database    string         // Output database, if results are not inlined
4064	Collection  string         // Output collection, if results are not inlined
4065	Time        int64          // Time to run the job, in nanoseconds
4066	VerboseTime *MapReduceTime // Only defined if Verbose was true
4067}
4068
4069type MapReduceTime struct {
4070	Total    int64 // Total time, in nanoseconds
4071	Map      int64 "mapTime"  // Time within map function, in nanoseconds
4072	EmitLoop int64 "emitLoop" // Time within the emit/map loop, in nanoseconds
4073}
4074
4075// MapReduce executes a map/reduce job for documents covered by the query.
4076// That kind of job is suitable for very flexible bulk aggregation of data
4077// performed at the server side via Javascript functions.
4078//
4079// Results from the job may be returned as a result of the query itself
4080// through the result parameter in case they'll certainly fit in memory
4081// and in a single document.  If there's the possibility that the amount
4082// of data might be too large, results must be stored back in an alternative
4083// collection or even a separate database, by setting the Out field of the
4084// provided MapReduce job.  In that case, provide nil as the result parameter.
4085//
4086// These are some of the ways to set Out:
4087//
4088//     nil
4089//         Inline results into the result parameter.
4090//
4091//     bson.M{"replace": "mycollection"}
4092//         The output will be inserted into a collection which replaces any
4093//         existing collection with the same name.
4094//
4095//     bson.M{"merge": "mycollection"}
4096//         This option will merge new data into the old output collection. In
4097//         other words, if the same key exists in both the result set and the
4098//         old collection, the new key will overwrite the old one.
4099//
4100//     bson.M{"reduce": "mycollection"}
4101//         If documents exist for a given key in the result set and in the old
4102//         collection, then a reduce operation (using the specified reduce
4103//         function) will be performed on the two values and the result will be
4104//         written to the output collection. If a finalize function was
4105//         provided, this will be run after the reduce as well.
4106//
4107//     bson.M{...., "db": "mydb"}
4108//         Any of the above options can have the "db" key included for doing
4109//         the respective action in a separate database.
4110//
4111// The following is a trivial example which will count the number of
4112// occurrences of a field named n on each document in a collection, and
4113// will return results inline:
4114//
4115//     job := &mgo.MapReduce{
4116//             Map:      "function() { emit(this.n, 1) }",
4117//             Reduce:   "function(key, values) { return Array.sum(values) }",
4118//     }
4119//     var result []struct { Id int "_id"; Value int }
4120//     _, err := collection.Find(nil).MapReduce(job, &result)
4121//     if err != nil {
4122//         return err
4123//     }
4124//     for _, item := range result {
4125//         fmt.Println(item.Value)
4126//     }
4127//
4128// This function is compatible with MongoDB 1.7.4+.
4129//
4130// Relevant documentation:
4131//
4132//     http://www.mongodb.org/display/DOCS/MapReduce
4133//
4134func (q *Query) MapReduce(job *MapReduce, result interface{}) (info *MapReduceInfo, err error) {
4135	q.m.Lock()
4136	session := q.session
4137	op := q.op // Copy.
4138	limit := q.limit
4139	q.m.Unlock()
4140
4141	c := strings.Index(op.collection, ".")
4142	if c < 0 {
4143		return nil, errors.New("Bad collection name: " + op.collection)
4144	}
4145
4146	dbname := op.collection[:c]
4147	cname := op.collection[c+1:]
4148
4149	cmd := mapReduceCmd{
4150		Collection: cname,
4151		Map:        job.Map,
4152		Reduce:     job.Reduce,
4153		Finalize:   job.Finalize,
4154		Out:        fixMROut(job.Out),
4155		Scope:      job.Scope,
4156		Verbose:    job.Verbose,
4157		Query:      op.query,
4158		Sort:       op.options.OrderBy,
4159		Limit:      limit,
4160	}
4161
4162	if cmd.Out == nil {
4163		cmd.Out = bson.D{{"inline", 1}}
4164	}
4165
4166	var doc mapReduceResult
4167	err = session.DB(dbname).Run(&cmd, &doc)
4168	if err != nil {
4169		return nil, err
4170	}
4171	if doc.Err != "" {
4172		return nil, errors.New(doc.Err)
4173	}
4174
4175	info = &MapReduceInfo{
4176		InputCount:  doc.Counts.Input,
4177		EmitCount:   doc.Counts.Emit,
4178		OutputCount: doc.Counts.Output,
4179		Time:        doc.TimeMillis * 1e6,
4180	}
4181
4182	if doc.Result.Kind == 0x02 {
4183		err = doc.Result.Unmarshal(&info.Collection)
4184		info.Database = dbname
4185	} else if doc.Result.Kind == 0x03 {
4186		var v struct{ Collection, Db string }
4187		err = doc.Result.Unmarshal(&v)
4188		info.Collection = v.Collection
4189		info.Database = v.Db
4190	}
4191
4192	if doc.Timing != nil {
4193		info.VerboseTime = doc.Timing
4194		info.VerboseTime.Total *= 1e6
4195		info.VerboseTime.Map *= 1e6
4196		info.VerboseTime.EmitLoop *= 1e6
4197	}
4198
4199	if err != nil {
4200		return nil, err
4201	}
4202	if result != nil {
4203		return info, doc.Results.Unmarshal(result)
4204	}
4205	return info, nil
4206}
4207
4208// The "out" option in the MapReduce command must be ordered. This was
4209// found after the implementation was accepting maps for a long time,
4210// so rather than breaking the API, we'll fix the order if necessary.
4211// Details about the order requirement may be seen in MongoDB's code:
4212//
4213//     http://goo.gl/L8jwJX
4214//
4215func fixMROut(out interface{}) interface{} {
4216	outv := reflect.ValueOf(out)
4217	if outv.Kind() != reflect.Map || outv.Type().Key() != reflect.TypeOf("") {
4218		return out
4219	}
4220	outs := make(bson.D, outv.Len())
4221
4222	outTypeIndex := -1
4223	for i, k := range outv.MapKeys() {
4224		ks := k.String()
4225		outs[i].Name = ks
4226		outs[i].Value = outv.MapIndex(k).Interface()
4227		switch ks {
4228		case "normal", "replace", "merge", "reduce", "inline":
4229			outTypeIndex = i
4230		}
4231	}
4232	if outTypeIndex > 0 {
4233		outs[0], outs[outTypeIndex] = outs[outTypeIndex], outs[0]
4234	}
4235	return outs
4236}
4237
4238// Change holds fields for running a findAndModify MongoDB command via
4239// the Query.Apply method.
4240type Change struct {
4241	Update    interface{} // The update document
4242	Upsert    bool        // Whether to insert in case the document isn't found
4243	Remove    bool        // Whether to remove the document found rather than updating
4244	ReturnNew bool        // Should the modified document be returned rather than the old one
4245}
4246
4247type findModifyCmd struct {
4248	Collection                  string      "findAndModify"
4249	Query, Update, Sort, Fields interface{} ",omitempty"
4250	Upsert, Remove, New         bool        ",omitempty"
4251}
4252
4253type valueResult struct {
4254	Value     bson.Raw
4255	LastError LastError "lastErrorObject"
4256}
4257
4258// Apply runs the findAndModify MongoDB command, which allows updating, upserting
4259// or removing a document matching a query and atomically returning either the old
4260// version (the default) or the new version of the document (when ReturnNew is true).
4261// If no objects are found Apply returns ErrNotFound.
4262//
4263// The Sort and Select query methods affect the result of Apply.  In case
4264// multiple documents match the query, Sort enables selecting which document to
4265// act upon by ordering it first.  Select enables retrieving only a selection
4266// of fields of the new or old document.
4267//
4268// This simple example increments a counter and prints its new value:
4269//
4270//     change := mgo.Change{
4271//             Update: bson.M{"$inc": bson.M{"n": 1}},
4272//             ReturnNew: true,
4273//     }
4274//     info, err = col.Find(M{"_id": id}).Apply(change, &doc)
4275//     fmt.Println(doc.N)
4276//
4277// This method depends on MongoDB >= 2.0 to work properly.
4278//
4279// Relevant documentation:
4280//
4281//     http://www.mongodb.org/display/DOCS/findAndModify+Command
4282//     http://www.mongodb.org/display/DOCS/Updating
4283//     http://www.mongodb.org/display/DOCS/Atomic+Operations
4284//
4285func (q *Query) Apply(change Change, result interface{}) (info *ChangeInfo, err error) {
4286	q.m.Lock()
4287	session := q.session
4288	op := q.op // Copy.
4289	q.m.Unlock()
4290
4291	c := strings.Index(op.collection, ".")
4292	if c < 0 {
4293		return nil, errors.New("bad collection name: " + op.collection)
4294	}
4295
4296	dbname := op.collection[:c]
4297	cname := op.collection[c+1:]
4298
4299	cmd := findModifyCmd{
4300		Collection: cname,
4301		Update:     change.Update,
4302		Upsert:     change.Upsert,
4303		Remove:     change.Remove,
4304		New:        change.ReturnNew,
4305		Query:      op.query,
4306		Sort:       op.options.OrderBy,
4307		Fields:     op.selector,
4308	}
4309
4310	session = session.Clone()
4311	defer session.Close()
4312	session.SetMode(Strong, false)
4313
4314	var doc valueResult
4315	for i := 0; i < maxUpsertRetries; i++ {
4316		err = session.DB(dbname).Run(&cmd, &doc)
4317		if err == nil {
4318			break
4319		}
4320		if change.Upsert && IsDup(err) && i+1 < maxUpsertRetries {
4321			// Retry duplicate key errors on upserts.
4322			// https://docs.mongodb.com/v3.2/reference/method/db.collection.update/#use-unique-indexes
4323			continue
4324		}
4325		if qerr, ok := err.(*QueryError); ok && qerr.Message == "No matching object found" {
4326			return nil, ErrNotFound
4327		}
4328		return nil, err
4329	}
4330	if doc.LastError.N == 0 {
4331		return nil, ErrNotFound
4332	}
4333	if doc.Value.Kind != 0x0A && result != nil {
4334		err = doc.Value.Unmarshal(result)
4335		if err != nil {
4336			return nil, err
4337		}
4338	}
4339	info = &ChangeInfo{}
4340	lerr := &doc.LastError
4341	if lerr.UpdatedExisting {
4342		info.Updated = lerr.N
4343		info.Matched = lerr.N
4344	} else if change.Remove {
4345		info.Removed = lerr.N
4346		info.Matched = lerr.N
4347	} else if change.Upsert {
4348		info.UpsertedId = lerr.UpsertedId
4349	}
4350	return info, nil
4351}
4352
4353// The BuildInfo type encapsulates details about the running MongoDB server.
4354//
4355// Note that the VersionArray field was introduced in MongoDB 2.0+, but it is
4356// internally assembled from the Version information for previous versions.
4357// In both cases, VersionArray is guaranteed to have at least 4 entries.
4358type BuildInfo struct {
4359	Version        string
4360	VersionArray   []int  `bson:"versionArray"` // On MongoDB 2.0+; assembled from Version otherwise
4361	GitVersion     string `bson:"gitVersion"`
4362	OpenSSLVersion string `bson:"OpenSSLVersion"`
4363	SysInfo        string `bson:"sysInfo"` // Deprecated and empty on MongoDB 3.2+.
4364	Bits           int
4365	Debug          bool
4366	MaxObjectSize  int `bson:"maxBsonObjectSize"`
4367}
4368
4369// VersionAtLeast returns whether the BuildInfo version is greater than or
4370// equal to the provided version number. If more than one number is
4371// provided, numbers will be considered as major, minor, and so on.
4372func (bi *BuildInfo) VersionAtLeast(version ...int) bool {
4373	for i, vi := range version {
4374		if i == len(bi.VersionArray) {
4375			return false
4376		}
4377		if bivi := bi.VersionArray[i]; bivi != vi {
4378			return bivi >= vi
4379		}
4380	}
4381	return true
4382}
4383
4384// BuildInfo retrieves the version and other details about the
4385// running MongoDB server.
4386func (s *Session) BuildInfo() (info BuildInfo, err error) {
4387	err = s.Run(bson.D{{"buildInfo", "1"}}, &info)
4388	if len(info.VersionArray) == 0 {
4389		for _, a := range strings.Split(info.Version, ".") {
4390			i, err := strconv.Atoi(a)
4391			if err != nil {
4392				break
4393			}
4394			info.VersionArray = append(info.VersionArray, i)
4395		}
4396	}
4397	for len(info.VersionArray) < 4 {
4398		info.VersionArray = append(info.VersionArray, 0)
4399	}
4400	if i := strings.IndexByte(info.GitVersion, ' '); i >= 0 {
4401		// Strip off the " modules: enterprise" suffix. This is a _git version_.
4402		// That information may be moved to another field if people need it.
4403		info.GitVersion = info.GitVersion[:i]
4404	}
4405	if info.SysInfo == "deprecated" {
4406		info.SysInfo = ""
4407	}
4408	return
4409}
4410
4411// ---------------------------------------------------------------------------
4412// Internal session handling helpers.
4413
4414func (s *Session) acquireSocket(slaveOk bool) (*mongoSocket, error) {
4415
4416	// Read-only lock to check for previously reserved socket.
4417	s.m.RLock()
4418	// If there is a slave socket reserved and its use is acceptable, take it as long
4419	// as there isn't a master socket which would be preferred by the read preference mode.
4420	if s.slaveSocket != nil && s.slaveOk && slaveOk && (s.masterSocket == nil || s.consistency != PrimaryPreferred && s.consistency != Monotonic) {
4421		socket := s.slaveSocket
4422		socket.Acquire()
4423		s.m.RUnlock()
4424		return socket, nil
4425	}
4426	if s.masterSocket != nil {
4427		socket := s.masterSocket
4428		socket.Acquire()
4429		s.m.RUnlock()
4430		return socket, nil
4431	}
4432	s.m.RUnlock()
4433
4434	// No go.  We may have to request a new socket and change the session,
4435	// so try again but with an exclusive lock now.
4436	s.m.Lock()
4437	defer s.m.Unlock()
4438
4439	if s.slaveSocket != nil && s.slaveOk && slaveOk && (s.masterSocket == nil || s.consistency != PrimaryPreferred && s.consistency != Monotonic) {
4440		s.slaveSocket.Acquire()
4441		return s.slaveSocket, nil
4442	}
4443	if s.masterSocket != nil {
4444		s.masterSocket.Acquire()
4445		return s.masterSocket, nil
4446	}
4447
4448	// Still not good.  We need a new socket.
4449	sock, err := s.cluster().AcquireSocket(s.consistency, slaveOk && s.slaveOk, s.syncTimeout, s.sockTimeout, s.queryConfig.op.serverTags, s.poolLimit)
4450	if err != nil {
4451		return nil, err
4452	}
4453
4454	// Authenticate the new socket.
4455	if err = s.socketLogin(sock); err != nil {
4456		sock.Release()
4457		return nil, err
4458	}
4459
4460	// Keep track of the new socket, if necessary.
4461	// Note that, as a special case, if the Eventual session was
4462	// not refreshed (s.slaveSocket != nil), it means the developer
4463	// asked to preserve an existing reserved socket, so we'll
4464	// keep a master one around too before a Refresh happens.
4465	if s.consistency != Eventual || s.slaveSocket != nil {
4466		s.setSocket(sock)
4467	}
4468
4469	// Switch over a Monotonic session to the master.
4470	if !slaveOk && s.consistency == Monotonic {
4471		s.slaveOk = false
4472	}
4473
4474	return sock, nil
4475}
4476
4477// setSocket binds socket to this section.
4478func (s *Session) setSocket(socket *mongoSocket) {
4479	info := socket.Acquire()
4480	if info.Master {
4481		if s.masterSocket != nil {
4482			panic("setSocket(master) with existing master socket reserved")
4483		}
4484		s.masterSocket = socket
4485	} else {
4486		if s.slaveSocket != nil {
4487			panic("setSocket(slave) with existing slave socket reserved")
4488		}
4489		s.slaveSocket = socket
4490	}
4491}
4492
4493// unsetSocket releases any slave and/or master sockets reserved.
4494func (s *Session) unsetSocket() {
4495	if s.masterSocket != nil {
4496		s.masterSocket.Release()
4497	}
4498	if s.slaveSocket != nil {
4499		s.slaveSocket.Release()
4500	}
4501	s.masterSocket = nil
4502	s.slaveSocket = nil
4503}
4504
4505func (iter *Iter) replyFunc() replyFunc {
4506	return func(err error, op *replyOp, docNum int, docData []byte) {
4507		iter.m.Lock()
4508		iter.docsToReceive--
4509		if err != nil {
4510			iter.err = err
4511			debugf("Iter %p received an error: %s", iter, err.Error())
4512		} else if docNum == -1 {
4513			debugf("Iter %p received no documents (cursor=%d).", iter, op.cursorId)
4514			if op != nil && op.cursorId != 0 {
4515				// It's a tailable cursor.
4516				iter.op.cursorId = op.cursorId
4517			} else if op != nil && op.cursorId == 0 && op.flags&1 == 1 {
4518				// Cursor likely timed out.
4519				iter.err = ErrCursor
4520			} else {
4521				iter.err = ErrNotFound
4522			}
4523		} else if iter.findCmd {
4524			debugf("Iter %p received reply document %d/%d (cursor=%d)", iter, docNum+1, int(op.replyDocs), op.cursorId)
4525			var findReply struct {
4526				Ok     bool
4527				Code   int
4528				Errmsg string
4529				Cursor cursorData
4530			}
4531			if err := bson.Unmarshal(docData, &findReply); err != nil {
4532				iter.err = err
4533			} else if !findReply.Ok && findReply.Errmsg != "" {
4534				iter.err = &QueryError{Code: findReply.Code, Message: findReply.Errmsg}
4535			} else if len(findReply.Cursor.FirstBatch) == 0 && len(findReply.Cursor.NextBatch) == 0 {
4536				iter.err = ErrNotFound
4537			} else {
4538				batch := findReply.Cursor.FirstBatch
4539				if len(batch) == 0 {
4540					batch = findReply.Cursor.NextBatch
4541				}
4542				rdocs := len(batch)
4543				for _, raw := range batch {
4544					iter.docData.Push(raw.Data)
4545				}
4546				iter.docsToReceive = 0
4547				docsToProcess := iter.docData.Len()
4548				if iter.limit == 0 || int32(docsToProcess) < iter.limit {
4549					iter.docsBeforeMore = docsToProcess - int(iter.prefetch*float64(rdocs))
4550				} else {
4551					iter.docsBeforeMore = -1
4552				}
4553				iter.op.cursorId = findReply.Cursor.Id
4554			}
4555		} else {
4556			rdocs := int(op.replyDocs)
4557			if docNum == 0 {
4558				iter.docsToReceive += rdocs - 1
4559				docsToProcess := iter.docData.Len() + rdocs
4560				if iter.limit == 0 || int32(docsToProcess) < iter.limit {
4561					iter.docsBeforeMore = docsToProcess - int(iter.prefetch*float64(rdocs))
4562				} else {
4563					iter.docsBeforeMore = -1
4564				}
4565				iter.op.cursorId = op.cursorId
4566			}
4567			debugf("Iter %p received reply document %d/%d (cursor=%d)", iter, docNum+1, rdocs, op.cursorId)
4568			iter.docData.Push(docData)
4569		}
4570		iter.gotReply.Broadcast()
4571		iter.m.Unlock()
4572	}
4573}
4574
4575type writeCmdResult struct {
4576	Ok        bool
4577	N         int
4578	NModified int `bson:"nModified"`
4579	Upserted  []struct {
4580		Index int
4581		Id    interface{} `_id`
4582	}
4583	ConcernError writeConcernError `bson:"writeConcernError"`
4584	Errors       []writeCmdError   `bson:"writeErrors"`
4585}
4586
4587type writeConcernError struct {
4588	Code   int
4589	ErrMsg string
4590}
4591
4592type writeCmdError struct {
4593	Index  int
4594	Code   int
4595	ErrMsg string
4596}
4597
4598func (r *writeCmdResult) BulkErrorCases() []BulkErrorCase {
4599	ecases := make([]BulkErrorCase, len(r.Errors))
4600	for i, err := range r.Errors {
4601		ecases[i] = BulkErrorCase{err.Index, &QueryError{Code: err.Code, Message: err.ErrMsg}}
4602	}
4603	return ecases
4604}
4605
4606// writeOp runs the given modifying operation, potentially followed up
4607// by a getLastError command in case the session is in safe mode.  The
4608// LastError result is made available in lerr, and if lerr.Err is set it
4609// will also be returned as err.
4610func (c *Collection) writeOp(op interface{}, ordered bool) (lerr *LastError, err error) {
4611	s := c.Database.Session
4612	socket, err := s.acquireSocket(c.Database.Name == "local")
4613	if err != nil {
4614		return nil, err
4615	}
4616	defer socket.Release()
4617
4618	s.m.RLock()
4619	safeOp := s.safeOp
4620	bypassValidation := s.bypassValidation
4621	s.m.RUnlock()
4622
4623	if socket.ServerInfo().MaxWireVersion >= 2 {
4624		// Servers with a more recent write protocol benefit from write commands.
4625		if op, ok := op.(*insertOp); ok && len(op.documents) > 1000 {
4626			var lerr LastError
4627
4628			// Maximum batch size is 1000. Must split out in separate operations for compatibility.
4629			all := op.documents
4630			for i := 0; i < len(all); i += 1000 {
4631				l := i + 1000
4632				if l > len(all) {
4633					l = len(all)
4634				}
4635				op.documents = all[i:l]
4636				oplerr, err := c.writeOpCommand(socket, safeOp, op, ordered, bypassValidation)
4637				lerr.N += oplerr.N
4638				lerr.modified += oplerr.modified
4639				if err != nil {
4640					for ei := range oplerr.ecases {
4641						oplerr.ecases[ei].Index += i
4642					}
4643					lerr.ecases = append(lerr.ecases, oplerr.ecases...)
4644					if op.flags&1 == 0 {
4645						return &lerr, err
4646					}
4647				}
4648			}
4649			if len(lerr.ecases) != 0 {
4650				return &lerr, lerr.ecases[0].Err
4651			}
4652			return &lerr, nil
4653		}
4654		return c.writeOpCommand(socket, safeOp, op, ordered, bypassValidation)
4655	} else if updateOps, ok := op.(bulkUpdateOp); ok {
4656		var lerr LastError
4657		for i, updateOp := range updateOps {
4658			oplerr, err := c.writeOpQuery(socket, safeOp, updateOp, ordered)
4659			lerr.N += oplerr.N
4660			lerr.modified += oplerr.modified
4661			if err != nil {
4662				lerr.ecases = append(lerr.ecases, BulkErrorCase{i, err})
4663				if ordered {
4664					break
4665				}
4666			}
4667		}
4668		if len(lerr.ecases) != 0 {
4669			return &lerr, lerr.ecases[0].Err
4670		}
4671		return &lerr, nil
4672	} else if deleteOps, ok := op.(bulkDeleteOp); ok {
4673		var lerr LastError
4674		for i, deleteOp := range deleteOps {
4675			oplerr, err := c.writeOpQuery(socket, safeOp, deleteOp, ordered)
4676			lerr.N += oplerr.N
4677			lerr.modified += oplerr.modified
4678			if err != nil {
4679				lerr.ecases = append(lerr.ecases, BulkErrorCase{i, err})
4680				if ordered {
4681					break
4682				}
4683			}
4684		}
4685		if len(lerr.ecases) != 0 {
4686			return &lerr, lerr.ecases[0].Err
4687		}
4688		return &lerr, nil
4689	}
4690	return c.writeOpQuery(socket, safeOp, op, ordered)
4691}
4692
4693func (c *Collection) writeOpQuery(socket *mongoSocket, safeOp *queryOp, op interface{}, ordered bool) (lerr *LastError, err error) {
4694	if safeOp == nil {
4695		return nil, socket.Query(op)
4696	}
4697
4698	var mutex sync.Mutex
4699	var replyData []byte
4700	var replyErr error
4701	mutex.Lock()
4702	query := *safeOp // Copy the data.
4703	query.collection = c.Database.Name + ".$cmd"
4704	query.replyFunc = func(err error, reply *replyOp, docNum int, docData []byte) {
4705		replyData = docData
4706		replyErr = err
4707		mutex.Unlock()
4708	}
4709	err = socket.Query(op, &query)
4710	if err != nil {
4711		return nil, err
4712	}
4713	mutex.Lock() // Wait.
4714	if replyErr != nil {
4715		return nil, replyErr // XXX TESTME
4716	}
4717	if hasErrMsg(replyData) {
4718		// Looks like getLastError itself failed.
4719		err = checkQueryError(query.collection, replyData)
4720		if err != nil {
4721			return nil, err
4722		}
4723	}
4724	result := &LastError{}
4725	bson.Unmarshal(replyData, &result)
4726	debugf("Result from writing query: %#v", result)
4727	if result.Err != "" {
4728		result.ecases = []BulkErrorCase{{Index: 0, Err: result}}
4729		if insert, ok := op.(*insertOp); ok && len(insert.documents) > 1 {
4730			result.ecases[0].Index = -1
4731		}
4732		return result, result
4733	}
4734	// With MongoDB <2.6 we don't know how many actually changed, so make it the same as matched.
4735	result.modified = result.N
4736	return result, nil
4737}
4738
4739func (c *Collection) writeOpCommand(socket *mongoSocket, safeOp *queryOp, op interface{}, ordered, bypassValidation bool) (lerr *LastError, err error) {
4740	var writeConcern interface{}
4741	if safeOp == nil {
4742		writeConcern = bson.D{{"w", 0}}
4743	} else {
4744		writeConcern = safeOp.query.(*getLastError)
4745	}
4746
4747	var cmd bson.D
4748	switch op := op.(type) {
4749	case *insertOp:
4750		// http://docs.mongodb.org/manual/reference/command/insert
4751		cmd = bson.D{
4752			{"insert", c.Name},
4753			{"documents", op.documents},
4754			{"writeConcern", writeConcern},
4755			{"ordered", op.flags&1 == 0},
4756		}
4757	case *updateOp:
4758		// http://docs.mongodb.org/manual/reference/command/update
4759		cmd = bson.D{
4760			{"update", c.Name},
4761			{"updates", []interface{}{op}},
4762			{"writeConcern", writeConcern},
4763			{"ordered", ordered},
4764		}
4765	case bulkUpdateOp:
4766		// http://docs.mongodb.org/manual/reference/command/update
4767		cmd = bson.D{
4768			{"update", c.Name},
4769			{"updates", op},
4770			{"writeConcern", writeConcern},
4771			{"ordered", ordered},
4772		}
4773	case *deleteOp:
4774		// http://docs.mongodb.org/manual/reference/command/delete
4775		cmd = bson.D{
4776			{"delete", c.Name},
4777			{"deletes", []interface{}{op}},
4778			{"writeConcern", writeConcern},
4779			{"ordered", ordered},
4780		}
4781	case bulkDeleteOp:
4782		// http://docs.mongodb.org/manual/reference/command/delete
4783		cmd = bson.D{
4784			{"delete", c.Name},
4785			{"deletes", op},
4786			{"writeConcern", writeConcern},
4787			{"ordered", ordered},
4788		}
4789	}
4790	if bypassValidation {
4791		cmd = append(cmd, bson.DocElem{"bypassDocumentValidation", true})
4792	}
4793
4794	var result writeCmdResult
4795	err = c.Database.run(socket, cmd, &result)
4796	debugf("Write command result: %#v (err=%v)", result, err)
4797	ecases := result.BulkErrorCases()
4798	lerr = &LastError{
4799		UpdatedExisting: result.N > 0 && len(result.Upserted) == 0,
4800		N:               result.N,
4801
4802		modified: result.NModified,
4803		ecases:   ecases,
4804	}
4805	if len(result.Upserted) > 0 {
4806		lerr.UpsertedId = result.Upserted[0].Id
4807	}
4808	if len(result.Errors) > 0 {
4809		e := result.Errors[0]
4810		lerr.Code = e.Code
4811		lerr.Err = e.ErrMsg
4812		err = lerr
4813	} else if result.ConcernError.Code != 0 {
4814		e := result.ConcernError
4815		lerr.Code = e.Code
4816		lerr.Err = e.ErrMsg
4817		err = lerr
4818	}
4819
4820	if err == nil && safeOp == nil {
4821		return nil, nil
4822	}
4823	return lerr, err
4824}
4825
4826func hasErrMsg(d []byte) bool {
4827	l := len(d)
4828	for i := 0; i+8 < l; i++ {
4829		if d[i] == '\x02' && d[i+1] == 'e' && d[i+2] == 'r' && d[i+3] == 'r' && d[i+4] == 'm' && d[i+5] == 's' && d[i+6] == 'g' && d[i+7] == '\x00' {
4830			return true
4831		}
4832	}
4833	return false
4834}
4835