1package gocb 2 3import ( 4 "errors" 5 "sync" 6 "time" 7 8 gocbcore "github.com/couchbase/gocbcore/v9" 9) 10 11type kvProvider interface { 12 Add(opts gocbcore.AddOptions, cb gocbcore.StoreCallback) (gocbcore.PendingOp, error) 13 Set(opts gocbcore.SetOptions, cb gocbcore.StoreCallback) (gocbcore.PendingOp, error) 14 Replace(opts gocbcore.ReplaceOptions, cb gocbcore.StoreCallback) (gocbcore.PendingOp, error) 15 Get(opts gocbcore.GetOptions, cb gocbcore.GetCallback) (gocbcore.PendingOp, error) 16 GetOneReplica(opts gocbcore.GetOneReplicaOptions, cb gocbcore.GetReplicaCallback) (gocbcore.PendingOp, error) 17 Observe(opts gocbcore.ObserveOptions, cb gocbcore.ObserveCallback) (gocbcore.PendingOp, error) 18 ObserveVb(opts gocbcore.ObserveVbOptions, cb gocbcore.ObserveVbCallback) (gocbcore.PendingOp, error) 19 GetMeta(opts gocbcore.GetMetaOptions, cb gocbcore.GetMetaCallback) (gocbcore.PendingOp, error) 20 Delete(opts gocbcore.DeleteOptions, cb gocbcore.DeleteCallback) (gocbcore.PendingOp, error) 21 LookupIn(opts gocbcore.LookupInOptions, cb gocbcore.LookupInCallback) (gocbcore.PendingOp, error) 22 MutateIn(opts gocbcore.MutateInOptions, cb gocbcore.MutateInCallback) (gocbcore.PendingOp, error) 23 GetAndTouch(opts gocbcore.GetAndTouchOptions, cb gocbcore.GetAndTouchCallback) (gocbcore.PendingOp, error) 24 GetAndLock(opts gocbcore.GetAndLockOptions, cb gocbcore.GetAndLockCallback) (gocbcore.PendingOp, error) 25 Unlock(opts gocbcore.UnlockOptions, cb gocbcore.UnlockCallback) (gocbcore.PendingOp, error) 26 Touch(opts gocbcore.TouchOptions, cb gocbcore.TouchCallback) (gocbcore.PendingOp, error) 27 Increment(opts gocbcore.CounterOptions, cb gocbcore.CounterCallback) (gocbcore.PendingOp, error) 28 Decrement(opts gocbcore.CounterOptions, cb gocbcore.CounterCallback) (gocbcore.PendingOp, error) 29 Append(opts gocbcore.AdjoinOptions, cb gocbcore.AdjoinCallback) (gocbcore.PendingOp, error) 30 Prepend(opts gocbcore.AdjoinOptions, cb gocbcore.AdjoinCallback) (gocbcore.PendingOp, error) 31 ConfigSnapshot() (*gocbcore.ConfigSnapshot, error) 32} 33 34// Cas represents the specific state of a document on the cluster. 35type Cas gocbcore.Cas 36 37// InsertOptions are options that can be applied to an Insert operation. 38type InsertOptions struct { 39 Expiry time.Duration 40 PersistTo uint 41 ReplicateTo uint 42 DurabilityLevel DurabilityLevel 43 Transcoder Transcoder 44 Timeout time.Duration 45 RetryStrategy RetryStrategy 46} 47 48// Insert creates a new document in the Collection. 49func (c *Collection) Insert(id string, val interface{}, opts *InsertOptions) (mutOut *MutationResult, errOut error) { 50 if opts == nil { 51 opts = &InsertOptions{} 52 } 53 54 opm := c.newKvOpManager("Insert", nil) 55 defer opm.Finish() 56 57 opm.SetDocumentID(id) 58 opm.SetTranscoder(opts.Transcoder) 59 opm.SetValue(val) 60 opm.SetDuraOptions(opts.PersistTo, opts.ReplicateTo, opts.DurabilityLevel) 61 opm.SetRetryStrategy(opts.RetryStrategy) 62 opm.SetTimeout(opts.Timeout) 63 64 if err := opm.CheckReadyForOp(); err != nil { 65 return nil, err 66 } 67 68 agent, err := c.getKvProvider() 69 if err != nil { 70 return nil, err 71 } 72 err = opm.Wait(agent.Add(gocbcore.AddOptions{ 73 Key: opm.DocumentID(), 74 Value: opm.ValueBytes(), 75 Flags: opm.ValueFlags(), 76 Expiry: durationToExpiry(opts.Expiry), 77 CollectionName: opm.CollectionName(), 78 ScopeName: opm.ScopeName(), 79 DurabilityLevel: opm.DurabilityLevel(), 80 DurabilityLevelTimeout: opm.DurabilityTimeout(), 81 RetryStrategy: opm.RetryStrategy(), 82 TraceContext: opm.TraceSpan(), 83 Deadline: opm.Deadline(), 84 }, func(res *gocbcore.StoreResult, err error) { 85 if err != nil { 86 errOut = opm.EnhanceErr(err) 87 opm.Reject() 88 return 89 } 90 91 mutOut = &MutationResult{} 92 mutOut.cas = Cas(res.Cas) 93 mutOut.mt = opm.EnhanceMt(res.MutationToken) 94 95 opm.Resolve(mutOut.mt) 96 })) 97 if err != nil { 98 errOut = err 99 } 100 return 101} 102 103// UpsertOptions are options that can be applied to an Upsert operation. 104type UpsertOptions struct { 105 Expiry time.Duration 106 PersistTo uint 107 ReplicateTo uint 108 DurabilityLevel DurabilityLevel 109 Transcoder Transcoder 110 Timeout time.Duration 111 RetryStrategy RetryStrategy 112} 113 114// Upsert creates a new document in the Collection if it does not exist, if it does exist then it updates it. 115func (c *Collection) Upsert(id string, val interface{}, opts *UpsertOptions) (mutOut *MutationResult, errOut error) { 116 if opts == nil { 117 opts = &UpsertOptions{} 118 } 119 120 opm := c.newKvOpManager("Upsert", nil) 121 defer opm.Finish() 122 123 opm.SetDocumentID(id) 124 opm.SetTranscoder(opts.Transcoder) 125 opm.SetValue(val) 126 opm.SetDuraOptions(opts.PersistTo, opts.ReplicateTo, opts.DurabilityLevel) 127 opm.SetRetryStrategy(opts.RetryStrategy) 128 opm.SetTimeout(opts.Timeout) 129 130 if err := opm.CheckReadyForOp(); err != nil { 131 return nil, err 132 } 133 134 agent, err := c.getKvProvider() 135 if err != nil { 136 return nil, err 137 } 138 err = opm.Wait(agent.Set(gocbcore.SetOptions{ 139 Key: opm.DocumentID(), 140 Value: opm.ValueBytes(), 141 Flags: opm.ValueFlags(), 142 Expiry: durationToExpiry(opts.Expiry), 143 CollectionName: opm.CollectionName(), 144 ScopeName: opm.ScopeName(), 145 DurabilityLevel: opm.DurabilityLevel(), 146 DurabilityLevelTimeout: opm.DurabilityTimeout(), 147 RetryStrategy: opm.RetryStrategy(), 148 TraceContext: opm.TraceSpan(), 149 Deadline: opm.Deadline(), 150 }, func(res *gocbcore.StoreResult, err error) { 151 if err != nil { 152 errOut = opm.EnhanceErr(err) 153 opm.Reject() 154 return 155 } 156 157 mutOut = &MutationResult{} 158 mutOut.cas = Cas(res.Cas) 159 mutOut.mt = opm.EnhanceMt(res.MutationToken) 160 161 opm.Resolve(mutOut.mt) 162 })) 163 if err != nil { 164 errOut = err 165 } 166 return 167} 168 169// ReplaceOptions are the options available to a Replace operation. 170type ReplaceOptions struct { 171 Expiry time.Duration 172 Cas Cas 173 PersistTo uint 174 ReplicateTo uint 175 DurabilityLevel DurabilityLevel 176 Transcoder Transcoder 177 Timeout time.Duration 178 RetryStrategy RetryStrategy 179} 180 181// Replace updates a document in the collection. 182func (c *Collection) Replace(id string, val interface{}, opts *ReplaceOptions) (mutOut *MutationResult, errOut error) { 183 if opts == nil { 184 opts = &ReplaceOptions{} 185 } 186 187 opm := c.newKvOpManager("Replace", nil) 188 defer opm.Finish() 189 190 opm.SetDocumentID(id) 191 opm.SetTranscoder(opts.Transcoder) 192 opm.SetValue(val) 193 opm.SetDuraOptions(opts.PersistTo, opts.ReplicateTo, opts.DurabilityLevel) 194 opm.SetRetryStrategy(opts.RetryStrategy) 195 opm.SetTimeout(opts.Timeout) 196 197 if err := opm.CheckReadyForOp(); err != nil { 198 return nil, err 199 } 200 201 agent, err := c.getKvProvider() 202 if err != nil { 203 return nil, err 204 } 205 err = opm.Wait(agent.Replace(gocbcore.ReplaceOptions{ 206 Key: opm.DocumentID(), 207 Value: opm.ValueBytes(), 208 Flags: opm.ValueFlags(), 209 Expiry: durationToExpiry(opts.Expiry), 210 Cas: gocbcore.Cas(opts.Cas), 211 CollectionName: opm.CollectionName(), 212 ScopeName: opm.ScopeName(), 213 DurabilityLevel: opm.DurabilityLevel(), 214 DurabilityLevelTimeout: opm.DurabilityTimeout(), 215 RetryStrategy: opm.RetryStrategy(), 216 TraceContext: opm.TraceSpan(), 217 Deadline: opm.Deadline(), 218 }, func(res *gocbcore.StoreResult, err error) { 219 if err != nil { 220 errOut = opm.EnhanceErr(err) 221 opm.Reject() 222 return 223 } 224 225 mutOut = &MutationResult{} 226 mutOut.cas = Cas(res.Cas) 227 mutOut.mt = opm.EnhanceMt(res.MutationToken) 228 229 opm.Resolve(mutOut.mt) 230 })) 231 if err != nil { 232 errOut = err 233 } 234 return 235} 236 237// GetOptions are the options available to a Get operation. 238type GetOptions struct { 239 WithExpiry bool 240 // Project causes the Get operation to only fetch the fields indicated 241 // by the paths. The result of the operation is then treated as a 242 // standard GetResult. 243 Project []string 244 Transcoder Transcoder 245 Timeout time.Duration 246 RetryStrategy RetryStrategy 247} 248 249// Get performs a fetch operation against the collection. This can take 3 paths, a standard full document 250// fetch, a subdocument full document fetch also fetching document expiry (when WithExpiry is set), 251// or a subdocument fetch (when Project is used). 252func (c *Collection) Get(id string, opts *GetOptions) (docOut *GetResult, errOut error) { 253 if opts == nil { 254 opts = &GetOptions{} 255 } 256 257 if len(opts.Project) == 0 && !opts.WithExpiry { 258 return c.getDirect(id, opts) 259 } 260 261 return c.getProjected(id, opts) 262} 263 264func (c *Collection) getDirect(id string, opts *GetOptions) (docOut *GetResult, errOut error) { 265 if opts == nil { 266 opts = &GetOptions{} 267 } 268 269 opm := c.newKvOpManager("Get", nil) 270 defer opm.Finish() 271 272 opm.SetDocumentID(id) 273 opm.SetTranscoder(opts.Transcoder) 274 opm.SetRetryStrategy(opts.RetryStrategy) 275 opm.SetTimeout(opts.Timeout) 276 277 if err := opm.CheckReadyForOp(); err != nil { 278 return nil, err 279 } 280 281 agent, err := c.getKvProvider() 282 if err != nil { 283 return nil, err 284 } 285 err = opm.Wait(agent.Get(gocbcore.GetOptions{ 286 Key: opm.DocumentID(), 287 CollectionName: opm.CollectionName(), 288 ScopeName: opm.ScopeName(), 289 RetryStrategy: opm.RetryStrategy(), 290 TraceContext: opm.TraceSpan(), 291 Deadline: opm.Deadline(), 292 }, func(res *gocbcore.GetResult, err error) { 293 if err != nil { 294 errOut = opm.EnhanceErr(err) 295 opm.Reject() 296 return 297 } 298 299 doc := &GetResult{ 300 Result: Result{ 301 cas: Cas(res.Cas), 302 }, 303 transcoder: opm.Transcoder(), 304 contents: res.Value, 305 flags: res.Flags, 306 } 307 308 docOut = doc 309 310 opm.Resolve(nil) 311 })) 312 if err != nil { 313 errOut = err 314 } 315 return 316} 317 318func (c *Collection) getProjected(id string, opts *GetOptions) (docOut *GetResult, errOut error) { 319 if opts == nil { 320 opts = &GetOptions{} 321 } 322 323 opm := c.newKvOpManager("Get", nil) 324 defer opm.Finish() 325 326 opm.SetDocumentID(id) 327 opm.SetTranscoder(opts.Transcoder) 328 opm.SetRetryStrategy(opts.RetryStrategy) 329 opm.SetTimeout(opts.Timeout) 330 331 if opts.Transcoder != nil { 332 return nil, errors.New("Cannot specify custom transcoder for projected gets") 333 } 334 335 if err := opm.CheckReadyForOp(); err != nil { 336 return nil, err 337 } 338 339 numProjects := len(opts.Project) 340 if opts.WithExpiry { 341 numProjects = 1 + numProjects 342 } 343 344 projections := opts.Project 345 if numProjects > 16 { 346 projections = nil 347 } 348 349 var ops []LookupInSpec 350 351 if opts.WithExpiry { 352 ops = append(ops, GetSpec("$document.exptime", &GetSpecOptions{IsXattr: true})) 353 } 354 355 if len(projections) == 0 { 356 ops = append(ops, GetSpec("", nil)) 357 } else { 358 for _, path := range projections { 359 ops = append(ops, GetSpec(path, nil)) 360 } 361 } 362 363 result, err := c.internalLookupIn(opm, ops, false) 364 if err != nil { 365 return nil, err 366 } 367 368 doc := &GetResult{} 369 if opts.WithExpiry { 370 // if expiration was requested then extract and remove it from the results 371 err = result.ContentAt(0, &doc.expiry) 372 if err != nil { 373 return nil, err 374 } 375 ops = ops[1:] 376 result.contents = result.contents[1:] 377 } 378 379 doc.transcoder = opm.Transcoder() 380 doc.cas = result.cas 381 if projections == nil { 382 err = doc.fromFullProjection(ops, result, opts.Project) 383 if err != nil { 384 return nil, err 385 } 386 } else { 387 err = doc.fromSubDoc(ops, result) 388 if err != nil { 389 return nil, err 390 } 391 } 392 393 return doc, nil 394} 395 396// ExistsOptions are the options available to the Exists command. 397type ExistsOptions struct { 398 Timeout time.Duration 399 RetryStrategy RetryStrategy 400} 401 402// Exists checks if a document exists for the given id. 403func (c *Collection) Exists(id string, opts *ExistsOptions) (docOut *ExistsResult, errOut error) { 404 if opts == nil { 405 opts = &ExistsOptions{} 406 } 407 408 opm := c.newKvOpManager("Exists", nil) 409 defer opm.Finish() 410 411 opm.SetDocumentID(id) 412 opm.SetRetryStrategy(opts.RetryStrategy) 413 opm.SetTimeout(opts.Timeout) 414 415 if err := opm.CheckReadyForOp(); err != nil { 416 return nil, err 417 } 418 419 agent, err := c.getKvProvider() 420 if err != nil { 421 return nil, err 422 } 423 err = opm.Wait(agent.GetMeta(gocbcore.GetMetaOptions{ 424 Key: opm.DocumentID(), 425 CollectionName: opm.CollectionName(), 426 ScopeName: opm.ScopeName(), 427 RetryStrategy: opm.RetryStrategy(), 428 TraceContext: opm.TraceSpan, 429 Deadline: opm.Deadline(), 430 }, func(res *gocbcore.GetMetaResult, err error) { 431 if errors.Is(err, ErrDocumentNotFound) { 432 docOut = &ExistsResult{ 433 Result: Result{ 434 cas: Cas(0), 435 }, 436 docExists: false, 437 } 438 opm.Resolve(nil) 439 return 440 } 441 442 if err != nil { 443 errOut = opm.EnhanceErr(err) 444 opm.Reject() 445 return 446 } 447 448 if res != nil { 449 docOut = &ExistsResult{ 450 Result: Result{ 451 cas: Cas(res.Cas), 452 }, 453 docExists: res.Deleted == 0, 454 } 455 } 456 457 opm.Resolve(nil) 458 })) 459 if err != nil { 460 errOut = err 461 } 462 return 463} 464 465func (c *Collection) getOneReplica( 466 span requestSpanContext, 467 id string, 468 replicaIdx int, 469 transcoder Transcoder, 470 retryStrategy RetryStrategy, 471 cancelCh chan struct{}, 472 timeout time.Duration, 473) (docOut *GetReplicaResult, errOut error) { 474 opm := c.newKvOpManager("getOneReplica", span) 475 defer opm.Finish() 476 477 opm.SetDocumentID(id) 478 opm.SetTranscoder(transcoder) 479 opm.SetRetryStrategy(retryStrategy) 480 opm.SetTimeout(timeout) 481 opm.SetCancelCh(cancelCh) 482 483 agent, err := c.getKvProvider() 484 if err != nil { 485 return nil, err 486 } 487 if replicaIdx == 0 { 488 err = opm.Wait(agent.Get(gocbcore.GetOptions{ 489 Key: opm.DocumentID(), 490 CollectionName: opm.CollectionName(), 491 ScopeName: opm.ScopeName(), 492 RetryStrategy: opm.RetryStrategy(), 493 TraceContext: opm.TraceSpan(), 494 Deadline: opm.Deadline(), 495 }, func(res *gocbcore.GetResult, err error) { 496 if err != nil { 497 errOut = opm.EnhanceErr(err) 498 opm.Reject() 499 return 500 } 501 502 docOut = &GetReplicaResult{} 503 docOut.cas = Cas(res.Cas) 504 docOut.transcoder = opm.Transcoder() 505 docOut.contents = res.Value 506 docOut.flags = res.Flags 507 docOut.isReplica = false 508 509 opm.Resolve(nil) 510 })) 511 if err != nil { 512 errOut = err 513 } 514 return 515 } 516 517 err = opm.Wait(agent.GetOneReplica(gocbcore.GetOneReplicaOptions{ 518 Key: opm.DocumentID(), 519 ReplicaIdx: replicaIdx, 520 CollectionName: opm.CollectionName(), 521 ScopeName: opm.ScopeName(), 522 RetryStrategy: opm.RetryStrategy(), 523 TraceContext: opm.TraceSpan(), 524 Deadline: opm.Deadline(), 525 }, func(res *gocbcore.GetReplicaResult, err error) { 526 if err != nil { 527 errOut = opm.EnhanceErr(err) 528 opm.Reject() 529 return 530 } 531 532 docOut = &GetReplicaResult{} 533 docOut.cas = Cas(res.Cas) 534 docOut.transcoder = opm.Transcoder() 535 docOut.contents = res.Value 536 docOut.flags = res.Flags 537 docOut.isReplica = true 538 539 opm.Resolve(nil) 540 })) 541 if err != nil { 542 errOut = err 543 } 544 return 545} 546 547// GetAllReplicaOptions are the options available to the GetAllReplicas command. 548type GetAllReplicaOptions struct { 549 Transcoder Transcoder 550 Timeout time.Duration 551 RetryStrategy RetryStrategy 552} 553 554// GetAllReplicasResult represents the results of a GetAllReplicas operation. 555type GetAllReplicasResult struct { 556 lock sync.Mutex 557 totalRequests uint32 558 totalResults uint32 559 resCh chan *GetReplicaResult 560 cancelCh chan struct{} 561} 562 563func (r *GetAllReplicasResult) addResult(res *GetReplicaResult) { 564 // We use a lock here because the alternative means that there is a race 565 // between the channel writes from multiple results and the channels being 566 // closed. IE: T1-Incr, T2-Incr, T2-Send, T2-Close, T1-Send[PANIC] 567 r.lock.Lock() 568 569 r.totalResults++ 570 resultCount := r.totalResults 571 572 if resultCount <= r.totalRequests { 573 r.resCh <- res 574 } 575 576 if resultCount == r.totalRequests { 577 close(r.cancelCh) 578 close(r.resCh) 579 } 580 581 r.lock.Unlock() 582} 583 584// Next fetches the next replica result. 585func (r *GetAllReplicasResult) Next() *GetReplicaResult { 586 return <-r.resCh 587} 588 589// Close cancels all remaining get replica requests. 590func (r *GetAllReplicasResult) Close() error { 591 // See addResult discussion on lock usage. 592 r.lock.Lock() 593 594 // Note that this number increment must be high enough to be clear that 595 // the result set was closed, but low enough that it won't overflow if 596 // additional result objects are processed after the close. 597 prevResultCount := r.totalResults 598 r.totalResults += 100000 599 600 // We only have to close everything if the addResult method didn't already 601 // close them due to already having completed every request 602 if prevResultCount < r.totalRequests { 603 close(r.cancelCh) 604 close(r.resCh) 605 } 606 607 r.lock.Unlock() 608 609 return nil 610} 611 612// GetAllReplicas returns the value of a particular document from all replica servers. This will return an iterable 613// which streams results one at a time. 614func (c *Collection) GetAllReplicas(id string, opts *GetAllReplicaOptions) (docOut *GetAllReplicasResult, errOut error) { 615 if opts == nil { 616 opts = &GetAllReplicaOptions{} 617 } 618 619 span := c.startKvOpTrace("GetAllReplicas", nil) 620 defer span.Finish() 621 622 // Timeout needs to be adjusted here, since we use it at the bottom of this 623 // function, but the remaining options are all passed downwards and get handled 624 // by those functions rather than us. 625 timeout := opts.Timeout 626 if timeout == 0 { 627 timeout = c.timeoutsConfig.KVTimeout 628 } 629 630 deadline := time.Now().Add(timeout) 631 transcoder := opts.Transcoder 632 retryStrategy := opts.RetryStrategy 633 634 agent, err := c.getKvProvider() 635 if err != nil { 636 return nil, err 637 } 638 639 snapshot, err := agent.ConfigSnapshot() 640 if err != nil { 641 return nil, err 642 } 643 644 numReplicas, err := snapshot.NumReplicas() 645 if err != nil { 646 return nil, err 647 } 648 649 numServers := numReplicas + 1 650 outCh := make(chan *GetReplicaResult, numServers) 651 cancelCh := make(chan struct{}) 652 653 repRes := &GetAllReplicasResult{ 654 totalRequests: uint32(numServers), 655 resCh: outCh, 656 cancelCh: cancelCh, 657 } 658 659 // Loop all the servers and populate the result object 660 for replicaIdx := 0; replicaIdx < numServers; replicaIdx++ { 661 go func(replicaIdx int) { 662 // This timeout value will cause the getOneReplica operation to timeout after our deadline has expired, 663 // as the deadline has already begun. getOneReplica timing out before our deadline would cause inconsistent 664 // behaviour. 665 res, err := c.getOneReplica(span, id, replicaIdx, transcoder, retryStrategy, cancelCh, timeout) 666 if err != nil { 667 logDebugf("Failed to fetch replica from replica %d: %s", replicaIdx, err) 668 } else { 669 repRes.addResult(res) 670 } 671 }(replicaIdx) 672 } 673 674 // Start a timer to close it after the deadline 675 go func() { 676 select { 677 case <-time.After(time.Until(deadline)): 678 // If we timeout, we should close the result 679 err := repRes.Close() 680 if err != nil { 681 logDebugf("failed to close GetAllReplicas response: %s", err) 682 } 683 return 684 case <-cancelCh: 685 // If the cancel channel closes, we are done 686 return 687 } 688 }() 689 690 return repRes, nil 691} 692 693// GetAnyReplicaOptions are the options available to the GetAnyReplica command. 694type GetAnyReplicaOptions struct { 695 Transcoder Transcoder 696 Timeout time.Duration 697 RetryStrategy RetryStrategy 698} 699 700// GetAnyReplica returns the value of a particular document from a replica server. 701func (c *Collection) GetAnyReplica(id string, opts *GetAnyReplicaOptions) (docOut *GetReplicaResult, errOut error) { 702 if opts == nil { 703 opts = &GetAnyReplicaOptions{} 704 } 705 706 span := c.startKvOpTrace("GetAnyReplica", nil) 707 defer span.Finish() 708 709 repRes, err := c.GetAllReplicas(id, &GetAllReplicaOptions{ 710 Timeout: opts.Timeout, 711 Transcoder: opts.Transcoder, 712 RetryStrategy: opts.RetryStrategy, 713 }) 714 if err != nil { 715 return nil, err 716 } 717 718 // Try to fetch at least one result 719 res := repRes.Next() 720 if res == nil { 721 return nil, &KeyValueError{ 722 InnerError: ErrDocumentUnretrievable, 723 BucketName: c.bucketName(), 724 ScopeName: c.scope, 725 CollectionName: c.collectionName, 726 } 727 } 728 729 // Close the results channel since we don't care about any of the 730 // remaining result objects at this point. 731 err = repRes.Close() 732 if err != nil { 733 logDebugf("failed to close GetAnyReplica response: %s", err) 734 } 735 736 return res, nil 737} 738 739// RemoveOptions are the options available to the Remove command. 740type RemoveOptions struct { 741 Cas Cas 742 PersistTo uint 743 ReplicateTo uint 744 DurabilityLevel DurabilityLevel 745 Timeout time.Duration 746 RetryStrategy RetryStrategy 747} 748 749// Remove removes a document from the collection. 750func (c *Collection) Remove(id string, opts *RemoveOptions) (mutOut *MutationResult, errOut error) { 751 if opts == nil { 752 opts = &RemoveOptions{} 753 } 754 755 opm := c.newKvOpManager("Remove", nil) 756 defer opm.Finish() 757 758 opm.SetDocumentID(id) 759 opm.SetDuraOptions(opts.PersistTo, opts.ReplicateTo, opts.DurabilityLevel) 760 opm.SetRetryStrategy(opts.RetryStrategy) 761 opm.SetTimeout(opts.Timeout) 762 763 if err := opm.CheckReadyForOp(); err != nil { 764 return nil, err 765 } 766 767 agent, err := c.getKvProvider() 768 if err != nil { 769 return nil, err 770 } 771 err = opm.Wait(agent.Delete(gocbcore.DeleteOptions{ 772 Key: opm.DocumentID(), 773 Cas: gocbcore.Cas(opts.Cas), 774 CollectionName: opm.CollectionName(), 775 ScopeName: opm.ScopeName(), 776 DurabilityLevel: opm.DurabilityLevel(), 777 DurabilityLevelTimeout: opm.DurabilityTimeout(), 778 RetryStrategy: opm.RetryStrategy(), 779 TraceContext: opm.TraceSpan(), 780 Deadline: opm.Deadline(), 781 }, func(res *gocbcore.DeleteResult, err error) { 782 if err != nil { 783 errOut = opm.EnhanceErr(err) 784 opm.Reject() 785 return 786 } 787 788 mutOut = &MutationResult{} 789 mutOut.cas = Cas(res.Cas) 790 mutOut.mt = opm.EnhanceMt(res.MutationToken) 791 792 opm.Resolve(mutOut.mt) 793 })) 794 if err != nil { 795 errOut = err 796 } 797 return 798} 799 800// GetAndTouchOptions are the options available to the GetAndTouch operation. 801type GetAndTouchOptions struct { 802 Transcoder Transcoder 803 Timeout time.Duration 804 RetryStrategy RetryStrategy 805} 806 807// GetAndTouch retrieves a document and simultaneously updates its expiry time. 808func (c *Collection) GetAndTouch(id string, expiry time.Duration, opts *GetAndTouchOptions) (docOut *GetResult, errOut error) { 809 if opts == nil { 810 opts = &GetAndTouchOptions{} 811 } 812 813 opm := c.newKvOpManager("GetAndTouch", nil) 814 defer opm.Finish() 815 816 opm.SetDocumentID(id) 817 opm.SetTranscoder(opts.Transcoder) 818 opm.SetRetryStrategy(opts.RetryStrategy) 819 opm.SetTimeout(opts.Timeout) 820 821 if err := opm.CheckReadyForOp(); err != nil { 822 return nil, err 823 } 824 825 agent, err := c.getKvProvider() 826 if err != nil { 827 return nil, err 828 } 829 err = opm.Wait(agent.GetAndTouch(gocbcore.GetAndTouchOptions{ 830 Key: opm.DocumentID(), 831 Expiry: durationToExpiry(expiry), 832 CollectionName: opm.CollectionName(), 833 ScopeName: opm.ScopeName(), 834 RetryStrategy: opm.RetryStrategy(), 835 TraceContext: opm.TraceSpan(), 836 Deadline: opm.Deadline(), 837 }, func(res *gocbcore.GetAndTouchResult, err error) { 838 if err != nil { 839 errOut = opm.EnhanceErr(err) 840 opm.Reject() 841 return 842 } 843 844 if res != nil { 845 doc := &GetResult{ 846 Result: Result{ 847 cas: Cas(res.Cas), 848 }, 849 transcoder: opm.Transcoder(), 850 contents: res.Value, 851 flags: res.Flags, 852 } 853 854 docOut = doc 855 } 856 857 opm.Resolve(nil) 858 })) 859 if err != nil { 860 errOut = err 861 } 862 return 863} 864 865// GetAndLockOptions are the options available to the GetAndLock operation. 866type GetAndLockOptions struct { 867 Transcoder Transcoder 868 Timeout time.Duration 869 RetryStrategy RetryStrategy 870} 871 872// GetAndLock locks a document for a period of time, providing exclusive RW access to it. 873// A lockTime value of over 30 seconds will be treated as 30 seconds. The resolution used to send this value to 874// the server is seconds and is calculated using uint32(lockTime/time.Second). 875func (c *Collection) GetAndLock(id string, lockTime time.Duration, opts *GetAndLockOptions) (docOut *GetResult, errOut error) { 876 if opts == nil { 877 opts = &GetAndLockOptions{} 878 } 879 880 opm := c.newKvOpManager("GetAndLock", nil) 881 defer opm.Finish() 882 883 opm.SetDocumentID(id) 884 opm.SetTranscoder(opts.Transcoder) 885 opm.SetRetryStrategy(opts.RetryStrategy) 886 opm.SetTimeout(opts.Timeout) 887 888 if err := opm.CheckReadyForOp(); err != nil { 889 return nil, err 890 } 891 892 agent, err := c.getKvProvider() 893 if err != nil { 894 return nil, err 895 } 896 err = opm.Wait(agent.GetAndLock(gocbcore.GetAndLockOptions{ 897 Key: opm.DocumentID(), 898 LockTime: uint32(lockTime / time.Second), 899 CollectionName: opm.CollectionName(), 900 ScopeName: opm.ScopeName(), 901 RetryStrategy: opm.RetryStrategy(), 902 TraceContext: opm.TraceSpan(), 903 Deadline: opm.Deadline(), 904 }, func(res *gocbcore.GetAndLockResult, err error) { 905 if err != nil { 906 errOut = opm.EnhanceErr(err) 907 opm.Reject() 908 return 909 } 910 911 if res != nil { 912 doc := &GetResult{ 913 Result: Result{ 914 cas: Cas(res.Cas), 915 }, 916 transcoder: opm.Transcoder(), 917 contents: res.Value, 918 flags: res.Flags, 919 } 920 921 docOut = doc 922 } 923 924 opm.Resolve(nil) 925 })) 926 if err != nil { 927 errOut = err 928 } 929 return 930} 931 932// UnlockOptions are the options available to the GetAndLock operation. 933type UnlockOptions struct { 934 Timeout time.Duration 935 RetryStrategy RetryStrategy 936} 937 938// Unlock unlocks a document which was locked with GetAndLock. 939func (c *Collection) Unlock(id string, cas Cas, opts *UnlockOptions) (errOut error) { 940 if opts == nil { 941 opts = &UnlockOptions{} 942 } 943 944 opm := c.newKvOpManager("Unlock", nil) 945 defer opm.Finish() 946 947 opm.SetDocumentID(id) 948 opm.SetRetryStrategy(opts.RetryStrategy) 949 opm.SetTimeout(opts.Timeout) 950 951 if err := opm.CheckReadyForOp(); err != nil { 952 return err 953 } 954 955 agent, err := c.getKvProvider() 956 if err != nil { 957 return err 958 } 959 err = opm.Wait(agent.Unlock(gocbcore.UnlockOptions{ 960 Key: opm.DocumentID(), 961 Cas: gocbcore.Cas(cas), 962 CollectionName: opm.CollectionName(), 963 ScopeName: opm.ScopeName(), 964 RetryStrategy: opm.RetryStrategy(), 965 TraceContext: opm.TraceSpan(), 966 Deadline: opm.Deadline(), 967 }, func(res *gocbcore.UnlockResult, err error) { 968 if err != nil { 969 errOut = opm.EnhanceErr(err) 970 opm.Reject() 971 return 972 } 973 974 mt := opm.EnhanceMt(res.MutationToken) 975 opm.Resolve(mt) 976 })) 977 if err != nil { 978 errOut = err 979 } 980 return 981} 982 983// TouchOptions are the options available to the Touch operation. 984type TouchOptions struct { 985 Timeout time.Duration 986 RetryStrategy RetryStrategy 987} 988 989// Touch touches a document, specifying a new expiry time for it. 990func (c *Collection) Touch(id string, expiry time.Duration, opts *TouchOptions) (mutOut *MutationResult, errOut error) { 991 if opts == nil { 992 opts = &TouchOptions{} 993 } 994 995 opm := c.newKvOpManager("Touch", nil) 996 defer opm.Finish() 997 998 opm.SetDocumentID(id) 999 opm.SetRetryStrategy(opts.RetryStrategy) 1000 opm.SetTimeout(opts.Timeout) 1001 1002 if err := opm.CheckReadyForOp(); err != nil { 1003 return nil, err 1004 } 1005 1006 agent, err := c.getKvProvider() 1007 if err != nil { 1008 return nil, err 1009 } 1010 err = opm.Wait(agent.Touch(gocbcore.TouchOptions{ 1011 Key: opm.DocumentID(), 1012 Expiry: durationToExpiry(expiry), 1013 CollectionName: opm.CollectionName(), 1014 ScopeName: opm.ScopeName(), 1015 RetryStrategy: opm.RetryStrategy(), 1016 TraceContext: opm.TraceSpan(), 1017 Deadline: opm.Deadline(), 1018 }, func(res *gocbcore.TouchResult, err error) { 1019 if err != nil { 1020 errOut = opm.EnhanceErr(err) 1021 opm.Reject() 1022 return 1023 } 1024 1025 mutOut = &MutationResult{} 1026 mutOut.cas = Cas(res.Cas) 1027 mutOut.mt = opm.EnhanceMt(res.MutationToken) 1028 1029 opm.Resolve(mutOut.mt) 1030 })) 1031 if err != nil { 1032 errOut = err 1033 } 1034 return 1035} 1036 1037// Binary creates and returns a BinaryCollection object. 1038func (c *Collection) Binary() *BinaryCollection { 1039 return &BinaryCollection{collection: c} 1040} 1041