1// mgo - MongoDB driver for Go
2//
3// Copyright (c) 2010-2012 - Gustavo Niemeyer <gustavo@niemeyer.net>
4//
5// All rights reserved.
6//
7// Redistribution and use in source and binary forms, with or without
8// modification, are permitted provided that the following conditions are met:
9//
10// 1. Redistributions of source code must retain the above copyright notice, this
11//    list of conditions and the following disclaimer.
12// 2. Redistributions in binary form must reproduce the above copyright notice,
13//    this list of conditions and the following disclaimer in the documentation
14//    and/or other materials provided with the distribution.
15//
16// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
17// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
19// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
20// ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
21// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
22// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
23// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
24// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27package mgo_test
28
29import (
30	"fmt"
31	"io"
32	"net"
33	"strings"
34	"sync"
35	"time"
36
37	. "gopkg.in/check.v1"
38	"gopkg.in/mgo.v2"
39	"gopkg.in/mgo.v2/bson"
40)
41
42func (s *S) TestNewSession(c *C) {
43	session, err := mgo.Dial("localhost:40001")
44	c.Assert(err, IsNil)
45	defer session.Close()
46
47	// Do a dummy operation to wait for connection.
48	coll := session.DB("mydb").C("mycoll")
49	err = coll.Insert(M{"_id": 1})
50	c.Assert(err, IsNil)
51
52	// Tweak safety and query settings to ensure other has copied those.
53	session.SetSafe(nil)
54	session.SetBatch(-1)
55	other := session.New()
56	defer other.Close()
57	session.SetSafe(&mgo.Safe{})
58
59	// Clone was copied while session was unsafe, so no errors.
60	otherColl := other.DB("mydb").C("mycoll")
61	err = otherColl.Insert(M{"_id": 1})
62	c.Assert(err, IsNil)
63
64	// Original session was made safe again.
65	err = coll.Insert(M{"_id": 1})
66	c.Assert(err, NotNil)
67
68	// With New(), each session has its own socket now.
69	stats := mgo.GetStats()
70	c.Assert(stats.MasterConns, Equals, 2)
71	c.Assert(stats.SocketsInUse, Equals, 2)
72
73	// Ensure query parameters were cloned.
74	err = otherColl.Insert(M{"_id": 2})
75	c.Assert(err, IsNil)
76
77	// Ping the database to ensure the nonce has been received already.
78	c.Assert(other.Ping(), IsNil)
79
80	mgo.ResetStats()
81
82	iter := otherColl.Find(M{}).Iter()
83	c.Assert(err, IsNil)
84
85	m := M{}
86	ok := iter.Next(m)
87	c.Assert(ok, Equals, true)
88	err = iter.Close()
89	c.Assert(err, IsNil)
90
91	// If Batch(-1) is in effect, a single document must have been received.
92	stats = mgo.GetStats()
93	c.Assert(stats.ReceivedDocs, Equals, 1)
94}
95
96func (s *S) TestCloneSession(c *C) {
97	session, err := mgo.Dial("localhost:40001")
98	c.Assert(err, IsNil)
99	defer session.Close()
100
101	// Do a dummy operation to wait for connection.
102	coll := session.DB("mydb").C("mycoll")
103	err = coll.Insert(M{"_id": 1})
104	c.Assert(err, IsNil)
105
106	// Tweak safety and query settings to ensure clone is copying those.
107	session.SetSafe(nil)
108	session.SetBatch(-1)
109	clone := session.Clone()
110	defer clone.Close()
111	session.SetSafe(&mgo.Safe{})
112
113	// Clone was copied while session was unsafe, so no errors.
114	cloneColl := clone.DB("mydb").C("mycoll")
115	err = cloneColl.Insert(M{"_id": 1})
116	c.Assert(err, IsNil)
117
118	// Original session was made safe again.
119	err = coll.Insert(M{"_id": 1})
120	c.Assert(err, NotNil)
121
122	// With Clone(), same socket is shared between sessions now.
123	stats := mgo.GetStats()
124	c.Assert(stats.SocketsInUse, Equals, 1)
125	c.Assert(stats.SocketRefs, Equals, 2)
126
127	// Refreshing one of them should let the original socket go,
128	// while preserving the safety settings.
129	clone.Refresh()
130	err = cloneColl.Insert(M{"_id": 1})
131	c.Assert(err, IsNil)
132
133	// Must have used another connection now.
134	stats = mgo.GetStats()
135	c.Assert(stats.SocketsInUse, Equals, 2)
136	c.Assert(stats.SocketRefs, Equals, 2)
137
138	// Ensure query parameters were cloned.
139	err = cloneColl.Insert(M{"_id": 2})
140	c.Assert(err, IsNil)
141
142	// Ping the database to ensure the nonce has been received already.
143	c.Assert(clone.Ping(), IsNil)
144
145	mgo.ResetStats()
146
147	iter := cloneColl.Find(M{}).Iter()
148	c.Assert(err, IsNil)
149
150	m := M{}
151	ok := iter.Next(m)
152	c.Assert(ok, Equals, true)
153	err = iter.Close()
154	c.Assert(err, IsNil)
155
156	// If Batch(-1) is in effect, a single document must have been received.
157	stats = mgo.GetStats()
158	c.Assert(stats.ReceivedDocs, Equals, 1)
159}
160
161func (s *S) TestModeStrong(c *C) {
162	session, err := mgo.Dial("localhost:40012")
163	c.Assert(err, IsNil)
164	defer session.Close()
165
166	session.SetMode(mgo.Monotonic, false)
167	session.SetMode(mgo.Strong, false)
168
169	c.Assert(session.Mode(), Equals, mgo.Strong)
170
171	result := M{}
172	cmd := session.DB("admin").C("$cmd")
173	err = cmd.Find(M{"ismaster": 1}).One(&result)
174	c.Assert(err, IsNil)
175	c.Assert(result["ismaster"], Equals, true)
176
177	coll := session.DB("mydb").C("mycoll")
178	err = coll.Insert(M{"a": 1})
179	c.Assert(err, IsNil)
180
181	// Wait since the sync also uses sockets.
182	for len(session.LiveServers()) != 3 {
183		c.Log("Waiting for cluster sync to finish...")
184		time.Sleep(5e8)
185	}
186
187	stats := mgo.GetStats()
188	c.Assert(stats.MasterConns, Equals, 1)
189	c.Assert(stats.SlaveConns, Equals, 2)
190	c.Assert(stats.SocketsInUse, Equals, 1)
191
192	session.SetMode(mgo.Strong, true)
193
194	stats = mgo.GetStats()
195	c.Assert(stats.SocketsInUse, Equals, 0)
196}
197
198func (s *S) TestModeMonotonic(c *C) {
199	// Must necessarily connect to a slave, otherwise the
200	// master connection will be available first.
201	session, err := mgo.Dial("localhost:40012")
202	c.Assert(err, IsNil)
203	defer session.Close()
204
205	session.SetMode(mgo.Monotonic, false)
206
207	c.Assert(session.Mode(), Equals, mgo.Monotonic)
208
209	var result struct{ IsMaster bool }
210	cmd := session.DB("admin").C("$cmd")
211	err = cmd.Find(M{"ismaster": 1}).One(&result)
212	c.Assert(err, IsNil)
213	c.Assert(result.IsMaster, Equals, false)
214
215	coll := session.DB("mydb").C("mycoll")
216	err = coll.Insert(M{"a": 1})
217	c.Assert(err, IsNil)
218
219	err = cmd.Find(M{"ismaster": 1}).One(&result)
220	c.Assert(err, IsNil)
221	c.Assert(result.IsMaster, Equals, true)
222
223	// Wait since the sync also uses sockets.
224	for len(session.LiveServers()) != 3 {
225		c.Log("Waiting for cluster sync to finish...")
226		time.Sleep(5e8)
227	}
228
229	stats := mgo.GetStats()
230	c.Assert(stats.MasterConns, Equals, 1)
231	c.Assert(stats.SlaveConns, Equals, 2)
232	c.Assert(stats.SocketsInUse, Equals, 2)
233
234	session.SetMode(mgo.Monotonic, true)
235
236	stats = mgo.GetStats()
237	c.Assert(stats.SocketsInUse, Equals, 0)
238}
239
240func (s *S) TestModeMonotonicAfterStrong(c *C) {
241	// Test that a strong session shifting to a monotonic
242	// one preserves the socket untouched.
243
244	session, err := mgo.Dial("localhost:40012")
245	c.Assert(err, IsNil)
246	defer session.Close()
247
248	// Insert something to force a connection to the master.
249	coll := session.DB("mydb").C("mycoll")
250	err = coll.Insert(M{"a": 1})
251	c.Assert(err, IsNil)
252
253	session.SetMode(mgo.Monotonic, false)
254
255	// Wait since the sync also uses sockets.
256	for len(session.LiveServers()) != 3 {
257		c.Log("Waiting for cluster sync to finish...")
258		time.Sleep(5e8)
259	}
260
261	// Master socket should still be reserved.
262	stats := mgo.GetStats()
263	c.Assert(stats.SocketsInUse, Equals, 1)
264
265	// Confirm it's the master even though it's Monotonic by now.
266	result := M{}
267	cmd := session.DB("admin").C("$cmd")
268	err = cmd.Find(M{"ismaster": 1}).One(&result)
269	c.Assert(err, IsNil)
270	c.Assert(result["ismaster"], Equals, true)
271}
272
273func (s *S) TestModeStrongAfterMonotonic(c *C) {
274	// Test that shifting from Monotonic to Strong while
275	// using a slave socket will keep the socket reserved
276	// until the master socket is necessary, so that no
277	// switch over occurs unless it's actually necessary.
278
279	// Must necessarily connect to a slave, otherwise the
280	// master connection will be available first.
281	session, err := mgo.Dial("localhost:40012")
282	c.Assert(err, IsNil)
283	defer session.Close()
284
285	session.SetMode(mgo.Monotonic, false)
286
287	// Ensure we're talking to a slave, and reserve the socket.
288	result := M{}
289	err = session.Run("ismaster", &result)
290	c.Assert(err, IsNil)
291	c.Assert(result["ismaster"], Equals, false)
292
293	// Switch to a Strong session.
294	session.SetMode(mgo.Strong, false)
295
296	// Wait since the sync also uses sockets.
297	for len(session.LiveServers()) != 3 {
298		c.Log("Waiting for cluster sync to finish...")
299		time.Sleep(5e8)
300	}
301
302	// Slave socket should still be reserved.
303	stats := mgo.GetStats()
304	c.Assert(stats.SocketsInUse, Equals, 1)
305
306	// But any operation will switch it to the master.
307	result = M{}
308	err = session.Run("ismaster", &result)
309	c.Assert(err, IsNil)
310	c.Assert(result["ismaster"], Equals, true)
311}
312
313func (s *S) TestModeMonotonicWriteOnIteration(c *C) {
314	// Must necessarily connect to a slave, otherwise the
315	// master connection will be available first.
316	session, err := mgo.Dial("localhost:40012")
317	c.Assert(err, IsNil)
318	defer session.Close()
319
320	session.SetMode(mgo.Monotonic, false)
321
322	c.Assert(session.Mode(), Equals, mgo.Monotonic)
323
324	coll1 := session.DB("mydb").C("mycoll1")
325	coll2 := session.DB("mydb").C("mycoll2")
326
327	ns := []int{40, 41, 42, 43, 44, 45, 46}
328	for _, n := range ns {
329		err := coll1.Insert(M{"n": n})
330		c.Assert(err, IsNil)
331	}
332
333	// Release master so we can grab a slave again.
334	session.Refresh()
335
336	// Wait until synchronization is done.
337	for {
338		n, err := coll1.Count()
339		c.Assert(err, IsNil)
340		if n == len(ns) {
341			break
342		}
343	}
344
345	iter := coll1.Find(nil).Batch(2).Iter()
346	i := 0
347	m := M{}
348	for iter.Next(&m) {
349		i++
350		if i > 3 {
351			err := coll2.Insert(M{"n": 47 + i})
352			c.Assert(err, IsNil)
353		}
354	}
355	c.Assert(i, Equals, len(ns))
356}
357
358func (s *S) TestModeEventual(c *C) {
359	// Must necessarily connect to a slave, otherwise the
360	// master connection will be available first.
361	session, err := mgo.Dial("localhost:40012")
362	c.Assert(err, IsNil)
363	defer session.Close()
364
365	session.SetMode(mgo.Eventual, false)
366
367	c.Assert(session.Mode(), Equals, mgo.Eventual)
368
369	result := M{}
370	err = session.Run("ismaster", &result)
371	c.Assert(err, IsNil)
372	c.Assert(result["ismaster"], Equals, false)
373
374	coll := session.DB("mydb").C("mycoll")
375	err = coll.Insert(M{"a": 1})
376	c.Assert(err, IsNil)
377
378	result = M{}
379	err = session.Run("ismaster", &result)
380	c.Assert(err, IsNil)
381	c.Assert(result["ismaster"], Equals, false)
382
383	// Wait since the sync also uses sockets.
384	for len(session.LiveServers()) != 3 {
385		c.Log("Waiting for cluster sync to finish...")
386		time.Sleep(5e8)
387	}
388
389	stats := mgo.GetStats()
390	c.Assert(stats.MasterConns, Equals, 1)
391	c.Assert(stats.SlaveConns, Equals, 2)
392	c.Assert(stats.SocketsInUse, Equals, 0)
393}
394
395func (s *S) TestModeEventualAfterStrong(c *C) {
396	// Test that a strong session shifting to an eventual
397	// one preserves the socket untouched.
398
399	session, err := mgo.Dial("localhost:40012")
400	c.Assert(err, IsNil)
401	defer session.Close()
402
403	// Insert something to force a connection to the master.
404	coll := session.DB("mydb").C("mycoll")
405	err = coll.Insert(M{"a": 1})
406	c.Assert(err, IsNil)
407
408	session.SetMode(mgo.Eventual, false)
409
410	// Wait since the sync also uses sockets.
411	for len(session.LiveServers()) != 3 {
412		c.Log("Waiting for cluster sync to finish...")
413		time.Sleep(5e8)
414	}
415
416	// Master socket should still be reserved.
417	stats := mgo.GetStats()
418	c.Assert(stats.SocketsInUse, Equals, 1)
419
420	// Confirm it's the master even though it's Eventual by now.
421	result := M{}
422	cmd := session.DB("admin").C("$cmd")
423	err = cmd.Find(M{"ismaster": 1}).One(&result)
424	c.Assert(err, IsNil)
425	c.Assert(result["ismaster"], Equals, true)
426
427	session.SetMode(mgo.Eventual, true)
428
429	stats = mgo.GetStats()
430	c.Assert(stats.SocketsInUse, Equals, 0)
431}
432
433func (s *S) TestModeStrongFallover(c *C) {
434	if *fast {
435		c.Skip("-fast")
436	}
437
438	session, err := mgo.Dial("localhost:40021")
439	c.Assert(err, IsNil)
440	defer session.Close()
441
442	// With strong consistency, this will open a socket to the master.
443	result := &struct{ Host string }{}
444	err = session.Run("serverStatus", result)
445	c.Assert(err, IsNil)
446
447	// Kill the master.
448	host := result.Host
449	s.Stop(host)
450
451	// This must fail, since the connection was broken.
452	err = session.Run("serverStatus", result)
453	c.Assert(err, Equals, io.EOF)
454
455	// With strong consistency, it fails again until reset.
456	err = session.Run("serverStatus", result)
457	c.Assert(err, Equals, io.EOF)
458
459	session.Refresh()
460
461	// Now we should be able to talk to the new master.
462	// Increase the timeout since this may take quite a while.
463	session.SetSyncTimeout(3 * time.Minute)
464
465	err = session.Run("serverStatus", result)
466	c.Assert(err, IsNil)
467	c.Assert(result.Host, Not(Equals), host)
468
469	// Insert some data to confirm it's indeed a master.
470	err = session.DB("mydb").C("mycoll").Insert(M{"n": 42})
471	c.Assert(err, IsNil)
472}
473
474func (s *S) TestModePrimaryHiccup(c *C) {
475	if *fast {
476		c.Skip("-fast")
477	}
478
479	session, err := mgo.Dial("localhost:40021")
480	c.Assert(err, IsNil)
481	defer session.Close()
482
483	// With strong consistency, this will open a socket to the master.
484	result := &struct{ Host string }{}
485	err = session.Run("serverStatus", result)
486	c.Assert(err, IsNil)
487
488	// Establish a few extra sessions to create spare sockets to
489	// the master. This increases a bit the chances of getting an
490	// incorrect cached socket.
491	var sessions []*mgo.Session
492	for i := 0; i < 20; i++ {
493		sessions = append(sessions, session.Copy())
494		err = sessions[len(sessions)-1].Run("serverStatus", result)
495		c.Assert(err, IsNil)
496	}
497	for i := range sessions {
498		sessions[i].Close()
499	}
500
501	// Kill the master, but bring it back immediatelly.
502	host := result.Host
503	s.Stop(host)
504	s.StartAll()
505
506	// This must fail, since the connection was broken.
507	err = session.Run("serverStatus", result)
508	c.Assert(err, Equals, io.EOF)
509
510	// With strong consistency, it fails again until reset.
511	err = session.Run("serverStatus", result)
512	c.Assert(err, Equals, io.EOF)
513
514	session.Refresh()
515
516	// Now we should be able to talk to the new master.
517	// Increase the timeout since this may take quite a while.
518	session.SetSyncTimeout(3 * time.Minute)
519
520	// Insert some data to confirm it's indeed a master.
521	err = session.DB("mydb").C("mycoll").Insert(M{"n": 42})
522	c.Assert(err, IsNil)
523}
524
525func (s *S) TestModeMonotonicFallover(c *C) {
526	if *fast {
527		c.Skip("-fast")
528	}
529
530	session, err := mgo.Dial("localhost:40021")
531	c.Assert(err, IsNil)
532	defer session.Close()
533
534	session.SetMode(mgo.Monotonic, true)
535
536	// Insert something to force a switch to the master.
537	coll := session.DB("mydb").C("mycoll")
538	err = coll.Insert(M{"a": 1})
539	c.Assert(err, IsNil)
540
541	// Wait a bit for this to be synchronized to slaves.
542	time.Sleep(3 * time.Second)
543
544	result := &struct{ Host string }{}
545	err = session.Run("serverStatus", result)
546	c.Assert(err, IsNil)
547
548	// Kill the master.
549	host := result.Host
550	s.Stop(host)
551
552	// This must fail, since the connection was broken.
553	err = session.Run("serverStatus", result)
554	c.Assert(err, Equals, io.EOF)
555
556	// With monotonic consistency, it fails again until reset.
557	err = session.Run("serverStatus", result)
558	c.Assert(err, Equals, io.EOF)
559
560	session.Refresh()
561
562	// Now we should be able to talk to the new master.
563	err = session.Run("serverStatus", result)
564	c.Assert(err, IsNil)
565	c.Assert(result.Host, Not(Equals), host)
566}
567
568func (s *S) TestModeMonotonicWithSlaveFallover(c *C) {
569	if *fast {
570		c.Skip("-fast")
571	}
572
573	session, err := mgo.Dial("localhost:40021")
574	c.Assert(err, IsNil)
575	defer session.Close()
576
577	ssresult := &struct{ Host string }{}
578	imresult := &struct{ IsMaster bool }{}
579
580	// Figure the master while still using the strong session.
581	err = session.Run("serverStatus", ssresult)
582	c.Assert(err, IsNil)
583	err = session.Run("isMaster", imresult)
584	c.Assert(err, IsNil)
585	master := ssresult.Host
586	c.Assert(imresult.IsMaster, Equals, true, Commentf("%s is not the master", master))
587
588	// Create new monotonic session with an explicit address to ensure
589	// a slave is synchronized before the master, otherwise a connection
590	// with the master may be used below for lack of other options.
591	var addr string
592	switch {
593	case strings.HasSuffix(ssresult.Host, ":40021"):
594		addr = "localhost:40022"
595	case strings.HasSuffix(ssresult.Host, ":40022"):
596		addr = "localhost:40021"
597	case strings.HasSuffix(ssresult.Host, ":40023"):
598		addr = "localhost:40021"
599	default:
600		c.Fatal("Unknown host: ", ssresult.Host)
601	}
602
603	session, err = mgo.Dial(addr)
604	c.Assert(err, IsNil)
605	defer session.Close()
606
607	session.SetMode(mgo.Monotonic, true)
608
609	// Check the address of the socket associated with the monotonic session.
610	c.Log("Running serverStatus and isMaster with monotonic session")
611	err = session.Run("serverStatus", ssresult)
612	c.Assert(err, IsNil)
613	err = session.Run("isMaster", imresult)
614	c.Assert(err, IsNil)
615	slave := ssresult.Host
616	c.Assert(imresult.IsMaster, Equals, false, Commentf("%s is not a slave", slave))
617
618	c.Assert(master, Not(Equals), slave)
619
620	// Kill the master.
621	s.Stop(master)
622
623	// Session must still be good, since we were talking to a slave.
624	err = session.Run("serverStatus", ssresult)
625	c.Assert(err, IsNil)
626
627	c.Assert(ssresult.Host, Equals, slave,
628		Commentf("Monotonic session moved from %s to %s", slave, ssresult.Host))
629
630	// If we try to insert something, it'll have to hold until the new
631	// master is available to move the connection, and work correctly.
632	coll := session.DB("mydb").C("mycoll")
633	err = coll.Insert(M{"a": 1})
634	c.Assert(err, IsNil)
635
636	// Must now be talking to the new master.
637	err = session.Run("serverStatus", ssresult)
638	c.Assert(err, IsNil)
639	err = session.Run("isMaster", imresult)
640	c.Assert(err, IsNil)
641	c.Assert(imresult.IsMaster, Equals, true, Commentf("%s is not the master", master))
642
643	// ... which is not the old one, since it's still dead.
644	c.Assert(ssresult.Host, Not(Equals), master)
645}
646
647func (s *S) TestModeEventualFallover(c *C) {
648	if *fast {
649		c.Skip("-fast")
650	}
651
652	session, err := mgo.Dial("localhost:40021")
653	c.Assert(err, IsNil)
654	defer session.Close()
655
656	result := &struct{ Host string }{}
657	err = session.Run("serverStatus", result)
658	c.Assert(err, IsNil)
659	master := result.Host
660
661	session.SetMode(mgo.Eventual, true)
662
663	// Should connect to the master when needed.
664	coll := session.DB("mydb").C("mycoll")
665	err = coll.Insert(M{"a": 1})
666	c.Assert(err, IsNil)
667
668	// Wait a bit for this to be synchronized to slaves.
669	time.Sleep(3 * time.Second)
670
671	// Kill the master.
672	s.Stop(master)
673
674	// Should still work, with the new master now.
675	coll = session.DB("mydb").C("mycoll")
676	err = coll.Insert(M{"a": 1})
677	c.Assert(err, IsNil)
678
679	err = session.Run("serverStatus", result)
680	c.Assert(err, IsNil)
681	c.Assert(result.Host, Not(Equals), master)
682}
683
684func (s *S) TestModeSecondaryJustPrimary(c *C) {
685	if *fast {
686		c.Skip("-fast")
687	}
688
689	session, err := mgo.Dial("localhost:40001")
690	c.Assert(err, IsNil)
691	defer session.Close()
692
693	session.SetMode(mgo.Secondary, true)
694
695	err = session.Ping()
696	c.Assert(err, ErrorMatches, "no reachable servers")
697}
698
699func (s *S) TestModeSecondaryPreferredJustPrimary(c *C) {
700	if *fast {
701		c.Skip("-fast")
702	}
703
704	session, err := mgo.Dial("localhost:40001")
705	c.Assert(err, IsNil)
706	defer session.Close()
707
708	session.SetMode(mgo.SecondaryPreferred, true)
709
710	result := &struct{ Host string }{}
711	err = session.Run("serverStatus", result)
712	c.Assert(err, IsNil)
713}
714
715func (s *S) TestModeSecondaryPreferredFallover(c *C) {
716	if *fast {
717		c.Skip("-fast")
718	}
719
720	session, err := mgo.Dial("localhost:40011")
721	c.Assert(err, IsNil)
722	defer session.Close()
723
724	// Ensure secondaries are available for being picked up.
725	for len(session.LiveServers()) != 3 {
726		c.Log("Waiting for cluster sync to finish...")
727		time.Sleep(5e8)
728	}
729
730	session.SetMode(mgo.SecondaryPreferred, true)
731
732	result := &struct{ Host string }{}
733	err = session.Run("serverStatus", result)
734	c.Assert(err, IsNil)
735	c.Assert(supvName(result.Host), Not(Equals), "rs1a")
736	secondary := result.Host
737
738	// Should connect to the primary when needed.
739	coll := session.DB("mydb").C("mycoll")
740	err = coll.Insert(M{"a": 1})
741	c.Assert(err, IsNil)
742
743	// Wait a bit for this to be synchronized to slaves.
744	time.Sleep(3 * time.Second)
745
746	// Kill the primary.
747	s.Stop("localhost:40011")
748
749	// It can still talk to the selected secondary.
750	err = session.Run("serverStatus", result)
751	c.Assert(err, IsNil)
752	c.Assert(result.Host, Equals, secondary)
753
754	// But cannot speak to the primary until reset.
755	coll = session.DB("mydb").C("mycoll")
756	err = coll.Insert(M{"a": 1})
757	c.Assert(err, Equals, io.EOF)
758
759	session.Refresh()
760
761	// Can still talk to a secondary.
762	err = session.Run("serverStatus", result)
763	c.Assert(err, IsNil)
764	c.Assert(supvName(result.Host), Not(Equals), "rs1a")
765
766	s.StartAll()
767
768	// Should now be able to talk to the primary again.
769	coll = session.DB("mydb").C("mycoll")
770	err = coll.Insert(M{"a": 1})
771	c.Assert(err, IsNil)
772}
773
774func (s *S) TestModePrimaryPreferredFallover(c *C) {
775	if *fast {
776		c.Skip("-fast")
777	}
778
779	session, err := mgo.Dial("localhost:40011")
780	c.Assert(err, IsNil)
781	defer session.Close()
782
783	session.SetMode(mgo.PrimaryPreferred, true)
784
785	result := &struct{ Host string }{}
786	err = session.Run("serverStatus", result)
787	c.Assert(err, IsNil)
788	c.Assert(supvName(result.Host), Equals, "rs1a")
789
790	// Kill the primary.
791	s.Stop("localhost:40011")
792
793	// Should now fail as there was a primary socket in use already.
794	err = session.Run("serverStatus", result)
795	c.Assert(err, Equals, io.EOF)
796
797	// Refresh so the reserved primary socket goes away.
798	session.Refresh()
799
800	// Should be able to talk to the secondary.
801	err = session.Run("serverStatus", result)
802	c.Assert(err, IsNil)
803
804	s.StartAll()
805
806	// Should wait for the new primary to become available.
807	coll := session.DB("mydb").C("mycoll")
808	err = coll.Insert(M{"a": 1})
809	c.Assert(err, IsNil)
810
811	// And should use the new primary in general, as it is preferred.
812	err = session.Run("serverStatus", result)
813	c.Assert(err, IsNil)
814	c.Assert(supvName(result.Host), Equals, "rs1a")
815}
816
817func (s *S) TestModePrimaryFallover(c *C) {
818	if *fast {
819		c.Skip("-fast")
820	}
821
822	session, err := mgo.Dial("localhost:40011")
823	c.Assert(err, IsNil)
824	defer session.Close()
825
826	session.SetSyncTimeout(3 * time.Second)
827
828	session.SetMode(mgo.Primary, true)
829
830	result := &struct{ Host string }{}
831	err = session.Run("serverStatus", result)
832	c.Assert(err, IsNil)
833	c.Assert(supvName(result.Host), Equals, "rs1a")
834
835	// Kill the primary.
836	s.Stop("localhost:40011")
837
838	session.Refresh()
839
840	err = session.Ping()
841	c.Assert(err, ErrorMatches, "no reachable servers")
842}
843
844func (s *S) TestModeSecondary(c *C) {
845	if *fast {
846		c.Skip("-fast")
847	}
848
849	session, err := mgo.Dial("localhost:40011")
850	c.Assert(err, IsNil)
851	defer session.Close()
852
853	session.SetMode(mgo.Secondary, true)
854
855	result := &struct{ Host string }{}
856	err = session.Run("serverStatus", result)
857	c.Assert(err, IsNil)
858	c.Assert(supvName(result.Host), Not(Equals), "rs1a")
859	secondary := result.Host
860
861	coll := session.DB("mydb").C("mycoll")
862	err = coll.Insert(M{"a": 1})
863	c.Assert(err, IsNil)
864
865	err = session.Run("serverStatus", result)
866	c.Assert(err, IsNil)
867	c.Assert(result.Host, Equals, secondary)
868}
869
870func (s *S) TestPreserveSocketCountOnSync(c *C) {
871	if *fast {
872		c.Skip("-fast")
873	}
874
875	session, err := mgo.Dial("localhost:40011")
876	c.Assert(err, IsNil)
877	defer session.Close()
878
879	stats := mgo.GetStats()
880	for stats.SocketsAlive != 3 {
881		c.Logf("Waiting for all connections to be established (sockets alive currently %d)...", stats.SocketsAlive)
882		stats = mgo.GetStats()
883		time.Sleep(5e8)
884	}
885
886	c.Assert(stats.SocketsAlive, Equals, 3)
887
888	// Kill the master (with rs1, 'a' is always the master).
889	s.Stop("localhost:40011")
890
891	// Wait for the logic to run for a bit and bring it back.
892	startedAll := make(chan bool)
893	go func() {
894		time.Sleep(5e9)
895		s.StartAll()
896		startedAll <- true
897	}()
898
899	// Do not allow the test to return before the goroutine above is done.
900	defer func() {
901		<-startedAll
902	}()
903
904	// Do an action to kick the resync logic in, and also to
905	// wait until the cluster recognizes the server is back.
906	result := struct{ Ok bool }{}
907	err = session.Run("getLastError", &result)
908	c.Assert(err, IsNil)
909	c.Assert(result.Ok, Equals, true)
910
911	for i := 0; i != 20; i++ {
912		stats = mgo.GetStats()
913		if stats.SocketsAlive == 3 {
914			break
915		}
916		c.Logf("Waiting for 3 sockets alive, have %d", stats.SocketsAlive)
917		time.Sleep(5e8)
918	}
919
920	// Ensure the number of sockets is preserved after syncing.
921	stats = mgo.GetStats()
922	c.Assert(stats.SocketsAlive, Equals, 3)
923	c.Assert(stats.SocketsInUse, Equals, 1)
924	c.Assert(stats.SocketRefs, Equals, 1)
925}
926
927// Connect to the master of a deployment with a single server,
928// run an insert, and then ensure the insert worked and that a
929// single connection was established.
930func (s *S) TestTopologySyncWithSingleMaster(c *C) {
931	// Use hostname here rather than IP, to make things trickier.
932	session, err := mgo.Dial("localhost:40001")
933	c.Assert(err, IsNil)
934	defer session.Close()
935
936	coll := session.DB("mydb").C("mycoll")
937	err = coll.Insert(M{"a": 1, "b": 2})
938	c.Assert(err, IsNil)
939
940	// One connection used for discovery. Master socket recycled for
941	// insert. Socket is reserved after insert.
942	stats := mgo.GetStats()
943	c.Assert(stats.MasterConns, Equals, 1)
944	c.Assert(stats.SlaveConns, Equals, 0)
945	c.Assert(stats.SocketsInUse, Equals, 1)
946
947	// Refresh session and socket must be released.
948	session.Refresh()
949	stats = mgo.GetStats()
950	c.Assert(stats.SocketsInUse, Equals, 0)
951}
952
953func (s *S) TestTopologySyncWithSlaveSeed(c *C) {
954	// That's supposed to be a slave. Must run discovery
955	// and find out master to insert successfully.
956	session, err := mgo.Dial("localhost:40012")
957	c.Assert(err, IsNil)
958	defer session.Close()
959
960	coll := session.DB("mydb").C("mycoll")
961	coll.Insert(M{"a": 1, "b": 2})
962
963	result := struct{ Ok bool }{}
964	err = session.Run("getLastError", &result)
965	c.Assert(err, IsNil)
966	c.Assert(result.Ok, Equals, true)
967
968	// One connection to each during discovery. Master
969	// socket recycled for insert.
970	stats := mgo.GetStats()
971	c.Assert(stats.MasterConns, Equals, 1)
972	c.Assert(stats.SlaveConns, Equals, 2)
973
974	// Only one socket reference alive, in the master socket owned
975	// by the above session.
976	c.Assert(stats.SocketsInUse, Equals, 1)
977
978	// Refresh it, and it must be gone.
979	session.Refresh()
980	stats = mgo.GetStats()
981	c.Assert(stats.SocketsInUse, Equals, 0)
982}
983
984func (s *S) TestSyncTimeout(c *C) {
985	if *fast {
986		c.Skip("-fast")
987	}
988
989	session, err := mgo.Dial("localhost:40001")
990	c.Assert(err, IsNil)
991	defer session.Close()
992
993	s.Stop("localhost:40001")
994
995	timeout := 3 * time.Second
996	session.SetSyncTimeout(timeout)
997	started := time.Now()
998
999	// Do something.
1000	result := struct{ Ok bool }{}
1001	err = session.Run("getLastError", &result)
1002	c.Assert(err, ErrorMatches, "no reachable servers")
1003	c.Assert(started.Before(time.Now().Add(-timeout)), Equals, true)
1004	c.Assert(started.After(time.Now().Add(-timeout*2)), Equals, true)
1005}
1006
1007func (s *S) TestDialWithTimeout(c *C) {
1008	if *fast {
1009		c.Skip("-fast")
1010	}
1011
1012	timeout := 2 * time.Second
1013	started := time.Now()
1014
1015	// 40009 isn't used by the test servers.
1016	session, err := mgo.DialWithTimeout("localhost:40009", timeout)
1017	if session != nil {
1018		session.Close()
1019	}
1020	c.Assert(err, ErrorMatches, "no reachable servers")
1021	c.Assert(session, IsNil)
1022	c.Assert(started.Before(time.Now().Add(-timeout)), Equals, true)
1023	c.Assert(started.After(time.Now().Add(-timeout*2)), Equals, true)
1024}
1025
1026func (s *S) TestSocketTimeout(c *C) {
1027	if *fast {
1028		c.Skip("-fast")
1029	}
1030
1031	session, err := mgo.Dial("localhost:40001")
1032	c.Assert(err, IsNil)
1033	defer session.Close()
1034
1035	s.Freeze("localhost:40001")
1036
1037	timeout := 3 * time.Second
1038	session.SetSocketTimeout(timeout)
1039	started := time.Now()
1040
1041	// Do something.
1042	result := struct{ Ok bool }{}
1043	err = session.Run("getLastError", &result)
1044	c.Assert(err, ErrorMatches, ".*: i/o timeout")
1045	c.Assert(started.Before(time.Now().Add(-timeout)), Equals, true)
1046	c.Assert(started.After(time.Now().Add(-timeout*2)), Equals, true)
1047}
1048
1049func (s *S) TestSocketTimeoutOnDial(c *C) {
1050	if *fast {
1051		c.Skip("-fast")
1052	}
1053
1054	timeout := 1 * time.Second
1055
1056	defer mgo.HackSyncSocketTimeout(timeout)()
1057
1058	s.Freeze("localhost:40001")
1059
1060	started := time.Now()
1061
1062	session, err := mgo.DialWithTimeout("localhost:40001", timeout)
1063	c.Assert(err, ErrorMatches, "no reachable servers")
1064	c.Assert(session, IsNil)
1065
1066	c.Assert(started.Before(time.Now().Add(-timeout)), Equals, true)
1067	c.Assert(started.After(time.Now().Add(-20*time.Second)), Equals, true)
1068}
1069
1070func (s *S) TestSocketTimeoutOnInactiveSocket(c *C) {
1071	if *fast {
1072		c.Skip("-fast")
1073	}
1074
1075	session, err := mgo.Dial("localhost:40001")
1076	c.Assert(err, IsNil)
1077	defer session.Close()
1078
1079	timeout := 2 * time.Second
1080	session.SetSocketTimeout(timeout)
1081
1082	// Do something that relies on the timeout and works.
1083	c.Assert(session.Ping(), IsNil)
1084
1085	// Freeze and wait for the timeout to go by.
1086	s.Freeze("localhost:40001")
1087	time.Sleep(timeout + 500*time.Millisecond)
1088	s.Thaw("localhost:40001")
1089
1090	// Do something again. The timeout above should not have killed
1091	// the socket as there was nothing to be done.
1092	c.Assert(session.Ping(), IsNil)
1093}
1094
1095func (s *S) TestDialWithReplicaSetName(c *C) {
1096	seedLists := [][]string{
1097		// rs1 primary and rs2 primary
1098		[]string{"localhost:40011", "localhost:40021"},
1099		// rs1 primary and rs2 secondary
1100		[]string{"localhost:40011", "localhost:40022"},
1101		// rs1 secondary and rs2 primary
1102		[]string{"localhost:40012", "localhost:40021"},
1103		// rs1 secondary and rs2 secondary
1104		[]string{"localhost:40012", "localhost:40022"},
1105	}
1106
1107	rs2Members := []string{":40021", ":40022", ":40023"}
1108
1109	verifySyncedServers := func(session *mgo.Session, numServers int) {
1110		// wait for the server(s) to be synced
1111		for len(session.LiveServers()) != numServers {
1112			c.Log("Waiting for cluster sync to finish...")
1113			time.Sleep(5e8)
1114		}
1115
1116		// ensure none of the rs2 set members are communicated with
1117		for _, addr := range session.LiveServers() {
1118			for _, rs2Member := range rs2Members {
1119				c.Assert(strings.HasSuffix(addr, rs2Member), Equals, false)
1120			}
1121		}
1122	}
1123
1124	// only communication with rs1 members is expected
1125	for _, seedList := range seedLists {
1126		info := mgo.DialInfo{
1127			Addrs:          seedList,
1128			Timeout:        5 * time.Second,
1129			ReplicaSetName: "rs1",
1130		}
1131
1132		session, err := mgo.DialWithInfo(&info)
1133		c.Assert(err, IsNil)
1134		verifySyncedServers(session, 3)
1135		session.Close()
1136
1137		info.Direct = true
1138		session, err = mgo.DialWithInfo(&info)
1139		c.Assert(err, IsNil)
1140		verifySyncedServers(session, 1)
1141		session.Close()
1142
1143		connectionUrl := fmt.Sprintf("mongodb://%v/?replicaSet=rs1", strings.Join(seedList, ","))
1144		session, err = mgo.Dial(connectionUrl)
1145		c.Assert(err, IsNil)
1146		verifySyncedServers(session, 3)
1147		session.Close()
1148
1149		connectionUrl += "&connect=direct"
1150		session, err = mgo.Dial(connectionUrl)
1151		c.Assert(err, IsNil)
1152		verifySyncedServers(session, 1)
1153		session.Close()
1154	}
1155
1156}
1157
1158func (s *S) TestDirect(c *C) {
1159	session, err := mgo.Dial("localhost:40012?connect=direct")
1160	c.Assert(err, IsNil)
1161	defer session.Close()
1162
1163	// We know that server is a slave.
1164	session.SetMode(mgo.Monotonic, true)
1165
1166	result := &struct{ Host string }{}
1167	err = session.Run("serverStatus", result)
1168	c.Assert(err, IsNil)
1169	c.Assert(strings.HasSuffix(result.Host, ":40012"), Equals, true)
1170
1171	stats := mgo.GetStats()
1172	c.Assert(stats.SocketsAlive, Equals, 1)
1173	c.Assert(stats.SocketsInUse, Equals, 1)
1174	c.Assert(stats.SocketRefs, Equals, 1)
1175
1176	// We've got no master, so it'll timeout.
1177	session.SetSyncTimeout(5e8 * time.Nanosecond)
1178
1179	coll := session.DB("mydb").C("mycoll")
1180	err = coll.Insert(M{"test": 1})
1181	c.Assert(err, ErrorMatches, "no reachable servers")
1182
1183	// Writing to the local database is okay.
1184	coll = session.DB("local").C("mycoll")
1185	defer coll.RemoveAll(nil)
1186	id := bson.NewObjectId()
1187	err = coll.Insert(M{"_id": id})
1188	c.Assert(err, IsNil)
1189
1190	// Data was stored in the right server.
1191	n, err := coll.Find(M{"_id": id}).Count()
1192	c.Assert(err, IsNil)
1193	c.Assert(n, Equals, 1)
1194
1195	// Server hasn't changed.
1196	result.Host = ""
1197	err = session.Run("serverStatus", result)
1198	c.Assert(err, IsNil)
1199	c.Assert(strings.HasSuffix(result.Host, ":40012"), Equals, true)
1200}
1201
1202func (s *S) TestDirectToUnknownStateMember(c *C) {
1203	session, err := mgo.Dial("localhost:40041?connect=direct")
1204	c.Assert(err, IsNil)
1205	defer session.Close()
1206
1207	session.SetMode(mgo.Monotonic, true)
1208
1209	result := &struct{ Host string }{}
1210	err = session.Run("serverStatus", result)
1211	c.Assert(err, IsNil)
1212	c.Assert(strings.HasSuffix(result.Host, ":40041"), Equals, true)
1213
1214	// We've got no master, so it'll timeout.
1215	session.SetSyncTimeout(5e8 * time.Nanosecond)
1216
1217	coll := session.DB("mydb").C("mycoll")
1218	err = coll.Insert(M{"test": 1})
1219	c.Assert(err, ErrorMatches, "no reachable servers")
1220
1221	// Slave is still reachable.
1222	result.Host = ""
1223	err = session.Run("serverStatus", result)
1224	c.Assert(err, IsNil)
1225	c.Assert(strings.HasSuffix(result.Host, ":40041"), Equals, true)
1226}
1227
1228func (s *S) TestFailFast(c *C) {
1229	info := mgo.DialInfo{
1230		Addrs:    []string{"localhost:99999"},
1231		Timeout:  5 * time.Second,
1232		FailFast: true,
1233	}
1234
1235	started := time.Now()
1236
1237	_, err := mgo.DialWithInfo(&info)
1238	c.Assert(err, ErrorMatches, "no reachable servers")
1239
1240	c.Assert(started.After(time.Now().Add(-time.Second)), Equals, true)
1241}
1242
1243func (s *S) countQueries(c *C, server string) (n int) {
1244	defer func() { c.Logf("Queries for %q: %d", server, n) }()
1245	session, err := mgo.Dial(server + "?connect=direct")
1246	c.Assert(err, IsNil)
1247	defer session.Close()
1248	session.SetMode(mgo.Monotonic, true)
1249	var result struct {
1250		OpCounters struct {
1251			Query int
1252		}
1253		Metrics struct {
1254			Commands struct{ Find struct{ Total int } }
1255		}
1256	}
1257	err = session.Run("serverStatus", &result)
1258	c.Assert(err, IsNil)
1259	if s.versionAtLeast(3, 2) {
1260		return result.Metrics.Commands.Find.Total
1261	}
1262	return result.OpCounters.Query
1263}
1264
1265func (s *S) countCommands(c *C, server, commandName string) (n int) {
1266	defer func() { c.Logf("Queries for %q: %d", server, n) }()
1267	session, err := mgo.Dial(server + "?connect=direct")
1268	c.Assert(err, IsNil)
1269	defer session.Close()
1270	session.SetMode(mgo.Monotonic, true)
1271	var result struct {
1272		Metrics struct {
1273			Commands map[string]struct{ Total int }
1274		}
1275	}
1276	err = session.Run("serverStatus", &result)
1277	c.Assert(err, IsNil)
1278	return result.Metrics.Commands[commandName].Total
1279}
1280
1281func (s *S) TestMonotonicSlaveOkFlagWithMongos(c *C) {
1282	session, err := mgo.Dial("localhost:40021")
1283	c.Assert(err, IsNil)
1284	defer session.Close()
1285
1286	ssresult := &struct{ Host string }{}
1287	imresult := &struct{ IsMaster bool }{}
1288
1289	// Figure the master while still using the strong session.
1290	err = session.Run("serverStatus", ssresult)
1291	c.Assert(err, IsNil)
1292	err = session.Run("isMaster", imresult)
1293	c.Assert(err, IsNil)
1294	master := ssresult.Host
1295	c.Assert(imresult.IsMaster, Equals, true, Commentf("%s is not the master", master))
1296
1297	// Ensure mongos is aware about the current topology.
1298	s.Stop(":40201")
1299	s.StartAll()
1300
1301	mongos, err := mgo.Dial("localhost:40202")
1302	c.Assert(err, IsNil)
1303	defer mongos.Close()
1304
1305	// Insert some data as otherwise 3.2+ doesn't seem to run the query at all.
1306	err = mongos.DB("mydb").C("mycoll").Insert(bson.M{"n": 1})
1307	c.Assert(err, IsNil)
1308
1309	// Wait until all servers see the data.
1310	for _, addr := range []string{"localhost:40021", "localhost:40022", "localhost:40023"} {
1311		session, err := mgo.Dial(addr + "?connect=direct")
1312		c.Assert(err, IsNil)
1313		defer session.Close()
1314		session.SetMode(mgo.Monotonic, true)
1315		for i := 300; i >= 0; i-- {
1316			n, err := session.DB("mydb").C("mycoll").Find(nil).Count()
1317			c.Assert(err, IsNil)
1318			if n == 1 {
1319				break
1320			}
1321			if i == 0 {
1322				c.Fatalf("Inserted data never reached " + addr)
1323			}
1324			time.Sleep(100 * time.Millisecond)
1325		}
1326	}
1327
1328	// Collect op counters for everyone.
1329	q21a := s.countQueries(c, "localhost:40021")
1330	q22a := s.countQueries(c, "localhost:40022")
1331	q23a := s.countQueries(c, "localhost:40023")
1332
1333	// Do a SlaveOk query through MongoS
1334
1335	mongos.SetMode(mgo.Monotonic, true)
1336
1337	coll := mongos.DB("mydb").C("mycoll")
1338	var result struct{ N int }
1339	for i := 0; i != 5; i++ {
1340		err = coll.Find(nil).One(&result)
1341		c.Assert(err, IsNil)
1342		c.Assert(result.N, Equals, 1)
1343	}
1344
1345	// Collect op counters for everyone again.
1346	q21b := s.countQueries(c, "localhost:40021")
1347	q22b := s.countQueries(c, "localhost:40022")
1348	q23b := s.countQueries(c, "localhost:40023")
1349
1350	var masterDelta, slaveDelta int
1351	switch hostPort(master) {
1352	case "40021":
1353		masterDelta = q21b - q21a
1354		slaveDelta = (q22b - q22a) + (q23b - q23a)
1355	case "40022":
1356		masterDelta = q22b - q22a
1357		slaveDelta = (q21b - q21a) + (q23b - q23a)
1358	case "40023":
1359		masterDelta = q23b - q23a
1360		slaveDelta = (q21b - q21a) + (q22b - q22a)
1361	default:
1362		c.Fatal("Uh?")
1363	}
1364
1365	c.Check(masterDelta, Equals, 0) // Just the counting itself.
1366	c.Check(slaveDelta, Equals, 5)  // The counting for both, plus 5 queries above.
1367}
1368
1369func (s *S) TestSecondaryModeWithMongos(c *C) {
1370	session, err := mgo.Dial("localhost:40021")
1371	c.Assert(err, IsNil)
1372	defer session.Close()
1373
1374	ssresult := &struct{ Host string }{}
1375	imresult := &struct{ IsMaster bool }{}
1376
1377	// Figure the master while still using the strong session.
1378	err = session.Run("serverStatus", ssresult)
1379	c.Assert(err, IsNil)
1380	err = session.Run("isMaster", imresult)
1381	c.Assert(err, IsNil)
1382	master := ssresult.Host
1383	c.Assert(imresult.IsMaster, Equals, true, Commentf("%s is not the master", master))
1384
1385	// Ensure mongos is aware about the current topology.
1386	s.Stop(":40201")
1387	s.StartAll()
1388
1389	mongos, err := mgo.Dial("localhost:40202")
1390	c.Assert(err, IsNil)
1391	defer mongos.Close()
1392
1393	mongos.SetSyncTimeout(5 * time.Second)
1394
1395	// Insert some data as otherwise 3.2+ doesn't seem to run the query at all.
1396	err = mongos.DB("mydb").C("mycoll").Insert(bson.M{"n": 1})
1397	c.Assert(err, IsNil)
1398
1399	// Wait until all servers see the data.
1400	for _, addr := range []string{"localhost:40021", "localhost:40022", "localhost:40023"} {
1401		session, err := mgo.Dial(addr + "?connect=direct")
1402		c.Assert(err, IsNil)
1403		defer session.Close()
1404		session.SetMode(mgo.Monotonic, true)
1405		for i := 300; i >= 0; i-- {
1406			n, err := session.DB("mydb").C("mycoll").Find(nil).Count()
1407			c.Assert(err, IsNil)
1408			if n == 1 {
1409				break
1410			}
1411			if i == 0 {
1412				c.Fatalf("Inserted data never reached " + addr)
1413			}
1414			time.Sleep(100 * time.Millisecond)
1415		}
1416	}
1417
1418	// Collect op counters for everyone.
1419	q21a := s.countQueries(c, "localhost:40021")
1420	q22a := s.countQueries(c, "localhost:40022")
1421	q23a := s.countQueries(c, "localhost:40023")
1422
1423	// Do a Secondary query through MongoS
1424
1425	mongos.SetMode(mgo.Secondary, true)
1426
1427	coll := mongos.DB("mydb").C("mycoll")
1428	var result struct{ N int }
1429	for i := 0; i != 5; i++ {
1430		err = coll.Find(nil).One(&result)
1431		c.Assert(err, IsNil)
1432		c.Assert(result.N, Equals, 1)
1433	}
1434
1435	// Collect op counters for everyone again.
1436	q21b := s.countQueries(c, "localhost:40021")
1437	q22b := s.countQueries(c, "localhost:40022")
1438	q23b := s.countQueries(c, "localhost:40023")
1439
1440	var masterDelta, slaveDelta int
1441	switch hostPort(master) {
1442	case "40021":
1443		masterDelta = q21b - q21a
1444		slaveDelta = (q22b - q22a) + (q23b - q23a)
1445	case "40022":
1446		masterDelta = q22b - q22a
1447		slaveDelta = (q21b - q21a) + (q23b - q23a)
1448	case "40023":
1449		masterDelta = q23b - q23a
1450		slaveDelta = (q21b - q21a) + (q22b - q22a)
1451	default:
1452		c.Fatal("Uh?")
1453	}
1454
1455	c.Check(masterDelta, Equals, 0) // Just the counting itself.
1456	c.Check(slaveDelta, Equals, 5)  // The counting for both, plus 5 queries above.
1457}
1458
1459func (s *S) TestSecondaryModeWithMongosInsert(c *C) {
1460	if *fast {
1461		c.Skip("-fast")
1462	}
1463
1464	session, err := mgo.Dial("localhost:40202")
1465	c.Assert(err, IsNil)
1466	defer session.Close()
1467
1468	session.SetMode(mgo.Secondary, true)
1469	session.SetSyncTimeout(4 * time.Second)
1470
1471	coll := session.DB("mydb").C("mycoll")
1472	err = coll.Insert(M{"a": 1})
1473	c.Assert(err, IsNil)
1474
1475	var result struct{ A int }
1476	coll.Find(nil).One(&result)
1477	c.Assert(result.A, Equals, 1)
1478}
1479
1480
1481func (s *S) TestRemovalOfClusterMember(c *C) {
1482	if *fast {
1483		c.Skip("-fast")
1484	}
1485
1486	master, err := mgo.Dial("localhost:40021")
1487	c.Assert(err, IsNil)
1488	defer master.Close()
1489
1490	// Wait for cluster to fully sync up.
1491	for i := 0; i < 10; i++ {
1492		if len(master.LiveServers()) == 3 {
1493			break
1494		}
1495		time.Sleep(5e8)
1496	}
1497	if len(master.LiveServers()) != 3 {
1498		c.Fatalf("Test started with bad cluster state: %v", master.LiveServers())
1499	}
1500
1501	result := &struct {
1502		IsMaster bool
1503		Me       string
1504	}{}
1505	slave := master.Copy()
1506	slave.SetMode(mgo.Monotonic, true) // Monotonic can hold a non-master socket persistently.
1507	err = slave.Run("isMaster", result)
1508	c.Assert(err, IsNil)
1509	c.Assert(result.IsMaster, Equals, false)
1510	slaveAddr := result.Me
1511
1512	defer func() {
1513		config := map[string]string{
1514			"40021": `{_id: 1, host: "127.0.0.1:40021", priority: 1, tags: {rs2: "a"}}`,
1515			"40022": `{_id: 2, host: "127.0.0.1:40022", priority: 0, tags: {rs2: "b"}}`,
1516			"40023": `{_id: 3, host: "127.0.0.1:40023", priority: 0, tags: {rs2: "c"}}`,
1517		}
1518		master.Refresh()
1519		master.Run(bson.D{{"$eval", `rs.add(` + config[hostPort(slaveAddr)] + `)`}}, nil)
1520		master.Close()
1521		slave.Close()
1522
1523		// Ensure suite syncs up with the changes before next test.
1524		s.Stop(":40201")
1525		s.StartAll()
1526		time.Sleep(8 * time.Second)
1527		// TODO Find a better way to find out when mongos is fully aware that all
1528		// servers are up. Without that follow up tests that depend on mongos will
1529		// break due to their expectation of things being in a working state.
1530	}()
1531
1532	c.Logf("========== Removing slave: %s ==========", slaveAddr)
1533
1534	master.Run(bson.D{{"$eval", `rs.remove("` + slaveAddr + `")`}}, nil)
1535
1536	master.Refresh()
1537
1538	// Give the cluster a moment to catch up by doing a roundtrip to the master.
1539	err = master.Ping()
1540	c.Assert(err, IsNil)
1541
1542	time.Sleep(3e9)
1543
1544	// This must fail since the slave has been taken off the cluster.
1545	err = slave.Ping()
1546	c.Assert(err, NotNil)
1547
1548	for i := 0; i < 15; i++ {
1549		if len(master.LiveServers()) == 2 {
1550			break
1551		}
1552		time.Sleep(time.Second)
1553	}
1554	live := master.LiveServers()
1555	if len(live) != 2 {
1556		c.Errorf("Removed server still considered live: %#s", live)
1557	}
1558
1559	c.Log("========== Test succeeded. ==========")
1560}
1561
1562func (s *S) TestPoolLimitSimple(c *C) {
1563	for test := 0; test < 2; test++ {
1564		var session *mgo.Session
1565		var err error
1566		if test == 0 {
1567			session, err = mgo.Dial("localhost:40001")
1568			c.Assert(err, IsNil)
1569			session.SetPoolLimit(1)
1570		} else {
1571			session, err = mgo.Dial("localhost:40001?maxPoolSize=1")
1572			c.Assert(err, IsNil)
1573		}
1574		defer session.Close()
1575
1576		// Put one socket in use.
1577		c.Assert(session.Ping(), IsNil)
1578
1579		done := make(chan time.Duration)
1580
1581		// Now block trying to get another one due to the pool limit.
1582		go func() {
1583			copy := session.Copy()
1584			defer copy.Close()
1585			started := time.Now()
1586			c.Check(copy.Ping(), IsNil)
1587			done <- time.Now().Sub(started)
1588		}()
1589
1590		time.Sleep(300 * time.Millisecond)
1591
1592		// Put the one socket back in the pool, freeing it for the copy.
1593		session.Refresh()
1594		delay := <-done
1595		c.Assert(delay > 300*time.Millisecond, Equals, true, Commentf("Delay: %s", delay))
1596	}
1597}
1598
1599func (s *S) TestPoolLimitMany(c *C) {
1600	if *fast {
1601		c.Skip("-fast")
1602	}
1603
1604	session, err := mgo.Dial("localhost:40011")
1605	c.Assert(err, IsNil)
1606	defer session.Close()
1607
1608	stats := mgo.GetStats()
1609	for stats.SocketsAlive != 3 {
1610		c.Logf("Waiting for all connections to be established (sockets alive currently %d)...", stats.SocketsAlive)
1611		stats = mgo.GetStats()
1612		time.Sleep(5e8)
1613	}
1614
1615	const poolLimit = 64
1616	session.SetPoolLimit(poolLimit)
1617
1618	// Consume the whole limit for the master.
1619	var master []*mgo.Session
1620	for i := 0; i < poolLimit; i++ {
1621		s := session.Copy()
1622		defer s.Close()
1623		c.Assert(s.Ping(), IsNil)
1624		master = append(master, s)
1625	}
1626
1627	before := time.Now()
1628	go func() {
1629		time.Sleep(3e9)
1630		master[0].Refresh()
1631	}()
1632
1633	// Then, a single ping must block, since it would need another
1634	// connection to the master, over the limit. Once the goroutine
1635	// above releases its socket, it should move on.
1636	session.Ping()
1637	delay := time.Now().Sub(before)
1638	c.Assert(delay > 3e9, Equals, true)
1639	c.Assert(delay < 6e9, Equals, true)
1640}
1641
1642func (s *S) TestSetModeEventualIterBug(c *C) {
1643	session1, err := mgo.Dial("localhost:40011")
1644	c.Assert(err, IsNil)
1645	defer session1.Close()
1646
1647	session1.SetMode(mgo.Eventual, false)
1648
1649	coll1 := session1.DB("mydb").C("mycoll")
1650
1651	const N = 100
1652	for i := 0; i < N; i++ {
1653		err = coll1.Insert(M{"_id": i})
1654		c.Assert(err, IsNil)
1655	}
1656
1657	c.Logf("Waiting until secondary syncs")
1658	for {
1659		n, err := coll1.Count()
1660		c.Assert(err, IsNil)
1661		if n == N {
1662			c.Logf("Found all")
1663			break
1664		}
1665	}
1666
1667	session2, err := mgo.Dial("localhost:40011")
1668	c.Assert(err, IsNil)
1669	defer session2.Close()
1670
1671	session2.SetMode(mgo.Eventual, false)
1672
1673	coll2 := session2.DB("mydb").C("mycoll")
1674
1675	i := 0
1676	iter := coll2.Find(nil).Batch(10).Iter()
1677	var result struct{}
1678	for iter.Next(&result) {
1679		i++
1680	}
1681	c.Assert(iter.Close(), Equals, nil)
1682	c.Assert(i, Equals, N)
1683}
1684
1685func (s *S) TestCustomDialOld(c *C) {
1686	dials := make(chan bool, 16)
1687	dial := func(addr net.Addr) (net.Conn, error) {
1688		tcpaddr, ok := addr.(*net.TCPAddr)
1689		if !ok {
1690			return nil, fmt.Errorf("unexpected address type: %T", addr)
1691		}
1692		dials <- true
1693		return net.DialTCP("tcp", nil, tcpaddr)
1694	}
1695	info := mgo.DialInfo{
1696		Addrs: []string{"localhost:40012"},
1697		Dial:  dial,
1698	}
1699
1700	// Use hostname here rather than IP, to make things trickier.
1701	session, err := mgo.DialWithInfo(&info)
1702	c.Assert(err, IsNil)
1703	defer session.Close()
1704
1705	const N = 3
1706	for i := 0; i < N; i++ {
1707		select {
1708		case <-dials:
1709		case <-time.After(5 * time.Second):
1710			c.Fatalf("expected %d dials, got %d", N, i)
1711		}
1712	}
1713	select {
1714	case <-dials:
1715		c.Fatalf("got more dials than expected")
1716	case <-time.After(100 * time.Millisecond):
1717	}
1718}
1719
1720func (s *S) TestCustomDialNew(c *C) {
1721	dials := make(chan bool, 16)
1722	dial := func(addr *mgo.ServerAddr) (net.Conn, error) {
1723		dials <- true
1724		if addr.TCPAddr().Port == 40012 {
1725			c.Check(addr.String(), Equals, "localhost:40012")
1726		}
1727		return net.DialTCP("tcp", nil, addr.TCPAddr())
1728	}
1729	info := mgo.DialInfo{
1730		Addrs:      []string{"localhost:40012"},
1731		DialServer: dial,
1732	}
1733
1734	// Use hostname here rather than IP, to make things trickier.
1735	session, err := mgo.DialWithInfo(&info)
1736	c.Assert(err, IsNil)
1737	defer session.Close()
1738
1739	const N = 3
1740	for i := 0; i < N; i++ {
1741		select {
1742		case <-dials:
1743		case <-time.After(5 * time.Second):
1744			c.Fatalf("expected %d dials, got %d", N, i)
1745		}
1746	}
1747	select {
1748	case <-dials:
1749		c.Fatalf("got more dials than expected")
1750	case <-time.After(100 * time.Millisecond):
1751	}
1752}
1753
1754func (s *S) TestPrimaryShutdownOnAuthShard(c *C) {
1755	if *fast {
1756		c.Skip("-fast")
1757	}
1758
1759	// Dial the shard.
1760	session, err := mgo.Dial("localhost:40203")
1761	c.Assert(err, IsNil)
1762	defer session.Close()
1763
1764	// Login and insert something to make it more realistic.
1765	session.DB("admin").Login("root", "rapadura")
1766	coll := session.DB("mydb").C("mycoll")
1767	err = coll.Insert(bson.M{"n": 1})
1768	c.Assert(err, IsNil)
1769
1770	// Dial the replica set to figure the master out.
1771	rs, err := mgo.Dial("root:rapadura@localhost:40031")
1772	c.Assert(err, IsNil)
1773	defer rs.Close()
1774
1775	// With strong consistency, this will open a socket to the master.
1776	result := &struct{ Host string }{}
1777	err = rs.Run("serverStatus", result)
1778	c.Assert(err, IsNil)
1779
1780	// Kill the master.
1781	host := result.Host
1782	s.Stop(host)
1783
1784	// This must fail, since the connection was broken.
1785	err = rs.Run("serverStatus", result)
1786	c.Assert(err, Equals, io.EOF)
1787
1788	// This won't work because the master just died.
1789	err = coll.Insert(bson.M{"n": 2})
1790	c.Assert(err, NotNil)
1791
1792	// Refresh session and wait for re-election.
1793	session.Refresh()
1794	for i := 0; i < 60; i++ {
1795		err = coll.Insert(bson.M{"n": 3})
1796		if err == nil {
1797			break
1798		}
1799		c.Logf("Waiting for replica set to elect a new master. Last error: %v", err)
1800		time.Sleep(500 * time.Millisecond)
1801	}
1802	c.Assert(err, IsNil)
1803
1804	count, err := coll.Count()
1805	c.Assert(count > 1, Equals, true)
1806}
1807
1808func (s *S) TestNearestSecondary(c *C) {
1809	defer mgo.HackPingDelay(300 * time.Millisecond)()
1810
1811	rs1a := "127.0.0.1:40011"
1812	rs1b := "127.0.0.1:40012"
1813	rs1c := "127.0.0.1:40013"
1814	s.Freeze(rs1b)
1815
1816	session, err := mgo.Dial(rs1a)
1817	c.Assert(err, IsNil)
1818	defer session.Close()
1819
1820	// Wait for the sync up to run through the first couple of servers.
1821	for len(session.LiveServers()) != 2 {
1822		c.Log("Waiting for two servers to be alive...")
1823		time.Sleep(100 * time.Millisecond)
1824	}
1825
1826	// Extra delay to ensure the third server gets penalized.
1827	time.Sleep(500 * time.Millisecond)
1828
1829	// Release third server.
1830	s.Thaw(rs1b)
1831
1832	// Wait for it to come up.
1833	for len(session.LiveServers()) != 3 {
1834		c.Log("Waiting for all servers to be alive...")
1835		time.Sleep(100 * time.Millisecond)
1836	}
1837
1838	session.SetMode(mgo.Monotonic, true)
1839	var result struct{ Host string }
1840
1841	// See which slave picks the line, several times to avoid chance.
1842	for i := 0; i < 10; i++ {
1843		session.Refresh()
1844		err = session.Run("serverStatus", &result)
1845		c.Assert(err, IsNil)
1846		c.Assert(hostPort(result.Host), Equals, hostPort(rs1c))
1847	}
1848
1849	if *fast {
1850		// Don't hold back for several seconds.
1851		return
1852	}
1853
1854	// Now hold the other server for long enough to penalize it.
1855	s.Freeze(rs1c)
1856	time.Sleep(5 * time.Second)
1857	s.Thaw(rs1c)
1858
1859	// Wait for the ping to be processed.
1860	time.Sleep(500 * time.Millisecond)
1861
1862	// Repeating the test should now pick the former server consistently.
1863	for i := 0; i < 10; i++ {
1864		session.Refresh()
1865		err = session.Run("serverStatus", &result)
1866		c.Assert(err, IsNil)
1867		c.Assert(hostPort(result.Host), Equals, hostPort(rs1b))
1868	}
1869}
1870
1871func (s *S) TestNearestServer(c *C) {
1872	defer mgo.HackPingDelay(300 * time.Millisecond)()
1873
1874	rs1a := "127.0.0.1:40011"
1875	rs1b := "127.0.0.1:40012"
1876	rs1c := "127.0.0.1:40013"
1877
1878	session, err := mgo.Dial(rs1a)
1879	c.Assert(err, IsNil)
1880	defer session.Close()
1881
1882	s.Freeze(rs1a)
1883	s.Freeze(rs1b)
1884
1885	// Extra delay to ensure the first two servers get penalized.
1886	time.Sleep(500 * time.Millisecond)
1887
1888	// Release them.
1889	s.Thaw(rs1a)
1890	s.Thaw(rs1b)
1891
1892	// Wait for everyone to come up.
1893	for len(session.LiveServers()) != 3 {
1894		c.Log("Waiting for all servers to be alive...")
1895		time.Sleep(100 * time.Millisecond)
1896	}
1897
1898	session.SetMode(mgo.Nearest, true)
1899	var result struct{ Host string }
1900
1901	// See which server picks the line, several times to avoid chance.
1902	for i := 0; i < 10; i++ {
1903		session.Refresh()
1904		err = session.Run("serverStatus", &result)
1905		c.Assert(err, IsNil)
1906		c.Assert(hostPort(result.Host), Equals, hostPort(rs1c))
1907	}
1908
1909	if *fast {
1910		// Don't hold back for several seconds.
1911		return
1912	}
1913
1914	// Now hold the two secondaries for long enough to penalize them.
1915	s.Freeze(rs1b)
1916	s.Freeze(rs1c)
1917	time.Sleep(5 * time.Second)
1918	s.Thaw(rs1b)
1919	s.Thaw(rs1c)
1920
1921	// Wait for the ping to be processed.
1922	time.Sleep(500 * time.Millisecond)
1923
1924	// Repeating the test should now pick the primary server consistently.
1925	for i := 0; i < 10; i++ {
1926		session.Refresh()
1927		err = session.Run("serverStatus", &result)
1928		c.Assert(err, IsNil)
1929		c.Assert(hostPort(result.Host), Equals, hostPort(rs1a))
1930	}
1931}
1932
1933func (s *S) TestConnectCloseConcurrency(c *C) {
1934	restore := mgo.HackPingDelay(500 * time.Millisecond)
1935	defer restore()
1936	var wg sync.WaitGroup
1937	const n = 500
1938	wg.Add(n)
1939	for i := 0; i < n; i++ {
1940		go func() {
1941			defer wg.Done()
1942			session, err := mgo.Dial("localhost:40001")
1943			if err != nil {
1944				c.Fatal(err)
1945			}
1946			time.Sleep(1)
1947			session.Close()
1948		}()
1949	}
1950	wg.Wait()
1951}
1952
1953func (s *S) TestSelectServers(c *C) {
1954	if !s.versionAtLeast(2, 2) {
1955		c.Skip("read preferences introduced in 2.2")
1956	}
1957
1958	session, err := mgo.Dial("localhost:40011")
1959	c.Assert(err, IsNil)
1960	defer session.Close()
1961
1962	session.SetMode(mgo.Eventual, true)
1963
1964	var result struct{ Host string }
1965
1966	session.Refresh()
1967	session.SelectServers(bson.D{{"rs1", "b"}})
1968	err = session.Run("serverStatus", &result)
1969	c.Assert(err, IsNil)
1970	c.Assert(hostPort(result.Host), Equals, "40012")
1971
1972	session.Refresh()
1973	session.SelectServers(bson.D{{"rs1", "c"}})
1974	err = session.Run("serverStatus", &result)
1975	c.Assert(err, IsNil)
1976	c.Assert(hostPort(result.Host), Equals, "40013")
1977}
1978
1979func (s *S) TestSelectServersWithMongos(c *C) {
1980	if !s.versionAtLeast(2, 2) {
1981		c.Skip("read preferences introduced in 2.2")
1982	}
1983
1984	session, err := mgo.Dial("localhost:40021")
1985	c.Assert(err, IsNil)
1986	defer session.Close()
1987
1988	ssresult := &struct{ Host string }{}
1989	imresult := &struct{ IsMaster bool }{}
1990
1991	// Figure the master while still using the strong session.
1992	err = session.Run("serverStatus", ssresult)
1993	c.Assert(err, IsNil)
1994	err = session.Run("isMaster", imresult)
1995	c.Assert(err, IsNil)
1996	master := ssresult.Host
1997	c.Assert(imresult.IsMaster, Equals, true, Commentf("%s is not the master", master))
1998
1999	var slave1, slave2 string
2000	switch hostPort(master) {
2001	case "40021":
2002		slave1, slave2 = "b", "c"
2003	case "40022":
2004		slave1, slave2 = "a", "c"
2005	case "40023":
2006		slave1, slave2 = "a", "b"
2007	}
2008
2009	// Collect op counters for everyone.
2010	q21a := s.countQueries(c, "localhost:40021")
2011	q22a := s.countQueries(c, "localhost:40022")
2012	q23a := s.countQueries(c, "localhost:40023")
2013
2014	// Do a SlaveOk query through MongoS
2015	mongos, err := mgo.Dial("localhost:40202")
2016	c.Assert(err, IsNil)
2017	defer mongos.Close()
2018
2019	mongos.SetMode(mgo.Monotonic, true)
2020
2021	mongos.Refresh()
2022	mongos.SelectServers(bson.D{{"rs2", slave1}})
2023	coll := mongos.DB("mydb").C("mycoll")
2024	result := &struct{}{}
2025	for i := 0; i != 5; i++ {
2026		err := coll.Find(nil).One(result)
2027		c.Assert(err, Equals, mgo.ErrNotFound)
2028	}
2029
2030	mongos.Refresh()
2031	mongos.SelectServers(bson.D{{"rs2", slave2}})
2032	coll = mongos.DB("mydb").C("mycoll")
2033	for i := 0; i != 7; i++ {
2034		err := coll.Find(nil).One(result)
2035		c.Assert(err, Equals, mgo.ErrNotFound)
2036	}
2037
2038	// Collect op counters for everyone again.
2039	q21b := s.countQueries(c, "localhost:40021")
2040	q22b := s.countQueries(c, "localhost:40022")
2041	q23b := s.countQueries(c, "localhost:40023")
2042
2043	switch hostPort(master) {
2044	case "40021":
2045		c.Check(q21b-q21a, Equals, 0)
2046		c.Check(q22b-q22a, Equals, 5)
2047		c.Check(q23b-q23a, Equals, 7)
2048	case "40022":
2049		c.Check(q21b-q21a, Equals, 5)
2050		c.Check(q22b-q22a, Equals, 0)
2051		c.Check(q23b-q23a, Equals, 7)
2052	case "40023":
2053		c.Check(q21b-q21a, Equals, 5)
2054		c.Check(q22b-q22a, Equals, 7)
2055		c.Check(q23b-q23a, Equals, 0)
2056	default:
2057		c.Fatal("Uh?")
2058	}
2059}
2060
2061func (s *S) TestDoNotFallbackToMonotonic(c *C) {
2062	// There was a bug at some point that some functions were
2063	// falling back to Monotonic mode. This test ensures all listIndexes
2064	// commands go to the primary, as should happen since the session is
2065	// in Strong mode.
2066	if !s.versionAtLeast(3, 0) {
2067		c.Skip("command-counting logic depends on 3.0+")
2068	}
2069
2070	session, err := mgo.Dial("localhost:40012")
2071	c.Assert(err, IsNil)
2072	defer session.Close()
2073
2074	for i := 0; i < 15; i++ {
2075		q11a := s.countCommands(c, "localhost:40011", "listIndexes")
2076		q12a := s.countCommands(c, "localhost:40012", "listIndexes")
2077		q13a := s.countCommands(c, "localhost:40013", "listIndexes")
2078
2079		_, err := session.DB("local").C("system.indexes").Indexes()
2080		c.Assert(err, IsNil)
2081
2082		q11b := s.countCommands(c, "localhost:40011", "listIndexes")
2083		q12b := s.countCommands(c, "localhost:40012", "listIndexes")
2084		q13b := s.countCommands(c, "localhost:40013", "listIndexes")
2085
2086		c.Assert(q11b, Equals, q11a+1)
2087		c.Assert(q12b, Equals, q12a)
2088		c.Assert(q13b, Equals, q13a)
2089	}
2090}
2091