1package gocb
2
3import (
4	"errors"
5	"sync"
6	"time"
7
8	gocbcore "github.com/couchbase/gocbcore/v9"
9)
10
11type kvProvider interface {
12	Add(opts gocbcore.AddOptions, cb gocbcore.StoreCallback) (gocbcore.PendingOp, error)
13	Set(opts gocbcore.SetOptions, cb gocbcore.StoreCallback) (gocbcore.PendingOp, error)
14	Replace(opts gocbcore.ReplaceOptions, cb gocbcore.StoreCallback) (gocbcore.PendingOp, error)
15	Get(opts gocbcore.GetOptions, cb gocbcore.GetCallback) (gocbcore.PendingOp, error)
16	GetOneReplica(opts gocbcore.GetOneReplicaOptions, cb gocbcore.GetReplicaCallback) (gocbcore.PendingOp, error)
17	Observe(opts gocbcore.ObserveOptions, cb gocbcore.ObserveCallback) (gocbcore.PendingOp, error)
18	ObserveVb(opts gocbcore.ObserveVbOptions, cb gocbcore.ObserveVbCallback) (gocbcore.PendingOp, error)
19	GetMeta(opts gocbcore.GetMetaOptions, cb gocbcore.GetMetaCallback) (gocbcore.PendingOp, error)
20	Delete(opts gocbcore.DeleteOptions, cb gocbcore.DeleteCallback) (gocbcore.PendingOp, error)
21	LookupIn(opts gocbcore.LookupInOptions, cb gocbcore.LookupInCallback) (gocbcore.PendingOp, error)
22	MutateIn(opts gocbcore.MutateInOptions, cb gocbcore.MutateInCallback) (gocbcore.PendingOp, error)
23	GetAndTouch(opts gocbcore.GetAndTouchOptions, cb gocbcore.GetAndTouchCallback) (gocbcore.PendingOp, error)
24	GetAndLock(opts gocbcore.GetAndLockOptions, cb gocbcore.GetAndLockCallback) (gocbcore.PendingOp, error)
25	Unlock(opts gocbcore.UnlockOptions, cb gocbcore.UnlockCallback) (gocbcore.PendingOp, error)
26	Touch(opts gocbcore.TouchOptions, cb gocbcore.TouchCallback) (gocbcore.PendingOp, error)
27	Increment(opts gocbcore.CounterOptions, cb gocbcore.CounterCallback) (gocbcore.PendingOp, error)
28	Decrement(opts gocbcore.CounterOptions, cb gocbcore.CounterCallback) (gocbcore.PendingOp, error)
29	Append(opts gocbcore.AdjoinOptions, cb gocbcore.AdjoinCallback) (gocbcore.PendingOp, error)
30	Prepend(opts gocbcore.AdjoinOptions, cb gocbcore.AdjoinCallback) (gocbcore.PendingOp, error)
31	ConfigSnapshot() (*gocbcore.ConfigSnapshot, error)
32}
33
34// Cas represents the specific state of a document on the cluster.
35type Cas gocbcore.Cas
36
37// InsertOptions are options that can be applied to an Insert operation.
38type InsertOptions struct {
39	Expiry          time.Duration
40	PersistTo       uint
41	ReplicateTo     uint
42	DurabilityLevel DurabilityLevel
43	Transcoder      Transcoder
44	Timeout         time.Duration
45	RetryStrategy   RetryStrategy
46}
47
48// Insert creates a new document in the Collection.
49func (c *Collection) Insert(id string, val interface{}, opts *InsertOptions) (mutOut *MutationResult, errOut error) {
50	if opts == nil {
51		opts = &InsertOptions{}
52	}
53
54	opm := c.newKvOpManager("Insert", nil)
55	defer opm.Finish()
56
57	opm.SetDocumentID(id)
58	opm.SetTranscoder(opts.Transcoder)
59	opm.SetValue(val)
60	opm.SetDuraOptions(opts.PersistTo, opts.ReplicateTo, opts.DurabilityLevel)
61	opm.SetRetryStrategy(opts.RetryStrategy)
62	opm.SetTimeout(opts.Timeout)
63
64	if err := opm.CheckReadyForOp(); err != nil {
65		return nil, err
66	}
67
68	agent, err := c.getKvProvider()
69	if err != nil {
70		return nil, err
71	}
72	err = opm.Wait(agent.Add(gocbcore.AddOptions{
73		Key:                    opm.DocumentID(),
74		Value:                  opm.ValueBytes(),
75		Flags:                  opm.ValueFlags(),
76		Expiry:                 durationToExpiry(opts.Expiry),
77		CollectionName:         opm.CollectionName(),
78		ScopeName:              opm.ScopeName(),
79		DurabilityLevel:        opm.DurabilityLevel(),
80		DurabilityLevelTimeout: opm.DurabilityTimeout(),
81		RetryStrategy:          opm.RetryStrategy(),
82		TraceContext:           opm.TraceSpan(),
83		Deadline:               opm.Deadline(),
84	}, func(res *gocbcore.StoreResult, err error) {
85		if err != nil {
86			errOut = opm.EnhanceErr(err)
87			opm.Reject()
88			return
89		}
90
91		mutOut = &MutationResult{}
92		mutOut.cas = Cas(res.Cas)
93		mutOut.mt = opm.EnhanceMt(res.MutationToken)
94
95		opm.Resolve(mutOut.mt)
96	}))
97	if err != nil {
98		errOut = err
99	}
100	return
101}
102
103// UpsertOptions are options that can be applied to an Upsert operation.
104type UpsertOptions struct {
105	Expiry          time.Duration
106	PersistTo       uint
107	ReplicateTo     uint
108	DurabilityLevel DurabilityLevel
109	Transcoder      Transcoder
110	Timeout         time.Duration
111	RetryStrategy   RetryStrategy
112}
113
114// Upsert creates a new document in the Collection if it does not exist, if it does exist then it updates it.
115func (c *Collection) Upsert(id string, val interface{}, opts *UpsertOptions) (mutOut *MutationResult, errOut error) {
116	if opts == nil {
117		opts = &UpsertOptions{}
118	}
119
120	opm := c.newKvOpManager("Upsert", nil)
121	defer opm.Finish()
122
123	opm.SetDocumentID(id)
124	opm.SetTranscoder(opts.Transcoder)
125	opm.SetValue(val)
126	opm.SetDuraOptions(opts.PersistTo, opts.ReplicateTo, opts.DurabilityLevel)
127	opm.SetRetryStrategy(opts.RetryStrategy)
128	opm.SetTimeout(opts.Timeout)
129
130	if err := opm.CheckReadyForOp(); err != nil {
131		return nil, err
132	}
133
134	agent, err := c.getKvProvider()
135	if err != nil {
136		return nil, err
137	}
138	err = opm.Wait(agent.Set(gocbcore.SetOptions{
139		Key:                    opm.DocumentID(),
140		Value:                  opm.ValueBytes(),
141		Flags:                  opm.ValueFlags(),
142		Expiry:                 durationToExpiry(opts.Expiry),
143		CollectionName:         opm.CollectionName(),
144		ScopeName:              opm.ScopeName(),
145		DurabilityLevel:        opm.DurabilityLevel(),
146		DurabilityLevelTimeout: opm.DurabilityTimeout(),
147		RetryStrategy:          opm.RetryStrategy(),
148		TraceContext:           opm.TraceSpan(),
149		Deadline:               opm.Deadline(),
150	}, func(res *gocbcore.StoreResult, err error) {
151		if err != nil {
152			errOut = opm.EnhanceErr(err)
153			opm.Reject()
154			return
155		}
156
157		mutOut = &MutationResult{}
158		mutOut.cas = Cas(res.Cas)
159		mutOut.mt = opm.EnhanceMt(res.MutationToken)
160
161		opm.Resolve(mutOut.mt)
162	}))
163	if err != nil {
164		errOut = err
165	}
166	return
167}
168
169// ReplaceOptions are the options available to a Replace operation.
170type ReplaceOptions struct {
171	Expiry          time.Duration
172	Cas             Cas
173	PersistTo       uint
174	ReplicateTo     uint
175	DurabilityLevel DurabilityLevel
176	Transcoder      Transcoder
177	Timeout         time.Duration
178	RetryStrategy   RetryStrategy
179}
180
181// Replace updates a document in the collection.
182func (c *Collection) Replace(id string, val interface{}, opts *ReplaceOptions) (mutOut *MutationResult, errOut error) {
183	if opts == nil {
184		opts = &ReplaceOptions{}
185	}
186
187	opm := c.newKvOpManager("Replace", nil)
188	defer opm.Finish()
189
190	opm.SetDocumentID(id)
191	opm.SetTranscoder(opts.Transcoder)
192	opm.SetValue(val)
193	opm.SetDuraOptions(opts.PersistTo, opts.ReplicateTo, opts.DurabilityLevel)
194	opm.SetRetryStrategy(opts.RetryStrategy)
195	opm.SetTimeout(opts.Timeout)
196
197	if err := opm.CheckReadyForOp(); err != nil {
198		return nil, err
199	}
200
201	agent, err := c.getKvProvider()
202	if err != nil {
203		return nil, err
204	}
205	err = opm.Wait(agent.Replace(gocbcore.ReplaceOptions{
206		Key:                    opm.DocumentID(),
207		Value:                  opm.ValueBytes(),
208		Flags:                  opm.ValueFlags(),
209		Expiry:                 durationToExpiry(opts.Expiry),
210		Cas:                    gocbcore.Cas(opts.Cas),
211		CollectionName:         opm.CollectionName(),
212		ScopeName:              opm.ScopeName(),
213		DurabilityLevel:        opm.DurabilityLevel(),
214		DurabilityLevelTimeout: opm.DurabilityTimeout(),
215		RetryStrategy:          opm.RetryStrategy(),
216		TraceContext:           opm.TraceSpan(),
217		Deadline:               opm.Deadline(),
218	}, func(res *gocbcore.StoreResult, err error) {
219		if err != nil {
220			errOut = opm.EnhanceErr(err)
221			opm.Reject()
222			return
223		}
224
225		mutOut = &MutationResult{}
226		mutOut.cas = Cas(res.Cas)
227		mutOut.mt = opm.EnhanceMt(res.MutationToken)
228
229		opm.Resolve(mutOut.mt)
230	}))
231	if err != nil {
232		errOut = err
233	}
234	return
235}
236
237// GetOptions are the options available to a Get operation.
238type GetOptions struct {
239	WithExpiry bool
240	// Project causes the Get operation to only fetch the fields indicated
241	// by the paths. The result of the operation is then treated as a
242	// standard GetResult.
243	Project       []string
244	Transcoder    Transcoder
245	Timeout       time.Duration
246	RetryStrategy RetryStrategy
247}
248
249// Get performs a fetch operation against the collection. This can take 3 paths, a standard full document
250// fetch, a subdocument full document fetch also fetching document expiry (when WithExpiry is set),
251// or a subdocument fetch (when Project is used).
252func (c *Collection) Get(id string, opts *GetOptions) (docOut *GetResult, errOut error) {
253	if opts == nil {
254		opts = &GetOptions{}
255	}
256
257	if len(opts.Project) == 0 && !opts.WithExpiry {
258		return c.getDirect(id, opts)
259	}
260
261	return c.getProjected(id, opts)
262}
263
264func (c *Collection) getDirect(id string, opts *GetOptions) (docOut *GetResult, errOut error) {
265	if opts == nil {
266		opts = &GetOptions{}
267	}
268
269	opm := c.newKvOpManager("Get", nil)
270	defer opm.Finish()
271
272	opm.SetDocumentID(id)
273	opm.SetTranscoder(opts.Transcoder)
274	opm.SetRetryStrategy(opts.RetryStrategy)
275	opm.SetTimeout(opts.Timeout)
276
277	if err := opm.CheckReadyForOp(); err != nil {
278		return nil, err
279	}
280
281	agent, err := c.getKvProvider()
282	if err != nil {
283		return nil, err
284	}
285	err = opm.Wait(agent.Get(gocbcore.GetOptions{
286		Key:            opm.DocumentID(),
287		CollectionName: opm.CollectionName(),
288		ScopeName:      opm.ScopeName(),
289		RetryStrategy:  opm.RetryStrategy(),
290		TraceContext:   opm.TraceSpan(),
291		Deadline:       opm.Deadline(),
292	}, func(res *gocbcore.GetResult, err error) {
293		if err != nil {
294			errOut = opm.EnhanceErr(err)
295			opm.Reject()
296			return
297		}
298
299		doc := &GetResult{
300			Result: Result{
301				cas: Cas(res.Cas),
302			},
303			transcoder: opm.Transcoder(),
304			contents:   res.Value,
305			flags:      res.Flags,
306		}
307
308		docOut = doc
309
310		opm.Resolve(nil)
311	}))
312	if err != nil {
313		errOut = err
314	}
315	return
316}
317
318func (c *Collection) getProjected(id string, opts *GetOptions) (docOut *GetResult, errOut error) {
319	if opts == nil {
320		opts = &GetOptions{}
321	}
322
323	opm := c.newKvOpManager("Get", nil)
324	defer opm.Finish()
325
326	opm.SetDocumentID(id)
327	opm.SetTranscoder(opts.Transcoder)
328	opm.SetRetryStrategy(opts.RetryStrategy)
329	opm.SetTimeout(opts.Timeout)
330
331	if opts.Transcoder != nil {
332		return nil, errors.New("Cannot specify custom transcoder for projected gets")
333	}
334
335	if err := opm.CheckReadyForOp(); err != nil {
336		return nil, err
337	}
338
339	numProjects := len(opts.Project)
340	if opts.WithExpiry {
341		numProjects = 1 + numProjects
342	}
343
344	projections := opts.Project
345	if numProjects > 16 {
346		projections = nil
347	}
348
349	var ops []LookupInSpec
350
351	if opts.WithExpiry {
352		ops = append(ops, GetSpec("$document.exptime", &GetSpecOptions{IsXattr: true}))
353	}
354
355	if len(projections) == 0 {
356		ops = append(ops, GetSpec("", nil))
357	} else {
358		for _, path := range projections {
359			ops = append(ops, GetSpec(path, nil))
360		}
361	}
362
363	result, err := c.internalLookupIn(opm, ops, false)
364	if err != nil {
365		return nil, err
366	}
367
368	doc := &GetResult{}
369	if opts.WithExpiry {
370		// if expiration was requested then extract and remove it from the results
371		err = result.ContentAt(0, &doc.expiry)
372		if err != nil {
373			return nil, err
374		}
375		ops = ops[1:]
376		result.contents = result.contents[1:]
377	}
378
379	doc.transcoder = opm.Transcoder()
380	doc.cas = result.cas
381	if projections == nil {
382		err = doc.fromFullProjection(ops, result, opts.Project)
383		if err != nil {
384			return nil, err
385		}
386	} else {
387		err = doc.fromSubDoc(ops, result)
388		if err != nil {
389			return nil, err
390		}
391	}
392
393	return doc, nil
394}
395
396// ExistsOptions are the options available to the Exists command.
397type ExistsOptions struct {
398	Timeout       time.Duration
399	RetryStrategy RetryStrategy
400}
401
402// Exists checks if a document exists for the given id.
403func (c *Collection) Exists(id string, opts *ExistsOptions) (docOut *ExistsResult, errOut error) {
404	if opts == nil {
405		opts = &ExistsOptions{}
406	}
407
408	opm := c.newKvOpManager("Exists", nil)
409	defer opm.Finish()
410
411	opm.SetDocumentID(id)
412	opm.SetRetryStrategy(opts.RetryStrategy)
413	opm.SetTimeout(opts.Timeout)
414
415	if err := opm.CheckReadyForOp(); err != nil {
416		return nil, err
417	}
418
419	agent, err := c.getKvProvider()
420	if err != nil {
421		return nil, err
422	}
423	err = opm.Wait(agent.GetMeta(gocbcore.GetMetaOptions{
424		Key:            opm.DocumentID(),
425		CollectionName: opm.CollectionName(),
426		ScopeName:      opm.ScopeName(),
427		RetryStrategy:  opm.RetryStrategy(),
428		TraceContext:   opm.TraceSpan,
429		Deadline:       opm.Deadline(),
430	}, func(res *gocbcore.GetMetaResult, err error) {
431		if errors.Is(err, ErrDocumentNotFound) {
432			docOut = &ExistsResult{
433				Result: Result{
434					cas: Cas(0),
435				},
436				docExists: false,
437			}
438			opm.Resolve(nil)
439			return
440		}
441
442		if err != nil {
443			errOut = opm.EnhanceErr(err)
444			opm.Reject()
445			return
446		}
447
448		if res != nil {
449			docOut = &ExistsResult{
450				Result: Result{
451					cas: Cas(res.Cas),
452				},
453				docExists: res.Deleted == 0,
454			}
455		}
456
457		opm.Resolve(nil)
458	}))
459	if err != nil {
460		errOut = err
461	}
462	return
463}
464
465func (c *Collection) getOneReplica(
466	span requestSpanContext,
467	id string,
468	replicaIdx int,
469	transcoder Transcoder,
470	retryStrategy RetryStrategy,
471	cancelCh chan struct{},
472	timeout time.Duration,
473) (docOut *GetReplicaResult, errOut error) {
474	opm := c.newKvOpManager("getOneReplica", span)
475	defer opm.Finish()
476
477	opm.SetDocumentID(id)
478	opm.SetTranscoder(transcoder)
479	opm.SetRetryStrategy(retryStrategy)
480	opm.SetTimeout(timeout)
481	opm.SetCancelCh(cancelCh)
482
483	agent, err := c.getKvProvider()
484	if err != nil {
485		return nil, err
486	}
487	if replicaIdx == 0 {
488		err = opm.Wait(agent.Get(gocbcore.GetOptions{
489			Key:            opm.DocumentID(),
490			CollectionName: opm.CollectionName(),
491			ScopeName:      opm.ScopeName(),
492			RetryStrategy:  opm.RetryStrategy(),
493			TraceContext:   opm.TraceSpan(),
494			Deadline:       opm.Deadline(),
495		}, func(res *gocbcore.GetResult, err error) {
496			if err != nil {
497				errOut = opm.EnhanceErr(err)
498				opm.Reject()
499				return
500			}
501
502			docOut = &GetReplicaResult{}
503			docOut.cas = Cas(res.Cas)
504			docOut.transcoder = opm.Transcoder()
505			docOut.contents = res.Value
506			docOut.flags = res.Flags
507			docOut.isReplica = false
508
509			opm.Resolve(nil)
510		}))
511		if err != nil {
512			errOut = err
513		}
514		return
515	}
516
517	err = opm.Wait(agent.GetOneReplica(gocbcore.GetOneReplicaOptions{
518		Key:            opm.DocumentID(),
519		ReplicaIdx:     replicaIdx,
520		CollectionName: opm.CollectionName(),
521		ScopeName:      opm.ScopeName(),
522		RetryStrategy:  opm.RetryStrategy(),
523		TraceContext:   opm.TraceSpan(),
524		Deadline:       opm.Deadline(),
525	}, func(res *gocbcore.GetReplicaResult, err error) {
526		if err != nil {
527			errOut = opm.EnhanceErr(err)
528			opm.Reject()
529			return
530		}
531
532		docOut = &GetReplicaResult{}
533		docOut.cas = Cas(res.Cas)
534		docOut.transcoder = opm.Transcoder()
535		docOut.contents = res.Value
536		docOut.flags = res.Flags
537		docOut.isReplica = true
538
539		opm.Resolve(nil)
540	}))
541	if err != nil {
542		errOut = err
543	}
544	return
545}
546
547// GetAllReplicaOptions are the options available to the GetAllReplicas command.
548type GetAllReplicaOptions struct {
549	Transcoder    Transcoder
550	Timeout       time.Duration
551	RetryStrategy RetryStrategy
552}
553
554// GetAllReplicasResult represents the results of a GetAllReplicas operation.
555type GetAllReplicasResult struct {
556	lock          sync.Mutex
557	totalRequests uint32
558	totalResults  uint32
559	resCh         chan *GetReplicaResult
560	cancelCh      chan struct{}
561}
562
563func (r *GetAllReplicasResult) addResult(res *GetReplicaResult) {
564	// We use a lock here because the alternative means that there is a race
565	// between the channel writes from multiple results and the channels being
566	// closed.  IE: T1-Incr, T2-Incr, T2-Send, T2-Close, T1-Send[PANIC]
567	r.lock.Lock()
568
569	r.totalResults++
570	resultCount := r.totalResults
571
572	if resultCount <= r.totalRequests {
573		r.resCh <- res
574	}
575
576	if resultCount == r.totalRequests {
577		close(r.cancelCh)
578		close(r.resCh)
579	}
580
581	r.lock.Unlock()
582}
583
584// Next fetches the next replica result.
585func (r *GetAllReplicasResult) Next() *GetReplicaResult {
586	return <-r.resCh
587}
588
589// Close cancels all remaining get replica requests.
590func (r *GetAllReplicasResult) Close() error {
591	// See addResult discussion on lock usage.
592	r.lock.Lock()
593
594	// Note that this number increment must be high enough to be clear that
595	// the result set was closed, but low enough that it won't overflow if
596	// additional result objects are processed after the close.
597	prevResultCount := r.totalResults
598	r.totalResults += 100000
599
600	// We only have to close everything if the addResult method didn't already
601	// close them due to already having completed every request
602	if prevResultCount < r.totalRequests {
603		close(r.cancelCh)
604		close(r.resCh)
605	}
606
607	r.lock.Unlock()
608
609	return nil
610}
611
612// GetAllReplicas returns the value of a particular document from all replica servers. This will return an iterable
613// which streams results one at a time.
614func (c *Collection) GetAllReplicas(id string, opts *GetAllReplicaOptions) (docOut *GetAllReplicasResult, errOut error) {
615	if opts == nil {
616		opts = &GetAllReplicaOptions{}
617	}
618
619	span := c.startKvOpTrace("GetAllReplicas", nil)
620	defer span.Finish()
621
622	// Timeout needs to be adjusted here, since we use it at the bottom of this
623	// function, but the remaining options are all passed downwards and get handled
624	// by those functions rather than us.
625	timeout := opts.Timeout
626	if timeout == 0 {
627		timeout = c.timeoutsConfig.KVTimeout
628	}
629
630	deadline := time.Now().Add(timeout)
631	transcoder := opts.Transcoder
632	retryStrategy := opts.RetryStrategy
633
634	agent, err := c.getKvProvider()
635	if err != nil {
636		return nil, err
637	}
638
639	snapshot, err := agent.ConfigSnapshot()
640	if err != nil {
641		return nil, err
642	}
643
644	numReplicas, err := snapshot.NumReplicas()
645	if err != nil {
646		return nil, err
647	}
648
649	numServers := numReplicas + 1
650	outCh := make(chan *GetReplicaResult, numServers)
651	cancelCh := make(chan struct{})
652
653	repRes := &GetAllReplicasResult{
654		totalRequests: uint32(numServers),
655		resCh:         outCh,
656		cancelCh:      cancelCh,
657	}
658
659	// Loop all the servers and populate the result object
660	for replicaIdx := 0; replicaIdx < numServers; replicaIdx++ {
661		go func(replicaIdx int) {
662			// This timeout value will cause the getOneReplica operation to timeout after our deadline has expired,
663			// as the deadline has already begun. getOneReplica timing out before our deadline would cause inconsistent
664			// behaviour.
665			res, err := c.getOneReplica(span, id, replicaIdx, transcoder, retryStrategy, cancelCh, timeout)
666			if err != nil {
667				logDebugf("Failed to fetch replica from replica %d: %s", replicaIdx, err)
668			} else {
669				repRes.addResult(res)
670			}
671		}(replicaIdx)
672	}
673
674	// Start a timer to close it after the deadline
675	go func() {
676		select {
677		case <-time.After(time.Until(deadline)):
678			// If we timeout, we should close the result
679			err := repRes.Close()
680			if err != nil {
681				logDebugf("failed to close GetAllReplicas response: %s", err)
682			}
683			return
684		case <-cancelCh:
685			// If the cancel channel closes, we are done
686			return
687		}
688	}()
689
690	return repRes, nil
691}
692
693// GetAnyReplicaOptions are the options available to the GetAnyReplica command.
694type GetAnyReplicaOptions struct {
695	Transcoder    Transcoder
696	Timeout       time.Duration
697	RetryStrategy RetryStrategy
698}
699
700// GetAnyReplica returns the value of a particular document from a replica server.
701func (c *Collection) GetAnyReplica(id string, opts *GetAnyReplicaOptions) (docOut *GetReplicaResult, errOut error) {
702	if opts == nil {
703		opts = &GetAnyReplicaOptions{}
704	}
705
706	span := c.startKvOpTrace("GetAnyReplica", nil)
707	defer span.Finish()
708
709	repRes, err := c.GetAllReplicas(id, &GetAllReplicaOptions{
710		Timeout:       opts.Timeout,
711		Transcoder:    opts.Transcoder,
712		RetryStrategy: opts.RetryStrategy,
713	})
714	if err != nil {
715		return nil, err
716	}
717
718	// Try to fetch at least one result
719	res := repRes.Next()
720	if res == nil {
721		return nil, &KeyValueError{
722			InnerError:     ErrDocumentUnretrievable,
723			BucketName:     c.bucketName(),
724			ScopeName:      c.scope,
725			CollectionName: c.collectionName,
726		}
727	}
728
729	// Close the results channel since we don't care about any of the
730	// remaining result objects at this point.
731	err = repRes.Close()
732	if err != nil {
733		logDebugf("failed to close GetAnyReplica response: %s", err)
734	}
735
736	return res, nil
737}
738
739// RemoveOptions are the options available to the Remove command.
740type RemoveOptions struct {
741	Cas             Cas
742	PersistTo       uint
743	ReplicateTo     uint
744	DurabilityLevel DurabilityLevel
745	Timeout         time.Duration
746	RetryStrategy   RetryStrategy
747}
748
749// Remove removes a document from the collection.
750func (c *Collection) Remove(id string, opts *RemoveOptions) (mutOut *MutationResult, errOut error) {
751	if opts == nil {
752		opts = &RemoveOptions{}
753	}
754
755	opm := c.newKvOpManager("Remove", nil)
756	defer opm.Finish()
757
758	opm.SetDocumentID(id)
759	opm.SetDuraOptions(opts.PersistTo, opts.ReplicateTo, opts.DurabilityLevel)
760	opm.SetRetryStrategy(opts.RetryStrategy)
761	opm.SetTimeout(opts.Timeout)
762
763	if err := opm.CheckReadyForOp(); err != nil {
764		return nil, err
765	}
766
767	agent, err := c.getKvProvider()
768	if err != nil {
769		return nil, err
770	}
771	err = opm.Wait(agent.Delete(gocbcore.DeleteOptions{
772		Key:                    opm.DocumentID(),
773		Cas:                    gocbcore.Cas(opts.Cas),
774		CollectionName:         opm.CollectionName(),
775		ScopeName:              opm.ScopeName(),
776		DurabilityLevel:        opm.DurabilityLevel(),
777		DurabilityLevelTimeout: opm.DurabilityTimeout(),
778		RetryStrategy:          opm.RetryStrategy(),
779		TraceContext:           opm.TraceSpan(),
780		Deadline:               opm.Deadline(),
781	}, func(res *gocbcore.DeleteResult, err error) {
782		if err != nil {
783			errOut = opm.EnhanceErr(err)
784			opm.Reject()
785			return
786		}
787
788		mutOut = &MutationResult{}
789		mutOut.cas = Cas(res.Cas)
790		mutOut.mt = opm.EnhanceMt(res.MutationToken)
791
792		opm.Resolve(mutOut.mt)
793	}))
794	if err != nil {
795		errOut = err
796	}
797	return
798}
799
800// GetAndTouchOptions are the options available to the GetAndTouch operation.
801type GetAndTouchOptions struct {
802	Transcoder    Transcoder
803	Timeout       time.Duration
804	RetryStrategy RetryStrategy
805}
806
807// GetAndTouch retrieves a document and simultaneously updates its expiry time.
808func (c *Collection) GetAndTouch(id string, expiry time.Duration, opts *GetAndTouchOptions) (docOut *GetResult, errOut error) {
809	if opts == nil {
810		opts = &GetAndTouchOptions{}
811	}
812
813	opm := c.newKvOpManager("GetAndTouch", nil)
814	defer opm.Finish()
815
816	opm.SetDocumentID(id)
817	opm.SetTranscoder(opts.Transcoder)
818	opm.SetRetryStrategy(opts.RetryStrategy)
819	opm.SetTimeout(opts.Timeout)
820
821	if err := opm.CheckReadyForOp(); err != nil {
822		return nil, err
823	}
824
825	agent, err := c.getKvProvider()
826	if err != nil {
827		return nil, err
828	}
829	err = opm.Wait(agent.GetAndTouch(gocbcore.GetAndTouchOptions{
830		Key:            opm.DocumentID(),
831		Expiry:         durationToExpiry(expiry),
832		CollectionName: opm.CollectionName(),
833		ScopeName:      opm.ScopeName(),
834		RetryStrategy:  opm.RetryStrategy(),
835		TraceContext:   opm.TraceSpan(),
836		Deadline:       opm.Deadline(),
837	}, func(res *gocbcore.GetAndTouchResult, err error) {
838		if err != nil {
839			errOut = opm.EnhanceErr(err)
840			opm.Reject()
841			return
842		}
843
844		if res != nil {
845			doc := &GetResult{
846				Result: Result{
847					cas: Cas(res.Cas),
848				},
849				transcoder: opm.Transcoder(),
850				contents:   res.Value,
851				flags:      res.Flags,
852			}
853
854			docOut = doc
855		}
856
857		opm.Resolve(nil)
858	}))
859	if err != nil {
860		errOut = err
861	}
862	return
863}
864
865// GetAndLockOptions are the options available to the GetAndLock operation.
866type GetAndLockOptions struct {
867	Transcoder    Transcoder
868	Timeout       time.Duration
869	RetryStrategy RetryStrategy
870}
871
872// GetAndLock locks a document for a period of time, providing exclusive RW access to it.
873// A lockTime value of over 30 seconds will be treated as 30 seconds. The resolution used to send this value to
874// the server is seconds and is calculated using uint32(lockTime/time.Second).
875func (c *Collection) GetAndLock(id string, lockTime time.Duration, opts *GetAndLockOptions) (docOut *GetResult, errOut error) {
876	if opts == nil {
877		opts = &GetAndLockOptions{}
878	}
879
880	opm := c.newKvOpManager("GetAndLock", nil)
881	defer opm.Finish()
882
883	opm.SetDocumentID(id)
884	opm.SetTranscoder(opts.Transcoder)
885	opm.SetRetryStrategy(opts.RetryStrategy)
886	opm.SetTimeout(opts.Timeout)
887
888	if err := opm.CheckReadyForOp(); err != nil {
889		return nil, err
890	}
891
892	agent, err := c.getKvProvider()
893	if err != nil {
894		return nil, err
895	}
896	err = opm.Wait(agent.GetAndLock(gocbcore.GetAndLockOptions{
897		Key:            opm.DocumentID(),
898		LockTime:       uint32(lockTime / time.Second),
899		CollectionName: opm.CollectionName(),
900		ScopeName:      opm.ScopeName(),
901		RetryStrategy:  opm.RetryStrategy(),
902		TraceContext:   opm.TraceSpan(),
903		Deadline:       opm.Deadline(),
904	}, func(res *gocbcore.GetAndLockResult, err error) {
905		if err != nil {
906			errOut = opm.EnhanceErr(err)
907			opm.Reject()
908			return
909		}
910
911		if res != nil {
912			doc := &GetResult{
913				Result: Result{
914					cas: Cas(res.Cas),
915				},
916				transcoder: opm.Transcoder(),
917				contents:   res.Value,
918				flags:      res.Flags,
919			}
920
921			docOut = doc
922		}
923
924		opm.Resolve(nil)
925	}))
926	if err != nil {
927		errOut = err
928	}
929	return
930}
931
932// UnlockOptions are the options available to the GetAndLock operation.
933type UnlockOptions struct {
934	Timeout       time.Duration
935	RetryStrategy RetryStrategy
936}
937
938// Unlock unlocks a document which was locked with GetAndLock.
939func (c *Collection) Unlock(id string, cas Cas, opts *UnlockOptions) (errOut error) {
940	if opts == nil {
941		opts = &UnlockOptions{}
942	}
943
944	opm := c.newKvOpManager("Unlock", nil)
945	defer opm.Finish()
946
947	opm.SetDocumentID(id)
948	opm.SetRetryStrategy(opts.RetryStrategy)
949	opm.SetTimeout(opts.Timeout)
950
951	if err := opm.CheckReadyForOp(); err != nil {
952		return err
953	}
954
955	agent, err := c.getKvProvider()
956	if err != nil {
957		return err
958	}
959	err = opm.Wait(agent.Unlock(gocbcore.UnlockOptions{
960		Key:            opm.DocumentID(),
961		Cas:            gocbcore.Cas(cas),
962		CollectionName: opm.CollectionName(),
963		ScopeName:      opm.ScopeName(),
964		RetryStrategy:  opm.RetryStrategy(),
965		TraceContext:   opm.TraceSpan(),
966		Deadline:       opm.Deadline(),
967	}, func(res *gocbcore.UnlockResult, err error) {
968		if err != nil {
969			errOut = opm.EnhanceErr(err)
970			opm.Reject()
971			return
972		}
973
974		mt := opm.EnhanceMt(res.MutationToken)
975		opm.Resolve(mt)
976	}))
977	if err != nil {
978		errOut = err
979	}
980	return
981}
982
983// TouchOptions are the options available to the Touch operation.
984type TouchOptions struct {
985	Timeout       time.Duration
986	RetryStrategy RetryStrategy
987}
988
989// Touch touches a document, specifying a new expiry time for it.
990func (c *Collection) Touch(id string, expiry time.Duration, opts *TouchOptions) (mutOut *MutationResult, errOut error) {
991	if opts == nil {
992		opts = &TouchOptions{}
993	}
994
995	opm := c.newKvOpManager("Touch", nil)
996	defer opm.Finish()
997
998	opm.SetDocumentID(id)
999	opm.SetRetryStrategy(opts.RetryStrategy)
1000	opm.SetTimeout(opts.Timeout)
1001
1002	if err := opm.CheckReadyForOp(); err != nil {
1003		return nil, err
1004	}
1005
1006	agent, err := c.getKvProvider()
1007	if err != nil {
1008		return nil, err
1009	}
1010	err = opm.Wait(agent.Touch(gocbcore.TouchOptions{
1011		Key:            opm.DocumentID(),
1012		Expiry:         durationToExpiry(expiry),
1013		CollectionName: opm.CollectionName(),
1014		ScopeName:      opm.ScopeName(),
1015		RetryStrategy:  opm.RetryStrategy(),
1016		TraceContext:   opm.TraceSpan(),
1017		Deadline:       opm.Deadline(),
1018	}, func(res *gocbcore.TouchResult, err error) {
1019		if err != nil {
1020			errOut = opm.EnhanceErr(err)
1021			opm.Reject()
1022			return
1023		}
1024
1025		mutOut = &MutationResult{}
1026		mutOut.cas = Cas(res.Cas)
1027		mutOut.mt = opm.EnhanceMt(res.MutationToken)
1028
1029		opm.Resolve(mutOut.mt)
1030	}))
1031	if err != nil {
1032		errOut = err
1033	}
1034	return
1035}
1036
1037// Binary creates and returns a BinaryCollection object.
1038func (c *Collection) Binary() *BinaryCollection {
1039	return &BinaryCollection{collection: c}
1040}
1041