1package gocbcore
2
3import (
4	"crypto/x509"
5	"encoding/json"
6	"errors"
7	"fmt"
8	"io/ioutil"
9	"log"
10	"net/url"
11	"strconv"
12	"strings"
13	"time"
14
15	"github.com/couchbase/gocbcore/v9/memd"
16)
17
18func (suite *StandardTestSuite) TestCidRetries() {
19	suite.EnsureSupportsFeature(TestFeatureCollections)
20
21	agent, s := suite.GetAgentAndHarness()
22
23	bucketName := suite.BucketName
24	scopeName := suite.ScopeName
25	collectionName := "testCidRetries"
26
27	_, err := testCreateCollection(collectionName, scopeName, bucketName, agent)
28	if err != nil {
29		suite.T().Logf("Failed to create collection: %v", err)
30	}
31
32	// prime the cid map cache
33	s.PushOp(agent.GetCollectionID(scopeName, collectionName, GetCollectionIDOptions{},
34		func(result *GetCollectionIDResult, err error) {
35			s.Wrap(func() {
36				if err != nil {
37					s.Fatalf("Get CID operation failed: %v", err)
38				}
39			})
40		}),
41	)
42	s.Wait(0)
43
44	// delete the collection
45	_, err = testDeleteCollection(collectionName, scopeName, bucketName, agent, true)
46	if err != nil {
47		suite.T().Fatalf("Failed to delete collection: %v", err)
48	}
49
50	// recreate
51	_, err = testCreateCollection(collectionName, scopeName, bucketName, agent)
52	if err != nil {
53		suite.T().Fatalf("Failed to create collection: %v", err)
54	}
55
56	// Set should succeed as we detect cid unknown, fetch the cid and then retry again. This should happen
57	// even if we don't set a retry strategy.
58	s.PushOp(agent.Set(SetOptions{
59		Key:            []byte("test"),
60		Value:          []byte("{}"),
61		CollectionName: collectionName,
62		ScopeName:      scopeName,
63	}, func(res *StoreResult, err error) {
64		s.Wrap(func() {
65			if err != nil {
66				s.Fatalf("Set operation failed: %v", err)
67			}
68			if res.Cas == Cas(0) {
69				s.Fatalf("Invalid cas received")
70			}
71		})
72	}))
73	s.Wait(0)
74
75	// Get
76	s.PushOp(agent.Get(GetOptions{
77		Key:            []byte("test"),
78		CollectionName: collectionName,
79		ScopeName:      scopeName,
80	}, func(res *GetResult, err error) {
81		s.Wrap(func() {
82			if err != nil {
83				s.Fatalf("Get operation failed: %v", err)
84			}
85			if res.Cas == Cas(0) {
86				s.Fatalf("Invalid cas received")
87			}
88		})
89	}))
90	s.Wait(0)
91}
92
93func (suite *StandardTestSuite) TestBasicOps() {
94	agent, s := suite.GetAgentAndHarness()
95
96	// Set
97	s.PushOp(agent.Set(SetOptions{
98		Key:            []byte("test"),
99		Value:          []byte("{}"),
100		CollectionName: suite.CollectionName,
101		ScopeName:      suite.ScopeName,
102	}, func(res *StoreResult, err error) {
103		s.Wrap(func() {
104			if err != nil {
105				s.Fatalf("Set operation failed: %v", err)
106			}
107			if res.Cas == Cas(0) {
108				s.Fatalf("Invalid cas received")
109			}
110		})
111	}))
112	s.Wait(0)
113
114	// Get
115	s.PushOp(agent.Get(GetOptions{
116		Key:            []byte("test"),
117		CollectionName: suite.CollectionName,
118		ScopeName:      suite.ScopeName,
119	}, func(res *GetResult, err error) {
120		s.Wrap(func() {
121			if err != nil {
122				s.Fatalf("Get operation failed: %v", err)
123			}
124			if res.Cas == Cas(0) {
125				s.Fatalf("Invalid cas received")
126			}
127		})
128	}))
129	s.Wait(0)
130}
131
132func (suite *StandardTestSuite) TestCasMismatch() {
133	agent, s := suite.GetAgentAndHarness()
134
135	// Set
136	var cas Cas
137	s.PushOp(agent.Set(SetOptions{
138		Key:            []byte("testCasMismatch"),
139		Value:          []byte("{}"),
140		CollectionName: suite.CollectionName,
141		ScopeName:      suite.ScopeName,
142	}, func(res *StoreResult, err error) {
143		s.Wrap(func() {
144			if err != nil {
145				s.Fatalf("Set operation failed: %v", err)
146			}
147			if res.Cas == Cas(0) {
148				s.Fatalf("Invalid cas received")
149			}
150			cas = res.Cas
151		})
152	}))
153	s.Wait(0)
154
155	// Replace to change cas on the server
156	s.PushOp(agent.Replace(ReplaceOptions{
157		Key:            []byte("testCasMismatch"),
158		Value:          []byte("{\"key\":\"value\"}"),
159		CollectionName: suite.CollectionName,
160		ScopeName:      suite.ScopeName,
161	}, func(res *StoreResult, err error) {
162		s.Wrap(func() {
163			if err != nil {
164				s.Fatalf("Replace operation failed: %v", err)
165			}
166			if res.Cas == Cas(0) {
167				s.Fatalf("Invalid cas received")
168			}
169		})
170	}))
171	s.Wait(0)
172
173	// Replace which should fail with a cas mismatch
174	s.PushOp(agent.Replace(ReplaceOptions{
175		Key:            []byte("testCasMismatch"),
176		Value:          []byte("{\"key\":\"value2\"}"),
177		CollectionName: suite.CollectionName,
178		ScopeName:      suite.ScopeName,
179		Cas:            cas,
180	}, func(res *StoreResult, err error) {
181		s.Wrap(func() {
182			if err == nil {
183				s.Fatalf("Set operation succeeded but should have failed")
184			}
185
186			if !errors.Is(err, ErrCasMismatch) {
187				suite.T().Fatalf("Expected CasMismatch error but was %v", err)
188			}
189		})
190	}))
191	s.Wait(0)
192}
193
194func (suite *StandardTestSuite) TestGetReplica() {
195	suite.EnsureSupportsFeature(TestFeatureReplicas)
196	agent, s := suite.GetAgentAndHarness()
197
198	// Set
199	s.PushOp(agent.Set(SetOptions{
200		Key:            []byte("testReplica"),
201		Value:          []byte("{}"),
202		CollectionName: suite.CollectionName,
203		ScopeName:      suite.ScopeName,
204	}, func(res *StoreResult, err error) {
205		s.Wrap(func() {
206			if err != nil {
207				s.Fatalf("Set operation failed: %v", err)
208			}
209			if res.Cas == Cas(0) {
210				s.Fatalf("Invalid cas received")
211			}
212		})
213	}))
214	s.Wait(0)
215
216	retries := 0
217	keyExists := false
218	for {
219		s.PushOp(agent.GetOneReplica(GetOneReplicaOptions{
220			Key:            []byte("testReplica"),
221			ReplicaIdx:     1,
222			CollectionName: suite.CollectionName,
223			ScopeName:      suite.ScopeName,
224		}, func(res *GetReplicaResult, err error) {
225			s.Wrap(func() {
226				keyNotFound := errors.Is(err, ErrDocumentNotFound)
227				if err == nil {
228					keyExists = true
229				} else if err != nil && !keyNotFound {
230					s.Fatalf("GetReplica specific returned error that was not document not found: %v", err)
231				}
232				if !keyNotFound && res.Cas == Cas(0) {
233					s.Fatalf("Invalid cas received")
234				}
235			})
236		}))
237		s.Wait(0)
238		if keyExists {
239			break
240		}
241		retries++
242		if retries >= 5 {
243			suite.T().Fatalf("GetReplica could not locate key")
244		}
245		time.Sleep(50 * time.Millisecond)
246	}
247}
248
249func (suite *StandardTestSuite) TestDurableWriteGetReplica() {
250	suite.EnsureSupportsFeature(TestFeatureReplicas)
251	suite.EnsureSupportsFeature(TestFeatureEnhancedDurability)
252	agent, s := suite.GetAgentAndHarness()
253
254	// Set
255	s.PushOp(agent.Set(SetOptions{
256		Key:                    []byte("testDurableReplica"),
257		Value:                  []byte("{}"),
258		CollectionName:         suite.CollectionName,
259		ScopeName:              suite.ScopeName,
260		DurabilityLevel:        memd.DurabilityLevelMajority,
261		DurabilityLevelTimeout: 10 * time.Second,
262	}, func(res *StoreResult, err error) {
263		s.Wrap(func() {
264			if err != nil {
265				s.Fatalf("Set operation failed: %v", err)
266			}
267			if res.Cas == Cas(0) {
268				s.Fatalf("Invalid cas received")
269			}
270		})
271	}))
272	s.Wait(0)
273
274	retries := 0
275	keyExists := false
276	for {
277		s.PushOp(agent.GetOneReplica(GetOneReplicaOptions{
278			Key:            []byte("testDurableReplica"),
279			ReplicaIdx:     1,
280			CollectionName: suite.CollectionName,
281			ScopeName:      suite.ScopeName,
282		}, func(res *GetReplicaResult, err error) {
283			s.Wrap(func() {
284				keyNotFound := errors.Is(err, ErrDocumentNotFound)
285				if err == nil {
286					keyExists = true
287				} else if err != nil && !keyNotFound {
288					s.Fatalf("GetReplica specific returned error that was not document not found: %v", err)
289				}
290				if !keyNotFound && res.Cas == Cas(0) {
291					s.Fatalf("Invalid cas received")
292				}
293			})
294		}))
295		s.Wait(0)
296		if keyExists {
297			break
298		}
299		retries++
300		if retries >= 5 {
301			suite.T().Fatalf("GetReplica could not locate key")
302		}
303		time.Sleep(50 * time.Millisecond)
304	}
305}
306
307func (suite *StandardTestSuite) TestAddDurableWriteGetReplica() {
308	suite.EnsureSupportsFeature(TestFeatureReplicas)
309	suite.EnsureSupportsFeature(TestFeatureEnhancedDurability)
310	agent, s := suite.GetAgentAndHarness()
311
312	s.PushOp(agent.Add(AddOptions{
313		Key:                    []byte("testAddDurableReplica"),
314		Value:                  []byte("{}"),
315		CollectionName:         suite.CollectionName,
316		ScopeName:              suite.ScopeName,
317		DurabilityLevel:        memd.DurabilityLevelMajority,
318		DurabilityLevelTimeout: 10 * time.Second,
319	}, func(res *StoreResult, err error) {
320		s.Wrap(func() {
321			if err != nil {
322				s.Fatalf("Add operation failed: %v", err)
323			}
324			if res.Cas == Cas(0) {
325				s.Fatalf("Invalid cas received")
326			}
327		})
328	}))
329	s.Wait(0)
330
331	retries := 0
332	keyExists := false
333	for {
334		s.PushOp(agent.GetOneReplica(GetOneReplicaOptions{
335			Key:            []byte("testAddDurableReplica"),
336			ReplicaIdx:     1,
337			CollectionName: suite.CollectionName,
338			ScopeName:      suite.ScopeName,
339		}, func(res *GetReplicaResult, err error) {
340			s.Wrap(func() {
341				keyNotFound := errors.Is(err, ErrDocumentNotFound)
342				if err == nil {
343					keyExists = true
344				} else if err != nil && !keyNotFound {
345					s.Fatalf("GetReplica specific returned error that was not document not found: %v", err)
346				}
347				if !keyNotFound && res.Cas == Cas(0) {
348					s.Fatalf("Invalid cas received")
349				}
350			})
351		}))
352		s.Wait(0)
353		if keyExists {
354			break
355		}
356		retries++
357		if retries >= 5 {
358			suite.T().Fatalf("GetReplica could not locate key")
359		}
360		time.Sleep(50 * time.Millisecond)
361	}
362}
363
364func (suite *StandardTestSuite) TestReplaceDurableWriteGetReplica() {
365	suite.EnsureSupportsFeature(TestFeatureReplicas)
366	suite.EnsureSupportsFeature(TestFeatureEnhancedDurability)
367	agent, s := suite.GetAgentAndHarness()
368
369	s.PushOp(agent.Set(SetOptions{
370		Key:                    []byte("testReplaceDurableReplica"),
371		Value:                  []byte("{}"),
372		CollectionName:         suite.CollectionName,
373		ScopeName:              suite.ScopeName,
374		DurabilityLevel:        memd.DurabilityLevelMajority,
375		DurabilityLevelTimeout: 10 * time.Second,
376	}, func(res *StoreResult, err error) {
377		s.Wrap(func() {
378			if err != nil {
379				s.Fatalf("Set operation failed: %v", err)
380			}
381			if res.Cas == Cas(0) {
382				s.Fatalf("Invalid cas received")
383			}
384		})
385	}))
386	s.Wait(0)
387
388	s.PushOp(agent.Replace(ReplaceOptions{
389		Key:                    []byte("testReplaceDurableReplica"),
390		Value:                  []byte("{}"),
391		CollectionName:         suite.CollectionName,
392		ScopeName:              suite.ScopeName,
393		DurabilityLevel:        memd.DurabilityLevelMajority,
394		DurabilityLevelTimeout: 10 * time.Second,
395	}, func(res *StoreResult, err error) {
396		s.Wrap(func() {
397			if err != nil {
398				s.Fatalf("Replace operation failed: %v", err)
399			}
400			if res.Cas == Cas(0) {
401				s.Fatalf("Invalid cas received")
402			}
403		})
404	}))
405	s.Wait(0)
406
407	retries := 0
408	keyExists := false
409	for {
410		s.PushOp(agent.GetOneReplica(GetOneReplicaOptions{
411			Key:            []byte("testReplaceDurableReplica"),
412			ReplicaIdx:     1,
413			CollectionName: suite.CollectionName,
414			ScopeName:      suite.ScopeName,
415		}, func(res *GetReplicaResult, err error) {
416			s.Wrap(func() {
417				keyNotFound := errors.Is(err, ErrDocumentNotFound)
418				if err == nil {
419					keyExists = true
420				} else if err != nil && !keyNotFound {
421					s.Fatalf("GetReplica specific returned error that was not document not found: %v", err)
422				}
423				if !keyNotFound && res.Cas == Cas(0) {
424					s.Fatalf("Invalid cas received")
425				}
426			})
427		}))
428		s.Wait(0)
429		if keyExists {
430			break
431		}
432		retries++
433		if retries >= 5 {
434			suite.T().Fatalf("GetReplica could not locate key")
435		}
436		time.Sleep(50 * time.Millisecond)
437	}
438}
439
440func (suite *StandardTestSuite) TestDeleteDurableWriteGetReplica() {
441	suite.EnsureSupportsFeature(TestFeatureReplicas)
442	suite.EnsureSupportsFeature(TestFeatureEnhancedDurability)
443	agent, s := suite.GetAgentAndHarness()
444
445	s.PushOp(agent.Set(SetOptions{
446		Key:                    []byte("testDeleteDurableReplica"),
447		Value:                  []byte("{}"),
448		CollectionName:         suite.CollectionName,
449		ScopeName:              suite.ScopeName,
450		DurabilityLevel:        memd.DurabilityLevelMajority,
451		DurabilityLevelTimeout: 10 * time.Second,
452	}, func(res *StoreResult, err error) {
453		s.Wrap(func() {
454			if err != nil {
455				s.Fatalf("Set operation failed: %v", err)
456			}
457			if res.Cas == Cas(0) {
458				s.Fatalf("Invalid cas received")
459			}
460		})
461	}))
462	s.Wait(0)
463
464	s.PushOp(agent.Delete(DeleteOptions{
465		Key:                    []byte("testDeleteDurableReplica"),
466		CollectionName:         suite.CollectionName,
467		ScopeName:              suite.ScopeName,
468		DurabilityLevel:        memd.DurabilityLevelMajority,
469		DurabilityLevelTimeout: 10 * time.Second,
470	}, func(res *DeleteResult, err error) {
471		s.Wrap(func() {
472			if err != nil {
473				s.Fatalf("Delete operation failed: %v", err)
474			}
475			if res.Cas == Cas(0) {
476				s.Fatalf("Invalid cas received")
477			}
478		})
479	}))
480	s.Wait(0)
481
482	retries := 0
483	keyNotFound := false
484	for {
485		s.PushOp(agent.GetOneReplica(GetOneReplicaOptions{
486			Key:            []byte("testDeleteDurableReplica"),
487			ReplicaIdx:     1,
488			CollectionName: suite.CollectionName,
489			ScopeName:      suite.ScopeName,
490		}, func(res *GetReplicaResult, err error) {
491			s.Wrap(func() {
492				if errors.Is(err, ErrDocumentNotFound) {
493					keyNotFound = true
494				} else if err != nil {
495					s.Fatalf("GetReplica specific returned error that was not document not found: %v", err)
496				}
497				if !keyNotFound && res.Cas == Cas(0) {
498					s.Fatalf("Invalid cas received")
499				}
500			})
501		}))
502		s.Wait(0)
503		if keyNotFound {
504			break
505		}
506		retries++
507		if retries >= 5 {
508			suite.T().Fatalf("GetReplica could always locate key")
509		}
510		time.Sleep(50 * time.Millisecond)
511	}
512}
513
514func (suite *StandardTestSuite) TestBasicReplace() {
515	agent, s := suite.GetAgentAndHarness()
516
517	oldCas := Cas(0)
518	s.PushOp(agent.Set(SetOptions{
519		Key:            []byte("testx"),
520		Value:          []byte("{}"),
521		CollectionName: suite.CollectionName,
522		ScopeName:      suite.ScopeName,
523	}, func(res *StoreResult, err error) {
524		oldCas = res.Cas
525		s.Continue()
526	}))
527	s.Wait(0)
528
529	s.PushOp(agent.Replace(ReplaceOptions{
530		Key:            []byte("testx"),
531		Value:          []byte("[]"),
532		Cas:            oldCas,
533		CollectionName: suite.CollectionName,
534		ScopeName:      suite.ScopeName,
535	}, func(res *StoreResult, err error) {
536		s.Wrap(func() {
537			if err != nil {
538				s.Fatalf("Replace operation failed: %v", err)
539			}
540			if res.Cas == Cas(0) {
541				s.Fatalf("Invalid cas received")
542			}
543		})
544	}))
545	s.Wait(0)
546}
547
548func (suite *StandardTestSuite) TestBasicRemove() {
549	agent, s := suite.GetAgentAndHarness()
550
551	s.PushOp(agent.Set(SetOptions{
552		Key:            []byte("testy"),
553		Value:          []byte("{}"),
554		CollectionName: suite.CollectionName,
555		ScopeName:      suite.ScopeName,
556	}, func(res *StoreResult, err error) {
557		s.Continue()
558	}))
559	s.Wait(0)
560
561	s.PushOp(agent.Delete(DeleteOptions{
562		Key:            []byte("testy"),
563		CollectionName: suite.CollectionName,
564		ScopeName:      suite.ScopeName,
565	}, func(res *DeleteResult, err error) {
566		s.Wrap(func() {
567			if err != nil {
568				s.Fatalf("Remove operation failed: %v", err)
569			}
570		})
571	}))
572	s.Wait(0)
573}
574
575func (suite *StandardTestSuite) TestBasicInsert() {
576	agent, s := suite.GetAgentAndHarness()
577
578	s.PushOp(agent.Delete(DeleteOptions{
579		Key:            []byte("testz"),
580		CollectionName: suite.CollectionName,
581		ScopeName:      suite.ScopeName,
582	}, func(res *DeleteResult, err error) {
583		s.Continue()
584	}))
585	s.Wait(0)
586
587	s.PushOp(agent.Add(AddOptions{
588		Key:            []byte("testz"),
589		Value:          []byte("[]"),
590		CollectionName: suite.CollectionName,
591		ScopeName:      suite.ScopeName,
592	}, func(res *StoreResult, err error) {
593		s.Wrap(func() {
594			if err != nil {
595				s.Fatalf("Add operation failed: %v", err)
596			}
597			if res.Cas == Cas(0) {
598				s.Fatalf("Invalid cas received")
599			}
600		})
601	}))
602	s.Wait(0)
603}
604
605func (suite *StandardTestSuite) TestBasicCounters() {
606	agent, s := suite.GetAgentAndHarness()
607
608	// Counters
609	s.PushOp(agent.Delete(DeleteOptions{
610		Key:            []byte("testCounters"),
611		CollectionName: suite.CollectionName,
612		ScopeName:      suite.ScopeName,
613	}, func(res *DeleteResult, err error) {
614		s.Continue()
615	}))
616	s.Wait(0)
617
618	s.PushOp(agent.Increment(CounterOptions{
619		Key:            []byte("testCounters"),
620		Delta:          5,
621		Initial:        11,
622		CollectionName: suite.CollectionName,
623		ScopeName:      suite.ScopeName,
624	}, func(res *CounterResult, err error) {
625		s.Wrap(func() {
626			if err != nil {
627				s.Fatalf("Increment operation failed: %v", err)
628			}
629			if res.Cas == Cas(0) {
630				s.Fatalf("Invalid cas received")
631			}
632			if res.Value != 11 {
633				s.Fatalf("Increment did not operate properly")
634			}
635		})
636	}))
637	s.Wait(0)
638
639	s.PushOp(agent.Increment(CounterOptions{
640		Key:            []byte("testCounters"),
641		Delta:          5,
642		Initial:        22,
643		CollectionName: suite.CollectionName,
644		ScopeName:      suite.ScopeName,
645	}, func(res *CounterResult, err error) {
646		s.Wrap(func() {
647			if err != nil {
648				s.Fatalf("Increment operation failed: %v", err)
649			}
650			if res.Cas == Cas(0) {
651				s.Fatalf("Invalid cas received")
652			}
653			if res.Value != 16 {
654				s.Fatalf("Increment did not operate properly")
655			}
656		})
657	}))
658	s.Wait(0)
659
660	s.PushOp(agent.Decrement(CounterOptions{
661		Key:            []byte("testCounters"),
662		Delta:          3,
663		Initial:        65,
664		CollectionName: suite.CollectionName,
665		ScopeName:      suite.ScopeName,
666	}, func(res *CounterResult, err error) {
667		s.Wrap(func() {
668			if err != nil {
669				s.Fatalf("Increment operation failed: %v", err)
670			}
671			if res.Cas == Cas(0) {
672				s.Fatalf("Invalid cas received")
673			}
674			if res.Value != 13 {
675				s.Fatalf("Increment did not operate properly")
676			}
677		})
678	}))
679	s.Wait(0)
680}
681
682func (suite *StandardTestSuite) TestBasicAdjoins() {
683	suite.EnsureSupportsFeature(TestFeatureAdjoin)
684
685	agent, s := suite.GetAgentAndHarness()
686
687	s.PushOp(agent.Set(SetOptions{
688		Key:            []byte("testAdjoins"),
689		Value:          []byte("there"),
690		CollectionName: suite.CollectionName,
691		ScopeName:      suite.ScopeName,
692	}, func(res *StoreResult, err error) {
693		s.Continue()
694	}))
695	s.Wait(0)
696
697	s.PushOp(agent.Append(AdjoinOptions{
698		Key:            []byte("testAdjoins"),
699		Value:          []byte(" Frank!"),
700		CollectionName: suite.CollectionName,
701		ScopeName:      suite.ScopeName,
702	}, func(res *AdjoinResult, err error) {
703		s.Wrap(func() {
704			if err != nil {
705				s.Fatalf("Append operation failed: %v", err)
706			}
707			if res.Cas == Cas(0) {
708				s.Fatalf("Invalid cas received")
709			}
710		})
711	}))
712	s.Wait(0)
713
714	s.PushOp(agent.Prepend(AdjoinOptions{
715		Key:            []byte("testAdjoins"),
716		Value:          []byte("Hello "),
717		CollectionName: suite.CollectionName,
718		ScopeName:      suite.ScopeName,
719	}, func(res *AdjoinResult, err error) {
720		s.Wrap(func() {
721			if err != nil {
722				s.Fatalf("Prepend operation failed: %v", err)
723			}
724			if res.Cas == Cas(0) {
725				s.Fatalf("Invalid cas received")
726			}
727		})
728	}))
729	s.Wait(0)
730
731	s.PushOp(agent.Get(GetOptions{
732		Key:            []byte("testAdjoins"),
733		CollectionName: suite.CollectionName,
734		ScopeName:      suite.ScopeName,
735	}, func(res *GetResult, err error) {
736		s.Wrap(func() {
737			if err != nil {
738				s.Fatalf("Get operation failed: %v", err)
739			}
740			if res.Cas == Cas(0) {
741				s.Fatalf("Invalid cas received")
742			}
743
744			if string(res.Value) != "Hello there Frank!" {
745				s.Fatalf("Adjoin operations did not behave")
746			}
747		})
748	}))
749	s.Wait(0)
750}
751
752func (suite *StandardTestSuite) TestExpiry() {
753	agent, s := suite.GetAgentAndHarness()
754
755	s.PushOp(agent.Set(SetOptions{
756		Key:            []byte("testExpiry"),
757		Value:          []byte("{}"),
758		Expiry:         1,
759		CollectionName: suite.CollectionName,
760		ScopeName:      suite.ScopeName,
761	}, func(res *StoreResult, err error) {
762		s.Wrap(func() {
763			if err != nil {
764				s.Fatalf("Set operation failed: %v", err)
765			}
766		})
767	}))
768	s.Wait(0)
769
770	suite.TimeTravel(2000 * time.Millisecond)
771
772	s.PushOp(agent.Get(GetOptions{
773		Key:            []byte("testExpiry"),
774		CollectionName: suite.CollectionName,
775		ScopeName:      suite.ScopeName,
776		RetryStrategy:  NewBestEffortRetryStrategy(nil),
777	}, func(res *GetResult, err error) {
778		s.Wrap(func() {
779			if !errors.Is(err, ErrDocumentNotFound) {
780				s.Fatalf("Get should have returned document not found")
781			}
782		})
783	}))
784	s.Wait(0)
785}
786
787func (suite *StandardTestSuite) TestTouch() {
788	agent, s := suite.GetAgentAndHarness()
789
790	s.PushOp(agent.Set(SetOptions{
791		Key:            []byte("testTouch"),
792		Value:          []byte("{}"),
793		Expiry:         1,
794		CollectionName: suite.CollectionName,
795		ScopeName:      suite.ScopeName,
796	}, func(res *StoreResult, err error) {
797		s.Wrap(func() {
798			if err != nil {
799				s.Fatalf("Set operation failed: %v", err)
800			}
801		})
802	}))
803	s.Wait(0)
804
805	s.PushOp(agent.Touch(TouchOptions{
806		Key:            []byte("testTouch"),
807		Expiry:         3,
808		CollectionName: suite.CollectionName,
809		ScopeName:      suite.ScopeName,
810	}, func(res *TouchResult, err error) {
811		s.Wrap(func() {
812			if err != nil {
813				s.Fatalf("Touch operation failed: %v", err)
814			}
815		})
816	}))
817	s.Wait(0)
818
819	suite.TimeTravel(1500 * time.Millisecond)
820
821	s.PushOp(agent.Get(GetOptions{
822		Key:            []byte("testTouch"),
823		CollectionName: suite.CollectionName,
824		ScopeName:      suite.ScopeName,
825	}, func(res *GetResult, err error) {
826		s.Wrap(func() {
827			if err != nil {
828				s.Fatalf("Get should have been successful")
829			}
830		})
831	}))
832	s.Wait(0)
833
834	suite.TimeTravel(2500 * time.Millisecond)
835
836	s.PushOp(agent.Get(GetOptions{
837		Key:            []byte("testTouch"),
838		CollectionName: suite.CollectionName,
839		ScopeName:      suite.ScopeName,
840	}, func(res *GetResult, err error) {
841		s.Wrap(func() {
842			if !errors.Is(err, ErrDocumentNotFound) {
843				s.Fatalf("Get should have returned document not found")
844			}
845		})
846	}))
847	s.Wait(0)
848}
849
850func (suite *StandardTestSuite) TestGetAndTouch() {
851	agent, s := suite.GetAgentAndHarness()
852
853	s.PushOp(agent.Set(SetOptions{
854		Key:            []byte("testGetAndTouch"),
855		Value:          []byte("{}"),
856		Expiry:         1,
857		CollectionName: suite.CollectionName,
858		ScopeName:      suite.ScopeName,
859	}, func(res *StoreResult, err error) {
860		s.Wrap(func() {
861			if err != nil {
862				s.Fatalf("Set operation failed: %v", err)
863			}
864		})
865	}))
866	s.Wait(0)
867
868	s.PushOp(agent.GetAndTouch(GetAndTouchOptions{
869		Key:            []byte("testGetAndTouch"),
870		Expiry:         3,
871		CollectionName: suite.CollectionName,
872		ScopeName:      suite.ScopeName,
873	}, func(res *GetAndTouchResult, err error) {
874		s.Wrap(func() {
875			if err != nil {
876				s.Fatalf("Touch operation failed: %v", err)
877			}
878		})
879	}))
880	s.Wait(0)
881
882	suite.TimeTravel(1500 * time.Millisecond)
883
884	s.PushOp(agent.Get(GetOptions{
885		Key:            []byte("testGetAndTouch"),
886		CollectionName: suite.CollectionName,
887		ScopeName:      suite.ScopeName,
888	}, func(res *GetResult, err error) {
889		s.Wrap(func() {
890			if err != nil {
891				s.Fatalf("Get should have been successful")
892			}
893		})
894	}))
895	s.Wait(0)
896
897	suite.TimeTravel(3000 * time.Millisecond)
898
899	s.PushOp(agent.Get(GetOptions{
900		Key:            []byte("testGetAndTouch"),
901		CollectionName: suite.CollectionName,
902		ScopeName:      suite.ScopeName,
903	}, func(res *GetResult, err error) {
904		s.Wrap(func() {
905			if !errors.Is(err, ErrDocumentNotFound) {
906				s.Fatalf("Get should have returned document not found: %v", err)
907			}
908		})
909	}))
910	s.Wait(0)
911}
912
913// This test will lock the document for 1 second, it will then perform set requests for up to 2 seconds,
914// the operation should succeed within the 2 seconds.
915func (suite *StandardTestSuite) TestRetrySet() {
916	agent, s := suite.GetAgentAndHarness()
917
918	s.PushOp(agent.Set(SetOptions{
919		Key:            []byte("testRetrySet"),
920		Value:          []byte("{}"),
921		Expiry:         1,
922		CollectionName: suite.CollectionName,
923		ScopeName:      suite.ScopeName,
924	}, func(res *StoreResult, err error) {
925		s.Wrap(func() {
926			if err != nil {
927				s.Fatalf("Set operation failed: %v", err)
928			}
929		})
930	}))
931	s.Wait(0)
932
933	s.PushOp(agent.GetAndLock(GetAndLockOptions{
934		Key:            []byte("testRetrySet"),
935		LockTime:       1,
936		CollectionName: suite.CollectionName,
937		ScopeName:      suite.ScopeName,
938	}, func(res *GetAndLockResult, err error) {
939		s.Wrap(func() {
940			if err != nil {
941				s.Fatalf("GetAndLock operation failed: %v", err)
942			}
943		})
944	}))
945	s.Wait(0)
946
947	s.PushOp(agent.Set(SetOptions{
948		Key:            []byte("testRetrySet"),
949		Value:          []byte("{}"),
950		Expiry:         1,
951		CollectionName: suite.CollectionName,
952		ScopeName:      suite.ScopeName,
953		RetryStrategy:  NewBestEffortRetryStrategy(nil),
954	}, func(res *StoreResult, err error) {
955		s.Wrap(func() {
956			if err != nil {
957				s.Fatalf("Set operation failed: %v", err)
958			}
959		})
960	}))
961	s.Wait(0)
962}
963
964func (suite *StandardTestSuite) TestObserve() {
965	suite.EnsureSupportsFeature(TestFeatureReplicas)
966
967	agent, s := suite.GetAgentAndHarness()
968	if agent.HasCollectionsSupport() {
969		suite.T().Skip("Skipping test as observe does not support collections")
970	}
971
972	s.PushOp(agent.Set(SetOptions{
973		Key:            []byte("testObserve"),
974		Value:          []byte("there"),
975		CollectionName: suite.CollectionName,
976		ScopeName:      suite.ScopeName,
977	}, func(res *StoreResult, err error) {
978		s.Continue()
979	}))
980	s.Wait(0)
981
982	s.PushOp(agent.Observe(ObserveOptions{
983		Key:            []byte("testObserve"),
984		ReplicaIdx:     1,
985		CollectionName: suite.CollectionName,
986		ScopeName:      suite.ScopeName,
987	}, func(res *ObserveResult, err error) {
988		s.Wrap(func() {
989			if err != nil {
990				s.Fatalf("Observe operation failed: %v", err)
991			}
992		})
993	}))
994	s.Wait(0)
995}
996
997func (suite *StandardTestSuite) TestObserveSeqNo() {
998	suite.EnsureSupportsFeature(TestFeatureReplicas)
999
1000	agent, s := suite.GetAgentAndHarness()
1001
1002	origMt := MutationToken{}
1003	s.PushOp(agent.Set(SetOptions{
1004		Key:            []byte("testObserve"),
1005		Value:          []byte("there"),
1006		CollectionName: suite.CollectionName,
1007		ScopeName:      suite.ScopeName,
1008	}, func(res *StoreResult, err error) {
1009		s.Wrap(func() {
1010			if err != nil {
1011				s.Fatalf("Initial set operation failed: %v", err)
1012			}
1013
1014			mt := res.MutationToken
1015			if mt.VbUUID == 0 && mt.SeqNo == 0 {
1016				s.Skipf("ObserveSeqNo not supported by server")
1017			}
1018
1019			origMt = mt
1020		})
1021	}))
1022	s.Wait(0)
1023
1024	origCurSeqNo := SeqNo(0)
1025	vbID, err := agent.kvMux.KeyToVbucket([]byte("testObserve"))
1026	if err != nil {
1027		s.Fatalf("KeyToVbucket operation failed: %v", err)
1028	}
1029
1030	s.PushOp(agent.ObserveVb(ObserveVbOptions{
1031		VbID:       vbID,
1032		VbUUID:     origMt.VbUUID,
1033		ReplicaIdx: 1,
1034	}, func(res *ObserveVbResult, err error) {
1035		s.Wrap(func() {
1036			if err != nil {
1037				s.Fatalf("ObserveSeqNo operation failed: %v", err)
1038			}
1039
1040			origCurSeqNo = res.CurrentSeqNo
1041		})
1042	}))
1043	s.Wait(0)
1044
1045	newMt := MutationToken{}
1046	s.PushOp(agent.Set(SetOptions{
1047		Key:            []byte("testObserve"),
1048		Value:          []byte("there"),
1049		CollectionName: suite.CollectionName,
1050		ScopeName:      suite.ScopeName,
1051	}, func(res *StoreResult, err error) {
1052		s.Wrap(func() {
1053			if err != nil {
1054				s.Fatalf("Second set operation failed: %v", err)
1055			}
1056
1057			newMt = res.MutationToken
1058		})
1059	}))
1060	s.Wait(0)
1061
1062	vbID, err = agent.kvMux.KeyToVbucket([]byte("testObserve"))
1063	if err != nil {
1064		s.Fatalf("KeyToVbucket operation failed: %v", err)
1065	}
1066	s.PushOp(agent.ObserveVb(ObserveVbOptions{
1067		VbID:       vbID,
1068		VbUUID:     newMt.VbUUID,
1069		ReplicaIdx: 1,
1070	}, func(res *ObserveVbResult, err error) {
1071		s.Wrap(func() {
1072			if err != nil {
1073				s.Fatalf("ObserveSeqNo operation failed: %v", err)
1074			}
1075			if res.CurrentSeqNo < origCurSeqNo {
1076				s.Fatalf("SeqNo does not appear to be working")
1077			}
1078		})
1079	}))
1080	s.Wait(0)
1081}
1082
1083func (suite *StandardTestSuite) TestRandomGet() {
1084	agent, s := suite.GetAgentAndHarness()
1085
1086	distkeys, err := MakeDistKeys(agent, time.Now().Add(2*time.Second))
1087	suite.Require().Nil(err, err)
1088	for _, k := range distkeys {
1089		s.PushOp(agent.Set(SetOptions{
1090			Key:            []byte(k),
1091			Value:          []byte("Hello World!"),
1092			CollectionName: suite.CollectionName,
1093			ScopeName:      suite.ScopeName,
1094		}, func(res *StoreResult, err error) {
1095			s.Wrap(func() {
1096				if err != nil {
1097					s.Fatalf("Couldn't store some items: %v", err)
1098				}
1099			})
1100		}))
1101		s.Wait(0)
1102	}
1103
1104	s.PushOp(agent.GetRandom(GetRandomOptions{
1105		CollectionName: suite.CollectionName,
1106		ScopeName:      suite.ScopeName,
1107	}, func(res *GetRandomResult, err error) {
1108		s.Wrap(func() {
1109			if err != nil {
1110				s.Fatalf("Get operation failed: %v", err)
1111			}
1112			if res.Cas == Cas(0) {
1113				s.Fatalf("Invalid cas received")
1114			}
1115			if len(res.Key) == 0 {
1116				s.Fatalf("Invalid key returned")
1117			}
1118			if len(res.Value) == 0 {
1119				s.Fatalf("No value returned")
1120			}
1121		})
1122	}))
1123	s.Wait(0)
1124}
1125
1126func (suite *StandardTestSuite) TestStats() {
1127	agent, s := suite.GetAgentAndHarness()
1128
1129	snapshot, err := agent.ConfigSnapshot()
1130	if err != nil {
1131		suite.T().Fatalf("Failed to get config snapshot: %s", err)
1132	}
1133	numServers, err := snapshot.NumServers()
1134	if err != nil {
1135		suite.T().Fatalf("Failed to get num servers: %s", err)
1136	}
1137
1138	s.PushOp(agent.Stats(StatsOptions{
1139		Key: "",
1140	}, func(res *StatsResult, err error) {
1141		s.Wrap(func() {
1142			if len(res.Servers) != numServers {
1143				s.Fatalf("Didn't Get all stats!")
1144			}
1145			for srv, curStats := range res.Servers {
1146				if curStats.Error != nil {
1147					s.Fatalf("Got error %v in stats for %s", curStats.Error, srv)
1148				}
1149
1150				if curStats.Stats == nil || len(curStats.Stats) == 0 {
1151					s.Fatalf("Got no stats in stats for %s", srv)
1152				}
1153			}
1154		})
1155	}))
1156	s.Wait(0)
1157}
1158
1159func (suite *StandardTestSuite) TestGetHttpEps() {
1160	agent, _ := suite.GetAgentAndHarness()
1161
1162	// Relies on a 3.0.0+ server
1163	n1qlEpList := agent.N1qlEps()
1164	if len(n1qlEpList) == 0 {
1165		suite.T().Fatalf("Failed to retrieve N1QL endpoint list")
1166	}
1167
1168	mgmtEpList := agent.MgmtEps()
1169	if len(mgmtEpList) == 0 {
1170		suite.T().Fatalf("Failed to retrieve N1QL endpoint list")
1171	}
1172
1173	capiEpList := agent.CapiEps()
1174	if len(capiEpList) == 0 {
1175		suite.T().Fatalf("Failed to retrieve N1QL endpoint list")
1176	}
1177}
1178
1179func (suite *StandardTestSuite) TestMemcachedBucket() {
1180	suite.EnsureSupportsFeature(TestFeatureMemd)
1181
1182	s := suite.GetHarness()
1183	agent := suite.MemdAgent()
1184
1185	s.PushOp(agent.Set(SetOptions{
1186		Key:   []byte("key"),
1187		Value: []byte("value"),
1188	}, func(res *StoreResult, err error) {
1189		s.Wrap(func() {
1190			if err != nil {
1191				s.Fatalf("Got error for Set: %v", err)
1192			}
1193		})
1194	}))
1195	s.Wait(0)
1196
1197	s.PushOp(agent.Get(GetOptions{
1198		Key: []byte("key"),
1199	}, func(res *GetResult, err error) {
1200		s.Wrap(func() {
1201			if err != nil {
1202				s.Fatalf("Couldn't Get back key: %v", err)
1203			}
1204			if string(res.Value) != "value" {
1205				s.Fatalf("Got back wrong value!")
1206			}
1207		})
1208	}))
1209	s.Wait(0)
1210
1211	// Try to perform Observe: should fail since this isn't supported on Memcached buckets
1212	_, err := agent.Observe(ObserveOptions{
1213		Key: []byte("key"),
1214	}, func(res *ObserveResult, err error) {
1215		s.Wrap(func() {
1216			s.Fatalf("Scheduling should fail on memcached buckets!")
1217		})
1218	})
1219
1220	if !errors.Is(err, ErrFeatureNotAvailable) {
1221		suite.T().Fatalf("Expected observe error for memcached bucket!")
1222	}
1223}
1224
1225func (suite *StandardTestSuite) TestFlagsRoundTrip() {
1226	// Ensure flags are round-tripped with the server correctly.
1227	agent, s := suite.GetAgentAndHarness()
1228
1229	s.PushOp(agent.Set(SetOptions{
1230		Key:            []byte("flagskey"),
1231		Value:          []byte("{}"),
1232		Flags:          0x99889988,
1233		CollectionName: suite.CollectionName,
1234		ScopeName:      suite.ScopeName,
1235	}, func(res *StoreResult, err error) {
1236		s.Wrap(func() {
1237			if err != nil {
1238				s.Fatalf("Got error for Set: %v", err)
1239			}
1240		})
1241	}))
1242	s.Wait(0)
1243
1244	s.PushOp(agent.Get(GetOptions{
1245		Key:            []byte("flagskey"),
1246		CollectionName: suite.CollectionName,
1247		ScopeName:      suite.ScopeName,
1248	}, func(res *GetResult, err error) {
1249		s.Wrap(func() {
1250			if err != nil {
1251				s.Fatalf("Couldn't Get back key: %v", err)
1252			}
1253			if res.Flags != 0x99889988 {
1254				s.Fatalf("flags failed to round-trip")
1255			}
1256		})
1257	}))
1258	s.Wait(0)
1259}
1260
1261func (suite *StandardTestSuite) TestMetaOps() {
1262	suite.EnsureSupportsFeature(TestFeatureGetMeta)
1263
1264	agent, s := suite.GetAgentAndHarness()
1265
1266	var currentCas Cas
1267
1268	// Set
1269
1270	s.PushOp(agent.Set(SetOptions{
1271		Key:   []byte("test"),
1272		Value: []byte("{}"),
1273	}, func(res *StoreResult, err error) {
1274		s.Wrap(func() {
1275			if err != nil {
1276				s.Fatalf("Set operation failed")
1277			}
1278			if res.Cas == Cas(0) {
1279				s.Fatalf("Invalid cas received")
1280			}
1281
1282			currentCas = res.Cas
1283		})
1284	}))
1285	s.Wait(0)
1286
1287	// GetMeta
1288	s.PushOp(agent.GetMeta(GetMetaOptions{
1289		Key: []byte("test"),
1290	}, func(res *GetMetaResult, err error) {
1291		s.Wrap(func() {
1292			if err != nil {
1293				s.Fatalf("GetMeta operation failed")
1294			}
1295			if res.Expiry != 0 {
1296				s.Fatalf("Invalid expiry received")
1297			}
1298			if res.Deleted != 0 {
1299				s.Fatalf("Invalid deleted flag received")
1300			}
1301			if res.Cas != currentCas {
1302				s.Fatalf("Invalid cas received")
1303			}
1304		})
1305	}))
1306	s.Wait(0)
1307}
1308
1309func (suite *StandardTestSuite) TestPing() {
1310	agent, s := suite.GetAgentAndHarness()
1311
1312	s.PushOp(agent.Ping(PingOptions{}, func(res *PingResult, err error) {
1313		s.Wrap(func() {
1314			if len(res.Services) == 0 {
1315				s.Fatalf("Ping report contained no results")
1316			}
1317		})
1318	}))
1319	s.Wait(5)
1320}
1321
1322func (suite *StandardTestSuite) TestDiagnostics() {
1323	agent, _ := suite.GetAgentAndHarness()
1324
1325	report, err := agent.Diagnostics(DiagnosticsOptions{})
1326	if err != nil {
1327		suite.T().Fatalf("Failed to fetch diagnostics: %s", err)
1328	}
1329
1330	if len(report.MemdConns) == 0 {
1331		suite.T().Fatalf("Diagnostics report contained no results")
1332	}
1333
1334	for _, conn := range report.MemdConns {
1335		if conn.RemoteAddr == "" {
1336			suite.T().Fatalf("Diagnostic report contained invalid entry")
1337		}
1338	}
1339}
1340
1341type testAlternateAddressesRouteConfigMgr struct {
1342	cfg       *routeConfig
1343	cfgCalled bool
1344}
1345
1346func (taa *testAlternateAddressesRouteConfigMgr) OnNewRouteConfig(cfg *routeConfig) {
1347	taa.cfgCalled = true
1348	taa.cfg = cfg
1349}
1350
1351func (suite *StandardTestSuite) TestAlternateAddressesEmptyStringConfig() {
1352	cfgBk := suite.LoadConfigFromFile("testdata/bucket_config_with_external_addresses.json")
1353
1354	mgr := &testAlternateAddressesRouteConfigMgr{}
1355	cfgManager := newConfigManager(configManagerProperties{
1356		SrcMemdAddrs: []string{"192.168.132.234:32799"},
1357	})
1358
1359	cfgManager.AddConfigWatcher(mgr)
1360	cfgManager.OnNewConfig(cfgBk)
1361
1362	networkType := cfgManager.NetworkType()
1363	if networkType != "external" {
1364		suite.T().Fatalf("Expected agent networkType to be external, was %s", networkType)
1365	}
1366
1367	for i, server := range mgr.cfg.kvServerList {
1368		cfgBkNode := cfgBk.NodesExt[i]
1369		port := cfgBkNode.AltAddresses["external"].Ports.Kv
1370		cfgBkServer := fmt.Sprintf("%s:%d", cfgBkNode.AltAddresses["external"].Hostname, port)
1371		if server != cfgBkServer {
1372			suite.T().Fatalf("Expected kv server to be %s but was %s", cfgBkServer, server)
1373		}
1374	}
1375}
1376
1377func (suite *StandardTestSuite) TestAlternateAddressesAutoConfig() {
1378	cfgBk := suite.LoadConfigFromFile("testdata/bucket_config_with_external_addresses.json")
1379
1380	mgr := &testAlternateAddressesRouteConfigMgr{}
1381	cfgManager := newConfigManager(configManagerProperties{
1382		NetworkType:  "auto",
1383		SrcMemdAddrs: []string{"192.168.132.234:32799"},
1384	})
1385	cfgManager.AddConfigWatcher(mgr)
1386	cfgManager.OnNewConfig(cfgBk)
1387
1388	networkType := cfgManager.NetworkType()
1389	if networkType != "external" {
1390		suite.T().Fatalf("Expected agent networkType to be external, was %s", networkType)
1391	}
1392
1393	for i, server := range mgr.cfg.kvServerList {
1394		cfgBkNode := cfgBk.NodesExt[i]
1395		port := cfgBkNode.AltAddresses["external"].Ports.Kv
1396		cfgBkServer := fmt.Sprintf("%s:%d", cfgBkNode.AltAddresses["external"].Hostname, port)
1397		if server != cfgBkServer {
1398			suite.T().Fatalf("Expected kv server to be %s but was %s", cfgBkServer, server)
1399		}
1400	}
1401}
1402
1403func (suite *StandardTestSuite) TestAlternateAddressesAutoInternalConfig() {
1404	cfgBk := suite.LoadConfigFromFile("testdata/bucket_config_with_external_addresses.json")
1405
1406	mgr := &testAlternateAddressesRouteConfigMgr{}
1407	cfgManager := newConfigManager(configManagerProperties{
1408		NetworkType:  "auto",
1409		SrcMemdAddrs: []string{"172.17.0.4:11210"},
1410	})
1411
1412	cfgManager.AddConfigWatcher(mgr)
1413	cfgManager.OnNewConfig(cfgBk)
1414
1415	networkType := cfgManager.NetworkType()
1416	if networkType != "default" {
1417		suite.T().Fatalf("Expected agent networkType to be external, was %s", networkType)
1418	}
1419
1420	for i, server := range mgr.cfg.kvServerList {
1421		cfgBkNode := cfgBk.NodesExt[i]
1422		port := cfgBkNode.Services.Kv
1423		cfgBkServer := fmt.Sprintf("%s:%d", cfgBkNode.Hostname, port)
1424		if server != cfgBkServer {
1425			suite.T().Fatalf("Expected kv server to be %s but was %s", cfgBkServer, server)
1426		}
1427	}
1428}
1429
1430func (suite *StandardTestSuite) TestAlternateAddressesDefaultConfig() {
1431	cfgBk := suite.LoadConfigFromFile("testdata/bucket_config_with_external_addresses.json")
1432
1433	mgr := &testAlternateAddressesRouteConfigMgr{}
1434	cfgManager := newConfigManager(configManagerProperties{
1435		NetworkType:  "default",
1436		SrcMemdAddrs: []string{"192.168.132.234:32799"},
1437	})
1438	cfgManager.AddConfigWatcher(mgr)
1439	cfgManager.OnNewConfig(cfgBk)
1440
1441	networkType := cfgManager.NetworkType()
1442	if networkType != "default" {
1443		suite.T().Fatalf("Expected agent networkType to be default, was %s", networkType)
1444	}
1445
1446	for i, server := range mgr.cfg.kvServerList {
1447		cfgBkNode := cfgBk.NodesExt[i]
1448		port := cfgBkNode.Services.Kv
1449		cfgBkServer := fmt.Sprintf("%s:%d", cfgBkNode.Hostname, port)
1450		if server != cfgBkServer {
1451			suite.T().Fatalf("Expected kv server to be %s but was %s", cfgBkServer, server)
1452		}
1453	}
1454}
1455
1456func (suite *StandardTestSuite) TestAlternateAddressesExternalConfig() {
1457	cfgBk := suite.LoadConfigFromFile("testdata/bucket_config_with_external_addresses.json")
1458
1459	mgr := &testAlternateAddressesRouteConfigMgr{}
1460	cfgManager := newConfigManager(configManagerProperties{
1461		NetworkType:  "external",
1462		SrcMemdAddrs: []string{"192.168.132.234:32799"},
1463	})
1464	cfgManager.AddConfigWatcher(mgr)
1465	cfgManager.OnNewConfig(cfgBk)
1466
1467	networkType := cfgManager.NetworkType()
1468	if networkType != "external" {
1469		suite.T().Fatalf("Expected agent networkType to be external, was %s", networkType)
1470	}
1471
1472	for i, server := range mgr.cfg.kvServerList {
1473		cfgBkNode := cfgBk.NodesExt[i]
1474		port := cfgBkNode.AltAddresses["external"].Ports.Kv
1475		cfgBkServer := fmt.Sprintf("%s:%d", cfgBkNode.AltAddresses["external"].Hostname, port)
1476		if server != cfgBkServer {
1477			suite.T().Fatalf("Expected kv server to be %s but was %s", cfgBkServer, server)
1478		}
1479	}
1480}
1481
1482func (suite *StandardTestSuite) TestAlternateAddressesExternalConfigNoPorts() {
1483	cfgBk := suite.LoadConfigFromFile("testdata/bucket_config_with_external_addresses_without_ports.json")
1484
1485	mgr := &testAlternateAddressesRouteConfigMgr{}
1486	cfgManager := newConfigManager(configManagerProperties{
1487		NetworkType:  "external",
1488		SrcMemdAddrs: []string{"192.168.132.234:32799"},
1489	})
1490	cfgManager.AddConfigWatcher(mgr)
1491	cfgManager.OnNewConfig(cfgBk)
1492
1493	networkType := cfgManager.NetworkType()
1494	if networkType != "external" {
1495		suite.T().Fatalf("Expected agent networkType to be external, was %s", networkType)
1496	}
1497
1498	for i, server := range mgr.cfg.kvServerList {
1499		cfgBkNode := cfgBk.NodesExt[i]
1500		port := cfgBkNode.Services.Kv
1501		cfgBkServer := fmt.Sprintf("%s:%d", cfgBkNode.AltAddresses["external"].Hostname, port)
1502		if server != cfgBkServer {
1503			suite.T().Fatalf("Expected kv server to be %s but was %s", cfgBkServer, server)
1504		}
1505	}
1506}
1507
1508func (suite *StandardTestSuite) TestAlternateAddressesInvalidConfig() {
1509	cfgBk := suite.LoadConfigFromFile("testdata/bucket_config_with_external_addresses.json")
1510
1511	mgr := &testAlternateAddressesRouteConfigMgr{}
1512	cfgManager := newConfigManager(configManagerProperties{
1513		NetworkType:  "invalid",
1514		SrcMemdAddrs: []string{"192.168.132.234:32799"},
1515	})
1516
1517	cfgManager.AddConfigWatcher(mgr)
1518	cfgManager.OnNewConfig(cfgBk)
1519
1520	networkType := cfgManager.NetworkType()
1521	if networkType != "invalid" {
1522		suite.T().Fatalf("Expected agent networkType to be invalid, was %s", networkType)
1523	}
1524
1525	if mgr.cfgCalled {
1526		suite.T().Fatalf("Expected route config to not be propagated, was propagated")
1527	}
1528}
1529
1530func (suite *StandardTestSuite) TestAgentWaitUntilReadyGCCCP() {
1531	suite.EnsureSupportsFeature(TestFeatureGCCCP)
1532
1533	cfg := suite.makeAgentConfig(globalTestConfig)
1534	agent, err := CreateAgent(&cfg)
1535	suite.Require().Nil(err, err)
1536	defer agent.Close()
1537	s := suite.GetHarness()
1538
1539	s.PushOp(agent.WaitUntilReady(time.Now().Add(5*time.Second), WaitUntilReadyOptions{}, func(result *WaitUntilReadyResult, err error) {
1540		s.Wrap(func() {
1541			if err != nil {
1542				s.Fatalf("WaitUntilReady failed with error: %v", err)
1543			}
1544		})
1545	}))
1546	s.Wait(6)
1547
1548	s.PushOp(agent.Ping(PingOptions{
1549		ServiceTypes: []ServiceType{N1qlService},
1550		N1QLDeadline: time.Now().Add(5 * time.Second),
1551	}, func(result *PingResult, err error) {
1552		s.Wrap(func() {
1553			if err != nil {
1554				s.Fatalf("Ping failed with error: %v", err)
1555			}
1556		})
1557	}))
1558	s.Wait(0)
1559}
1560
1561func (suite *StandardTestSuite) VerifyConnectedToBucket(agent *Agent, s *TestSubHarness, test string) {
1562	s.PushOp(agent.WaitUntilReady(time.Now().Add(5*time.Second), WaitUntilReadyOptions{}, func(result *WaitUntilReadyResult, err error) {
1563		s.Wrap(func() {
1564			if err != nil {
1565				s.Fatalf("WaitUntilReady failed with error: %v", err)
1566			}
1567		})
1568	}))
1569	s.Wait(6)
1570
1571	s.PushOp(agent.Set(SetOptions{
1572		Key:            []byte(test),
1573		Value:          []byte("{}"),
1574		CollectionName: suite.CollectionName,
1575		ScopeName:      suite.ScopeName,
1576	}, func(res *StoreResult, err error) {
1577		s.Wrap(func() {
1578			if err != nil {
1579				s.Fatalf("Got error for Set: %v", err)
1580			}
1581		})
1582	}))
1583	s.Wait(0)
1584}
1585
1586func (suite *StandardTestSuite) TestAgentWaitUntilReadyBucket() {
1587	cfg := suite.makeAgentConfig(globalTestConfig)
1588	cfg.BucketName = globalTestConfig.BucketName
1589	agent, err := CreateAgent(&cfg)
1590	suite.Require().Nil(err, err)
1591	defer agent.Close()
1592	s := suite.GetHarness()
1593
1594	suite.VerifyConnectedToBucket(agent, s, "TestAgentWaitUntilReadyBucket")
1595}
1596
1597func (suite *StandardTestSuite) TestAgentGroupWaitUntilReadyGCCCP() {
1598	suite.EnsureSupportsFeature(TestFeatureGCCCP)
1599
1600	cfg := suite.makeAgentGroupConfig(globalTestConfig)
1601	ag, err := CreateAgentGroup(&cfg)
1602	suite.Require().Nil(err, err)
1603	defer ag.Close()
1604	s := suite.GetHarness()
1605
1606	s.PushOp(ag.WaitUntilReady(time.Now().Add(5*time.Second), WaitUntilReadyOptions{}, func(result *WaitUntilReadyResult, err error) {
1607		s.Wrap(func() {
1608			if err != nil {
1609				s.Fatalf("WaitUntilReady failed with error: %v", err)
1610			}
1611		})
1612	}))
1613	s.Wait(6)
1614
1615	s.PushOp(ag.Ping(PingOptions{
1616		ServiceTypes: []ServiceType{N1qlService},
1617		N1QLDeadline: time.Now().Add(5 * time.Second),
1618	}, func(result *PingResult, err error) {
1619		s.Wrap(func() {
1620			if err != nil {
1621				s.Fatalf("Ping failed with error: %v", err)
1622			}
1623		})
1624	}))
1625	s.Wait(0)
1626}
1627
1628// This test cannot run against mock as the mock does not respond with 200 status code for all of the endpoints.
1629func (suite *StandardTestSuite) TestAgentGroupWaitUntilReadyBucket() {
1630	suite.EnsureSupportsFeature(TestFeaturePingServices)
1631
1632	cfg := suite.makeAgentGroupConfig(globalTestConfig)
1633	ag, err := CreateAgentGroup(&cfg)
1634	suite.Require().Nil(err, err)
1635	defer ag.Close()
1636	s := suite.GetHarness()
1637
1638	err = ag.OpenBucket(globalTestConfig.BucketName)
1639	suite.Require().Nil(err, err)
1640
1641	agent := ag.GetAgent("default")
1642	suite.Require().NotNil(agent)
1643
1644	suite.VerifyConnectedToBucket(agent, s, "TestAgentGroupWaitUntilReadyBucket")
1645}
1646
1647func (suite *StandardTestSuite) TestConnectHTTPOnlyDefaultPort() {
1648	cfg := suite.makeAgentConfig(globalTestConfig)
1649	if len(cfg.HTTPAddrs) == 0 {
1650		suite.T().Skip("Skipping test due to no HTTP addresses")
1651	}
1652
1653	addr1 := cfg.HTTPAddrs[0]
1654	port := strings.Split(addr1, ":")[1]
1655	if port != "8091" {
1656		suite.T().Skipf("Skipping test due to non default port %s", port)
1657	}
1658
1659	cfg.HTTPAddrs = []string{addr1}
1660	cfg.MemdAddrs = []string{}
1661	cfg.BucketName = globalTestConfig.BucketName
1662	agent, err := CreateAgent(&cfg)
1663	suite.Require().Nil(err, err)
1664	defer agent.Close()
1665	s := suite.GetHarness()
1666
1667	suite.VerifyConnectedToBucket(agent, s, "TestConnectHTTPOnlyDefaultPort")
1668}
1669
1670func (suite *StandardTestSuite) TestConnectHTTPOnlyDefaultPortSSL() {
1671	suite.EnsureSupportsFeature(TestFeatureSsl)
1672
1673	cfg := suite.makeAgentConfig(globalTestConfig)
1674	if len(cfg.HTTPAddrs) == 0 {
1675		suite.T().Skip("Skipping test due to no HTTP addresses")
1676	}
1677
1678	addr1 := cfg.HTTPAddrs[0]
1679	parts := strings.Split(addr1, ":")
1680	if parts[1] != "8091" {
1681		suite.T().Skipf("Skipping test due to non default port %s", parts[1])
1682	}
1683
1684	cfg.HTTPAddrs = []string{parts[0] + ":" + "18091"}
1685	cfg.MemdAddrs = []string{}
1686	cfg.UseTLS = true
1687	// SkipVerify
1688	cfg.TLSRootCAProvider = func() *x509.CertPool {
1689		return nil
1690	}
1691	cfg.BucketName = globalTestConfig.BucketName
1692	agent, err := CreateAgent(&cfg)
1693	suite.Require().Nil(err, err)
1694	defer agent.Close()
1695	s := suite.GetHarness()
1696
1697	suite.VerifyConnectedToBucket(agent, s, "TestConnectHTTPOnlyDefaultPortSSL")
1698}
1699
1700func (suite *StandardTestSuite) TestConnectHTTPOnlyDefaultPortFastFailInvalidBucket() {
1701	cfg := suite.makeAgentConfig(globalTestConfig)
1702	if len(cfg.HTTPAddrs) == 0 {
1703		suite.T().Skip("Skipping test due to no HTTP addresses")
1704	}
1705
1706	addr1 := cfg.HTTPAddrs[0]
1707	port := strings.Split(addr1, ":")[1]
1708	if port != "8091" {
1709		suite.T().Skipf("Skipping test due to non default port %s", port)
1710	}
1711
1712	cfg.HTTPAddrs = []string{addr1}
1713	cfg.MemdAddrs = []string{}
1714	cfg.BucketName = "idontexist"
1715	agent, err := CreateAgent(&cfg)
1716	suite.Require().Nil(err, err)
1717	defer agent.Close()
1718	s := suite.GetHarness()
1719
1720	start := time.Now()
1721	s.PushOp(agent.WaitUntilReady(time.Now().Add(5*time.Second), WaitUntilReadyOptions{
1722		RetryStrategy: newFailFastRetryStrategy(),
1723	}, func(result *WaitUntilReadyResult, err error) {
1724		s.Wrap(func() {
1725			if err == nil {
1726				s.Fatalf("WaitUntilReady failed without error")
1727			}
1728			if !errors.Is(err, ErrAuthenticationFailure) {
1729				s.Fatalf("WaitUntilReady should have failed with auth error but was %v", err)
1730			}
1731			if time.Since(start) > 5*time.Second {
1732				s.Fatalf("WaitUntilReady should have failed before the timeout duration, was %s", time.Since(start))
1733			}
1734		})
1735	}))
1736	s.Wait(6)
1737}
1738
1739func (suite *StandardTestSuite) TestConnectHTTPOnlyNonDefaultPort() {
1740	cfg := suite.makeAgentConfig(globalTestConfig)
1741	if len(cfg.HTTPAddrs) == 0 {
1742		suite.T().Skip("Skipping test due to no HTTP addresses")
1743	}
1744
1745	addr1 := cfg.HTTPAddrs[0]
1746	port := strings.Split(addr1, ":")[1]
1747	if port == "8091" {
1748		suite.T().Skipf("Skipping test due to default port %s", port)
1749	}
1750
1751	cfg.HTTPAddrs = []string{addr1}
1752	cfg.MemdAddrs = []string{}
1753	cfg.BucketName = globalTestConfig.BucketName
1754	agent, err := CreateAgent(&cfg)
1755	suite.Require().Nil(err, err)
1756	defer agent.Close()
1757	s := suite.GetHarness()
1758
1759	suite.VerifyConnectedToBucket(agent, s, "TestConnectHTTPOnlyNonDefaultPort")
1760}
1761
1762func (suite *StandardTestSuite) TestConnectHTTPOnlyNonDefaultPortFastFailInvalidBucket() {
1763	cfg := suite.makeAgentConfig(globalTestConfig)
1764	if len(cfg.HTTPAddrs) == 0 {
1765		suite.T().Skip("Skipping test due to no HTTP addresses")
1766	}
1767
1768	addr1 := cfg.HTTPAddrs[0]
1769	port := strings.Split(addr1, ":")[1]
1770	if port == "8091" {
1771		suite.T().Skipf("Skipping test due to default port %s", port)
1772	}
1773
1774	cfg.HTTPAddrs = []string{addr1}
1775	cfg.MemdAddrs = []string{}
1776	cfg.BucketName = "idontexist"
1777	agent, err := CreateAgent(&cfg)
1778	suite.Require().Nil(err, err)
1779	defer agent.Close()
1780	s := suite.GetHarness()
1781
1782	start := time.Now()
1783	s.PushOp(agent.WaitUntilReady(time.Now().Add(5*time.Second), WaitUntilReadyOptions{
1784		RetryStrategy: newFailFastRetryStrategy(),
1785	}, func(result *WaitUntilReadyResult, err error) {
1786		s.Wrap(func() {
1787			if err == nil {
1788				s.Fatalf("WaitUntilReady failed without error")
1789			}
1790			if !errors.Is(err, ErrAuthenticationFailure) {
1791				s.Fatalf("WaitUntilReady should have failed with auth error but was %v", err)
1792			}
1793			if time.Since(start) > 5*time.Second {
1794				s.Fatalf("WaitUntilReady should have failed before the timeout duration, was %s", time.Since(start))
1795			}
1796		})
1797	}))
1798	s.Wait(6)
1799}
1800
1801func (suite *StandardTestSuite) TestConnectMemdOnlyDefaultPort() {
1802	cfg := suite.makeAgentConfig(globalTestConfig)
1803	if len(cfg.MemdAddrs) == 0 {
1804		suite.T().Skip("Skipping test due to no Memd addresses")
1805	}
1806
1807	addr1 := cfg.MemdAddrs[0]
1808	port := strings.Split(addr1, ":")[1]
1809	if port != "11210" {
1810		suite.T().Skipf("Skipping test due to non default port %s", port)
1811	}
1812
1813	cfg.HTTPAddrs = []string{}
1814	cfg.MemdAddrs = []string{addr1}
1815	cfg.BucketName = globalTestConfig.BucketName
1816	agent, err := CreateAgent(&cfg)
1817	suite.Require().Nil(err, err)
1818	defer agent.Close()
1819	s := suite.GetHarness()
1820
1821	suite.VerifyConnectedToBucket(agent, s, "TestConnectMemdOnlyDefaultPort")
1822}
1823
1824func (suite *StandardTestSuite) TestConnectMemdOnlyDefaultPortSSL() {
1825	suite.EnsureSupportsFeature(TestFeatureSsl)
1826
1827	cfg := suite.makeAgentConfig(globalTestConfig)
1828	if len(cfg.MemdAddrs) == 0 {
1829		suite.T().Skip("Skipping test due to no memd addresses")
1830	}
1831
1832	addr1 := cfg.MemdAddrs[0]
1833	parts := strings.Split(addr1, ":")
1834	if parts[1] != "11210" {
1835		suite.T().Skipf("Skipping test due to non default port %s", parts[1])
1836	}
1837
1838	cfg.HTTPAddrs = []string{}
1839	cfg.MemdAddrs = []string{parts[0] + ":11207"}
1840	cfg.UseTLS = true
1841	// SkipVerify
1842	cfg.TLSRootCAProvider = func() *x509.CertPool {
1843		return nil
1844	}
1845	cfg.BucketName = globalTestConfig.BucketName
1846	agent, err := CreateAgent(&cfg)
1847	suite.Require().Nil(err, err)
1848	defer agent.Close()
1849	s := suite.GetHarness()
1850
1851	suite.VerifyConnectedToBucket(agent, s, "TestConnectMemdOnlyDefaultPortSSL")
1852}
1853
1854func (suite *StandardTestSuite) TestConnectMemdOnlyNonDefaultPort() {
1855	cfg := suite.makeAgentConfig(globalTestConfig)
1856	if len(cfg.MemdAddrs) == 0 {
1857		suite.T().Skip("Skipping test due to no memd addresses")
1858	}
1859
1860	addr1 := cfg.MemdAddrs[0]
1861	port := strings.Split(addr1, ":")[1]
1862	if port == "8091" {
1863		suite.T().Skipf("Skipping test due to default port %s", port)
1864	}
1865
1866	cfg.HTTPAddrs = []string{}
1867	cfg.MemdAddrs = []string{addr1}
1868	cfg.BucketName = globalTestConfig.BucketName
1869	agent, err := CreateAgent(&cfg)
1870	suite.Require().Nil(err, err)
1871	defer agent.Close()
1872	s := suite.GetHarness()
1873
1874	suite.VerifyConnectedToBucket(agent, s, "TestConnectMemdOnlyNonDefaultPort")
1875}
1876
1877func (suite *StandardTestSuite) TestConnectMemdOnlyDefaultPortFastFailInvalidBucket() {
1878	cfg := suite.makeAgentConfig(globalTestConfig)
1879	if len(cfg.MemdAddrs) == 0 {
1880		suite.T().Skip("Skipping test due to no memd addresses")
1881	}
1882
1883	addr1 := cfg.MemdAddrs[0]
1884	port := strings.Split(addr1, ":")[1]
1885	if port != "11210" {
1886		suite.T().Skipf("Skipping test due to non default port %s", port)
1887	}
1888
1889	cfg.HTTPAddrs = []string{}
1890	cfg.MemdAddrs = []string{addr1}
1891	cfg.BucketName = "idontexist"
1892	agent, err := CreateAgent(&cfg)
1893	suite.Require().Nil(err, err)
1894	defer agent.Close()
1895	s := suite.GetHarness()
1896
1897	start := time.Now()
1898	s.PushOp(agent.WaitUntilReady(time.Now().Add(5*time.Second), WaitUntilReadyOptions{
1899		RetryStrategy: newFailFastRetryStrategy(),
1900	}, func(result *WaitUntilReadyResult, err error) {
1901		s.Wrap(func() {
1902			if err == nil {
1903				s.Fatalf("WaitUntilReady failed without error")
1904			}
1905			if !errors.Is(err, ErrAuthenticationFailure) {
1906				s.Fatalf("WaitUntilReady should have failed with auth error but was %v", err)
1907			}
1908			if time.Since(start) > 5*time.Second {
1909				s.Fatalf("WaitUntilReady should have failed before the timeout duration, was %s", time.Since(start))
1910			}
1911		})
1912	}))
1913	s.Wait(6)
1914}
1915
1916// These functions are likely temporary.
1917
1918type testManifestWithError struct {
1919	Manifest Manifest
1920	Err      error
1921}
1922
1923func testCreateScope(name, bucketName string, agent *Agent) (*Manifest, error) {
1924	data := url.Values{}
1925	data.Set("name", name)
1926
1927	req := &HTTPRequest{
1928		Service:  MgmtService,
1929		Path:     fmt.Sprintf("/pools/default/buckets/%s/collections", bucketName),
1930		Method:   "POST",
1931		Body:     []byte(data.Encode()),
1932		Headers:  make(map[string]string),
1933		Deadline: time.Now().Add(10 * time.Second),
1934	}
1935
1936	req.Headers["Content-Type"] = "application/x-www-form-urlencoded"
1937
1938	resCh := make(chan *HTTPResponse)
1939	errCh := make(chan error)
1940	_, err := agent.DoHTTPRequest(req, func(response *HTTPResponse, err error) {
1941		if err != nil {
1942			errCh <- err
1943			return
1944		}
1945		resCh <- response
1946	})
1947	if err != nil {
1948		return nil, err
1949	}
1950
1951	var resp *HTTPResponse
1952	select {
1953	case respErr := <-errCh:
1954		if respErr != nil {
1955			return nil, respErr
1956		}
1957	case res := <-resCh:
1958		resp = res
1959	}
1960
1961	if resp.StatusCode >= 300 {
1962		data, err := ioutil.ReadAll(resp.Body)
1963		if err != nil {
1964			return nil, fmt.Errorf("could not create scope, status code: %d", resp.StatusCode)
1965		}
1966		err = resp.Body.Close()
1967		if err != nil {
1968			logDebugf("Failed to close response body")
1969		}
1970		return nil, fmt.Errorf("could not create scope, %s", string(data))
1971	}
1972
1973	respBody := struct {
1974		UID string `json:"uid"`
1975	}{}
1976	jsonDec := json.NewDecoder(resp.Body)
1977	err = jsonDec.Decode(&respBody)
1978	if err != nil {
1979		return nil, err
1980	}
1981	err = resp.Body.Close()
1982	if err != nil {
1983		return nil, err
1984	}
1985
1986	uid, err := strconv.ParseInt(respBody.UID, 16, 64)
1987	if err != nil {
1988		return nil, err
1989	}
1990
1991	timer := time.NewTimer(20 * time.Second)
1992	waitCh := make(chan testManifestWithError, 1)
1993	go waitForManifest(agent, uint64(uid), waitCh)
1994
1995	for {
1996		select {
1997		case <-timer.C:
1998			return nil, errors.New("wait time for scope to become available expired")
1999		case manifest := <-waitCh:
2000			if manifest.Err != nil {
2001				return nil, manifest.Err
2002			}
2003
2004			return &manifest.Manifest, nil
2005		}
2006	}
2007}
2008
2009func testDeleteScope(name, bucketName string, agent *Agent, waitForDeletion bool) (*Manifest, error) {
2010	data := url.Values{}
2011	data.Set("name", name)
2012
2013	req := &HTTPRequest{
2014		Service:  MgmtService,
2015		Path:     fmt.Sprintf("/pools/default/buckets/%s/collections/%s", bucketName, name),
2016		Method:   "DELETE",
2017		Headers:  make(map[string]string),
2018		Deadline: time.Now().Add(10 * time.Second),
2019	}
2020
2021	resCh := make(chan *HTTPResponse)
2022	errCh := make(chan error)
2023	_, err := agent.DoHTTPRequest(req, func(response *HTTPResponse, err error) {
2024		if err != nil {
2025			errCh <- err
2026			return
2027		}
2028		resCh <- response
2029	})
2030	if err != nil {
2031		return nil, err
2032	}
2033
2034	var resp *HTTPResponse
2035	select {
2036	case respErr := <-errCh:
2037		if respErr != nil {
2038			return nil, respErr
2039		}
2040	case res := <-resCh:
2041		resp = res
2042	}
2043
2044	if err != nil {
2045		return nil, err
2046	}
2047	if resp.StatusCode >= 300 {
2048		data, err := ioutil.ReadAll(resp.Body)
2049		if err != nil {
2050			return nil, fmt.Errorf("could not delete scope, status code: %d", resp.StatusCode)
2051		}
2052		err = resp.Body.Close()
2053		if err != nil {
2054			logDebugf("Failed to close response body")
2055		}
2056		return nil, fmt.Errorf("could not delete scope, %s", string(data))
2057	}
2058
2059	respBody := struct {
2060		UID string `json:"uid"`
2061	}{}
2062	jsonDec := json.NewDecoder(resp.Body)
2063	err = jsonDec.Decode(&respBody)
2064	if err != nil {
2065		return nil, err
2066	}
2067	err = resp.Body.Close()
2068	if err != nil {
2069		return nil, err
2070	}
2071
2072	uid, err := strconv.ParseInt(respBody.UID, 16, 64)
2073	if err != nil {
2074		return nil, err
2075	}
2076
2077	timer := time.NewTimer(20 * time.Second)
2078	waitCh := make(chan testManifestWithError, 1)
2079	go waitForManifest(agent, uint64(uid), waitCh)
2080
2081	for {
2082		select {
2083		case <-timer.C:
2084			return nil, errors.New("wait time for scope to become deleted expired")
2085		case manifest := <-waitCh:
2086			if manifest.Err != nil {
2087				return nil, manifest.Err
2088			}
2089
2090			return &manifest.Manifest, nil
2091		}
2092	}
2093
2094}
2095
2096func testCreateCollection(name, scopeName, bucketName string, agent *Agent) (*Manifest, error) {
2097	if scopeName == "" {
2098		scopeName = "_default"
2099	}
2100	if name == "" {
2101		name = "_default"
2102	}
2103
2104	data := url.Values{}
2105	data.Set("name", name)
2106
2107	req := &HTTPRequest{
2108		Service:  MgmtService,
2109		Path:     fmt.Sprintf("/pools/default/buckets/%s/collections/%s/", bucketName, scopeName),
2110		Method:   "POST",
2111		Body:     []byte(data.Encode()),
2112		Headers:  make(map[string]string),
2113		Deadline: time.Now().Add(10 * time.Second),
2114	}
2115
2116	req.Headers["Content-Type"] = "application/x-www-form-urlencoded"
2117
2118	resCh := make(chan *HTTPResponse)
2119	errCh := make(chan error)
2120	_, err := agent.DoHTTPRequest(req, func(response *HTTPResponse, err error) {
2121		if err != nil {
2122			errCh <- err
2123			return
2124		}
2125		resCh <- response
2126	})
2127	if err != nil {
2128		return nil, err
2129	}
2130
2131	var resp *HTTPResponse
2132	select {
2133	case respErr := <-errCh:
2134		if respErr != nil {
2135			return nil, respErr
2136		}
2137	case res := <-resCh:
2138		resp = res
2139	}
2140
2141	if resp.StatusCode >= 300 {
2142		data, err := ioutil.ReadAll(resp.Body)
2143		if err != nil {
2144			return nil, fmt.Errorf("could not create collection, status code: %d", resp.StatusCode)
2145		}
2146		err = resp.Body.Close()
2147		if err != nil {
2148			logDebugf("Failed to close response body")
2149		}
2150		return nil, fmt.Errorf("could not create collection, %s", string(data))
2151	}
2152
2153	respBody := struct {
2154		UID string `json:"uid"`
2155	}{}
2156	jsonDec := json.NewDecoder(resp.Body)
2157	err = jsonDec.Decode(&respBody)
2158	if err != nil {
2159		return nil, err
2160	}
2161	err = resp.Body.Close()
2162	if err != nil {
2163		return nil, err
2164	}
2165
2166	uid, err := strconv.ParseInt(respBody.UID, 16, 64)
2167	if err != nil {
2168		return nil, err
2169	}
2170
2171	timer := time.NewTimer(20 * time.Second)
2172	waitCh := make(chan testManifestWithError, 1)
2173	go waitForManifest(agent, uint64(uid), waitCh)
2174
2175	for {
2176		select {
2177		case <-timer.C:
2178			return nil, errors.New("wait time for collection to become available expired")
2179		case manifest := <-waitCh:
2180			if manifest.Err != nil {
2181				return nil, manifest.Err
2182			}
2183
2184			return &manifest.Manifest, nil
2185		}
2186	}
2187}
2188
2189func testDeleteCollection(name, scopeName, bucketName string, agent *Agent, waitForDeletion bool) (*Manifest, error) {
2190	if scopeName == "" {
2191		scopeName = "_default"
2192	}
2193	if name == "" {
2194		name = "_default"
2195	}
2196
2197	data := url.Values{}
2198	data.Set("name", name)
2199
2200	req := &HTTPRequest{
2201		Service:  MgmtService,
2202		Path:     fmt.Sprintf("/pools/default/buckets/%s/collections/%s/%s", bucketName, scopeName, name),
2203		Method:   "DELETE",
2204		Headers:  make(map[string]string),
2205		Deadline: time.Now().Add(10 * time.Second),
2206	}
2207
2208	resCh := make(chan *HTTPResponse)
2209	errCh := make(chan error)
2210	_, err := agent.DoHTTPRequest(req, func(response *HTTPResponse, err error) {
2211		if err != nil {
2212			errCh <- err
2213			return
2214		}
2215		resCh <- response
2216	})
2217	if err != nil {
2218		return nil, err
2219	}
2220
2221	var resp *HTTPResponse
2222	select {
2223	case respErr := <-errCh:
2224		if respErr != nil {
2225			return nil, respErr
2226		}
2227	case res := <-resCh:
2228		resp = res
2229	}
2230
2231	if err != nil {
2232		return nil, err
2233	}
2234	if resp.StatusCode >= 300 {
2235		data, err := ioutil.ReadAll(resp.Body)
2236		if err != nil {
2237			return nil, fmt.Errorf("could not delete collection, status code: %d", resp.StatusCode)
2238		}
2239		err = resp.Body.Close()
2240		if err != nil {
2241			logDebugf("Failed to close response body")
2242		}
2243		return nil, fmt.Errorf("could not delete collection, %s", string(data))
2244	}
2245
2246	respBody := struct {
2247		UID string `json:"uid"`
2248	}{}
2249	jsonDec := json.NewDecoder(resp.Body)
2250	err = jsonDec.Decode(&respBody)
2251	if err != nil {
2252		return nil, err
2253	}
2254	err = resp.Body.Close()
2255	if err != nil {
2256		return nil, err
2257	}
2258
2259	uid, err := strconv.ParseInt(respBody.UID, 16, 64)
2260	if err != nil {
2261		return nil, err
2262	}
2263
2264	timer := time.NewTimer(20 * time.Second)
2265	waitCh := make(chan testManifestWithError, 1)
2266	go waitForManifest(agent, uint64(uid), waitCh)
2267
2268	for {
2269		select {
2270		case <-timer.C:
2271			return nil, errors.New("wait time for collection to become deleted expired")
2272		case manifest := <-waitCh:
2273			if manifest.Err != nil {
2274				return nil, manifest.Err
2275			}
2276
2277			return &manifest.Manifest, nil
2278		}
2279	}
2280
2281}
2282
2283func waitForManifest(agent *Agent, manifestID uint64, manifestCh chan testManifestWithError) {
2284	var manifest Manifest
2285	for manifest.UID != manifestID {
2286		setCh := make(chan struct{})
2287		agent.GetCollectionManifest(GetCollectionManifestOptions{}, func(result *GetCollectionManifestResult, err error) {
2288			if err != nil {
2289				log.Println(err.Error())
2290				close(setCh)
2291				manifestCh <- testManifestWithError{Err: err}
2292				return
2293			}
2294
2295			err = json.Unmarshal(result.Manifest, &manifest)
2296			if err != nil {
2297				log.Println(err.Error())
2298				close(setCh)
2299				manifestCh <- testManifestWithError{Err: err}
2300				return
2301			}
2302
2303			if manifest.UID == manifestID {
2304				close(setCh)
2305				manifestCh <- testManifestWithError{Manifest: manifest}
2306				return
2307			}
2308			setCh <- struct{}{}
2309		})
2310		<-setCh
2311		time.Sleep(500 * time.Millisecond)
2312	}
2313}
2314