1// mgo - MongoDB driver for Go
2//
3// Copyright (c) 2010-2012 - Gustavo Niemeyer <gustavo@niemeyer.net>
4//
5// All rights reserved.
6//
7// Redistribution and use in source and binary forms, with or without
8// modification, are permitted provided that the following conditions are met:
9//
10// 1. Redistributions of source code must retain the above copyright notice, this
11//    list of conditions and the following disclaimer.
12// 2. Redistributions in binary form must reproduce the above copyright notice,
13//    this list of conditions and the following disclaimer in the documentation
14//    and/or other materials provided with the distribution.
15//
16// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
17// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
19// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
20// ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
21// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
22// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
23// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
24// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27package mgo
28
29import (
30	"crypto/md5"
31	"encoding/hex"
32	"errors"
33	"fmt"
34	"math"
35	"net"
36	"net/url"
37	"reflect"
38	"sort"
39	"strconv"
40	"strings"
41	"sync"
42	"time"
43
44	"gopkg.in/mgo.v2/bson"
45)
46
47type Mode int
48
49const (
50	// Relevant documentation on read preference modes:
51	//
52	//     http://docs.mongodb.org/manual/reference/read-preference/
53	//
54	Primary            Mode = 2 // Default mode. All operations read from the current replica set primary.
55	PrimaryPreferred   Mode = 3 // Read from the primary if available. Read from the secondary otherwise.
56	Secondary          Mode = 4 // Read from one of the nearest secondary members of the replica set.
57	SecondaryPreferred Mode = 5 // Read from one of the nearest secondaries if available. Read from primary otherwise.
58	Nearest            Mode = 6 // Read from one of the nearest members, irrespective of it being primary or secondary.
59
60	// Read preference modes are specific to mgo:
61	Eventual  Mode = 0 // Same as Nearest, but may change servers between reads.
62	Monotonic Mode = 1 // Same as SecondaryPreferred before first write. Same as Primary after first write.
63	Strong    Mode = 2 // Same as Primary.
64)
65
66// mgo.v3: Drop Strong mode, suffix all modes with "Mode".
67
68// When changing the Session type, check if newSession and copySession
69// need to be updated too.
70
71// Session represents a communication session with the database.
72//
73// All Session methods are concurrency-safe and may be called from multiple
74// goroutines. In all session modes but Eventual, using the session from
75// multiple goroutines will cause them to share the same underlying socket.
76// See the documentation on Session.SetMode for more details.
77type Session struct {
78	m                sync.RWMutex
79	cluster_         *mongoCluster
80	slaveSocket      *mongoSocket
81	masterSocket     *mongoSocket
82	slaveOk          bool
83	consistency      Mode
84	queryConfig      query
85	safeOp           *queryOp
86	syncTimeout      time.Duration
87	sockTimeout      time.Duration
88	defaultdb        string
89	sourcedb         string
90	dialCred         *Credential
91	creds            []Credential
92	poolLimit        int
93	bypassValidation bool
94}
95
96type Database struct {
97	Session *Session
98	Name    string
99}
100
101type Collection struct {
102	Database *Database
103	Name     string // "collection"
104	FullName string // "db.collection"
105}
106
107type Query struct {
108	m       sync.Mutex
109	session *Session
110	query   // Enables default settings in session.
111}
112
113type query struct {
114	op       queryOp
115	prefetch float64
116	limit    int32
117}
118
119type getLastError struct {
120	CmdName  int         "getLastError,omitempty"
121	W        interface{} "w,omitempty"
122	WTimeout int         "wtimeout,omitempty"
123	FSync    bool        "fsync,omitempty"
124	J        bool        "j,omitempty"
125}
126
127type Iter struct {
128	m              sync.Mutex
129	gotReply       sync.Cond
130	session        *Session
131	server         *mongoServer
132	docData        queue
133	err            error
134	op             getMoreOp
135	prefetch       float64
136	limit          int32
137	docsToReceive  int
138	docsBeforeMore int
139	timeout        time.Duration
140	timedout       bool
141	findCmd        bool
142}
143
144var (
145	ErrNotFound = errors.New("not found")
146	ErrCursor   = errors.New("invalid cursor")
147)
148
149const (
150	defaultPrefetch  = 0.25
151	maxUpsertRetries = 5
152)
153
154// Dial establishes a new session to the cluster identified by the given seed
155// server(s). The session will enable communication with all of the servers in
156// the cluster, so the seed servers are used only to find out about the cluster
157// topology.
158//
159// Dial will timeout after 10 seconds if a server isn't reached. The returned
160// session will timeout operations after one minute by default if servers
161// aren't available. To customize the timeout, see DialWithTimeout,
162// SetSyncTimeout, and SetSocketTimeout.
163//
164// This method is generally called just once for a given cluster.  Further
165// sessions to the same cluster are then established using the New or Copy
166// methods on the obtained session. This will make them share the underlying
167// cluster, and manage the pool of connections appropriately.
168//
169// Once the session is not useful anymore, Close must be called to release the
170// resources appropriately.
171//
172// The seed servers must be provided in the following format:
173//
174//     [mongodb://][user:pass@]host1[:port1][,host2[:port2],...][/database][?options]
175//
176// For example, it may be as simple as:
177//
178//     localhost
179//
180// Or more involved like:
181//
182//     mongodb://myuser:mypass@localhost:40001,otherhost:40001/mydb
183//
184// If the port number is not provided for a server, it defaults to 27017.
185//
186// The username and password provided in the URL will be used to authenticate
187// into the database named after the slash at the end of the host names, or
188// into the "admin" database if none is provided.  The authentication information
189// will persist in sessions obtained through the New method as well.
190//
191// The following connection options are supported after the question mark:
192//
193//     connect=direct
194//
195//         Disables the automatic replica set server discovery logic, and
196//         forces the use of servers provided only (even if secondaries).
197//         Note that to talk to a secondary the consistency requirements
198//         must be relaxed to Monotonic or Eventual via SetMode.
199//
200//
201//     connect=replicaSet
202//
203//  	   Discover replica sets automatically. Default connection behavior.
204//
205//
206//     replicaSet=<setname>
207//
208//         If specified will prevent the obtained session from communicating
209//         with any server which is not part of a replica set with the given name.
210//         The default is to communicate with any server specified or discovered
211//         via the servers contacted.
212//
213//
214//     authSource=<db>
215//
216//         Informs the database used to establish credentials and privileges
217//         with a MongoDB server. Defaults to the database name provided via
218//         the URL path, and "admin" if that's unset.
219//
220//
221//     authMechanism=<mechanism>
222//
223//        Defines the protocol for credential negotiation. Defaults to "MONGODB-CR",
224//        which is the default username/password challenge-response mechanism.
225//
226//
227//     gssapiServiceName=<name>
228//
229//        Defines the service name to use when authenticating with the GSSAPI
230//        mechanism. Defaults to "mongodb".
231//
232//
233//     maxPoolSize=<limit>
234//
235//        Defines the per-server socket pool limit. Defaults to 4096.
236//        See Session.SetPoolLimit for details.
237//
238//
239// Relevant documentation:
240//
241//     http://docs.mongodb.org/manual/reference/connection-string/
242//
243func Dial(url string) (*Session, error) {
244	session, err := DialWithTimeout(url, 10*time.Second)
245	if err == nil {
246		session.SetSyncTimeout(1 * time.Minute)
247		session.SetSocketTimeout(1 * time.Minute)
248	}
249	return session, err
250}
251
252// DialWithTimeout works like Dial, but uses timeout as the amount of time to
253// wait for a server to respond when first connecting and also on follow up
254// operations in the session. If timeout is zero, the call may block
255// forever waiting for a connection to be made.
256//
257// See SetSyncTimeout for customizing the timeout for the session.
258func DialWithTimeout(url string, timeout time.Duration) (*Session, error) {
259	info, err := ParseURL(url)
260	if err != nil {
261		return nil, err
262	}
263	info.Timeout = timeout
264	return DialWithInfo(info)
265}
266
267// ParseURL parses a MongoDB URL as accepted by the Dial function and returns
268// a value suitable for providing into DialWithInfo.
269//
270// See Dial for more details on the format of url.
271func ParseURL(url string) (*DialInfo, error) {
272	uinfo, err := extractURL(url)
273	if err != nil {
274		return nil, err
275	}
276	direct := false
277	mechanism := ""
278	service := ""
279	source := ""
280	setName := ""
281	poolLimit := 0
282	for k, v := range uinfo.options {
283		switch k {
284		case "authSource":
285			source = v
286		case "authMechanism":
287			mechanism = v
288		case "gssapiServiceName":
289			service = v
290		case "replicaSet":
291			setName = v
292		case "maxPoolSize":
293			poolLimit, err = strconv.Atoi(v)
294			if err != nil {
295				return nil, errors.New("bad value for maxPoolSize: " + v)
296			}
297		case "connect":
298			if v == "direct" {
299				direct = true
300				break
301			}
302			if v == "replicaSet" {
303				break
304			}
305			fallthrough
306		default:
307			return nil, errors.New("unsupported connection URL option: " + k + "=" + v)
308		}
309	}
310	info := DialInfo{
311		Addrs:          uinfo.addrs,
312		Direct:         direct,
313		Database:       uinfo.db,
314		Username:       uinfo.user,
315		Password:       uinfo.pass,
316		Mechanism:      mechanism,
317		Service:        service,
318		Source:         source,
319		PoolLimit:      poolLimit,
320		ReplicaSetName: setName,
321	}
322	return &info, nil
323}
324
325// DialInfo holds options for establishing a session with a MongoDB cluster.
326// To use a URL, see the Dial function.
327type DialInfo struct {
328	// Addrs holds the addresses for the seed servers.
329	Addrs []string
330
331	// Direct informs whether to establish connections only with the
332	// specified seed servers, or to obtain information for the whole
333	// cluster and establish connections with further servers too.
334	Direct bool
335
336	// Timeout is the amount of time to wait for a server to respond when
337	// first connecting and on follow up operations in the session. If
338	// timeout is zero, the call may block forever waiting for a connection
339	// to be established. Timeout does not affect logic in DialServer.
340	Timeout time.Duration
341
342	// FailFast will cause connection and query attempts to fail faster when
343	// the server is unavailable, instead of retrying until the configured
344	// timeout period. Note that an unavailable server may silently drop
345	// packets instead of rejecting them, in which case it's impossible to
346	// distinguish it from a slow server, so the timeout stays relevant.
347	FailFast bool
348
349	// Database is the default database name used when the Session.DB method
350	// is called with an empty name, and is also used during the initial
351	// authentication if Source is unset.
352	Database string
353
354	// ReplicaSetName, if specified, will prevent the obtained session from
355	// communicating with any server which is not part of a replica set
356	// with the given name. The default is to communicate with any server
357	// specified or discovered via the servers contacted.
358	ReplicaSetName string
359
360	// Source is the database used to establish credentials and privileges
361	// with a MongoDB server. Defaults to the value of Database, if that is
362	// set, or "admin" otherwise.
363	Source string
364
365	// Service defines the service name to use when authenticating with the GSSAPI
366	// mechanism. Defaults to "mongodb".
367	Service string
368
369	// ServiceHost defines which hostname to use when authenticating
370	// with the GSSAPI mechanism. If not specified, defaults to the MongoDB
371	// server's address.
372	ServiceHost string
373
374	// Mechanism defines the protocol for credential negotiation.
375	// Defaults to "MONGODB-CR".
376	Mechanism string
377
378	// Username and Password inform the credentials for the initial authentication
379	// done on the database defined by the Source field. See Session.Login.
380	Username string
381	Password string
382
383	// PoolLimit defines the per-server socket pool limit. Defaults to 4096.
384	// See Session.SetPoolLimit for details.
385	PoolLimit int
386
387	// DialServer optionally specifies the dial function for establishing
388	// connections with the MongoDB servers.
389	DialServer func(addr *ServerAddr) (net.Conn, error)
390
391	// WARNING: This field is obsolete. See DialServer above.
392	Dial func(addr net.Addr) (net.Conn, error)
393}
394
395// mgo.v3: Drop DialInfo.Dial.
396
397// ServerAddr represents the address for establishing a connection to an
398// individual MongoDB server.
399type ServerAddr struct {
400	str string
401	tcp *net.TCPAddr
402}
403
404// String returns the address that was provided for the server before resolution.
405func (addr *ServerAddr) String() string {
406	return addr.str
407}
408
409// TCPAddr returns the resolved TCP address for the server.
410func (addr *ServerAddr) TCPAddr() *net.TCPAddr {
411	return addr.tcp
412}
413
414// DialWithInfo establishes a new session to the cluster identified by info.
415func DialWithInfo(info *DialInfo) (*Session, error) {
416	addrs := make([]string, len(info.Addrs))
417	for i, addr := range info.Addrs {
418		p := strings.LastIndexAny(addr, "]:")
419		if p == -1 || addr[p] != ':' {
420			// XXX This is untested. The test suite doesn't use the standard port.
421			addr += ":27017"
422		}
423		addrs[i] = addr
424	}
425	cluster := newCluster(addrs, info.Direct, info.FailFast, dialer{info.Dial, info.DialServer}, info.ReplicaSetName)
426	session := newSession(Eventual, cluster, info.Timeout)
427	session.defaultdb = info.Database
428	if session.defaultdb == "" {
429		session.defaultdb = "test"
430	}
431	session.sourcedb = info.Source
432	if session.sourcedb == "" {
433		session.sourcedb = info.Database
434		if session.sourcedb == "" {
435			session.sourcedb = "admin"
436		}
437	}
438	if info.Username != "" {
439		source := session.sourcedb
440		if info.Source == "" &&
441			(info.Mechanism == "GSSAPI" || info.Mechanism == "PLAIN" || info.Mechanism == "MONGODB-X509") {
442			source = "$external"
443		}
444		session.dialCred = &Credential{
445			Username:    info.Username,
446			Password:    info.Password,
447			Mechanism:   info.Mechanism,
448			Service:     info.Service,
449			ServiceHost: info.ServiceHost,
450			Source:      source,
451		}
452		session.creds = []Credential{*session.dialCred}
453	}
454	if info.PoolLimit > 0 {
455		session.poolLimit = info.PoolLimit
456	}
457	cluster.Release()
458
459	// People get confused when we return a session that is not actually
460	// established to any servers yet (e.g. what if url was wrong). So,
461	// ping the server to ensure there's someone there, and abort if it
462	// fails.
463	if err := session.Ping(); err != nil {
464		session.Close()
465		return nil, err
466	}
467	session.SetMode(Strong, true)
468	return session, nil
469}
470
471func isOptSep(c rune) bool {
472	return c == ';' || c == '&'
473}
474
475type urlInfo struct {
476	addrs   []string
477	user    string
478	pass    string
479	db      string
480	options map[string]string
481}
482
483func extractURL(s string) (*urlInfo, error) {
484	if strings.HasPrefix(s, "mongodb://") {
485		s = s[10:]
486	}
487	info := &urlInfo{options: make(map[string]string)}
488	if c := strings.Index(s, "?"); c != -1 {
489		for _, pair := range strings.FieldsFunc(s[c+1:], isOptSep) {
490			l := strings.SplitN(pair, "=", 2)
491			if len(l) != 2 || l[0] == "" || l[1] == "" {
492				return nil, errors.New("connection option must be key=value: " + pair)
493			}
494			info.options[l[0]] = l[1]
495		}
496		s = s[:c]
497	}
498	if c := strings.Index(s, "@"); c != -1 {
499		pair := strings.SplitN(s[:c], ":", 2)
500		if len(pair) > 2 || pair[0] == "" {
501			return nil, errors.New("credentials must be provided as user:pass@host")
502		}
503		var err error
504		info.user, err = url.QueryUnescape(pair[0])
505		if err != nil {
506			return nil, fmt.Errorf("cannot unescape username in URL: %q", pair[0])
507		}
508		if len(pair) > 1 {
509			info.pass, err = url.QueryUnescape(pair[1])
510			if err != nil {
511				return nil, fmt.Errorf("cannot unescape password in URL")
512			}
513		}
514		s = s[c+1:]
515	}
516	if c := strings.Index(s, "/"); c != -1 {
517		info.db = s[c+1:]
518		s = s[:c]
519	}
520	info.addrs = strings.Split(s, ",")
521	return info, nil
522}
523
524func newSession(consistency Mode, cluster *mongoCluster, timeout time.Duration) (session *Session) {
525	cluster.Acquire()
526	session = &Session{
527		cluster_:    cluster,
528		syncTimeout: timeout,
529		sockTimeout: timeout,
530		poolLimit:   4096,
531	}
532	debugf("New session %p on cluster %p", session, cluster)
533	session.SetMode(consistency, true)
534	session.SetSafe(&Safe{})
535	session.queryConfig.prefetch = defaultPrefetch
536	return session
537}
538
539func copySession(session *Session, keepCreds bool) (s *Session) {
540	cluster := session.cluster()
541	cluster.Acquire()
542	if session.masterSocket != nil {
543		session.masterSocket.Acquire()
544	}
545	if session.slaveSocket != nil {
546		session.slaveSocket.Acquire()
547	}
548	var creds []Credential
549	if keepCreds {
550		creds = make([]Credential, len(session.creds))
551		copy(creds, session.creds)
552	} else if session.dialCred != nil {
553		creds = []Credential{*session.dialCred}
554	}
555	scopy := *session
556	scopy.m = sync.RWMutex{}
557	scopy.creds = creds
558	s = &scopy
559	debugf("New session %p on cluster %p (copy from %p)", s, cluster, session)
560	return s
561}
562
563// LiveServers returns a list of server addresses which are
564// currently known to be alive.
565func (s *Session) LiveServers() (addrs []string) {
566	s.m.RLock()
567	addrs = s.cluster().LiveServers()
568	s.m.RUnlock()
569	return addrs
570}
571
572// DB returns a value representing the named database. If name
573// is empty, the database name provided in the dialed URL is
574// used instead. If that is also empty, "test" is used as a
575// fallback in a way equivalent to the mongo shell.
576//
577// Creating this value is a very lightweight operation, and
578// involves no network communication.
579func (s *Session) DB(name string) *Database {
580	if name == "" {
581		name = s.defaultdb
582	}
583	return &Database{s, name}
584}
585
586// C returns a value representing the named collection.
587//
588// Creating this value is a very lightweight operation, and
589// involves no network communication.
590func (db *Database) C(name string) *Collection {
591	return &Collection{db, name, db.Name + "." + name}
592}
593
594// With returns a copy of db that uses session s.
595func (db *Database) With(s *Session) *Database {
596	newdb := *db
597	newdb.Session = s
598	return &newdb
599}
600
601// With returns a copy of c that uses session s.
602func (c *Collection) With(s *Session) *Collection {
603	newdb := *c.Database
604	newdb.Session = s
605	newc := *c
606	newc.Database = &newdb
607	return &newc
608}
609
610// GridFS returns a GridFS value representing collections in db that
611// follow the standard GridFS specification.
612// The provided prefix (sometimes known as root) will determine which
613// collections to use, and is usually set to "fs" when there is a
614// single GridFS in the database.
615//
616// See the GridFS Create, Open, and OpenId methods for more details.
617//
618// Relevant documentation:
619//
620//     http://www.mongodb.org/display/DOCS/GridFS
621//     http://www.mongodb.org/display/DOCS/GridFS+Tools
622//     http://www.mongodb.org/display/DOCS/GridFS+Specification
623//
624func (db *Database) GridFS(prefix string) *GridFS {
625	return newGridFS(db, prefix)
626}
627
628// Run issues the provided command on the db database and unmarshals
629// its result in the respective argument. The cmd argument may be either
630// a string with the command name itself, in which case an empty document of
631// the form bson.M{cmd: 1} will be used, or it may be a full command document.
632//
633// Note that MongoDB considers the first marshalled key as the command
634// name, so when providing a command with options, it's important to
635// use an ordering-preserving document, such as a struct value or an
636// instance of bson.D.  For instance:
637//
638//     db.Run(bson.D{{"create", "mycollection"}, {"size", 1024}})
639//
640// For privilleged commands typically run on the "admin" database, see
641// the Run method in the Session type.
642//
643// Relevant documentation:
644//
645//     http://www.mongodb.org/display/DOCS/Commands
646//     http://www.mongodb.org/display/DOCS/List+of+Database+CommandSkips
647//
648func (db *Database) Run(cmd interface{}, result interface{}) error {
649	socket, err := db.Session.acquireSocket(true)
650	if err != nil {
651		return err
652	}
653	defer socket.Release()
654
655	// This is an optimized form of db.C("$cmd").Find(cmd).One(result).
656	return db.run(socket, cmd, result)
657}
658
659// Credential holds details to authenticate with a MongoDB server.
660type Credential struct {
661	// Username and Password hold the basic details for authentication.
662	// Password is optional with some authentication mechanisms.
663	Username string
664	Password string
665
666	// Source is the database used to establish credentials and privileges
667	// with a MongoDB server. Defaults to the default database provided
668	// during dial, or "admin" if that was unset.
669	Source string
670
671	// Service defines the service name to use when authenticating with the GSSAPI
672	// mechanism. Defaults to "mongodb".
673	Service string
674
675	// ServiceHost defines which hostname to use when authenticating
676	// with the GSSAPI mechanism. If not specified, defaults to the MongoDB
677	// server's address.
678	ServiceHost string
679
680	// Mechanism defines the protocol for credential negotiation.
681	// Defaults to "MONGODB-CR".
682	Mechanism string
683}
684
685// Login authenticates with MongoDB using the provided credential.  The
686// authentication is valid for the whole session and will stay valid until
687// Logout is explicitly called for the same database, or the session is
688// closed.
689func (db *Database) Login(user, pass string) error {
690	return db.Session.Login(&Credential{Username: user, Password: pass, Source: db.Name})
691}
692
693// Login authenticates with MongoDB using the provided credential.  The
694// authentication is valid for the whole session and will stay valid until
695// Logout is explicitly called for the same database, or the session is
696// closed.
697func (s *Session) Login(cred *Credential) error {
698	socket, err := s.acquireSocket(true)
699	if err != nil {
700		return err
701	}
702	defer socket.Release()
703
704	credCopy := *cred
705	if cred.Source == "" {
706		if cred.Mechanism == "GSSAPI" {
707			credCopy.Source = "$external"
708		} else {
709			credCopy.Source = s.sourcedb
710		}
711	}
712	err = socket.Login(credCopy)
713	if err != nil {
714		return err
715	}
716
717	s.m.Lock()
718	s.creds = append(s.creds, credCopy)
719	s.m.Unlock()
720	return nil
721}
722
723func (s *Session) socketLogin(socket *mongoSocket) error {
724	for _, cred := range s.creds {
725		if err := socket.Login(cred); err != nil {
726			return err
727		}
728	}
729	return nil
730}
731
732// Logout removes any established authentication credentials for the database.
733func (db *Database) Logout() {
734	session := db.Session
735	dbname := db.Name
736	session.m.Lock()
737	found := false
738	for i, cred := range session.creds {
739		if cred.Source == dbname {
740			copy(session.creds[i:], session.creds[i+1:])
741			session.creds = session.creds[:len(session.creds)-1]
742			found = true
743			break
744		}
745	}
746	if found {
747		if session.masterSocket != nil {
748			session.masterSocket.Logout(dbname)
749		}
750		if session.slaveSocket != nil {
751			session.slaveSocket.Logout(dbname)
752		}
753	}
754	session.m.Unlock()
755}
756
757// LogoutAll removes all established authentication credentials for the session.
758func (s *Session) LogoutAll() {
759	s.m.Lock()
760	for _, cred := range s.creds {
761		if s.masterSocket != nil {
762			s.masterSocket.Logout(cred.Source)
763		}
764		if s.slaveSocket != nil {
765			s.slaveSocket.Logout(cred.Source)
766		}
767	}
768	s.creds = s.creds[0:0]
769	s.m.Unlock()
770}
771
772// User represents a MongoDB user.
773//
774// Relevant documentation:
775//
776//     http://docs.mongodb.org/manual/reference/privilege-documents/
777//     http://docs.mongodb.org/manual/reference/user-privileges/
778//
779type User struct {
780	// Username is how the user identifies itself to the system.
781	Username string `bson:"user"`
782
783	// Password is the plaintext password for the user. If set,
784	// the UpsertUser method will hash it into PasswordHash and
785	// unset it before the user is added to the database.
786	Password string `bson:",omitempty"`
787
788	// PasswordHash is the MD5 hash of Username+":mongo:"+Password.
789	PasswordHash string `bson:"pwd,omitempty"`
790
791	// CustomData holds arbitrary data admins decide to associate
792	// with this user, such as the full name or employee id.
793	CustomData interface{} `bson:"customData,omitempty"`
794
795	// Roles indicates the set of roles the user will be provided.
796	// See the Role constants.
797	Roles []Role `bson:"roles"`
798
799	// OtherDBRoles allows assigning roles in other databases from
800	// user documents inserted in the admin database. This field
801	// only works in the admin database.
802	OtherDBRoles map[string][]Role `bson:"otherDBRoles,omitempty"`
803
804	// UserSource indicates where to look for this user's credentials.
805	// It may be set to a database name, or to "$external" for
806	// consulting an external resource such as Kerberos. UserSource
807	// must not be set if Password or PasswordHash are present.
808	//
809	// WARNING: This setting was only ever supported in MongoDB 2.4,
810	// and is now obsolete.
811	UserSource string `bson:"userSource,omitempty"`
812}
813
814type Role string
815
816const (
817	// Relevant documentation:
818	//
819	//     http://docs.mongodb.org/manual/reference/user-privileges/
820	//
821	RoleRoot         Role = "root"
822	RoleRead         Role = "read"
823	RoleReadAny      Role = "readAnyDatabase"
824	RoleReadWrite    Role = "readWrite"
825	RoleReadWriteAny Role = "readWriteAnyDatabase"
826	RoleDBAdmin      Role = "dbAdmin"
827	RoleDBAdminAny   Role = "dbAdminAnyDatabase"
828	RoleUserAdmin    Role = "userAdmin"
829	RoleUserAdminAny Role = "userAdminAnyDatabase"
830	RoleClusterAdmin Role = "clusterAdmin"
831)
832
833// UpsertUser updates the authentication credentials and the roles for
834// a MongoDB user within the db database. If the named user doesn't exist
835// it will be created.
836//
837// This method should only be used from MongoDB 2.4 and on. For older
838// MongoDB releases, use the obsolete AddUser method instead.
839//
840// Relevant documentation:
841//
842//     http://docs.mongodb.org/manual/reference/user-privileges/
843//     http://docs.mongodb.org/manual/reference/privilege-documents/
844//
845func (db *Database) UpsertUser(user *User) error {
846	if user.Username == "" {
847		return fmt.Errorf("user has no Username")
848	}
849	if (user.Password != "" || user.PasswordHash != "") && user.UserSource != "" {
850		return fmt.Errorf("user has both Password/PasswordHash and UserSource set")
851	}
852	if len(user.OtherDBRoles) > 0 && db.Name != "admin" && db.Name != "$external" {
853		return fmt.Errorf("user with OtherDBRoles is only supported in the admin or $external databases")
854	}
855
856	// Attempt to run this using 2.6+ commands.
857	rundb := db
858	if user.UserSource != "" {
859		// Compatibility logic for the userSource field of MongoDB <= 2.4.X
860		rundb = db.Session.DB(user.UserSource)
861	}
862	err := rundb.runUserCmd("updateUser", user)
863	// retry with createUser when isAuthError in order to enable the "localhost exception"
864	if isNotFound(err) || isAuthError(err) {
865		return rundb.runUserCmd("createUser", user)
866	}
867	if !isNoCmd(err) {
868		return err
869	}
870
871	// Command does not exist. Fallback to pre-2.6 behavior.
872	var set, unset bson.D
873	if user.Password != "" {
874		psum := md5.New()
875		psum.Write([]byte(user.Username + ":mongo:" + user.Password))
876		set = append(set, bson.DocElem{"pwd", hex.EncodeToString(psum.Sum(nil))})
877		unset = append(unset, bson.DocElem{"userSource", 1})
878	} else if user.PasswordHash != "" {
879		set = append(set, bson.DocElem{"pwd", user.PasswordHash})
880		unset = append(unset, bson.DocElem{"userSource", 1})
881	}
882	if user.UserSource != "" {
883		set = append(set, bson.DocElem{"userSource", user.UserSource})
884		unset = append(unset, bson.DocElem{"pwd", 1})
885	}
886	if user.Roles != nil || user.OtherDBRoles != nil {
887		set = append(set, bson.DocElem{"roles", user.Roles})
888		if len(user.OtherDBRoles) > 0 {
889			set = append(set, bson.DocElem{"otherDBRoles", user.OtherDBRoles})
890		} else {
891			unset = append(unset, bson.DocElem{"otherDBRoles", 1})
892		}
893	}
894	users := db.C("system.users")
895	err = users.Update(bson.D{{"user", user.Username}}, bson.D{{"$unset", unset}, {"$set", set}})
896	if err == ErrNotFound {
897		set = append(set, bson.DocElem{"user", user.Username})
898		if user.Roles == nil && user.OtherDBRoles == nil {
899			// Roles must be sent, as it's the way MongoDB distinguishes
900			// old-style documents from new-style documents in pre-2.6.
901			set = append(set, bson.DocElem{"roles", user.Roles})
902		}
903		err = users.Insert(set)
904	}
905	return err
906}
907
908func isNoCmd(err error) bool {
909	e, ok := err.(*QueryError)
910	return ok && (e.Code == 59 || e.Code == 13390 || strings.HasPrefix(e.Message, "no such cmd:"))
911}
912
913func isNotFound(err error) bool {
914	e, ok := err.(*QueryError)
915	return ok && e.Code == 11
916}
917
918func isAuthError(err error) bool {
919	e, ok := err.(*QueryError)
920	return ok && e.Code == 13
921}
922
923func (db *Database) runUserCmd(cmdName string, user *User) error {
924	cmd := make(bson.D, 0, 16)
925	cmd = append(cmd, bson.DocElem{cmdName, user.Username})
926	if user.Password != "" {
927		cmd = append(cmd, bson.DocElem{"pwd", user.Password})
928	}
929	var roles []interface{}
930	for _, role := range user.Roles {
931		roles = append(roles, role)
932	}
933	for db, dbroles := range user.OtherDBRoles {
934		for _, role := range dbroles {
935			roles = append(roles, bson.D{{"role", role}, {"db", db}})
936		}
937	}
938	if roles != nil || user.Roles != nil || cmdName == "createUser" {
939		cmd = append(cmd, bson.DocElem{"roles", roles})
940	}
941	err := db.Run(cmd, nil)
942	if !isNoCmd(err) && user.UserSource != "" && (user.UserSource != "$external" || db.Name != "$external") {
943		return fmt.Errorf("MongoDB 2.6+ does not support the UserSource setting")
944	}
945	return err
946}
947
948// AddUser creates or updates the authentication credentials of user within
949// the db database.
950//
951// WARNING: This method is obsolete and should only be used with MongoDB 2.2
952// or earlier. For MongoDB 2.4 and on, use UpsertUser instead.
953func (db *Database) AddUser(username, password string, readOnly bool) error {
954	// Try to emulate the old behavior on 2.6+
955	user := &User{Username: username, Password: password}
956	if db.Name == "admin" {
957		if readOnly {
958			user.Roles = []Role{RoleReadAny}
959		} else {
960			user.Roles = []Role{RoleReadWriteAny}
961		}
962	} else {
963		if readOnly {
964			user.Roles = []Role{RoleRead}
965		} else {
966			user.Roles = []Role{RoleReadWrite}
967		}
968	}
969	err := db.runUserCmd("updateUser", user)
970	if isNotFound(err) {
971		return db.runUserCmd("createUser", user)
972	}
973	if !isNoCmd(err) {
974		return err
975	}
976
977	// Command doesn't exist. Fallback to pre-2.6 behavior.
978	psum := md5.New()
979	psum.Write([]byte(username + ":mongo:" + password))
980	digest := hex.EncodeToString(psum.Sum(nil))
981	c := db.C("system.users")
982	_, err = c.Upsert(bson.M{"user": username}, bson.M{"$set": bson.M{"user": username, "pwd": digest, "readOnly": readOnly}})
983	return err
984}
985
986// RemoveUser removes the authentication credentials of user from the database.
987func (db *Database) RemoveUser(user string) error {
988	err := db.Run(bson.D{{"dropUser", user}}, nil)
989	if isNoCmd(err) {
990		users := db.C("system.users")
991		return users.Remove(bson.M{"user": user})
992	}
993	if isNotFound(err) {
994		return ErrNotFound
995	}
996	return err
997}
998
999type indexSpec struct {
1000	Name, NS         string
1001	Key              bson.D
1002	Unique           bool    ",omitempty"
1003	DropDups         bool    "dropDups,omitempty"
1004	Background       bool    ",omitempty"
1005	Sparse           bool    ",omitempty"
1006	Bits             int     ",omitempty"
1007	Min, Max         float64 ",omitempty"
1008	BucketSize       float64 "bucketSize,omitempty"
1009	ExpireAfter      int     "expireAfterSeconds,omitempty"
1010	Weights          bson.D  ",omitempty"
1011	DefaultLanguage  string  "default_language,omitempty"
1012	LanguageOverride string  "language_override,omitempty"
1013	TextIndexVersion int     "textIndexVersion,omitempty"
1014
1015	Collation *Collation "collation,omitempty"
1016}
1017
1018type Index struct {
1019	Key        []string // Index key fields; prefix name with dash (-) for descending order
1020	Unique     bool     // Prevent two documents from having the same index key
1021	DropDups   bool     // Drop documents with the same index key as a previously indexed one
1022	Background bool     // Build index in background and return immediately
1023	Sparse     bool     // Only index documents containing the Key fields
1024
1025	// If ExpireAfter is defined the server will periodically delete
1026	// documents with indexed time.Time older than the provided delta.
1027	ExpireAfter time.Duration
1028
1029	// Name holds the stored index name. On creation if this field is unset it is
1030	// computed by EnsureIndex based on the index key.
1031	Name string
1032
1033	// Properties for spatial indexes.
1034	//
1035	// Min and Max were improperly typed as int when they should have been
1036	// floats.  To preserve backwards compatibility they are still typed as
1037	// int and the following two fields enable reading and writing the same
1038	// fields as float numbers. In mgo.v3, these fields will be dropped and
1039	// Min/Max will become floats.
1040	Min, Max   int
1041	Minf, Maxf float64
1042	BucketSize float64
1043	Bits       int
1044
1045	// Properties for text indexes.
1046	DefaultLanguage  string
1047	LanguageOverride string
1048
1049	// Weights defines the significance of provided fields relative to other
1050	// fields in a text index. The score for a given word in a document is derived
1051	// from the weighted sum of the frequency for each of the indexed fields in
1052	// that document. The default field weight is 1.
1053	Weights map[string]int
1054
1055	// Collation defines the collation to use for the index.
1056	Collation *Collation
1057}
1058
1059type Collation struct {
1060
1061	// Locale defines the collation locale.
1062	Locale string `bson:"locale"`
1063
1064	// CaseLevel defines whether to turn case sensitivity on at strength 1 or 2.
1065	CaseLevel bool `bson:"caseLevel,omitempty"`
1066
1067	// CaseFirst may be set to "upper" or "lower" to define whether
1068	// to have uppercase or lowercase items first. Default is "off".
1069	CaseFirst string `bson:"caseFirst,omitempty"`
1070
1071	// Strength defines the priority of comparison properties, as follows:
1072	//
1073	//   1 (primary)    - Strongest level, denote difference between base characters
1074	//   2 (secondary)  - Accents in characters are considered secondary differences
1075	//   3 (tertiary)   - Upper and lower case differences in characters are
1076	//                    distinguished at the tertiary level
1077	//   4 (quaternary) - When punctuation is ignored at level 1-3, an additional
1078	//                    level can be used to distinguish words with and without
1079	//                    punctuation. Should only be used if ignoring punctuation
1080	//                    is required or when processing Japanese text.
1081	//   5 (identical)  - When all other levels are equal, the identical level is
1082	//                    used as a tiebreaker. The Unicode code point values of
1083	//                    the NFD form of each string are compared at this level,
1084	//                    just in case there is no difference at levels 1-4
1085	//
1086	// Strength defaults to 3.
1087	Strength int `bson:"strength,omitempty"`
1088
1089	// NumericOrdering defines whether to order numbers based on numerical
1090	// order and not collation order.
1091	NumericOrdering bool `bson:"numericOrdering,omitempty"`
1092
1093	// Alternate controls whether spaces and punctuation are considered base characters.
1094	// May be set to "non-ignorable" (spaces and punctuation considered base characters)
1095	// or "shifted" (spaces and punctuation not considered base characters, and only
1096	// distinguished at strength > 3). Defaults to "non-ignorable".
1097	Alternate string `bson:"alternate,omitempty"`
1098
1099	// Backwards defines whether to have secondary differences considered in reverse order,
1100	// as done in the French language.
1101	Backwards bool `bson:"backwards,omitempty"`
1102}
1103
1104// mgo.v3: Drop Minf and Maxf and transform Min and Max to floats.
1105// mgo.v3: Drop DropDups as it's unsupported past 2.8.
1106
1107type indexKeyInfo struct {
1108	name    string
1109	key     bson.D
1110	weights bson.D
1111}
1112
1113func parseIndexKey(key []string) (*indexKeyInfo, error) {
1114	var keyInfo indexKeyInfo
1115	isText := false
1116	var order interface{}
1117	for _, field := range key {
1118		raw := field
1119		if keyInfo.name != "" {
1120			keyInfo.name += "_"
1121		}
1122		var kind string
1123		if field != "" {
1124			if field[0] == '$' {
1125				if c := strings.Index(field, ":"); c > 1 && c < len(field)-1 {
1126					kind = field[1:c]
1127					field = field[c+1:]
1128					keyInfo.name += field + "_" + kind
1129				} else {
1130					field = "\x00"
1131				}
1132			}
1133			switch field[0] {
1134			case 0:
1135				// Logic above failed. Reset and error.
1136				field = ""
1137			case '@':
1138				order = "2d"
1139				field = field[1:]
1140				// The shell used to render this field as key_ instead of key_2d,
1141				// and mgo followed suit. This has been fixed in recent server
1142				// releases, and mgo followed as well.
1143				keyInfo.name += field + "_2d"
1144			case '-':
1145				order = -1
1146				field = field[1:]
1147				keyInfo.name += field + "_-1"
1148			case '+':
1149				field = field[1:]
1150				fallthrough
1151			default:
1152				if kind == "" {
1153					order = 1
1154					keyInfo.name += field + "_1"
1155				} else {
1156					order = kind
1157				}
1158			}
1159		}
1160		if field == "" || kind != "" && order != kind {
1161			return nil, fmt.Errorf(`invalid index key: want "[$<kind>:][-]<field name>", got %q`, raw)
1162		}
1163		if kind == "text" {
1164			if !isText {
1165				keyInfo.key = append(keyInfo.key, bson.DocElem{"_fts", "text"}, bson.DocElem{"_ftsx", 1})
1166				isText = true
1167			}
1168			keyInfo.weights = append(keyInfo.weights, bson.DocElem{field, 1})
1169		} else {
1170			keyInfo.key = append(keyInfo.key, bson.DocElem{field, order})
1171		}
1172	}
1173	if keyInfo.name == "" {
1174		return nil, errors.New("invalid index key: no fields provided")
1175	}
1176	return &keyInfo, nil
1177}
1178
1179// EnsureIndexKey ensures an index with the given key exists, creating it
1180// if necessary.
1181//
1182// This example:
1183//
1184//     err := collection.EnsureIndexKey("a", "b")
1185//
1186// Is equivalent to:
1187//
1188//     err := collection.EnsureIndex(mgo.Index{Key: []string{"a", "b"}})
1189//
1190// See the EnsureIndex method for more details.
1191func (c *Collection) EnsureIndexKey(key ...string) error {
1192	return c.EnsureIndex(Index{Key: key})
1193}
1194
1195// EnsureIndex ensures an index with the given key exists, creating it with
1196// the provided parameters if necessary. EnsureIndex does not modify a previously
1197// existent index with a matching key. The old index must be dropped first instead.
1198//
1199// Once EnsureIndex returns successfully, following requests for the same index
1200// will not contact the server unless Collection.DropIndex is used to drop the
1201// same index, or Session.ResetIndexCache is called.
1202//
1203// For example:
1204//
1205//     index := Index{
1206//         Key: []string{"lastname", "firstname"},
1207//         Unique: true,
1208//         DropDups: true,
1209//         Background: true, // See notes.
1210//         Sparse: true,
1211//     }
1212//     err := collection.EnsureIndex(index)
1213//
1214// The Key value determines which fields compose the index. The index ordering
1215// will be ascending by default.  To obtain an index with a descending order,
1216// the field name should be prefixed by a dash (e.g. []string{"-time"}). It can
1217// also be optionally prefixed by an index kind, as in "$text:summary" or
1218// "$2d:-point". The key string format is:
1219//
1220//     [$<kind>:][-]<field name>
1221//
1222// If the Unique field is true, the index must necessarily contain only a single
1223// document per Key.  With DropDups set to true, documents with the same key
1224// as a previously indexed one will be dropped rather than an error returned.
1225//
1226// If Background is true, other connections will be allowed to proceed using
1227// the collection without the index while it's being built. Note that the
1228// session executing EnsureIndex will be blocked for as long as it takes for
1229// the index to be built.
1230//
1231// If Sparse is true, only documents containing the provided Key fields will be
1232// included in the index.  When using a sparse index for sorting, only indexed
1233// documents will be returned.
1234//
1235// If ExpireAfter is non-zero, the server will periodically scan the collection
1236// and remove documents containing an indexed time.Time field with a value
1237// older than ExpireAfter. See the documentation for details:
1238//
1239//     http://docs.mongodb.org/manual/tutorial/expire-data
1240//
1241// Other kinds of indexes are also supported through that API. Here is an example:
1242//
1243//     index := Index{
1244//         Key: []string{"$2d:loc"},
1245//         Bits: 26,
1246//     }
1247//     err := collection.EnsureIndex(index)
1248//
1249// The example above requests the creation of a "2d" index for the "loc" field.
1250//
1251// The 2D index bounds may be changed using the Min and Max attributes of the
1252// Index value.  The default bound setting of (-180, 180) is suitable for
1253// latitude/longitude pairs.
1254//
1255// The Bits parameter sets the precision of the 2D geohash values.  If not
1256// provided, 26 bits are used, which is roughly equivalent to 1 foot of
1257// precision for the default (-180, 180) index bounds.
1258//
1259// Relevant documentation:
1260//
1261//     http://www.mongodb.org/display/DOCS/Indexes
1262//     http://www.mongodb.org/display/DOCS/Indexing+Advice+and+FAQ
1263//     http://www.mongodb.org/display/DOCS/Indexing+as+a+Background+Operation
1264//     http://www.mongodb.org/display/DOCS/Geospatial+Indexing
1265//     http://www.mongodb.org/display/DOCS/Multikeys
1266//
1267func (c *Collection) EnsureIndex(index Index) error {
1268	keyInfo, err := parseIndexKey(index.Key)
1269	if err != nil {
1270		return err
1271	}
1272
1273	session := c.Database.Session
1274	cacheKey := c.FullName + "\x00" + keyInfo.name
1275	if session.cluster().HasCachedIndex(cacheKey) {
1276		return nil
1277	}
1278
1279	spec := indexSpec{
1280		Name:             keyInfo.name,
1281		NS:               c.FullName,
1282		Key:              keyInfo.key,
1283		Unique:           index.Unique,
1284		DropDups:         index.DropDups,
1285		Background:       index.Background,
1286		Sparse:           index.Sparse,
1287		Bits:             index.Bits,
1288		Min:              index.Minf,
1289		Max:              index.Maxf,
1290		BucketSize:       index.BucketSize,
1291		ExpireAfter:      int(index.ExpireAfter / time.Second),
1292		Weights:          keyInfo.weights,
1293		DefaultLanguage:  index.DefaultLanguage,
1294		LanguageOverride: index.LanguageOverride,
1295		Collation:        index.Collation,
1296	}
1297
1298	if spec.Min == 0 && spec.Max == 0 {
1299		spec.Min = float64(index.Min)
1300		spec.Max = float64(index.Max)
1301	}
1302
1303	if index.Name != "" {
1304		spec.Name = index.Name
1305	}
1306
1307NextField:
1308	for name, weight := range index.Weights {
1309		for i, elem := range spec.Weights {
1310			if elem.Name == name {
1311				spec.Weights[i].Value = weight
1312				continue NextField
1313			}
1314		}
1315		panic("weight provided for field that is not part of index key: " + name)
1316	}
1317
1318	cloned := session.Clone()
1319	defer cloned.Close()
1320	cloned.SetMode(Strong, false)
1321	cloned.EnsureSafe(&Safe{})
1322	db := c.Database.With(cloned)
1323
1324	// Try with a command first.
1325	err = db.Run(bson.D{{"createIndexes", c.Name}, {"indexes", []indexSpec{spec}}}, nil)
1326	if isNoCmd(err) {
1327		// Command not yet supported. Insert into the indexes collection instead.
1328		err = db.C("system.indexes").Insert(&spec)
1329	}
1330	if err == nil {
1331		session.cluster().CacheIndex(cacheKey, true)
1332	}
1333	return err
1334}
1335
1336// DropIndex drops the index with the provided key from the c collection.
1337//
1338// See EnsureIndex for details on the accepted key variants.
1339//
1340// For example:
1341//
1342//     err1 := collection.DropIndex("firstField", "-secondField")
1343//     err2 := collection.DropIndex("customIndexName")
1344//
1345func (c *Collection) DropIndex(key ...string) error {
1346	keyInfo, err := parseIndexKey(key)
1347	if err != nil {
1348		return err
1349	}
1350
1351	session := c.Database.Session
1352	cacheKey := c.FullName + "\x00" + keyInfo.name
1353	session.cluster().CacheIndex(cacheKey, false)
1354
1355	session = session.Clone()
1356	defer session.Close()
1357	session.SetMode(Strong, false)
1358
1359	db := c.Database.With(session)
1360	result := struct {
1361		ErrMsg string
1362		Ok     bool
1363	}{}
1364	err = db.Run(bson.D{{"dropIndexes", c.Name}, {"index", keyInfo.name}}, &result)
1365	if err != nil {
1366		return err
1367	}
1368	if !result.Ok {
1369		return errors.New(result.ErrMsg)
1370	}
1371	return nil
1372}
1373
1374// DropIndexName removes the index with the provided index name.
1375//
1376// For example:
1377//
1378//     err := collection.DropIndex("customIndexName")
1379//
1380func (c *Collection) DropIndexName(name string) error {
1381	session := c.Database.Session
1382
1383	session = session.Clone()
1384	defer session.Close()
1385	session.SetMode(Strong, false)
1386
1387	c = c.With(session)
1388
1389	indexes, err := c.Indexes()
1390	if err != nil {
1391		return err
1392	}
1393
1394	var index Index
1395	for _, idx := range indexes {
1396		if idx.Name == name {
1397			index = idx
1398			break
1399		}
1400	}
1401
1402	if index.Name != "" {
1403		keyInfo, err := parseIndexKey(index.Key)
1404		if err != nil {
1405			return err
1406		}
1407
1408		cacheKey := c.FullName + "\x00" + keyInfo.name
1409		session.cluster().CacheIndex(cacheKey, false)
1410	}
1411
1412	result := struct {
1413		ErrMsg string
1414		Ok     bool
1415	}{}
1416	err = c.Database.Run(bson.D{{"dropIndexes", c.Name}, {"index", name}}, &result)
1417	if err != nil {
1418		return err
1419	}
1420	if !result.Ok {
1421		return errors.New(result.ErrMsg)
1422	}
1423	return nil
1424}
1425
1426// nonEventual returns a clone of session and ensures it is not Eventual.
1427// This guarantees that the server that is used for queries may be reused
1428// afterwards when a cursor is received.
1429func (session *Session) nonEventual() *Session {
1430	cloned := session.Clone()
1431	if cloned.consistency == Eventual {
1432		cloned.SetMode(Monotonic, false)
1433	}
1434	return cloned
1435}
1436
1437// Indexes returns a list of all indexes for the collection.
1438//
1439// For example, this snippet would drop all available indexes:
1440//
1441//   indexes, err := collection.Indexes()
1442//   if err != nil {
1443//       return err
1444//   }
1445//   for _, index := range indexes {
1446//       err = collection.DropIndex(index.Key...)
1447//       if err != nil {
1448//           return err
1449//       }
1450//   }
1451//
1452// See the EnsureIndex method for more details on indexes.
1453func (c *Collection) Indexes() (indexes []Index, err error) {
1454	cloned := c.Database.Session.nonEventual()
1455	defer cloned.Close()
1456
1457	batchSize := int(cloned.queryConfig.op.limit)
1458
1459	// Try with a command.
1460	var result struct {
1461		Indexes []bson.Raw
1462		Cursor  cursorData
1463	}
1464	var iter *Iter
1465	err = c.Database.With(cloned).Run(bson.D{{"listIndexes", c.Name}, {"cursor", bson.D{{"batchSize", batchSize}}}}, &result)
1466	if err == nil {
1467		firstBatch := result.Indexes
1468		if firstBatch == nil {
1469			firstBatch = result.Cursor.FirstBatch
1470		}
1471		ns := strings.SplitN(result.Cursor.NS, ".", 2)
1472		if len(ns) < 2 {
1473			iter = c.With(cloned).NewIter(nil, firstBatch, result.Cursor.Id, nil)
1474		} else {
1475			iter = cloned.DB(ns[0]).C(ns[1]).NewIter(nil, firstBatch, result.Cursor.Id, nil)
1476		}
1477	} else if isNoCmd(err) {
1478		// Command not yet supported. Query the database instead.
1479		iter = c.Database.C("system.indexes").Find(bson.M{"ns": c.FullName}).Iter()
1480	} else {
1481		return nil, err
1482	}
1483
1484	var spec indexSpec
1485	for iter.Next(&spec) {
1486		indexes = append(indexes, indexFromSpec(spec))
1487	}
1488	if err = iter.Close(); err != nil {
1489		return nil, err
1490	}
1491	sort.Sort(indexSlice(indexes))
1492	return indexes, nil
1493}
1494
1495func indexFromSpec(spec indexSpec) Index {
1496	index := Index{
1497		Name:             spec.Name,
1498		Key:              simpleIndexKey(spec.Key),
1499		Unique:           spec.Unique,
1500		DropDups:         spec.DropDups,
1501		Background:       spec.Background,
1502		Sparse:           spec.Sparse,
1503		Minf:             spec.Min,
1504		Maxf:             spec.Max,
1505		Bits:             spec.Bits,
1506		BucketSize:       spec.BucketSize,
1507		DefaultLanguage:  spec.DefaultLanguage,
1508		LanguageOverride: spec.LanguageOverride,
1509		ExpireAfter:      time.Duration(spec.ExpireAfter) * time.Second,
1510		Collation:        spec.Collation,
1511	}
1512	if float64(int(spec.Min)) == spec.Min && float64(int(spec.Max)) == spec.Max {
1513		index.Min = int(spec.Min)
1514		index.Max = int(spec.Max)
1515	}
1516	if spec.TextIndexVersion > 0 {
1517		index.Key = make([]string, len(spec.Weights))
1518		index.Weights = make(map[string]int)
1519		for i, elem := range spec.Weights {
1520			index.Key[i] = "$text:" + elem.Name
1521			if w, ok := elem.Value.(int); ok {
1522				index.Weights[elem.Name] = w
1523			}
1524		}
1525	}
1526	return index
1527}
1528
1529type indexSlice []Index
1530
1531func (idxs indexSlice) Len() int           { return len(idxs) }
1532func (idxs indexSlice) Less(i, j int) bool { return idxs[i].Name < idxs[j].Name }
1533func (idxs indexSlice) Swap(i, j int)      { idxs[i], idxs[j] = idxs[j], idxs[i] }
1534
1535func simpleIndexKey(realKey bson.D) (key []string) {
1536	for i := range realKey {
1537		field := realKey[i].Name
1538		vi, ok := realKey[i].Value.(int)
1539		if !ok {
1540			vf, _ := realKey[i].Value.(float64)
1541			vi = int(vf)
1542		}
1543		if vi == 1 {
1544			key = append(key, field)
1545			continue
1546		}
1547		if vi == -1 {
1548			key = append(key, "-"+field)
1549			continue
1550		}
1551		if vs, ok := realKey[i].Value.(string); ok {
1552			key = append(key, "$"+vs+":"+field)
1553			continue
1554		}
1555		panic("Got unknown index key type for field " + field)
1556	}
1557	return
1558}
1559
1560// ResetIndexCache() clears the cache of previously ensured indexes.
1561// Following requests to EnsureIndex will contact the server.
1562func (s *Session) ResetIndexCache() {
1563	s.cluster().ResetIndexCache()
1564}
1565
1566// New creates a new session with the same parameters as the original
1567// session, including consistency, batch size, prefetching, safety mode,
1568// etc. The returned session will use sockets from the pool, so there's
1569// a chance that writes just performed in another session may not yet
1570// be visible.
1571//
1572// Login information from the original session will not be copied over
1573// into the new session unless it was provided through the initial URL
1574// for the Dial function.
1575//
1576// See the Copy and Clone methods.
1577//
1578func (s *Session) New() *Session {
1579	s.m.Lock()
1580	scopy := copySession(s, false)
1581	s.m.Unlock()
1582	scopy.Refresh()
1583	return scopy
1584}
1585
1586// Copy works just like New, but preserves the exact authentication
1587// information from the original session.
1588func (s *Session) Copy() *Session {
1589	s.m.Lock()
1590	scopy := copySession(s, true)
1591	s.m.Unlock()
1592	scopy.Refresh()
1593	return scopy
1594}
1595
1596// Clone works just like Copy, but also reuses the same socket as the original
1597// session, in case it had already reserved one due to its consistency
1598// guarantees.  This behavior ensures that writes performed in the old session
1599// are necessarily observed when using the new session, as long as it was a
1600// strong or monotonic session.  That said, it also means that long operations
1601// may cause other goroutines using the original session to wait.
1602func (s *Session) Clone() *Session {
1603	s.m.Lock()
1604	scopy := copySession(s, true)
1605	s.m.Unlock()
1606	return scopy
1607}
1608
1609// Close terminates the session.  It's a runtime error to use a session
1610// after it has been closed.
1611func (s *Session) Close() {
1612	s.m.Lock()
1613	if s.cluster_ != nil {
1614		debugf("Closing session %p", s)
1615		s.unsetSocket()
1616		s.cluster_.Release()
1617		s.cluster_ = nil
1618	}
1619	s.m.Unlock()
1620}
1621
1622func (s *Session) cluster() *mongoCluster {
1623	if s.cluster_ == nil {
1624		panic("Session already closed")
1625	}
1626	return s.cluster_
1627}
1628
1629// Refresh puts back any reserved sockets in use and restarts the consistency
1630// guarantees according to the current consistency setting for the session.
1631func (s *Session) Refresh() {
1632	s.m.Lock()
1633	s.slaveOk = s.consistency != Strong
1634	s.unsetSocket()
1635	s.m.Unlock()
1636}
1637
1638// SetMode changes the consistency mode for the session.
1639//
1640// The default mode is Strong.
1641//
1642// In the Strong consistency mode reads and writes will always be made to
1643// the primary server using a unique connection so that reads and writes are
1644// fully consistent, ordered, and observing the most up-to-date data.
1645// This offers the least benefits in terms of distributing load, but the
1646// most guarantees.  See also Monotonic and Eventual.
1647//
1648// In the Monotonic consistency mode reads may not be entirely up-to-date,
1649// but they will always see the history of changes moving forward, the data
1650// read will be consistent across sequential queries in the same session,
1651// and modifications made within the session will be observed in following
1652// queries (read-your-writes).
1653//
1654// In practice, the Monotonic mode is obtained by performing initial reads
1655// on a unique connection to an arbitrary secondary, if one is available,
1656// and once the first write happens, the session connection is switched over
1657// to the primary server.  This manages to distribute some of the reading
1658// load with secondaries, while maintaining some useful guarantees.
1659//
1660// In the Eventual consistency mode reads will be made to any secondary in the
1661// cluster, if one is available, and sequential reads will not necessarily
1662// be made with the same connection.  This means that data may be observed
1663// out of order.  Writes will of course be issued to the primary, but
1664// independent writes in the same Eventual session may also be made with
1665// independent connections, so there are also no guarantees in terms of
1666// write ordering (no read-your-writes guarantees either).
1667//
1668// The Eventual mode is the fastest and most resource-friendly, but is
1669// also the one offering the least guarantees about ordering of the data
1670// read and written.
1671//
1672// If refresh is true, in addition to ensuring the session is in the given
1673// consistency mode, the consistency guarantees will also be reset (e.g.
1674// a Monotonic session will be allowed to read from secondaries again).
1675// This is equivalent to calling the Refresh function.
1676//
1677// Shifting between Monotonic and Strong modes will keep a previously
1678// reserved connection for the session unless refresh is true or the
1679// connection is unsuitable (to a secondary server in a Strong session).
1680func (s *Session) SetMode(consistency Mode, refresh bool) {
1681	s.m.Lock()
1682	debugf("Session %p: setting mode %d with refresh=%v (master=%p, slave=%p)", s, consistency, refresh, s.masterSocket, s.slaveSocket)
1683	s.consistency = consistency
1684	if refresh {
1685		s.slaveOk = s.consistency != Strong
1686		s.unsetSocket()
1687	} else if s.consistency == Strong {
1688		s.slaveOk = false
1689	} else if s.masterSocket == nil {
1690		s.slaveOk = true
1691	}
1692	s.m.Unlock()
1693}
1694
1695// Mode returns the current consistency mode for the session.
1696func (s *Session) Mode() Mode {
1697	s.m.RLock()
1698	mode := s.consistency
1699	s.m.RUnlock()
1700	return mode
1701}
1702
1703// SetSyncTimeout sets the amount of time an operation with this session
1704// will wait before returning an error in case a connection to a usable
1705// server can't be established. Set it to zero to wait forever. The
1706// default value is 7 seconds.
1707func (s *Session) SetSyncTimeout(d time.Duration) {
1708	s.m.Lock()
1709	s.syncTimeout = d
1710	s.m.Unlock()
1711}
1712
1713// SetSocketTimeout sets the amount of time to wait for a non-responding
1714// socket to the database before it is forcefully closed.
1715//
1716// The default timeout is 1 minute.
1717func (s *Session) SetSocketTimeout(d time.Duration) {
1718	s.m.Lock()
1719	s.sockTimeout = d
1720	if s.masterSocket != nil {
1721		s.masterSocket.SetTimeout(d)
1722	}
1723	if s.slaveSocket != nil {
1724		s.slaveSocket.SetTimeout(d)
1725	}
1726	s.m.Unlock()
1727}
1728
1729// SetCursorTimeout changes the standard timeout period that the server
1730// enforces on created cursors. The only supported value right now is
1731// 0, which disables the timeout. The standard server timeout is 10 minutes.
1732func (s *Session) SetCursorTimeout(d time.Duration) {
1733	s.m.Lock()
1734	if d == 0 {
1735		s.queryConfig.op.flags |= flagNoCursorTimeout
1736	} else {
1737		panic("SetCursorTimeout: only 0 (disable timeout) supported for now")
1738	}
1739	s.m.Unlock()
1740}
1741
1742// SetPoolLimit sets the maximum number of sockets in use in a single server
1743// before this session will block waiting for a socket to be available.
1744// The default limit is 4096.
1745//
1746// This limit must be set to cover more than any expected workload of the
1747// application. It is a bad practice and an unsupported use case to use the
1748// database driver to define the concurrency limit of an application. Prevent
1749// such concurrency "at the door" instead, by properly restricting the amount
1750// of used resources and number of goroutines before they are created.
1751func (s *Session) SetPoolLimit(limit int) {
1752	s.m.Lock()
1753	s.poolLimit = limit
1754	s.m.Unlock()
1755}
1756
1757// SetBypassValidation sets whether the server should bypass the registered
1758// validation expressions executed when documents are inserted or modified,
1759// in the interest of preserving invariants in the collection being modified.
1760// The default is to not bypass, and thus to perform the validation
1761// expressions registered for modified collections.
1762//
1763// Document validation was introuced in MongoDB 3.2.
1764//
1765// Relevant documentation:
1766//
1767//   https://docs.mongodb.org/manual/release-notes/3.2/#bypass-validation
1768//
1769func (s *Session) SetBypassValidation(bypass bool) {
1770	s.m.Lock()
1771	s.bypassValidation = bypass
1772	s.m.Unlock()
1773}
1774
1775// SetBatch sets the default batch size used when fetching documents from the
1776// database. It's possible to change this setting on a per-query basis as
1777// well, using the Query.Batch method.
1778//
1779// The default batch size is defined by the database itself.  As of this
1780// writing, MongoDB will use an initial size of min(100 docs, 4MB) on the
1781// first batch, and 4MB on remaining ones.
1782func (s *Session) SetBatch(n int) {
1783	if n == 1 {
1784		// Server interprets 1 as -1 and closes the cursor (!?)
1785		n = 2
1786	}
1787	s.m.Lock()
1788	s.queryConfig.op.limit = int32(n)
1789	s.m.Unlock()
1790}
1791
1792// SetPrefetch sets the default point at which the next batch of results will be
1793// requested.  When there are p*batch_size remaining documents cached in an
1794// Iter, the next batch will be requested in background. For instance, when
1795// using this:
1796//
1797//     session.SetBatch(200)
1798//     session.SetPrefetch(0.25)
1799//
1800// and there are only 50 documents cached in the Iter to be processed, the
1801// next batch of 200 will be requested. It's possible to change this setting on
1802// a per-query basis as well, using the Prefetch method of Query.
1803//
1804// The default prefetch value is 0.25.
1805func (s *Session) SetPrefetch(p float64) {
1806	s.m.Lock()
1807	s.queryConfig.prefetch = p
1808	s.m.Unlock()
1809}
1810
1811// See SetSafe for details on the Safe type.
1812type Safe struct {
1813	W        int    // Min # of servers to ack before success
1814	WMode    string // Write mode for MongoDB 2.0+ (e.g. "majority")
1815	WTimeout int    // Milliseconds to wait for W before timing out
1816	FSync    bool   // Sync via the journal if present, or via data files sync otherwise
1817	J        bool   // Sync via the journal if present
1818}
1819
1820// Safe returns the current safety mode for the session.
1821func (s *Session) Safe() (safe *Safe) {
1822	s.m.Lock()
1823	defer s.m.Unlock()
1824	if s.safeOp != nil {
1825		cmd := s.safeOp.query.(*getLastError)
1826		safe = &Safe{WTimeout: cmd.WTimeout, FSync: cmd.FSync, J: cmd.J}
1827		switch w := cmd.W.(type) {
1828		case string:
1829			safe.WMode = w
1830		case int:
1831			safe.W = w
1832		}
1833	}
1834	return
1835}
1836
1837// SetSafe changes the session safety mode.
1838//
1839// If the safe parameter is nil, the session is put in unsafe mode, and writes
1840// become fire-and-forget, without error checking.  The unsafe mode is faster
1841// since operations won't hold on waiting for a confirmation.
1842//
1843// If the safe parameter is not nil, any changing query (insert, update, ...)
1844// will be followed by a getLastError command with the specified parameters,
1845// to ensure the request was correctly processed.
1846//
1847// The default is &Safe{}, meaning check for errors and use the default
1848// behavior for all fields.
1849//
1850// The safe.W parameter determines how many servers should confirm a write
1851// before the operation is considered successful.  If set to 0 or 1, the
1852// command will return as soon as the primary is done with the request.
1853// If safe.WTimeout is greater than zero, it determines how many milliseconds
1854// to wait for the safe.W servers to respond before returning an error.
1855//
1856// Starting with MongoDB 2.0.0 the safe.WMode parameter can be used instead
1857// of W to request for richer semantics. If set to "majority" the server will
1858// wait for a majority of members from the replica set to respond before
1859// returning. Custom modes may also be defined within the server to create
1860// very detailed placement schemas. See the data awareness documentation in
1861// the links below for more details (note that MongoDB internally reuses the
1862// "w" field name for WMode).
1863//
1864// If safe.J is true, servers will block until write operations have been
1865// committed to the journal. Cannot be used in combination with FSync. Prior
1866// to MongoDB 2.6 this option was ignored if the server was running without
1867// journaling. Starting with MongoDB 2.6 write operations will fail with an
1868// exception if this option is used when the server is running without
1869// journaling.
1870//
1871// If safe.FSync is true and the server is running without journaling, blocks
1872// until the server has synced all data files to disk. If the server is running
1873// with journaling, this acts the same as the J option, blocking until write
1874// operations have been committed to the journal. Cannot be used in
1875// combination with J.
1876//
1877// Since MongoDB 2.0.0, the safe.J option can also be used instead of FSync
1878// to force the server to wait for a group commit in case journaling is
1879// enabled. The option has no effect if the server has journaling disabled.
1880//
1881// For example, the following statement will make the session check for
1882// errors, without imposing further constraints:
1883//
1884//     session.SetSafe(&mgo.Safe{})
1885//
1886// The following statement will force the server to wait for a majority of
1887// members of a replica set to return (MongoDB 2.0+ only):
1888//
1889//     session.SetSafe(&mgo.Safe{WMode: "majority"})
1890//
1891// The following statement, on the other hand, ensures that at least two
1892// servers have flushed the change to disk before confirming the success
1893// of operations:
1894//
1895//     session.EnsureSafe(&mgo.Safe{W: 2, FSync: true})
1896//
1897// The following statement, on the other hand, disables the verification
1898// of errors entirely:
1899//
1900//     session.SetSafe(nil)
1901//
1902// See also the EnsureSafe method.
1903//
1904// Relevant documentation:
1905//
1906//     http://www.mongodb.org/display/DOCS/getLastError+Command
1907//     http://www.mongodb.org/display/DOCS/Verifying+Propagation+of+Writes+with+getLastError
1908//     http://www.mongodb.org/display/DOCS/Data+Center+Awareness
1909//
1910func (s *Session) SetSafe(safe *Safe) {
1911	s.m.Lock()
1912	s.safeOp = nil
1913	s.ensureSafe(safe)
1914	s.m.Unlock()
1915}
1916
1917// EnsureSafe compares the provided safety parameters with the ones
1918// currently in use by the session and picks the most conservative
1919// choice for each setting.
1920//
1921// That is:
1922//
1923//     - safe.WMode is always used if set.
1924//     - safe.W is used if larger than the current W and WMode is empty.
1925//     - safe.FSync is always used if true.
1926//     - safe.J is used if FSync is false.
1927//     - safe.WTimeout is used if set and smaller than the current WTimeout.
1928//
1929// For example, the following statement will ensure the session is
1930// at least checking for errors, without enforcing further constraints.
1931// If a more conservative SetSafe or EnsureSafe call was previously done,
1932// the following call will be ignored.
1933//
1934//     session.EnsureSafe(&mgo.Safe{})
1935//
1936// See also the SetSafe method for details on what each option means.
1937//
1938// Relevant documentation:
1939//
1940//     http://www.mongodb.org/display/DOCS/getLastError+Command
1941//     http://www.mongodb.org/display/DOCS/Verifying+Propagation+of+Writes+with+getLastError
1942//     http://www.mongodb.org/display/DOCS/Data+Center+Awareness
1943//
1944func (s *Session) EnsureSafe(safe *Safe) {
1945	s.m.Lock()
1946	s.ensureSafe(safe)
1947	s.m.Unlock()
1948}
1949
1950func (s *Session) ensureSafe(safe *Safe) {
1951	if safe == nil {
1952		return
1953	}
1954
1955	var w interface{}
1956	if safe.WMode != "" {
1957		w = safe.WMode
1958	} else if safe.W > 0 {
1959		w = safe.W
1960	}
1961
1962	var cmd getLastError
1963	if s.safeOp == nil {
1964		cmd = getLastError{1, w, safe.WTimeout, safe.FSync, safe.J}
1965	} else {
1966		// Copy.  We don't want to mutate the existing query.
1967		cmd = *(s.safeOp.query.(*getLastError))
1968		if cmd.W == nil {
1969			cmd.W = w
1970		} else if safe.WMode != "" {
1971			cmd.W = safe.WMode
1972		} else if i, ok := cmd.W.(int); ok && safe.W > i {
1973			cmd.W = safe.W
1974		}
1975		if safe.WTimeout > 0 && safe.WTimeout < cmd.WTimeout {
1976			cmd.WTimeout = safe.WTimeout
1977		}
1978		if safe.FSync {
1979			cmd.FSync = true
1980			cmd.J = false
1981		} else if safe.J && !cmd.FSync {
1982			cmd.J = true
1983		}
1984	}
1985	s.safeOp = &queryOp{
1986		query:      &cmd,
1987		collection: "admin.$cmd",
1988		limit:      -1,
1989	}
1990}
1991
1992// Run issues the provided command on the "admin" database and
1993// and unmarshals its result in the respective argument. The cmd
1994// argument may be either a string with the command name itself, in
1995// which case an empty document of the form bson.M{cmd: 1} will be used,
1996// or it may be a full command document.
1997//
1998// Note that MongoDB considers the first marshalled key as the command
1999// name, so when providing a command with options, it's important to
2000// use an ordering-preserving document, such as a struct value or an
2001// instance of bson.D.  For instance:
2002//
2003//     db.Run(bson.D{{"create", "mycollection"}, {"size", 1024}})
2004//
2005// For commands on arbitrary databases, see the Run method in
2006// the Database type.
2007//
2008// Relevant documentation:
2009//
2010//     http://www.mongodb.org/display/DOCS/Commands
2011//     http://www.mongodb.org/display/DOCS/List+of+Database+CommandSkips
2012//
2013func (s *Session) Run(cmd interface{}, result interface{}) error {
2014	return s.DB("admin").Run(cmd, result)
2015}
2016
2017// SelectServers restricts communication to servers configured with the
2018// given tags. For example, the following statement restricts servers
2019// used for reading operations to those with both tag "disk" set to
2020// "ssd" and tag "rack" set to 1:
2021//
2022//     session.SelectServers(bson.D{{"disk", "ssd"}, {"rack", 1}})
2023//
2024// Multiple sets of tags may be provided, in which case the used server
2025// must match all tags within any one set.
2026//
2027// If a connection was previously assigned to the session due to the
2028// current session mode (see Session.SetMode), the tag selection will
2029// only be enforced after the session is refreshed.
2030//
2031// Relevant documentation:
2032//
2033//     http://docs.mongodb.org/manual/tutorial/configure-replica-set-tag-sets
2034//
2035func (s *Session) SelectServers(tags ...bson.D) {
2036	s.m.Lock()
2037	s.queryConfig.op.serverTags = tags
2038	s.m.Unlock()
2039}
2040
2041// Ping runs a trivial ping command just to get in touch with the server.
2042func (s *Session) Ping() error {
2043	return s.Run("ping", nil)
2044}
2045
2046// Fsync flushes in-memory writes to disk on the server the session
2047// is established with. If async is true, the call returns immediately,
2048// otherwise it returns after the flush has been made.
2049func (s *Session) Fsync(async bool) error {
2050	return s.Run(bson.D{{"fsync", 1}, {"async", async}}, nil)
2051}
2052
2053// FsyncLock locks all writes in the specific server the session is
2054// established with and returns. Any writes attempted to the server
2055// after it is successfully locked will block until FsyncUnlock is
2056// called for the same server.
2057//
2058// This method works on secondaries as well, preventing the oplog from
2059// being flushed while the server is locked, but since only the server
2060// connected to is locked, for locking specific secondaries it may be
2061// necessary to establish a connection directly to the secondary (see
2062// Dial's connect=direct option).
2063//
2064// As an important caveat, note that once a write is attempted and
2065// blocks, follow up reads will block as well due to the way the
2066// lock is internally implemented in the server. More details at:
2067//
2068//     https://jira.mongodb.org/browse/SERVER-4243
2069//
2070// FsyncLock is often used for performing consistent backups of
2071// the database files on disk.
2072//
2073// Relevant documentation:
2074//
2075//     http://www.mongodb.org/display/DOCS/fsync+Command
2076//     http://www.mongodb.org/display/DOCS/Backups
2077//
2078func (s *Session) FsyncLock() error {
2079	return s.Run(bson.D{{"fsync", 1}, {"lock", true}}, nil)
2080}
2081
2082// FsyncUnlock releases the server for writes. See FsyncLock for details.
2083func (s *Session) FsyncUnlock() error {
2084	err := s.Run(bson.D{{"fsyncUnlock", 1}}, nil)
2085	if isNoCmd(err) {
2086		err = s.DB("admin").C("$cmd.sys.unlock").Find(nil).One(nil) // WTF?
2087	}
2088	return err
2089}
2090
2091// Find prepares a query using the provided document.  The document may be a
2092// map or a struct value capable of being marshalled with bson.  The map
2093// may be a generic one using interface{} for its key and/or values, such as
2094// bson.M, or it may be a properly typed map.  Providing nil as the document
2095// is equivalent to providing an empty document such as bson.M{}.
2096//
2097// Further details of the query may be tweaked using the resulting Query value,
2098// and then executed to retrieve results using methods such as One, For,
2099// Iter, or Tail.
2100//
2101// In case the resulting document includes a field named $err or errmsg, which
2102// are standard ways for MongoDB to return query errors, the returned err will
2103// be set to a *QueryError value including the Err message and the Code.  In
2104// those cases, the result argument is still unmarshalled into with the
2105// received document so that any other custom values may be obtained if
2106// desired.
2107//
2108// Relevant documentation:
2109//
2110//     http://www.mongodb.org/display/DOCS/Querying
2111//     http://www.mongodb.org/display/DOCS/Advanced+Queries
2112//
2113func (c *Collection) Find(query interface{}) *Query {
2114	session := c.Database.Session
2115	session.m.RLock()
2116	q := &Query{session: session, query: session.queryConfig}
2117	session.m.RUnlock()
2118	q.op.query = query
2119	q.op.collection = c.FullName
2120	return q
2121}
2122
2123type repairCmd struct {
2124	RepairCursor string           `bson:"repairCursor"`
2125	Cursor       *repairCmdCursor ",omitempty"
2126}
2127
2128type repairCmdCursor struct {
2129	BatchSize int `bson:"batchSize,omitempty"`
2130}
2131
2132// Repair returns an iterator that goes over all recovered documents in the
2133// collection, in a best-effort manner. This is most useful when there are
2134// damaged data files. Multiple copies of the same document may be returned
2135// by the iterator.
2136//
2137// Repair is supported in MongoDB 2.7.8 and later.
2138func (c *Collection) Repair() *Iter {
2139	// Clone session and set it to Monotonic mode so that the server
2140	// used for the query may be safely obtained afterwards, if
2141	// necessary for iteration when a cursor is received.
2142	session := c.Database.Session
2143	cloned := session.nonEventual()
2144	defer cloned.Close()
2145
2146	batchSize := int(cloned.queryConfig.op.limit)
2147
2148	var result struct{ Cursor cursorData }
2149
2150	cmd := repairCmd{
2151		RepairCursor: c.Name,
2152		Cursor:       &repairCmdCursor{batchSize},
2153	}
2154
2155	clonedc := c.With(cloned)
2156	err := clonedc.Database.Run(cmd, &result)
2157	return clonedc.NewIter(session, result.Cursor.FirstBatch, result.Cursor.Id, err)
2158}
2159
2160// FindId is a convenience helper equivalent to:
2161//
2162//     query := collection.Find(bson.M{"_id": id})
2163//
2164// See the Find method for more details.
2165func (c *Collection) FindId(id interface{}) *Query {
2166	return c.Find(bson.D{{"_id", id}})
2167}
2168
2169type Pipe struct {
2170	session    *Session
2171	collection *Collection
2172	pipeline   interface{}
2173	allowDisk  bool
2174	batchSize  int
2175}
2176
2177type pipeCmd struct {
2178	Aggregate string
2179	Pipeline  interface{}
2180	Cursor    *pipeCmdCursor ",omitempty"
2181	Explain   bool           ",omitempty"
2182	AllowDisk bool           "allowDiskUse,omitempty"
2183}
2184
2185type pipeCmdCursor struct {
2186	BatchSize int `bson:"batchSize,omitempty"`
2187}
2188
2189// Pipe prepares a pipeline to aggregate. The pipeline document
2190// must be a slice built in terms of the aggregation framework language.
2191//
2192// For example:
2193//
2194//     pipe := collection.Pipe([]bson.M{{"$match": bson.M{"name": "Otavio"}}})
2195//     iter := pipe.Iter()
2196//
2197// Relevant documentation:
2198//
2199//     http://docs.mongodb.org/manual/reference/aggregation
2200//     http://docs.mongodb.org/manual/applications/aggregation
2201//     http://docs.mongodb.org/manual/tutorial/aggregation-examples
2202//
2203func (c *Collection) Pipe(pipeline interface{}) *Pipe {
2204	session := c.Database.Session
2205	session.m.RLock()
2206	batchSize := int(session.queryConfig.op.limit)
2207	session.m.RUnlock()
2208	return &Pipe{
2209		session:    session,
2210		collection: c,
2211		pipeline:   pipeline,
2212		batchSize:  batchSize,
2213	}
2214}
2215
2216// Iter executes the pipeline and returns an iterator capable of going
2217// over all the generated results.
2218func (p *Pipe) Iter() *Iter {
2219	// Clone session and set it to Monotonic mode so that the server
2220	// used for the query may be safely obtained afterwards, if
2221	// necessary for iteration when a cursor is received.
2222	cloned := p.session.nonEventual()
2223	defer cloned.Close()
2224	c := p.collection.With(cloned)
2225
2226	var result struct {
2227		Result []bson.Raw // 2.4, no cursors.
2228		Cursor cursorData // 2.6+, with cursors.
2229	}
2230
2231	cmd := pipeCmd{
2232		Aggregate: c.Name,
2233		Pipeline:  p.pipeline,
2234		AllowDisk: p.allowDisk,
2235		Cursor:    &pipeCmdCursor{p.batchSize},
2236	}
2237	err := c.Database.Run(cmd, &result)
2238	if e, ok := err.(*QueryError); ok && e.Message == `unrecognized field "cursor` {
2239		cmd.Cursor = nil
2240		cmd.AllowDisk = false
2241		err = c.Database.Run(cmd, &result)
2242	}
2243	firstBatch := result.Result
2244	if firstBatch == nil {
2245		firstBatch = result.Cursor.FirstBatch
2246	}
2247	return c.NewIter(p.session, firstBatch, result.Cursor.Id, err)
2248}
2249
2250// NewIter returns a newly created iterator with the provided parameters.
2251// Using this method is not recommended unless the desired functionality
2252// is not yet exposed via a more convenient interface (Find, Pipe, etc).
2253//
2254// The optional session parameter associates the lifetime of the returned
2255// iterator to an arbitrary session. If nil, the iterator will be bound to
2256// c's session.
2257//
2258// Documents in firstBatch will be individually provided by the returned
2259// iterator before documents from cursorId are made available. If cursorId
2260// is zero, only the documents in firstBatch are provided.
2261//
2262// If err is not nil, the iterator's Err method will report it after
2263// exhausting documents in firstBatch.
2264//
2265// NewIter must be called right after the cursor id is obtained, and must not
2266// be called on a collection in Eventual mode, because the cursor id is
2267// associated with the specific server that returned it. The provided session
2268// parameter may be in any mode or state, though.
2269//
2270func (c *Collection) NewIter(session *Session, firstBatch []bson.Raw, cursorId int64, err error) *Iter {
2271	var server *mongoServer
2272	csession := c.Database.Session
2273	csession.m.RLock()
2274	socket := csession.masterSocket
2275	if socket == nil {
2276		socket = csession.slaveSocket
2277	}
2278	if socket != nil {
2279		server = socket.Server()
2280	}
2281	csession.m.RUnlock()
2282
2283	if server == nil {
2284		if csession.Mode() == Eventual {
2285			panic("Collection.NewIter called in Eventual mode")
2286		}
2287		if err == nil {
2288			err = errors.New("server not available")
2289		}
2290	}
2291
2292	if session == nil {
2293		session = csession
2294	}
2295
2296	iter := &Iter{
2297		session: session,
2298		server:  server,
2299		timeout: -1,
2300		err:     err,
2301	}
2302	iter.gotReply.L = &iter.m
2303	for _, doc := range firstBatch {
2304		iter.docData.Push(doc.Data)
2305	}
2306	if cursorId != 0 {
2307		iter.op.cursorId = cursorId
2308		iter.op.collection = c.FullName
2309		iter.op.replyFunc = iter.replyFunc()
2310	}
2311	return iter
2312}
2313
2314// All works like Iter.All.
2315func (p *Pipe) All(result interface{}) error {
2316	return p.Iter().All(result)
2317}
2318
2319// One executes the pipeline and unmarshals the first item from the
2320// result set into the result parameter.
2321// It returns ErrNotFound if no items are generated by the pipeline.
2322func (p *Pipe) One(result interface{}) error {
2323	iter := p.Iter()
2324	if iter.Next(result) {
2325		return nil
2326	}
2327	if err := iter.Err(); err != nil {
2328		return err
2329	}
2330	return ErrNotFound
2331}
2332
2333// Explain returns a number of details about how the MongoDB server would
2334// execute the requested pipeline, such as the number of objects examined,
2335// the number of times the read lock was yielded to allow writes to go in,
2336// and so on.
2337//
2338// For example:
2339//
2340//     var m bson.M
2341//     err := collection.Pipe(pipeline).Explain(&m)
2342//     if err == nil {
2343//         fmt.Printf("Explain: %#v\n", m)
2344//     }
2345//
2346func (p *Pipe) Explain(result interface{}) error {
2347	c := p.collection
2348	cmd := pipeCmd{
2349		Aggregate: c.Name,
2350		Pipeline:  p.pipeline,
2351		AllowDisk: p.allowDisk,
2352		Explain:   true,
2353	}
2354	return c.Database.Run(cmd, result)
2355}
2356
2357// AllowDiskUse enables writing to the "<dbpath>/_tmp" server directory so
2358// that aggregation pipelines do not have to be held entirely in memory.
2359func (p *Pipe) AllowDiskUse() *Pipe {
2360	p.allowDisk = true
2361	return p
2362}
2363
2364// Batch sets the batch size used when fetching documents from the database.
2365// It's possible to change this setting on a per-session basis as well, using
2366// the Batch method of Session.
2367//
2368// The default batch size is defined by the database server.
2369func (p *Pipe) Batch(n int) *Pipe {
2370	p.batchSize = n
2371	return p
2372}
2373
2374// mgo.v3: Use a single user-visible error type.
2375
2376type LastError struct {
2377	Err             string
2378	Code, N, Waited int
2379	FSyncFiles      int `bson:"fsyncFiles"`
2380	WTimeout        bool
2381	UpdatedExisting bool        `bson:"updatedExisting"`
2382	UpsertedId      interface{} `bson:"upserted"`
2383
2384	modified int
2385	ecases   []BulkErrorCase
2386}
2387
2388func (err *LastError) Error() string {
2389	return err.Err
2390}
2391
2392type queryError struct {
2393	Err           string "$err"
2394	ErrMsg        string
2395	Assertion     string
2396	Code          int
2397	AssertionCode int "assertionCode"
2398}
2399
2400type QueryError struct {
2401	Code      int
2402	Message   string
2403	Assertion bool
2404}
2405
2406func (err *QueryError) Error() string {
2407	return err.Message
2408}
2409
2410// IsDup returns whether err informs of a duplicate key error because
2411// a primary key index or a secondary unique index already has an entry
2412// with the given value.
2413func IsDup(err error) bool {
2414	// Besides being handy, helps with MongoDB bugs SERVER-7164 and SERVER-11493.
2415	// What follows makes me sad. Hopefully conventions will be more clear over time.
2416	switch e := err.(type) {
2417	case *LastError:
2418		return e.Code == 11000 || e.Code == 11001 || e.Code == 12582 || e.Code == 16460 && strings.Contains(e.Err, " E11000 ")
2419	case *QueryError:
2420		return e.Code == 11000 || e.Code == 11001 || e.Code == 12582
2421	case *BulkError:
2422		for _, ecase := range e.ecases {
2423			if !IsDup(ecase.Err) {
2424				return false
2425			}
2426		}
2427		return true
2428	}
2429	return false
2430}
2431
2432// Insert inserts one or more documents in the respective collection.  In
2433// case the session is in safe mode (see the SetSafe method) and an error
2434// happens while inserting the provided documents, the returned error will
2435// be of type *LastError.
2436func (c *Collection) Insert(docs ...interface{}) error {
2437	_, err := c.writeOp(&insertOp{c.FullName, docs, 0}, true)
2438	return err
2439}
2440
2441// Update finds a single document matching the provided selector document
2442// and modifies it according to the update document.
2443// If the session is in safe mode (see SetSafe) a ErrNotFound error is
2444// returned if a document isn't found, or a value of type *LastError
2445// when some other error is detected.
2446//
2447// Relevant documentation:
2448//
2449//     http://www.mongodb.org/display/DOCS/Updating
2450//     http://www.mongodb.org/display/DOCS/Atomic+Operations
2451//
2452func (c *Collection) Update(selector interface{}, update interface{}) error {
2453	if selector == nil {
2454		selector = bson.D{}
2455	}
2456	op := updateOp{
2457		Collection: c.FullName,
2458		Selector:   selector,
2459		Update:     update,
2460	}
2461	lerr, err := c.writeOp(&op, true)
2462	if err == nil && lerr != nil && !lerr.UpdatedExisting {
2463		return ErrNotFound
2464	}
2465	return err
2466}
2467
2468// UpdateId is a convenience helper equivalent to:
2469//
2470//     err := collection.Update(bson.M{"_id": id}, update)
2471//
2472// See the Update method for more details.
2473func (c *Collection) UpdateId(id interface{}, update interface{}) error {
2474	return c.Update(bson.D{{"_id", id}}, update)
2475}
2476
2477// ChangeInfo holds details about the outcome of an update operation.
2478type ChangeInfo struct {
2479	// Updated reports the number of existing documents modified.
2480	// Due to server limitations, this reports the same value as the Matched field when
2481	// talking to MongoDB <= 2.4 and on Upsert and Apply (findAndModify) operations.
2482	Updated    int
2483	Removed    int         // Number of documents removed
2484	Matched    int         // Number of documents matched but not necessarily changed
2485	UpsertedId interface{} // Upserted _id field, when not explicitly provided
2486}
2487
2488// UpdateAll finds all documents matching the provided selector document
2489// and modifies them according to the update document.
2490// If the session is in safe mode (see SetSafe) details of the executed
2491// operation are returned in info or an error of type *LastError when
2492// some problem is detected. It is not an error for the update to not be
2493// applied on any documents because the selector doesn't match.
2494//
2495// Relevant documentation:
2496//
2497//     http://www.mongodb.org/display/DOCS/Updating
2498//     http://www.mongodb.org/display/DOCS/Atomic+Operations
2499//
2500func (c *Collection) UpdateAll(selector interface{}, update interface{}) (info *ChangeInfo, err error) {
2501	if selector == nil {
2502		selector = bson.D{}
2503	}
2504	op := updateOp{
2505		Collection: c.FullName,
2506		Selector:   selector,
2507		Update:     update,
2508		Flags:      2,
2509		Multi:      true,
2510	}
2511	lerr, err := c.writeOp(&op, true)
2512	if err == nil && lerr != nil {
2513		info = &ChangeInfo{Updated: lerr.modified, Matched: lerr.N}
2514	}
2515	return info, err
2516}
2517
2518// Upsert finds a single document matching the provided selector document
2519// and modifies it according to the update document.  If no document matching
2520// the selector is found, the update document is applied to the selector
2521// document and the result is inserted in the collection.
2522// If the session is in safe mode (see SetSafe) details of the executed
2523// operation are returned in info, or an error of type *LastError when
2524// some problem is detected.
2525//
2526// Relevant documentation:
2527//
2528//     http://www.mongodb.org/display/DOCS/Updating
2529//     http://www.mongodb.org/display/DOCS/Atomic+Operations
2530//
2531func (c *Collection) Upsert(selector interface{}, update interface{}) (info *ChangeInfo, err error) {
2532	if selector == nil {
2533		selector = bson.D{}
2534	}
2535	op := updateOp{
2536		Collection: c.FullName,
2537		Selector:   selector,
2538		Update:     update,
2539		Flags:      1,
2540		Upsert:     true,
2541	}
2542	var lerr *LastError
2543	for i := 0; i < maxUpsertRetries; i++ {
2544		lerr, err = c.writeOp(&op, true)
2545		// Retry duplicate key errors on upserts.
2546		// https://docs.mongodb.com/v3.2/reference/method/db.collection.update/#use-unique-indexes
2547		if !IsDup(err) {
2548			break
2549		}
2550	}
2551	if err == nil && lerr != nil {
2552		info = &ChangeInfo{}
2553		if lerr.UpdatedExisting {
2554			info.Matched = lerr.N
2555			info.Updated = lerr.modified
2556		} else {
2557			info.UpsertedId = lerr.UpsertedId
2558		}
2559	}
2560	return info, err
2561}
2562
2563// UpsertId is a convenience helper equivalent to:
2564//
2565//     info, err := collection.Upsert(bson.M{"_id": id}, update)
2566//
2567// See the Upsert method for more details.
2568func (c *Collection) UpsertId(id interface{}, update interface{}) (info *ChangeInfo, err error) {
2569	return c.Upsert(bson.D{{"_id", id}}, update)
2570}
2571
2572// Remove finds a single document matching the provided selector document
2573// and removes it from the database.
2574// If the session is in safe mode (see SetSafe) a ErrNotFound error is
2575// returned if a document isn't found, or a value of type *LastError
2576// when some other error is detected.
2577//
2578// Relevant documentation:
2579//
2580//     http://www.mongodb.org/display/DOCS/Removing
2581//
2582func (c *Collection) Remove(selector interface{}) error {
2583	if selector == nil {
2584		selector = bson.D{}
2585	}
2586	lerr, err := c.writeOp(&deleteOp{c.FullName, selector, 1, 1}, true)
2587	if err == nil && lerr != nil && lerr.N == 0 {
2588		return ErrNotFound
2589	}
2590	return err
2591}
2592
2593// RemoveId is a convenience helper equivalent to:
2594//
2595//     err := collection.Remove(bson.M{"_id": id})
2596//
2597// See the Remove method for more details.
2598func (c *Collection) RemoveId(id interface{}) error {
2599	return c.Remove(bson.D{{"_id", id}})
2600}
2601
2602// RemoveAll finds all documents matching the provided selector document
2603// and removes them from the database.  In case the session is in safe mode
2604// (see the SetSafe method) and an error happens when attempting the change,
2605// the returned error will be of type *LastError.
2606//
2607// Relevant documentation:
2608//
2609//     http://www.mongodb.org/display/DOCS/Removing
2610//
2611func (c *Collection) RemoveAll(selector interface{}) (info *ChangeInfo, err error) {
2612	if selector == nil {
2613		selector = bson.D{}
2614	}
2615	lerr, err := c.writeOp(&deleteOp{c.FullName, selector, 0, 0}, true)
2616	if err == nil && lerr != nil {
2617		info = &ChangeInfo{Removed: lerr.N, Matched: lerr.N}
2618	}
2619	return info, err
2620}
2621
2622// DropDatabase removes the entire database including all of its collections.
2623func (db *Database) DropDatabase() error {
2624	return db.Run(bson.D{{"dropDatabase", 1}}, nil)
2625}
2626
2627// DropCollection removes the entire collection including all of its documents.
2628func (c *Collection) DropCollection() error {
2629	return c.Database.Run(bson.D{{"drop", c.Name}}, nil)
2630}
2631
2632// The CollectionInfo type holds metadata about a collection.
2633//
2634// Relevant documentation:
2635//
2636//     http://www.mongodb.org/display/DOCS/createCollection+Command
2637//     http://www.mongodb.org/display/DOCS/Capped+Collections
2638//
2639type CollectionInfo struct {
2640	// DisableIdIndex prevents the automatic creation of the index
2641	// on the _id field for the collection.
2642	DisableIdIndex bool
2643
2644	// ForceIdIndex enforces the automatic creation of the index
2645	// on the _id field for the collection. Capped collections,
2646	// for example, do not have such an index by default.
2647	ForceIdIndex bool
2648
2649	// If Capped is true new documents will replace old ones when
2650	// the collection is full. MaxBytes must necessarily be set
2651	// to define the size when the collection wraps around.
2652	// MaxDocs optionally defines the number of documents when it
2653	// wraps, but MaxBytes still needs to be set.
2654	Capped   bool
2655	MaxBytes int
2656	MaxDocs  int
2657
2658	// Validator contains a validation expression that defines which
2659	// documents should be considered valid for this collection.
2660	Validator interface{}
2661
2662	// ValidationLevel may be set to "strict" (the default) to force
2663	// MongoDB to validate all documents on inserts and updates, to
2664	// "moderate" to apply the validation rules only to documents
2665	// that already fulfill the validation criteria, or to "off" for
2666	// disabling validation entirely.
2667	ValidationLevel string
2668
2669	// ValidationAction determines how MongoDB handles documents that
2670	// violate the validation rules. It may be set to "error" (the default)
2671	// to reject inserts or updates that violate the rules, or to "warn"
2672	// to log invalid operations but allow them to proceed.
2673	ValidationAction string
2674
2675	// StorageEngine allows specifying collection options for the
2676	// storage engine in use. The map keys must hold the storage engine
2677	// name for which options are being specified.
2678	StorageEngine interface{}
2679}
2680
2681// Create explicitly creates the c collection with details of info.
2682// MongoDB creates collections automatically on use, so this method
2683// is only necessary when creating collection with non-default
2684// characteristics, such as capped collections.
2685//
2686// Relevant documentation:
2687//
2688//     http://www.mongodb.org/display/DOCS/createCollection+Command
2689//     http://www.mongodb.org/display/DOCS/Capped+Collections
2690//
2691func (c *Collection) Create(info *CollectionInfo) error {
2692	cmd := make(bson.D, 0, 4)
2693	cmd = append(cmd, bson.DocElem{"create", c.Name})
2694	if info.Capped {
2695		if info.MaxBytes < 1 {
2696			return fmt.Errorf("Collection.Create: with Capped, MaxBytes must also be set")
2697		}
2698		cmd = append(cmd, bson.DocElem{"capped", true})
2699		cmd = append(cmd, bson.DocElem{"size", info.MaxBytes})
2700		if info.MaxDocs > 0 {
2701			cmd = append(cmd, bson.DocElem{"max", info.MaxDocs})
2702		}
2703	}
2704	if info.DisableIdIndex {
2705		cmd = append(cmd, bson.DocElem{"autoIndexId", false})
2706	}
2707	if info.ForceIdIndex {
2708		cmd = append(cmd, bson.DocElem{"autoIndexId", true})
2709	}
2710	if info.Validator != nil {
2711		cmd = append(cmd, bson.DocElem{"validator", info.Validator})
2712	}
2713	if info.ValidationLevel != "" {
2714		cmd = append(cmd, bson.DocElem{"validationLevel", info.ValidationLevel})
2715	}
2716	if info.ValidationAction != "" {
2717		cmd = append(cmd, bson.DocElem{"validationAction", info.ValidationAction})
2718	}
2719	if info.StorageEngine != nil {
2720		cmd = append(cmd, bson.DocElem{"storageEngine", info.StorageEngine})
2721	}
2722	return c.Database.Run(cmd, nil)
2723}
2724
2725// Batch sets the batch size used when fetching documents from the database.
2726// It's possible to change this setting on a per-session basis as well, using
2727// the Batch method of Session.
2728
2729// The default batch size is defined by the database itself.  As of this
2730// writing, MongoDB will use an initial size of min(100 docs, 4MB) on the
2731// first batch, and 4MB on remaining ones.
2732func (q *Query) Batch(n int) *Query {
2733	if n == 1 {
2734		// Server interprets 1 as -1 and closes the cursor (!?)
2735		n = 2
2736	}
2737	q.m.Lock()
2738	q.op.limit = int32(n)
2739	q.m.Unlock()
2740	return q
2741}
2742
2743// Prefetch sets the point at which the next batch of results will be requested.
2744// When there are p*batch_size remaining documents cached in an Iter, the next
2745// batch will be requested in background. For instance, when using this:
2746//
2747//     query.Batch(200).Prefetch(0.25)
2748//
2749// and there are only 50 documents cached in the Iter to be processed, the
2750// next batch of 200 will be requested. It's possible to change this setting on
2751// a per-session basis as well, using the SetPrefetch method of Session.
2752//
2753// The default prefetch value is 0.25.
2754func (q *Query) Prefetch(p float64) *Query {
2755	q.m.Lock()
2756	q.prefetch = p
2757	q.m.Unlock()
2758	return q
2759}
2760
2761// Skip skips over the n initial documents from the query results.  Note that
2762// this only makes sense with capped collections where documents are naturally
2763// ordered by insertion time, or with sorted results.
2764func (q *Query) Skip(n int) *Query {
2765	q.m.Lock()
2766	q.op.skip = int32(n)
2767	q.m.Unlock()
2768	return q
2769}
2770
2771// Limit restricts the maximum number of documents retrieved to n, and also
2772// changes the batch size to the same value.  Once n documents have been
2773// returned by Next, the following call will return ErrNotFound.
2774func (q *Query) Limit(n int) *Query {
2775	q.m.Lock()
2776	switch {
2777	case n == 1:
2778		q.limit = 1
2779		q.op.limit = -1
2780	case n == math.MinInt32: // -MinInt32 == -MinInt32
2781		q.limit = math.MaxInt32
2782		q.op.limit = math.MinInt32 + 1
2783	case n < 0:
2784		q.limit = int32(-n)
2785		q.op.limit = int32(n)
2786	default:
2787		q.limit = int32(n)
2788		q.op.limit = int32(n)
2789	}
2790	q.m.Unlock()
2791	return q
2792}
2793
2794// Select enables selecting which fields should be retrieved for the results
2795// found. For example, the following query would only retrieve the name field:
2796//
2797//     err := collection.Find(nil).Select(bson.M{"name": 1}).One(&result)
2798//
2799// Relevant documentation:
2800//
2801//     http://www.mongodb.org/display/DOCS/Retrieving+a+Subset+of+Fields
2802//
2803func (q *Query) Select(selector interface{}) *Query {
2804	q.m.Lock()
2805	q.op.selector = selector
2806	q.m.Unlock()
2807	return q
2808}
2809
2810// Sort asks the database to order returned documents according to the
2811// provided field names. A field name may be prefixed by - (minus) for
2812// it to be sorted in reverse order.
2813//
2814// For example:
2815//
2816//     query1 := collection.Find(nil).Sort("firstname", "lastname")
2817//     query2 := collection.Find(nil).Sort("-age")
2818//     query3 := collection.Find(nil).Sort("$natural")
2819//     query4 := collection.Find(nil).Select(bson.M{"score": bson.M{"$meta": "textScore"}}).Sort("$textScore:score")
2820//
2821// Relevant documentation:
2822//
2823//     http://www.mongodb.org/display/DOCS/Sorting+and+Natural+Order
2824//
2825func (q *Query) Sort(fields ...string) *Query {
2826	q.m.Lock()
2827	var order bson.D
2828	for _, field := range fields {
2829		n := 1
2830		var kind string
2831		if field != "" {
2832			if field[0] == '$' {
2833				if c := strings.Index(field, ":"); c > 1 && c < len(field)-1 {
2834					kind = field[1:c]
2835					field = field[c+1:]
2836				}
2837			}
2838			switch field[0] {
2839			case '+':
2840				field = field[1:]
2841			case '-':
2842				n = -1
2843				field = field[1:]
2844			}
2845		}
2846		if field == "" {
2847			panic("Sort: empty field name")
2848		}
2849		if kind == "textScore" {
2850			order = append(order, bson.DocElem{field, bson.M{"$meta": kind}})
2851		} else {
2852			order = append(order, bson.DocElem{field, n})
2853		}
2854	}
2855	q.op.options.OrderBy = order
2856	q.op.hasOptions = true
2857	q.m.Unlock()
2858	return q
2859}
2860
2861// Explain returns a number of details about how the MongoDB server would
2862// execute the requested query, such as the number of objects examined,
2863// the number of times the read lock was yielded to allow writes to go in,
2864// and so on.
2865//
2866// For example:
2867//
2868//     m := bson.M{}
2869//     err := collection.Find(bson.M{"filename": name}).Explain(m)
2870//     if err == nil {
2871//         fmt.Printf("Explain: %#v\n", m)
2872//     }
2873//
2874// Relevant documentation:
2875//
2876//     http://www.mongodb.org/display/DOCS/Optimization
2877//     http://www.mongodb.org/display/DOCS/Query+Optimizer
2878//
2879func (q *Query) Explain(result interface{}) error {
2880	q.m.Lock()
2881	clone := &Query{session: q.session, query: q.query}
2882	q.m.Unlock()
2883	clone.op.options.Explain = true
2884	clone.op.hasOptions = true
2885	if clone.op.limit > 0 {
2886		clone.op.limit = -q.op.limit
2887	}
2888	iter := clone.Iter()
2889	if iter.Next(result) {
2890		return nil
2891	}
2892	return iter.Close()
2893}
2894
2895// TODO: Add Collection.Explain. See https://goo.gl/1MDlvz.
2896
2897// Hint will include an explicit "hint" in the query to force the server
2898// to use a specified index, potentially improving performance in some
2899// situations.  The provided parameters are the fields that compose the
2900// key of the index to be used.  For details on how the indexKey may be
2901// built, see the EnsureIndex method.
2902//
2903// For example:
2904//
2905//     query := collection.Find(bson.M{"firstname": "Joe", "lastname": "Winter"})
2906//     query.Hint("lastname", "firstname")
2907//
2908// Relevant documentation:
2909//
2910//     http://www.mongodb.org/display/DOCS/Optimization
2911//     http://www.mongodb.org/display/DOCS/Query+Optimizer
2912//
2913func (q *Query) Hint(indexKey ...string) *Query {
2914	q.m.Lock()
2915	keyInfo, err := parseIndexKey(indexKey)
2916	q.op.options.Hint = keyInfo.key
2917	q.op.hasOptions = true
2918	q.m.Unlock()
2919	if err != nil {
2920		panic(err)
2921	}
2922	return q
2923}
2924
2925// SetMaxScan constrains the query to stop after scanning the specified
2926// number of documents.
2927//
2928// This modifier is generally used to prevent potentially long running
2929// queries from disrupting performance by scanning through too much data.
2930func (q *Query) SetMaxScan(n int) *Query {
2931	q.m.Lock()
2932	q.op.options.MaxScan = n
2933	q.op.hasOptions = true
2934	q.m.Unlock()
2935	return q
2936}
2937
2938// SetMaxTime constrains the query to stop after running for the specified time.
2939//
2940// When the time limit is reached MongoDB automatically cancels the query.
2941// This can be used to efficiently prevent and identify unexpectedly slow queries.
2942//
2943// A few important notes about the mechanism enforcing this limit:
2944//
2945//  - Requests can block behind locking operations on the server, and that blocking
2946//    time is not accounted for. In other words, the timer starts ticking only after
2947//    the actual start of the query when it initially acquires the appropriate lock;
2948//
2949//  - Operations are interrupted only at interrupt points where an operation can be
2950//    safely aborted – the total execution time may exceed the specified value;
2951//
2952//  - The limit can be applied to both CRUD operations and commands, but not all
2953//    commands are interruptible;
2954//
2955//  - While iterating over results, computing follow up batches is included in the
2956//    total time and the iteration continues until the alloted time is over, but
2957//    network roundtrips are not taken into account for the limit.
2958//
2959//  - This limit does not override the inactive cursor timeout for idle cursors
2960//    (default is 10 min).
2961//
2962// This mechanism was introduced in MongoDB 2.6.
2963//
2964// Relevant documentation:
2965//
2966//   http://blog.mongodb.org/post/83621787773/maxtimems-and-query-optimizer-introspection-in
2967//
2968func (q *Query) SetMaxTime(d time.Duration) *Query {
2969	q.m.Lock()
2970	q.op.options.MaxTimeMS = int(d / time.Millisecond)
2971	q.op.hasOptions = true
2972	q.m.Unlock()
2973	return q
2974}
2975
2976// Snapshot will force the performed query to make use of an available
2977// index on the _id field to prevent the same document from being returned
2978// more than once in a single iteration. This might happen without this
2979// setting in situations when the document changes in size and thus has to
2980// be moved while the iteration is running.
2981//
2982// Because snapshot mode traverses the _id index, it may not be used with
2983// sorting or explicit hints. It also cannot use any other index for the
2984// query.
2985//
2986// Even with snapshot mode, items inserted or deleted during the query may
2987// or may not be returned; that is, this mode is not a true point-in-time
2988// snapshot.
2989//
2990// The same effect of Snapshot may be obtained by using any unique index on
2991// field(s) that will not be modified (best to use Hint explicitly too).
2992// A non-unique index (such as creation time) may be made unique by
2993// appending _id to the index when creating it.
2994//
2995// Relevant documentation:
2996//
2997//     http://www.mongodb.org/display/DOCS/How+to+do+Snapshotted+Queries+in+the+Mongo+Database
2998//
2999func (q *Query) Snapshot() *Query {
3000	q.m.Lock()
3001	q.op.options.Snapshot = true
3002	q.op.hasOptions = true
3003	q.m.Unlock()
3004	return q
3005}
3006
3007// Comment adds a comment to the query to identify it in the database profiler output.
3008//
3009// Relevant documentation:
3010//
3011//     http://docs.mongodb.org/manual/reference/operator/meta/comment
3012//     http://docs.mongodb.org/manual/reference/command/profile
3013//     http://docs.mongodb.org/manual/administration/analyzing-mongodb-performance/#database-profiling
3014//
3015func (q *Query) Comment(comment string) *Query {
3016	q.m.Lock()
3017	q.op.options.Comment = comment
3018	q.op.hasOptions = true
3019	q.m.Unlock()
3020	return q
3021}
3022
3023// LogReplay enables an option that optimizes queries that are typically
3024// made on the MongoDB oplog for replaying it. This is an internal
3025// implementation aspect and most likely uninteresting for other uses.
3026// It has seen at least one use case, though, so it's exposed via the API.
3027func (q *Query) LogReplay() *Query {
3028	q.m.Lock()
3029	q.op.flags |= flagLogReplay
3030	q.m.Unlock()
3031	return q
3032}
3033
3034func checkQueryError(fullname string, d []byte) error {
3035	l := len(d)
3036	if l < 16 {
3037		return nil
3038	}
3039	if d[5] == '$' && d[6] == 'e' && d[7] == 'r' && d[8] == 'r' && d[9] == '\x00' && d[4] == '\x02' {
3040		goto Error
3041	}
3042	if len(fullname) < 5 || fullname[len(fullname)-5:] != ".$cmd" {
3043		return nil
3044	}
3045	for i := 0; i+8 < l; i++ {
3046		if d[i] == '\x02' && d[i+1] == 'e' && d[i+2] == 'r' && d[i+3] == 'r' && d[i+4] == 'm' && d[i+5] == 's' && d[i+6] == 'g' && d[i+7] == '\x00' {
3047			goto Error
3048		}
3049	}
3050	return nil
3051
3052Error:
3053	result := &queryError{}
3054	bson.Unmarshal(d, result)
3055	if result.Err == "" && result.ErrMsg == "" {
3056		return nil
3057	}
3058	if result.AssertionCode != 0 && result.Assertion != "" {
3059		return &QueryError{Code: result.AssertionCode, Message: result.Assertion, Assertion: true}
3060	}
3061	if result.Err != "" {
3062		return &QueryError{Code: result.Code, Message: result.Err}
3063	}
3064	return &QueryError{Code: result.Code, Message: result.ErrMsg}
3065}
3066
3067// One executes the query and unmarshals the first obtained document into the
3068// result argument.  The result must be a struct or map value capable of being
3069// unmarshalled into by gobson.  This function blocks until either a result
3070// is available or an error happens.  For example:
3071//
3072//     err := collection.Find(bson.M{"a": 1}).One(&result)
3073//
3074// In case the resulting document includes a field named $err or errmsg, which
3075// are standard ways for MongoDB to return query errors, the returned err will
3076// be set to a *QueryError value including the Err message and the Code.  In
3077// those cases, the result argument is still unmarshalled into with the
3078// received document so that any other custom values may be obtained if
3079// desired.
3080//
3081func (q *Query) One(result interface{}) (err error) {
3082	q.m.Lock()
3083	session := q.session
3084	op := q.op // Copy.
3085	q.m.Unlock()
3086
3087	socket, err := session.acquireSocket(true)
3088	if err != nil {
3089		return err
3090	}
3091	defer socket.Release()
3092
3093	op.limit = -1
3094
3095	session.prepareQuery(&op)
3096
3097	expectFindReply := prepareFindOp(socket, &op, 1)
3098
3099	data, err := socket.SimpleQuery(&op)
3100	if err != nil {
3101		return err
3102	}
3103	if data == nil {
3104		return ErrNotFound
3105	}
3106	if expectFindReply {
3107		var findReply struct {
3108			Ok     bool
3109			Code   int
3110			Errmsg string
3111			Cursor cursorData
3112		}
3113		err = bson.Unmarshal(data, &findReply)
3114		if err != nil {
3115			return err
3116		}
3117		if !findReply.Ok && findReply.Errmsg != "" {
3118			return &QueryError{Code: findReply.Code, Message: findReply.Errmsg}
3119		}
3120		if len(findReply.Cursor.FirstBatch) == 0 {
3121			return ErrNotFound
3122		}
3123		data = findReply.Cursor.FirstBatch[0].Data
3124	}
3125	if result != nil {
3126		err = bson.Unmarshal(data, result)
3127		if err == nil {
3128			debugf("Query %p document unmarshaled: %#v", q, result)
3129		} else {
3130			debugf("Query %p document unmarshaling failed: %#v", q, err)
3131			return err
3132		}
3133	}
3134	return checkQueryError(op.collection, data)
3135}
3136
3137// prepareFindOp translates op from being an old-style wire protocol query into
3138// a new-style find command if that's supported by the MongoDB server (3.2+).
3139// It returns whether to expect a find command result or not. Note op may be
3140// translated into an explain command, in which case the function returns false.
3141func prepareFindOp(socket *mongoSocket, op *queryOp, limit int32) bool {
3142	if socket.ServerInfo().MaxWireVersion < 4 || op.collection == "admin.$cmd" {
3143		return false
3144	}
3145
3146	nameDot := strings.Index(op.collection, ".")
3147	if nameDot < 0 {
3148		panic("invalid query collection name: " + op.collection)
3149	}
3150
3151	find := findCmd{
3152		Collection:  op.collection[nameDot+1:],
3153		Filter:      op.query,
3154		Projection:  op.selector,
3155		Sort:        op.options.OrderBy,
3156		Skip:        op.skip,
3157		Limit:       limit,
3158		MaxTimeMS:   op.options.MaxTimeMS,
3159		MaxScan:     op.options.MaxScan,
3160		Hint:        op.options.Hint,
3161		Comment:     op.options.Comment,
3162		Snapshot:    op.options.Snapshot,
3163		OplogReplay: op.flags&flagLogReplay != 0,
3164	}
3165	if op.limit < 0 {
3166		find.BatchSize = -op.limit
3167		find.SingleBatch = true
3168	} else {
3169		find.BatchSize = op.limit
3170	}
3171
3172	explain := op.options.Explain
3173
3174	op.collection = op.collection[:nameDot] + ".$cmd"
3175	op.query = &find
3176	op.skip = 0
3177	op.limit = -1
3178	op.options = queryWrapper{}
3179	op.hasOptions = false
3180
3181	if explain {
3182		op.query = bson.D{{"explain", op.query}}
3183		return false
3184	}
3185	return true
3186}
3187
3188type cursorData struct {
3189	FirstBatch []bson.Raw "firstBatch"
3190	NextBatch  []bson.Raw "nextBatch"
3191	NS         string
3192	Id         int64
3193}
3194
3195// findCmd holds the command used for performing queries on MongoDB 3.2+.
3196//
3197// Relevant documentation:
3198//
3199//     https://docs.mongodb.org/master/reference/command/find/#dbcmd.find
3200//
3201type findCmd struct {
3202	Collection          string      `bson:"find"`
3203	Filter              interface{} `bson:"filter,omitempty"`
3204	Sort                interface{} `bson:"sort,omitempty"`
3205	Projection          interface{} `bson:"projection,omitempty"`
3206	Hint                interface{} `bson:"hint,omitempty"`
3207	Skip                interface{} `bson:"skip,omitempty"`
3208	Limit               int32       `bson:"limit,omitempty"`
3209	BatchSize           int32       `bson:"batchSize,omitempty"`
3210	SingleBatch         bool        `bson:"singleBatch,omitempty"`
3211	Comment             string      `bson:"comment,omitempty"`
3212	MaxScan             int         `bson:"maxScan,omitempty"`
3213	MaxTimeMS           int         `bson:"maxTimeMS,omitempty"`
3214	ReadConcern         interface{} `bson:"readConcern,omitempty"`
3215	Max                 interface{} `bson:"max,omitempty"`
3216	Min                 interface{} `bson:"min,omitempty"`
3217	ReturnKey           bool        `bson:"returnKey,omitempty"`
3218	ShowRecordId        bool        `bson:"showRecordId,omitempty"`
3219	Snapshot            bool        `bson:"snapshot,omitempty"`
3220	Tailable            bool        `bson:"tailable,omitempty"`
3221	AwaitData           bool        `bson:"awaitData,omitempty"`
3222	OplogReplay         bool        `bson:"oplogReplay,omitempty"`
3223	NoCursorTimeout     bool        `bson:"noCursorTimeout,omitempty"`
3224	AllowPartialResults bool        `bson:"allowPartialResults,omitempty"`
3225}
3226
3227// getMoreCmd holds the command used for requesting more query results on MongoDB 3.2+.
3228//
3229// Relevant documentation:
3230//
3231//     https://docs.mongodb.org/master/reference/command/getMore/#dbcmd.getMore
3232//
3233type getMoreCmd struct {
3234	CursorId   int64  `bson:"getMore"`
3235	Collection string `bson:"collection"`
3236	BatchSize  int32  `bson:"batchSize,omitempty"`
3237	MaxTimeMS  int64  `bson:"maxTimeMS,omitempty"`
3238}
3239
3240// run duplicates the behavior of collection.Find(query).One(&result)
3241// as performed by Database.Run, specializing the logic for running
3242// database commands on a given socket.
3243func (db *Database) run(socket *mongoSocket, cmd, result interface{}) (err error) {
3244	// Database.Run:
3245	if name, ok := cmd.(string); ok {
3246		cmd = bson.D{{name, 1}}
3247	}
3248
3249	// Collection.Find:
3250	session := db.Session
3251	session.m.RLock()
3252	op := session.queryConfig.op // Copy.
3253	session.m.RUnlock()
3254	op.query = cmd
3255	op.collection = db.Name + ".$cmd"
3256
3257	// Query.One:
3258	session.prepareQuery(&op)
3259	op.limit = -1
3260
3261	data, err := socket.SimpleQuery(&op)
3262	if err != nil {
3263		return err
3264	}
3265	if data == nil {
3266		return ErrNotFound
3267	}
3268	if result != nil {
3269		err = bson.Unmarshal(data, result)
3270		if err != nil {
3271			debugf("Run command unmarshaling failed: %#v", op, err)
3272			return err
3273		}
3274		if globalDebug && globalLogger != nil {
3275			var res bson.M
3276			bson.Unmarshal(data, &res)
3277			debugf("Run command unmarshaled: %#v, result: %#v", op, res)
3278		}
3279	}
3280	return checkQueryError(op.collection, data)
3281}
3282
3283// The DBRef type implements support for the database reference MongoDB
3284// convention as supported by multiple drivers.  This convention enables
3285// cross-referencing documents between collections and databases using
3286// a structure which includes a collection name, a document id, and
3287// optionally a database name.
3288//
3289// See the FindRef methods on Session and on Database.
3290//
3291// Relevant documentation:
3292//
3293//     http://www.mongodb.org/display/DOCS/Database+References
3294//
3295type DBRef struct {
3296	Collection string      `bson:"$ref"`
3297	Id         interface{} `bson:"$id"`
3298	Database   string      `bson:"$db,omitempty"`
3299}
3300
3301// NOTE: Order of fields for DBRef above does matter, per documentation.
3302
3303// FindRef returns a query that looks for the document in the provided
3304// reference. If the reference includes the DB field, the document will
3305// be retrieved from the respective database.
3306//
3307// See also the DBRef type and the FindRef method on Session.
3308//
3309// Relevant documentation:
3310//
3311//     http://www.mongodb.org/display/DOCS/Database+References
3312//
3313func (db *Database) FindRef(ref *DBRef) *Query {
3314	var c *Collection
3315	if ref.Database == "" {
3316		c = db.C(ref.Collection)
3317	} else {
3318		c = db.Session.DB(ref.Database).C(ref.Collection)
3319	}
3320	return c.FindId(ref.Id)
3321}
3322
3323// FindRef returns a query that looks for the document in the provided
3324// reference. For a DBRef to be resolved correctly at the session level
3325// it must necessarily have the optional DB field defined.
3326//
3327// See also the DBRef type and the FindRef method on Database.
3328//
3329// Relevant documentation:
3330//
3331//     http://www.mongodb.org/display/DOCS/Database+References
3332//
3333func (s *Session) FindRef(ref *DBRef) *Query {
3334	if ref.Database == "" {
3335		panic(errors.New(fmt.Sprintf("Can't resolve database for %#v", ref)))
3336	}
3337	c := s.DB(ref.Database).C(ref.Collection)
3338	return c.FindId(ref.Id)
3339}
3340
3341// CollectionNames returns the collection names present in the db database.
3342func (db *Database) CollectionNames() (names []string, err error) {
3343	// Clone session and set it to Monotonic mode so that the server
3344	// used for the query may be safely obtained afterwards, if
3345	// necessary for iteration when a cursor is received.
3346	cloned := db.Session.nonEventual()
3347	defer cloned.Close()
3348
3349	batchSize := int(cloned.queryConfig.op.limit)
3350
3351	// Try with a command.
3352	var result struct {
3353		Collections []bson.Raw
3354		Cursor      cursorData
3355	}
3356	err = db.With(cloned).Run(bson.D{{"listCollections", 1}, {"cursor", bson.D{{"batchSize", batchSize}}}}, &result)
3357	if err == nil {
3358		firstBatch := result.Collections
3359		if firstBatch == nil {
3360			firstBatch = result.Cursor.FirstBatch
3361		}
3362		var iter *Iter
3363		ns := strings.SplitN(result.Cursor.NS, ".", 2)
3364		if len(ns) < 2 {
3365			iter = db.With(cloned).C("").NewIter(nil, firstBatch, result.Cursor.Id, nil)
3366		} else {
3367			iter = cloned.DB(ns[0]).C(ns[1]).NewIter(nil, firstBatch, result.Cursor.Id, nil)
3368		}
3369		var coll struct{ Name string }
3370		for iter.Next(&coll) {
3371			names = append(names, coll.Name)
3372		}
3373		if err := iter.Close(); err != nil {
3374			return nil, err
3375		}
3376		sort.Strings(names)
3377		return names, err
3378	}
3379	if err != nil && !isNoCmd(err) {
3380		return nil, err
3381	}
3382
3383	// Command not yet supported. Query the database instead.
3384	nameIndex := len(db.Name) + 1
3385	iter := db.C("system.namespaces").Find(nil).Iter()
3386	var coll struct{ Name string }
3387	for iter.Next(&coll) {
3388		if strings.Index(coll.Name, "$") < 0 || strings.Index(coll.Name, ".oplog.$") >= 0 {
3389			names = append(names, coll.Name[nameIndex:])
3390		}
3391	}
3392	if err := iter.Close(); err != nil {
3393		return nil, err
3394	}
3395	sort.Strings(names)
3396	return names, nil
3397}
3398
3399type dbNames struct {
3400	Databases []struct {
3401		Name  string
3402		Empty bool
3403	}
3404}
3405
3406// DatabaseNames returns the names of non-empty databases present in the cluster.
3407func (s *Session) DatabaseNames() (names []string, err error) {
3408	var result dbNames
3409	err = s.Run("listDatabases", &result)
3410	if err != nil {
3411		return nil, err
3412	}
3413	for _, db := range result.Databases {
3414		if !db.Empty {
3415			names = append(names, db.Name)
3416		}
3417	}
3418	sort.Strings(names)
3419	return names, nil
3420}
3421
3422// Iter executes the query and returns an iterator capable of going over all
3423// the results. Results will be returned in batches of configurable
3424// size (see the Batch method) and more documents will be requested when a
3425// configurable number of documents is iterated over (see the Prefetch method).
3426func (q *Query) Iter() *Iter {
3427	q.m.Lock()
3428	session := q.session
3429	op := q.op
3430	prefetch := q.prefetch
3431	limit := q.limit
3432	q.m.Unlock()
3433
3434	iter := &Iter{
3435		session:  session,
3436		prefetch: prefetch,
3437		limit:    limit,
3438		timeout:  -1,
3439	}
3440	iter.gotReply.L = &iter.m
3441	iter.op.collection = op.collection
3442	iter.op.limit = op.limit
3443	iter.op.replyFunc = iter.replyFunc()
3444	iter.docsToReceive++
3445
3446	socket, err := session.acquireSocket(true)
3447	if err != nil {
3448		iter.err = err
3449		return iter
3450	}
3451	defer socket.Release()
3452
3453	session.prepareQuery(&op)
3454	op.replyFunc = iter.op.replyFunc
3455
3456	if prepareFindOp(socket, &op, limit) {
3457		iter.findCmd = true
3458	}
3459
3460	iter.server = socket.Server()
3461	err = socket.Query(&op)
3462	if err != nil {
3463		// Must lock as the query is already out and it may call replyFunc.
3464		iter.m.Lock()
3465		iter.err = err
3466		iter.m.Unlock()
3467	}
3468
3469	return iter
3470}
3471
3472// Tail returns a tailable iterator. Unlike a normal iterator, a
3473// tailable iterator may wait for new values to be inserted in the
3474// collection once the end of the current result set is reached,
3475// A tailable iterator may only be used with capped collections.
3476//
3477// The timeout parameter indicates how long Next will block waiting
3478// for a result before timing out.  If set to -1, Next will not
3479// timeout, and will continue waiting for a result for as long as
3480// the cursor is valid and the session is not closed. If set to 0,
3481// Next times out as soon as it reaches the end of the result set.
3482// Otherwise, Next will wait for at least the given number of
3483// seconds for a new document to be available before timing out.
3484//
3485// On timeouts, Next will unblock and return false, and the Timeout
3486// method will return true if called. In these cases, Next may still
3487// be called again on the same iterator to check if a new value is
3488// available at the current cursor position, and again it will block
3489// according to the specified timeoutSecs. If the cursor becomes
3490// invalid, though, both Next and Timeout will return false and
3491// the query must be restarted.
3492//
3493// The following example demonstrates timeout handling and query
3494// restarting:
3495//
3496//    iter := collection.Find(nil).Sort("$natural").Tail(5 * time.Second)
3497//    for {
3498//         for iter.Next(&result) {
3499//             fmt.Println(result.Id)
3500//             lastId = result.Id
3501//         }
3502//         if iter.Err() != nil {
3503//             return iter.Close()
3504//         }
3505//         if iter.Timeout() {
3506//             continue
3507//         }
3508//         query := collection.Find(bson.M{"_id": bson.M{"$gt": lastId}})
3509//         iter = query.Sort("$natural").Tail(5 * time.Second)
3510//    }
3511//    iter.Close()
3512//
3513// Relevant documentation:
3514//
3515//     http://www.mongodb.org/display/DOCS/Tailable+Cursors
3516//     http://www.mongodb.org/display/DOCS/Capped+Collections
3517//     http://www.mongodb.org/display/DOCS/Sorting+and+Natural+Order
3518//
3519func (q *Query) Tail(timeout time.Duration) *Iter {
3520	q.m.Lock()
3521	session := q.session
3522	op := q.op
3523	prefetch := q.prefetch
3524	q.m.Unlock()
3525
3526	iter := &Iter{session: session, prefetch: prefetch}
3527	iter.gotReply.L = &iter.m
3528	iter.timeout = timeout
3529	iter.op.collection = op.collection
3530	iter.op.limit = op.limit
3531	iter.op.replyFunc = iter.replyFunc()
3532	iter.docsToReceive++
3533	session.prepareQuery(&op)
3534	op.replyFunc = iter.op.replyFunc
3535	op.flags |= flagTailable | flagAwaitData
3536
3537	socket, err := session.acquireSocket(true)
3538	if err != nil {
3539		iter.err = err
3540	} else {
3541		iter.server = socket.Server()
3542		err = socket.Query(&op)
3543		if err != nil {
3544			// Must lock as the query is already out and it may call replyFunc.
3545			iter.m.Lock()
3546			iter.err = err
3547			iter.m.Unlock()
3548		}
3549		socket.Release()
3550	}
3551	return iter
3552}
3553
3554func (s *Session) prepareQuery(op *queryOp) {
3555	s.m.RLock()
3556	op.mode = s.consistency
3557	if s.slaveOk {
3558		op.flags |= flagSlaveOk
3559	}
3560	s.m.RUnlock()
3561	return
3562}
3563
3564// Err returns nil if no errors happened during iteration, or the actual
3565// error otherwise.
3566//
3567// In case a resulting document included a field named $err or errmsg, which are
3568// standard ways for MongoDB to report an improper query, the returned value has
3569// a *QueryError type, and includes the Err message and the Code.
3570func (iter *Iter) Err() error {
3571	iter.m.Lock()
3572	err := iter.err
3573	iter.m.Unlock()
3574	if err == ErrNotFound {
3575		return nil
3576	}
3577	return err
3578}
3579
3580// Close kills the server cursor used by the iterator, if any, and returns
3581// nil if no errors happened during iteration, or the actual error otherwise.
3582//
3583// Server cursors are automatically closed at the end of an iteration, which
3584// means close will do nothing unless the iteration was interrupted before
3585// the server finished sending results to the driver. If Close is not called
3586// in such a situation, the cursor will remain available at the server until
3587// the default cursor timeout period is reached. No further problems arise.
3588//
3589// Close is idempotent. That means it can be called repeatedly and will
3590// return the same result every time.
3591//
3592// In case a resulting document included a field named $err or errmsg, which are
3593// standard ways for MongoDB to report an improper query, the returned value has
3594// a *QueryError type.
3595func (iter *Iter) Close() error {
3596	iter.m.Lock()
3597	cursorId := iter.op.cursorId
3598	iter.op.cursorId = 0
3599	err := iter.err
3600	iter.m.Unlock()
3601	if cursorId == 0 {
3602		if err == ErrNotFound {
3603			return nil
3604		}
3605		return err
3606	}
3607	socket, err := iter.acquireSocket()
3608	if err == nil {
3609		// TODO Batch kills.
3610		err = socket.Query(&killCursorsOp{[]int64{cursorId}})
3611		socket.Release()
3612	}
3613
3614	iter.m.Lock()
3615	if err != nil && (iter.err == nil || iter.err == ErrNotFound) {
3616		iter.err = err
3617	} else if iter.err != ErrNotFound {
3618		err = iter.err
3619	}
3620	iter.m.Unlock()
3621	return err
3622}
3623
3624// Done returns true only if a follow up Next call is guaranteed
3625// to return false.
3626//
3627// For an iterator created with Tail, Done may return false for
3628// an iterator that has no more data. Otherwise it's guaranteed
3629// to return false only if there is data or an error happened.
3630//
3631// Done may block waiting for a pending query to verify whether
3632// more data is actually available or not.
3633func (iter *Iter) Done() bool {
3634	iter.m.Lock()
3635	defer iter.m.Unlock()
3636
3637	for {
3638		if iter.docData.Len() > 0 {
3639			return false
3640		}
3641		if iter.docsToReceive > 1 {
3642			return true
3643		}
3644		if iter.docsToReceive > 0 {
3645			iter.gotReply.Wait()
3646			continue
3647		}
3648		return iter.op.cursorId == 0
3649	}
3650}
3651
3652// Timeout returns true if Next returned false due to a timeout of
3653// a tailable cursor. In those cases, Next may be called again to continue
3654// the iteration at the previous cursor position.
3655func (iter *Iter) Timeout() bool {
3656	iter.m.Lock()
3657	result := iter.timedout
3658	iter.m.Unlock()
3659	return result
3660}
3661
3662// Next retrieves the next document from the result set, blocking if necessary.
3663// This method will also automatically retrieve another batch of documents from
3664// the server when the current one is exhausted, or before that in background
3665// if pre-fetching is enabled (see the Query.Prefetch and Session.SetPrefetch
3666// methods).
3667//
3668// Next returns true if a document was successfully unmarshalled onto result,
3669// and false at the end of the result set or if an error happened.
3670// When Next returns false, the Err method should be called to verify if
3671// there was an error during iteration.
3672//
3673// For example:
3674//
3675//    iter := collection.Find(nil).Iter()
3676//    for iter.Next(&result) {
3677//        fmt.Printf("Result: %v\n", result.Id)
3678//    }
3679//    if err := iter.Close(); err != nil {
3680//        return err
3681//    }
3682//
3683func (iter *Iter) Next(result interface{}) bool {
3684	iter.m.Lock()
3685	iter.timedout = false
3686	timeout := time.Time{}
3687	for iter.err == nil && iter.docData.Len() == 0 && (iter.docsToReceive > 0 || iter.op.cursorId != 0) {
3688		if iter.docsToReceive == 0 {
3689			if iter.timeout >= 0 {
3690				if timeout.IsZero() {
3691					timeout = time.Now().Add(iter.timeout)
3692				}
3693				if time.Now().After(timeout) {
3694					iter.timedout = true
3695					iter.m.Unlock()
3696					return false
3697				}
3698			}
3699			iter.getMore()
3700			if iter.err != nil {
3701				break
3702			}
3703		}
3704		iter.gotReply.Wait()
3705	}
3706
3707	// Exhaust available data before reporting any errors.
3708	if docData, ok := iter.docData.Pop().([]byte); ok {
3709		close := false
3710		if iter.limit > 0 {
3711			iter.limit--
3712			if iter.limit == 0 {
3713				if iter.docData.Len() > 0 {
3714					iter.m.Unlock()
3715					panic(fmt.Errorf("data remains after limit exhausted: %d", iter.docData.Len()))
3716				}
3717				iter.err = ErrNotFound
3718				close = true
3719			}
3720		}
3721		if iter.op.cursorId != 0 && iter.err == nil {
3722			iter.docsBeforeMore--
3723			if iter.docsBeforeMore == -1 {
3724				iter.getMore()
3725			}
3726		}
3727		iter.m.Unlock()
3728
3729		if close {
3730			iter.Close()
3731		}
3732		err := bson.Unmarshal(docData, result)
3733		if err != nil {
3734			debugf("Iter %p document unmarshaling failed: %#v", iter, err)
3735			iter.m.Lock()
3736			if iter.err == nil {
3737				iter.err = err
3738			}
3739			iter.m.Unlock()
3740			return false
3741		}
3742		debugf("Iter %p document unmarshaled: %#v", iter, result)
3743		// XXX Only have to check first document for a query error?
3744		err = checkQueryError(iter.op.collection, docData)
3745		if err != nil {
3746			iter.m.Lock()
3747			if iter.err == nil {
3748				iter.err = err
3749			}
3750			iter.m.Unlock()
3751			return false
3752		}
3753		return true
3754	} else if iter.err != nil {
3755		debugf("Iter %p returning false: %s", iter, iter.err)
3756		iter.m.Unlock()
3757		return false
3758	} else if iter.op.cursorId == 0 {
3759		iter.err = ErrNotFound
3760		debugf("Iter %p exhausted with cursor=0", iter)
3761		iter.m.Unlock()
3762		return false
3763	}
3764
3765	panic("unreachable")
3766}
3767
3768// All retrieves all documents from the result set into the provided slice
3769// and closes the iterator.
3770//
3771// The result argument must necessarily be the address for a slice. The slice
3772// may be nil or previously allocated.
3773//
3774// WARNING: Obviously, All must not be used with result sets that may be
3775// potentially large, since it may consume all memory until the system
3776// crashes. Consider building the query with a Limit clause to ensure the
3777// result size is bounded.
3778//
3779// For instance:
3780//
3781//    var result []struct{ Value int }
3782//    iter := collection.Find(nil).Limit(100).Iter()
3783//    err := iter.All(&result)
3784//    if err != nil {
3785//        return err
3786//    }
3787//
3788func (iter *Iter) All(result interface{}) error {
3789	resultv := reflect.ValueOf(result)
3790	if resultv.Kind() != reflect.Ptr || resultv.Elem().Kind() != reflect.Slice {
3791		panic("result argument must be a slice address")
3792	}
3793	slicev := resultv.Elem()
3794	slicev = slicev.Slice(0, slicev.Cap())
3795	elemt := slicev.Type().Elem()
3796	i := 0
3797	for {
3798		if slicev.Len() == i {
3799			elemp := reflect.New(elemt)
3800			if !iter.Next(elemp.Interface()) {
3801				break
3802			}
3803			slicev = reflect.Append(slicev, elemp.Elem())
3804			slicev = slicev.Slice(0, slicev.Cap())
3805		} else {
3806			if !iter.Next(slicev.Index(i).Addr().Interface()) {
3807				break
3808			}
3809		}
3810		i++
3811	}
3812	resultv.Elem().Set(slicev.Slice(0, i))
3813	return iter.Close()
3814}
3815
3816// All works like Iter.All.
3817func (q *Query) All(result interface{}) error {
3818	return q.Iter().All(result)
3819}
3820
3821// The For method is obsolete and will be removed in a future release.
3822// See Iter as an elegant replacement.
3823func (q *Query) For(result interface{}, f func() error) error {
3824	return q.Iter().For(result, f)
3825}
3826
3827// The For method is obsolete and will be removed in a future release.
3828// See Iter as an elegant replacement.
3829func (iter *Iter) For(result interface{}, f func() error) (err error) {
3830	valid := false
3831	v := reflect.ValueOf(result)
3832	if v.Kind() == reflect.Ptr {
3833		v = v.Elem()
3834		switch v.Kind() {
3835		case reflect.Map, reflect.Ptr, reflect.Interface, reflect.Slice:
3836			valid = v.IsNil()
3837		}
3838	}
3839	if !valid {
3840		panic("For needs a pointer to nil reference value.  See the documentation.")
3841	}
3842	zero := reflect.Zero(v.Type())
3843	for {
3844		v.Set(zero)
3845		if !iter.Next(result) {
3846			break
3847		}
3848		err = f()
3849		if err != nil {
3850			return err
3851		}
3852	}
3853	return iter.Err()
3854}
3855
3856// acquireSocket acquires a socket from the same server that the iterator
3857// cursor was obtained from.
3858//
3859// WARNING: This method must not be called with iter.m locked. Acquiring the
3860// socket depends on the cluster sync loop, and the cluster sync loop might
3861// attempt actions which cause replyFunc to be called, inducing a deadlock.
3862func (iter *Iter) acquireSocket() (*mongoSocket, error) {
3863	socket, err := iter.session.acquireSocket(true)
3864	if err != nil {
3865		return nil, err
3866	}
3867	if socket.Server() != iter.server {
3868		// Socket server changed during iteration. This may happen
3869		// with Eventual sessions, if a Refresh is done, or if a
3870		// monotonic session gets a write and shifts from secondary
3871		// to primary. Our cursor is in a specific server, though.
3872		iter.session.m.Lock()
3873		sockTimeout := iter.session.sockTimeout
3874		iter.session.m.Unlock()
3875		socket.Release()
3876		socket, _, err = iter.server.AcquireSocket(0, sockTimeout)
3877		if err != nil {
3878			return nil, err
3879		}
3880		err := iter.session.socketLogin(socket)
3881		if err != nil {
3882			socket.Release()
3883			return nil, err
3884		}
3885	}
3886	return socket, nil
3887}
3888
3889func (iter *Iter) getMore() {
3890	// Increment now so that unlocking the iterator won't cause a
3891	// different goroutine to get here as well.
3892	iter.docsToReceive++
3893	iter.m.Unlock()
3894	socket, err := iter.acquireSocket()
3895	iter.m.Lock()
3896	if err != nil {
3897		iter.err = err
3898		return
3899	}
3900	defer socket.Release()
3901
3902	debugf("Iter %p requesting more documents", iter)
3903	if iter.limit > 0 {
3904		// The -1 below accounts for the fact docsToReceive was incremented above.
3905		limit := iter.limit - int32(iter.docsToReceive-1) - int32(iter.docData.Len())
3906		if limit < iter.op.limit {
3907			iter.op.limit = limit
3908		}
3909	}
3910	var op interface{}
3911	if iter.findCmd {
3912		op = iter.getMoreCmd()
3913	} else {
3914		op = &iter.op
3915	}
3916	if err := socket.Query(op); err != nil {
3917		iter.docsToReceive--
3918		iter.err = err
3919	}
3920}
3921
3922func (iter *Iter) getMoreCmd() *queryOp {
3923	// TODO: Define the query statically in the Iter type, next to getMoreOp.
3924	nameDot := strings.Index(iter.op.collection, ".")
3925	if nameDot < 0 {
3926		panic("invalid query collection name: " + iter.op.collection)
3927	}
3928
3929	getMore := getMoreCmd{
3930		CursorId:   iter.op.cursorId,
3931		Collection: iter.op.collection[nameDot+1:],
3932		BatchSize:  iter.op.limit,
3933	}
3934
3935	var op queryOp
3936	op.collection = iter.op.collection[:nameDot] + ".$cmd"
3937	op.query = &getMore
3938	op.limit = -1
3939	op.replyFunc = iter.op.replyFunc
3940	return &op
3941}
3942
3943type countCmd struct {
3944	Count string
3945	Query interface{}
3946	Limit int32 ",omitempty"
3947	Skip  int32 ",omitempty"
3948}
3949
3950// Count returns the total number of documents in the result set.
3951func (q *Query) Count() (n int, err error) {
3952	q.m.Lock()
3953	session := q.session
3954	op := q.op
3955	limit := q.limit
3956	q.m.Unlock()
3957
3958	c := strings.Index(op.collection, ".")
3959	if c < 0 {
3960		return 0, errors.New("Bad collection name: " + op.collection)
3961	}
3962
3963	dbname := op.collection[:c]
3964	cname := op.collection[c+1:]
3965	query := op.query
3966	if query == nil {
3967		query = bson.D{}
3968	}
3969	result := struct{ N int }{}
3970	err = session.DB(dbname).Run(countCmd{cname, query, limit, op.skip}, &result)
3971	return result.N, err
3972}
3973
3974// Count returns the total number of documents in the collection.
3975func (c *Collection) Count() (n int, err error) {
3976	return c.Find(nil).Count()
3977}
3978
3979type distinctCmd struct {
3980	Collection string "distinct"
3981	Key        string
3982	Query      interface{} ",omitempty"
3983}
3984
3985// Distinct unmarshals into result the list of distinct values for the given key.
3986//
3987// For example:
3988//
3989//     var result []int
3990//     err := collection.Find(bson.M{"gender": "F"}).Distinct("age", &result)
3991//
3992// Relevant documentation:
3993//
3994//     http://www.mongodb.org/display/DOCS/Aggregation
3995//
3996func (q *Query) Distinct(key string, result interface{}) error {
3997	q.m.Lock()
3998	session := q.session
3999	op := q.op // Copy.
4000	q.m.Unlock()
4001
4002	c := strings.Index(op.collection, ".")
4003	if c < 0 {
4004		return errors.New("Bad collection name: " + op.collection)
4005	}
4006
4007	dbname := op.collection[:c]
4008	cname := op.collection[c+1:]
4009
4010	var doc struct{ Values bson.Raw }
4011	err := session.DB(dbname).Run(distinctCmd{cname, key, op.query}, &doc)
4012	if err != nil {
4013		return err
4014	}
4015	return doc.Values.Unmarshal(result)
4016}
4017
4018type mapReduceCmd struct {
4019	Collection string "mapreduce"
4020	Map        string ",omitempty"
4021	Reduce     string ",omitempty"
4022	Finalize   string ",omitempty"
4023	Limit      int32  ",omitempty"
4024	Out        interface{}
4025	Query      interface{} ",omitempty"
4026	Sort       interface{} ",omitempty"
4027	Scope      interface{} ",omitempty"
4028	Verbose    bool        ",omitempty"
4029}
4030
4031type mapReduceResult struct {
4032	Results    bson.Raw
4033	Result     bson.Raw
4034	TimeMillis int64 "timeMillis"
4035	Counts     struct{ Input, Emit, Output int }
4036	Ok         bool
4037	Err        string
4038	Timing     *MapReduceTime
4039}
4040
4041type MapReduce struct {
4042	Map      string      // Map Javascript function code (required)
4043	Reduce   string      // Reduce Javascript function code (required)
4044	Finalize string      // Finalize Javascript function code (optional)
4045	Out      interface{} // Output collection name or document. If nil, results are inlined into the result parameter.
4046	Scope    interface{} // Optional global scope for Javascript functions
4047	Verbose  bool
4048}
4049
4050type MapReduceInfo struct {
4051	InputCount  int            // Number of documents mapped
4052	EmitCount   int            // Number of times reduce called emit
4053	OutputCount int            // Number of documents in resulting collection
4054	Database    string         // Output database, if results are not inlined
4055	Collection  string         // Output collection, if results are not inlined
4056	Time        int64          // Time to run the job, in nanoseconds
4057	VerboseTime *MapReduceTime // Only defined if Verbose was true
4058}
4059
4060type MapReduceTime struct {
4061	Total    int64 // Total time, in nanoseconds
4062	Map      int64 "mapTime"  // Time within map function, in nanoseconds
4063	EmitLoop int64 "emitLoop" // Time within the emit/map loop, in nanoseconds
4064}
4065
4066// MapReduce executes a map/reduce job for documents covered by the query.
4067// That kind of job is suitable for very flexible bulk aggregation of data
4068// performed at the server side via Javascript functions.
4069//
4070// Results from the job may be returned as a result of the query itself
4071// through the result parameter in case they'll certainly fit in memory
4072// and in a single document.  If there's the possibility that the amount
4073// of data might be too large, results must be stored back in an alternative
4074// collection or even a separate database, by setting the Out field of the
4075// provided MapReduce job.  In that case, provide nil as the result parameter.
4076//
4077// These are some of the ways to set Out:
4078//
4079//     nil
4080//         Inline results into the result parameter.
4081//
4082//     bson.M{"replace": "mycollection"}
4083//         The output will be inserted into a collection which replaces any
4084//         existing collection with the same name.
4085//
4086//     bson.M{"merge": "mycollection"}
4087//         This option will merge new data into the old output collection. In
4088//         other words, if the same key exists in both the result set and the
4089//         old collection, the new key will overwrite the old one.
4090//
4091//     bson.M{"reduce": "mycollection"}
4092//         If documents exist for a given key in the result set and in the old
4093//         collection, then a reduce operation (using the specified reduce
4094//         function) will be performed on the two values and the result will be
4095//         written to the output collection. If a finalize function was
4096//         provided, this will be run after the reduce as well.
4097//
4098//     bson.M{...., "db": "mydb"}
4099//         Any of the above options can have the "db" key included for doing
4100//         the respective action in a separate database.
4101//
4102// The following is a trivial example which will count the number of
4103// occurrences of a field named n on each document in a collection, and
4104// will return results inline:
4105//
4106//     job := &mgo.MapReduce{
4107//             Map:      "function() { emit(this.n, 1) }",
4108//             Reduce:   "function(key, values) { return Array.sum(values) }",
4109//     }
4110//     var result []struct { Id int "_id"; Value int }
4111//     _, err := collection.Find(nil).MapReduce(job, &result)
4112//     if err != nil {
4113//         return err
4114//     }
4115//     for _, item := range result {
4116//         fmt.Println(item.Value)
4117//     }
4118//
4119// This function is compatible with MongoDB 1.7.4+.
4120//
4121// Relevant documentation:
4122//
4123//     http://www.mongodb.org/display/DOCS/MapReduce
4124//
4125func (q *Query) MapReduce(job *MapReduce, result interface{}) (info *MapReduceInfo, err error) {
4126	q.m.Lock()
4127	session := q.session
4128	op := q.op // Copy.
4129	limit := q.limit
4130	q.m.Unlock()
4131
4132	c := strings.Index(op.collection, ".")
4133	if c < 0 {
4134		return nil, errors.New("Bad collection name: " + op.collection)
4135	}
4136
4137	dbname := op.collection[:c]
4138	cname := op.collection[c+1:]
4139
4140	cmd := mapReduceCmd{
4141		Collection: cname,
4142		Map:        job.Map,
4143		Reduce:     job.Reduce,
4144		Finalize:   job.Finalize,
4145		Out:        fixMROut(job.Out),
4146		Scope:      job.Scope,
4147		Verbose:    job.Verbose,
4148		Query:      op.query,
4149		Sort:       op.options.OrderBy,
4150		Limit:      limit,
4151	}
4152
4153	if cmd.Out == nil {
4154		cmd.Out = bson.D{{"inline", 1}}
4155	}
4156
4157	var doc mapReduceResult
4158	err = session.DB(dbname).Run(&cmd, &doc)
4159	if err != nil {
4160		return nil, err
4161	}
4162	if doc.Err != "" {
4163		return nil, errors.New(doc.Err)
4164	}
4165
4166	info = &MapReduceInfo{
4167		InputCount:  doc.Counts.Input,
4168		EmitCount:   doc.Counts.Emit,
4169		OutputCount: doc.Counts.Output,
4170		Time:        doc.TimeMillis * 1e6,
4171	}
4172
4173	if doc.Result.Kind == 0x02 {
4174		err = doc.Result.Unmarshal(&info.Collection)
4175		info.Database = dbname
4176	} else if doc.Result.Kind == 0x03 {
4177		var v struct{ Collection, Db string }
4178		err = doc.Result.Unmarshal(&v)
4179		info.Collection = v.Collection
4180		info.Database = v.Db
4181	}
4182
4183	if doc.Timing != nil {
4184		info.VerboseTime = doc.Timing
4185		info.VerboseTime.Total *= 1e6
4186		info.VerboseTime.Map *= 1e6
4187		info.VerboseTime.EmitLoop *= 1e6
4188	}
4189
4190	if err != nil {
4191		return nil, err
4192	}
4193	if result != nil {
4194		return info, doc.Results.Unmarshal(result)
4195	}
4196	return info, nil
4197}
4198
4199// The "out" option in the MapReduce command must be ordered. This was
4200// found after the implementation was accepting maps for a long time,
4201// so rather than breaking the API, we'll fix the order if necessary.
4202// Details about the order requirement may be seen in MongoDB's code:
4203//
4204//     http://goo.gl/L8jwJX
4205//
4206func fixMROut(out interface{}) interface{} {
4207	outv := reflect.ValueOf(out)
4208	if outv.Kind() != reflect.Map || outv.Type().Key() != reflect.TypeOf("") {
4209		return out
4210	}
4211	outs := make(bson.D, outv.Len())
4212
4213	outTypeIndex := -1
4214	for i, k := range outv.MapKeys() {
4215		ks := k.String()
4216		outs[i].Name = ks
4217		outs[i].Value = outv.MapIndex(k).Interface()
4218		switch ks {
4219		case "normal", "replace", "merge", "reduce", "inline":
4220			outTypeIndex = i
4221		}
4222	}
4223	if outTypeIndex > 0 {
4224		outs[0], outs[outTypeIndex] = outs[outTypeIndex], outs[0]
4225	}
4226	return outs
4227}
4228
4229// Change holds fields for running a findAndModify MongoDB command via
4230// the Query.Apply method.
4231type Change struct {
4232	Update    interface{} // The update document
4233	Upsert    bool        // Whether to insert in case the document isn't found
4234	Remove    bool        // Whether to remove the document found rather than updating
4235	ReturnNew bool        // Should the modified document be returned rather than the old one
4236}
4237
4238type findModifyCmd struct {
4239	Collection                  string      "findAndModify"
4240	Query, Update, Sort, Fields interface{} ",omitempty"
4241	Upsert, Remove, New         bool        ",omitempty"
4242}
4243
4244type valueResult struct {
4245	Value     bson.Raw
4246	LastError LastError "lastErrorObject"
4247}
4248
4249// Apply runs the findAndModify MongoDB command, which allows updating, upserting
4250// or removing a document matching a query and atomically returning either the old
4251// version (the default) or the new version of the document (when ReturnNew is true).
4252// If no objects are found Apply returns ErrNotFound.
4253//
4254// The Sort and Select query methods affect the result of Apply.  In case
4255// multiple documents match the query, Sort enables selecting which document to
4256// act upon by ordering it first.  Select enables retrieving only a selection
4257// of fields of the new or old document.
4258//
4259// This simple example increments a counter and prints its new value:
4260//
4261//     change := mgo.Change{
4262//             Update: bson.M{"$inc": bson.M{"n": 1}},
4263//             ReturnNew: true,
4264//     }
4265//     info, err = col.Find(M{"_id": id}).Apply(change, &doc)
4266//     fmt.Println(doc.N)
4267//
4268// This method depends on MongoDB >= 2.0 to work properly.
4269//
4270// Relevant documentation:
4271//
4272//     http://www.mongodb.org/display/DOCS/findAndModify+Command
4273//     http://www.mongodb.org/display/DOCS/Updating
4274//     http://www.mongodb.org/display/DOCS/Atomic+Operations
4275//
4276func (q *Query) Apply(change Change, result interface{}) (info *ChangeInfo, err error) {
4277	q.m.Lock()
4278	session := q.session
4279	op := q.op // Copy.
4280	q.m.Unlock()
4281
4282	c := strings.Index(op.collection, ".")
4283	if c < 0 {
4284		return nil, errors.New("bad collection name: " + op.collection)
4285	}
4286
4287	dbname := op.collection[:c]
4288	cname := op.collection[c+1:]
4289
4290	cmd := findModifyCmd{
4291		Collection: cname,
4292		Update:     change.Update,
4293		Upsert:     change.Upsert,
4294		Remove:     change.Remove,
4295		New:        change.ReturnNew,
4296		Query:      op.query,
4297		Sort:       op.options.OrderBy,
4298		Fields:     op.selector,
4299	}
4300
4301	session = session.Clone()
4302	defer session.Close()
4303	session.SetMode(Strong, false)
4304
4305	var doc valueResult
4306	for i := 0; i < maxUpsertRetries; i++ {
4307		err = session.DB(dbname).Run(&cmd, &doc)
4308		if err == nil {
4309			break
4310		}
4311		if change.Upsert && IsDup(err) && i+1 < maxUpsertRetries {
4312			// Retry duplicate key errors on upserts.
4313			// https://docs.mongodb.com/v3.2/reference/method/db.collection.update/#use-unique-indexes
4314			continue
4315		}
4316		if qerr, ok := err.(*QueryError); ok && qerr.Message == "No matching object found" {
4317			return nil, ErrNotFound
4318		}
4319		return nil, err
4320	}
4321	if doc.LastError.N == 0 {
4322		return nil, ErrNotFound
4323	}
4324	if doc.Value.Kind != 0x0A && result != nil {
4325		err = doc.Value.Unmarshal(result)
4326		if err != nil {
4327			return nil, err
4328		}
4329	}
4330	info = &ChangeInfo{}
4331	lerr := &doc.LastError
4332	if lerr.UpdatedExisting {
4333		info.Updated = lerr.N
4334		info.Matched = lerr.N
4335	} else if change.Remove {
4336		info.Removed = lerr.N
4337		info.Matched = lerr.N
4338	} else if change.Upsert {
4339		info.UpsertedId = lerr.UpsertedId
4340	}
4341	return info, nil
4342}
4343
4344// The BuildInfo type encapsulates details about the running MongoDB server.
4345//
4346// Note that the VersionArray field was introduced in MongoDB 2.0+, but it is
4347// internally assembled from the Version information for previous versions.
4348// In both cases, VersionArray is guaranteed to have at least 4 entries.
4349type BuildInfo struct {
4350	Version        string
4351	VersionArray   []int  `bson:"versionArray"` // On MongoDB 2.0+; assembled from Version otherwise
4352	GitVersion     string `bson:"gitVersion"`
4353	OpenSSLVersion string `bson:"OpenSSLVersion"`
4354	SysInfo        string `bson:"sysInfo"` // Deprecated and empty on MongoDB 3.2+.
4355	Bits           int
4356	Debug          bool
4357	MaxObjectSize  int `bson:"maxBsonObjectSize"`
4358}
4359
4360// VersionAtLeast returns whether the BuildInfo version is greater than or
4361// equal to the provided version number. If more than one number is
4362// provided, numbers will be considered as major, minor, and so on.
4363func (bi *BuildInfo) VersionAtLeast(version ...int) bool {
4364	for i, vi := range version {
4365		if i == len(bi.VersionArray) {
4366			return false
4367		}
4368		if bivi := bi.VersionArray[i]; bivi != vi {
4369			return bivi >= vi
4370		}
4371	}
4372	return true
4373}
4374
4375// BuildInfo retrieves the version and other details about the
4376// running MongoDB server.
4377func (s *Session) BuildInfo() (info BuildInfo, err error) {
4378	err = s.Run(bson.D{{"buildInfo", "1"}}, &info)
4379	if len(info.VersionArray) == 0 {
4380		for _, a := range strings.Split(info.Version, ".") {
4381			i, err := strconv.Atoi(a)
4382			if err != nil {
4383				break
4384			}
4385			info.VersionArray = append(info.VersionArray, i)
4386		}
4387	}
4388	for len(info.VersionArray) < 4 {
4389		info.VersionArray = append(info.VersionArray, 0)
4390	}
4391	if i := strings.IndexByte(info.GitVersion, ' '); i >= 0 {
4392		// Strip off the " modules: enterprise" suffix. This is a _git version_.
4393		// That information may be moved to another field if people need it.
4394		info.GitVersion = info.GitVersion[:i]
4395	}
4396	if info.SysInfo == "deprecated" {
4397		info.SysInfo = ""
4398	}
4399	return
4400}
4401
4402// ---------------------------------------------------------------------------
4403// Internal session handling helpers.
4404
4405func (s *Session) acquireSocket(slaveOk bool) (*mongoSocket, error) {
4406
4407	// Read-only lock to check for previously reserved socket.
4408	s.m.RLock()
4409	// If there is a slave socket reserved and its use is acceptable, take it as long
4410	// as there isn't a master socket which would be preferred by the read preference mode.
4411	if s.slaveSocket != nil && s.slaveOk && slaveOk && (s.masterSocket == nil || s.consistency != PrimaryPreferred && s.consistency != Monotonic) {
4412		socket := s.slaveSocket
4413		socket.Acquire()
4414		s.m.RUnlock()
4415		return socket, nil
4416	}
4417	if s.masterSocket != nil {
4418		socket := s.masterSocket
4419		socket.Acquire()
4420		s.m.RUnlock()
4421		return socket, nil
4422	}
4423	s.m.RUnlock()
4424
4425	// No go.  We may have to request a new socket and change the session,
4426	// so try again but with an exclusive lock now.
4427	s.m.Lock()
4428	defer s.m.Unlock()
4429
4430	if s.slaveSocket != nil && s.slaveOk && slaveOk && (s.masterSocket == nil || s.consistency != PrimaryPreferred && s.consistency != Monotonic) {
4431		s.slaveSocket.Acquire()
4432		return s.slaveSocket, nil
4433	}
4434	if s.masterSocket != nil {
4435		s.masterSocket.Acquire()
4436		return s.masterSocket, nil
4437	}
4438
4439	// Still not good.  We need a new socket.
4440	sock, err := s.cluster().AcquireSocket(s.consistency, slaveOk && s.slaveOk, s.syncTimeout, s.sockTimeout, s.queryConfig.op.serverTags, s.poolLimit)
4441	if err != nil {
4442		return nil, err
4443	}
4444
4445	// Authenticate the new socket.
4446	if err = s.socketLogin(sock); err != nil {
4447		sock.Release()
4448		return nil, err
4449	}
4450
4451	// Keep track of the new socket, if necessary.
4452	// Note that, as a special case, if the Eventual session was
4453	// not refreshed (s.slaveSocket != nil), it means the developer
4454	// asked to preserve an existing reserved socket, so we'll
4455	// keep a master one around too before a Refresh happens.
4456	if s.consistency != Eventual || s.slaveSocket != nil {
4457		s.setSocket(sock)
4458	}
4459
4460	// Switch over a Monotonic session to the master.
4461	if !slaveOk && s.consistency == Monotonic {
4462		s.slaveOk = false
4463	}
4464
4465	return sock, nil
4466}
4467
4468// setSocket binds socket to this section.
4469func (s *Session) setSocket(socket *mongoSocket) {
4470	info := socket.Acquire()
4471	if info.Master {
4472		if s.masterSocket != nil {
4473			panic("setSocket(master) with existing master socket reserved")
4474		}
4475		s.masterSocket = socket
4476	} else {
4477		if s.slaveSocket != nil {
4478			panic("setSocket(slave) with existing slave socket reserved")
4479		}
4480		s.slaveSocket = socket
4481	}
4482}
4483
4484// unsetSocket releases any slave and/or master sockets reserved.
4485func (s *Session) unsetSocket() {
4486	if s.masterSocket != nil {
4487		s.masterSocket.Release()
4488	}
4489	if s.slaveSocket != nil {
4490		s.slaveSocket.Release()
4491	}
4492	s.masterSocket = nil
4493	s.slaveSocket = nil
4494}
4495
4496func (iter *Iter) replyFunc() replyFunc {
4497	return func(err error, op *replyOp, docNum int, docData []byte) {
4498		iter.m.Lock()
4499		iter.docsToReceive--
4500		if err != nil {
4501			iter.err = err
4502			debugf("Iter %p received an error: %s", iter, err.Error())
4503		} else if docNum == -1 {
4504			debugf("Iter %p received no documents (cursor=%d).", iter, op.cursorId)
4505			if op != nil && op.cursorId != 0 {
4506				// It's a tailable cursor.
4507				iter.op.cursorId = op.cursorId
4508			} else if op != nil && op.cursorId == 0 && op.flags&1 == 1 {
4509				// Cursor likely timed out.
4510				iter.err = ErrCursor
4511			} else {
4512				iter.err = ErrNotFound
4513			}
4514		} else if iter.findCmd {
4515			debugf("Iter %p received reply document %d/%d (cursor=%d)", iter, docNum+1, int(op.replyDocs), op.cursorId)
4516			var findReply struct {
4517				Ok     bool
4518				Code   int
4519				Errmsg string
4520				Cursor cursorData
4521			}
4522			if err := bson.Unmarshal(docData, &findReply); err != nil {
4523				iter.err = err
4524			} else if !findReply.Ok && findReply.Errmsg != "" {
4525				iter.err = &QueryError{Code: findReply.Code, Message: findReply.Errmsg}
4526			} else if len(findReply.Cursor.FirstBatch) == 0 && len(findReply.Cursor.NextBatch) == 0 {
4527				iter.err = ErrNotFound
4528			} else {
4529				batch := findReply.Cursor.FirstBatch
4530				if len(batch) == 0 {
4531					batch = findReply.Cursor.NextBatch
4532				}
4533				rdocs := len(batch)
4534				for _, raw := range batch {
4535					iter.docData.Push(raw.Data)
4536				}
4537				iter.docsToReceive = 0
4538				docsToProcess := iter.docData.Len()
4539				if iter.limit == 0 || int32(docsToProcess) < iter.limit {
4540					iter.docsBeforeMore = docsToProcess - int(iter.prefetch*float64(rdocs))
4541				} else {
4542					iter.docsBeforeMore = -1
4543				}
4544				iter.op.cursorId = findReply.Cursor.Id
4545			}
4546		} else {
4547			rdocs := int(op.replyDocs)
4548			if docNum == 0 {
4549				iter.docsToReceive += rdocs - 1
4550				docsToProcess := iter.docData.Len() + rdocs
4551				if iter.limit == 0 || int32(docsToProcess) < iter.limit {
4552					iter.docsBeforeMore = docsToProcess - int(iter.prefetch*float64(rdocs))
4553				} else {
4554					iter.docsBeforeMore = -1
4555				}
4556				iter.op.cursorId = op.cursorId
4557			}
4558			debugf("Iter %p received reply document %d/%d (cursor=%d)", iter, docNum+1, rdocs, op.cursorId)
4559			iter.docData.Push(docData)
4560		}
4561		iter.gotReply.Broadcast()
4562		iter.m.Unlock()
4563	}
4564}
4565
4566type writeCmdResult struct {
4567	Ok        bool
4568	N         int
4569	NModified int `bson:"nModified"`
4570	Upserted  []struct {
4571		Index int
4572		Id    interface{} `_id`
4573	}
4574	ConcernError writeConcernError `bson:"writeConcernError"`
4575	Errors       []writeCmdError   `bson:"writeErrors"`
4576}
4577
4578type writeConcernError struct {
4579	Code   int
4580	ErrMsg string
4581}
4582
4583type writeCmdError struct {
4584	Index  int
4585	Code   int
4586	ErrMsg string
4587}
4588
4589func (r *writeCmdResult) BulkErrorCases() []BulkErrorCase {
4590	ecases := make([]BulkErrorCase, len(r.Errors))
4591	for i, err := range r.Errors {
4592		ecases[i] = BulkErrorCase{err.Index, &QueryError{Code: err.Code, Message: err.ErrMsg}}
4593	}
4594	return ecases
4595}
4596
4597// writeOp runs the given modifying operation, potentially followed up
4598// by a getLastError command in case the session is in safe mode.  The
4599// LastError result is made available in lerr, and if lerr.Err is set it
4600// will also be returned as err.
4601func (c *Collection) writeOp(op interface{}, ordered bool) (lerr *LastError, err error) {
4602	s := c.Database.Session
4603	socket, err := s.acquireSocket(c.Database.Name == "local")
4604	if err != nil {
4605		return nil, err
4606	}
4607	defer socket.Release()
4608
4609	s.m.RLock()
4610	safeOp := s.safeOp
4611	bypassValidation := s.bypassValidation
4612	s.m.RUnlock()
4613
4614	if socket.ServerInfo().MaxWireVersion >= 2 {
4615		// Servers with a more recent write protocol benefit from write commands.
4616		if op, ok := op.(*insertOp); ok && len(op.documents) > 1000 {
4617			var lerr LastError
4618
4619			// Maximum batch size is 1000. Must split out in separate operations for compatibility.
4620			all := op.documents
4621			for i := 0; i < len(all); i += 1000 {
4622				l := i + 1000
4623				if l > len(all) {
4624					l = len(all)
4625				}
4626				op.documents = all[i:l]
4627				oplerr, err := c.writeOpCommand(socket, safeOp, op, ordered, bypassValidation)
4628				lerr.N += oplerr.N
4629				lerr.modified += oplerr.modified
4630				if err != nil {
4631					for ei := range oplerr.ecases {
4632						oplerr.ecases[ei].Index += i
4633					}
4634					lerr.ecases = append(lerr.ecases, oplerr.ecases...)
4635					if op.flags&1 == 0 {
4636						return &lerr, err
4637					}
4638				}
4639			}
4640			if len(lerr.ecases) != 0 {
4641				return &lerr, lerr.ecases[0].Err
4642			}
4643			return &lerr, nil
4644		}
4645		return c.writeOpCommand(socket, safeOp, op, ordered, bypassValidation)
4646	} else if updateOps, ok := op.(bulkUpdateOp); ok {
4647		var lerr LastError
4648		for i, updateOp := range updateOps {
4649			oplerr, err := c.writeOpQuery(socket, safeOp, updateOp, ordered)
4650			lerr.N += oplerr.N
4651			lerr.modified += oplerr.modified
4652			if err != nil {
4653				lerr.ecases = append(lerr.ecases, BulkErrorCase{i, err})
4654				if ordered {
4655					break
4656				}
4657			}
4658		}
4659		if len(lerr.ecases) != 0 {
4660			return &lerr, lerr.ecases[0].Err
4661		}
4662		return &lerr, nil
4663	} else if deleteOps, ok := op.(bulkDeleteOp); ok {
4664		var lerr LastError
4665		for i, deleteOp := range deleteOps {
4666			oplerr, err := c.writeOpQuery(socket, safeOp, deleteOp, ordered)
4667			lerr.N += oplerr.N
4668			lerr.modified += oplerr.modified
4669			if err != nil {
4670				lerr.ecases = append(lerr.ecases, BulkErrorCase{i, err})
4671				if ordered {
4672					break
4673				}
4674			}
4675		}
4676		if len(lerr.ecases) != 0 {
4677			return &lerr, lerr.ecases[0].Err
4678		}
4679		return &lerr, nil
4680	}
4681	return c.writeOpQuery(socket, safeOp, op, ordered)
4682}
4683
4684func (c *Collection) writeOpQuery(socket *mongoSocket, safeOp *queryOp, op interface{}, ordered bool) (lerr *LastError, err error) {
4685	if safeOp == nil {
4686		return nil, socket.Query(op)
4687	}
4688
4689	var mutex sync.Mutex
4690	var replyData []byte
4691	var replyErr error
4692	mutex.Lock()
4693	query := *safeOp // Copy the data.
4694	query.collection = c.Database.Name + ".$cmd"
4695	query.replyFunc = func(err error, reply *replyOp, docNum int, docData []byte) {
4696		replyData = docData
4697		replyErr = err
4698		mutex.Unlock()
4699	}
4700	err = socket.Query(op, &query)
4701	if err != nil {
4702		return nil, err
4703	}
4704	mutex.Lock() // Wait.
4705	if replyErr != nil {
4706		return nil, replyErr // XXX TESTME
4707	}
4708	if hasErrMsg(replyData) {
4709		// Looks like getLastError itself failed.
4710		err = checkQueryError(query.collection, replyData)
4711		if err != nil {
4712			return nil, err
4713		}
4714	}
4715	result := &LastError{}
4716	bson.Unmarshal(replyData, &result)
4717	debugf("Result from writing query: %#v", result)
4718	if result.Err != "" {
4719		result.ecases = []BulkErrorCase{{Index: 0, Err: result}}
4720		if insert, ok := op.(*insertOp); ok && len(insert.documents) > 1 {
4721			result.ecases[0].Index = -1
4722		}
4723		return result, result
4724	}
4725	// With MongoDB <2.6 we don't know how many actually changed, so make it the same as matched.
4726	result.modified = result.N
4727	return result, nil
4728}
4729
4730func (c *Collection) writeOpCommand(socket *mongoSocket, safeOp *queryOp, op interface{}, ordered, bypassValidation bool) (lerr *LastError, err error) {
4731	var writeConcern interface{}
4732	if safeOp == nil {
4733		writeConcern = bson.D{{"w", 0}}
4734	} else {
4735		writeConcern = safeOp.query.(*getLastError)
4736	}
4737
4738	var cmd bson.D
4739	switch op := op.(type) {
4740	case *insertOp:
4741		// http://docs.mongodb.org/manual/reference/command/insert
4742		cmd = bson.D{
4743			{"insert", c.Name},
4744			{"documents", op.documents},
4745			{"writeConcern", writeConcern},
4746			{"ordered", op.flags&1 == 0},
4747		}
4748	case *updateOp:
4749		// http://docs.mongodb.org/manual/reference/command/update
4750		cmd = bson.D{
4751			{"update", c.Name},
4752			{"updates", []interface{}{op}},
4753			{"writeConcern", writeConcern},
4754			{"ordered", ordered},
4755		}
4756	case bulkUpdateOp:
4757		// http://docs.mongodb.org/manual/reference/command/update
4758		cmd = bson.D{
4759			{"update", c.Name},
4760			{"updates", op},
4761			{"writeConcern", writeConcern},
4762			{"ordered", ordered},
4763		}
4764	case *deleteOp:
4765		// http://docs.mongodb.org/manual/reference/command/delete
4766		cmd = bson.D{
4767			{"delete", c.Name},
4768			{"deletes", []interface{}{op}},
4769			{"writeConcern", writeConcern},
4770			{"ordered", ordered},
4771		}
4772	case bulkDeleteOp:
4773		// http://docs.mongodb.org/manual/reference/command/delete
4774		cmd = bson.D{
4775			{"delete", c.Name},
4776			{"deletes", op},
4777			{"writeConcern", writeConcern},
4778			{"ordered", ordered},
4779		}
4780	}
4781	if bypassValidation {
4782		cmd = append(cmd, bson.DocElem{"bypassDocumentValidation", true})
4783	}
4784
4785	var result writeCmdResult
4786	err = c.Database.run(socket, cmd, &result)
4787	debugf("Write command result: %#v (err=%v)", result, err)
4788	ecases := result.BulkErrorCases()
4789	lerr = &LastError{
4790		UpdatedExisting: result.N > 0 && len(result.Upserted) == 0,
4791		N:               result.N,
4792
4793		modified: result.NModified,
4794		ecases:   ecases,
4795	}
4796	if len(result.Upserted) > 0 {
4797		lerr.UpsertedId = result.Upserted[0].Id
4798	}
4799	if len(result.Errors) > 0 {
4800		e := result.Errors[0]
4801		lerr.Code = e.Code
4802		lerr.Err = e.ErrMsg
4803		err = lerr
4804	} else if result.ConcernError.Code != 0 {
4805		e := result.ConcernError
4806		lerr.Code = e.Code
4807		lerr.Err = e.ErrMsg
4808		err = lerr
4809	}
4810
4811	if err == nil && safeOp == nil {
4812		return nil, nil
4813	}
4814	return lerr, err
4815}
4816
4817func hasErrMsg(d []byte) bool {
4818	l := len(d)
4819	for i := 0; i+8 < l; i++ {
4820		if d[i] == '\x02' && d[i+1] == 'e' && d[i+2] == 'r' && d[i+3] == 'r' && d[i+4] == 'm' && d[i+5] == 's' && d[i+6] == 'g' && d[i+7] == '\x00' {
4821			return true
4822		}
4823	}
4824	return false
4825}
4826