1package gocb 2 3import ( 4 "encoding/json" 5 "fmt" 6 "io/ioutil" 7 "net/url" 8 "strings" 9 "time" 10 11 "github.com/google/uuid" 12 "github.com/pkg/errors" 13) 14 15// BucketType specifies the kind of bucket. 16type BucketType string 17 18const ( 19 // CouchbaseBucketType indicates a Couchbase bucket type. 20 CouchbaseBucketType BucketType = "membase" 21 22 // MemcachedBucketType indicates a Memcached bucket type. 23 MemcachedBucketType BucketType = "memcached" 24 25 // EphemeralBucketType indicates an Ephemeral bucket type. 26 EphemeralBucketType BucketType = "ephemeral" 27) 28 29// ConflictResolutionType specifies the kind of conflict resolution to use for a bucket. 30type ConflictResolutionType string 31 32const ( 33 // ConflictResolutionTypeTimestamp specifies to use timestamp conflict resolution on the bucket. 34 ConflictResolutionTypeTimestamp ConflictResolutionType = "lww" 35 36 // ConflictResolutionTypeSequenceNumber specifies to use sequence number conflict resolution on the bucket. 37 ConflictResolutionTypeSequenceNumber ConflictResolutionType = "seqno" 38) 39 40// EvictionPolicyType specifies the kind of eviction policy to use for a bucket. 41type EvictionPolicyType string 42 43const ( 44 // EvictionPolicyTypeFull specifies to use full eviction for a couchbase bucket. 45 EvictionPolicyTypeFull EvictionPolicyType = "fullEviction" 46 47 // EvictionPolicyTypeValueOnly specifies to use value only eviction for a couchbase bucket. 48 EvictionPolicyTypeValueOnly EvictionPolicyType = "valueOnly" 49 50 // EvictionPolicyTypeNotRecentlyUsed specifies to use not recently used (nru) eviction for an ephemeral bucket. 51 // UNCOMMITTED: This API may change in the future. 52 EvictionPolicyTypeNotRecentlyUsed EvictionPolicyType = "nruEviction" 53 54 // EvictionPolicyTypeNRU specifies to use no eviction for an ephemeral bucket. 55 // UNCOMMITTED: This API may change in the future. 56 EvictionPolicyTypeNoEviction EvictionPolicyType = "noEviction" 57) 58 59// CompressionMode specifies the kind of compression to use for a bucket. 60type CompressionMode string 61 62const ( 63 // CompressionModeOff specifies to use no compression for a bucket. 64 CompressionModeOff CompressionMode = "off" 65 66 // CompressionModePassive specifies to use passive compression for a bucket. 67 CompressionModePassive CompressionMode = "passive" 68 69 // CompressionModeActive specifies to use active compression for a bucket. 70 CompressionModeActive CompressionMode = "active" 71) 72 73type jsonBucketSettings struct { 74 Name string `json:"name"` 75 Controllers struct { 76 Flush string `json:"flush"` 77 } `json:"controllers"` 78 ReplicaIndex bool `json:"replicaIndex"` 79 Quota struct { 80 RAM uint64 `json:"ram"` 81 RawRAM uint64 `json:"rawRAM"` 82 } `json:"quota"` 83 ReplicaNumber uint32 `json:"replicaNumber"` 84 BucketType string `json:"bucketType"` 85 ConflictResolutionType string `json:"conflictResolutionType"` 86 EvictionPolicy string `json:"evictionPolicy"` 87 MaxTTL uint32 `json:"maxTTL"` 88 CompressionMode string `json:"compressionMode"` 89} 90 91// BucketSettings holds information about the settings for a bucket. 92type BucketSettings struct { 93 Name string 94 FlushEnabled bool 95 ReplicaIndexDisabled bool // inverted so that zero value matches server default. 96 RAMQuotaMB uint64 97 NumReplicas uint32 // NOTE: If not set this will set 0 replicas. 98 BucketType BucketType // Defaults to CouchbaseBucketType. 99 EvictionPolicy EvictionPolicyType 100 MaxTTL time.Duration 101 CompressionMode CompressionMode 102} 103 104func (bs *BucketSettings) fromData(data jsonBucketSettings) error { 105 bs.Name = data.Name 106 bs.FlushEnabled = data.Controllers.Flush != "" 107 bs.ReplicaIndexDisabled = !data.ReplicaIndex 108 bs.RAMQuotaMB = data.Quota.RawRAM / 1024 / 1024 109 bs.NumReplicas = data.ReplicaNumber 110 bs.EvictionPolicy = EvictionPolicyType(data.EvictionPolicy) 111 bs.MaxTTL = time.Duration(data.MaxTTL) * time.Second 112 bs.CompressionMode = CompressionMode(data.CompressionMode) 113 114 switch data.BucketType { 115 case "membase": 116 bs.BucketType = CouchbaseBucketType 117 case "memcached": 118 bs.BucketType = MemcachedBucketType 119 case "ephemeral": 120 bs.BucketType = EphemeralBucketType 121 default: 122 return errors.New("unrecognized bucket type string") 123 } 124 125 return nil 126} 127 128type bucketMgrErrorResp struct { 129 Errors map[string]string `json:"errors"` 130} 131 132func (bm *BucketManager) tryParseErrorMessage(req *mgmtRequest, resp *mgmtResponse) error { 133 b, err := ioutil.ReadAll(resp.Body) 134 if err != nil { 135 logDebugf("Failed to read bucket manager response body: %s", err) 136 return nil 137 } 138 139 if resp.StatusCode == 404 { 140 // If it was a 404 then there's no chance of the response body containing any structure 141 if strings.Contains(strings.ToLower(string(b)), "resource not found") { 142 return makeGenericMgmtError(ErrBucketNotFound, req, resp) 143 } 144 145 return makeGenericMgmtError(errors.New(string(b)), req, resp) 146 } 147 148 var mgrErr bucketMgrErrorResp 149 err = json.Unmarshal(b, &mgrErr) 150 if err != nil { 151 logDebugf("Failed to unmarshal error body: %s", err) 152 return makeGenericMgmtError(errors.New(string(b)), req, resp) 153 } 154 155 var bodyErr error 156 var firstErr string 157 for _, err := range mgrErr.Errors { 158 firstErr = strings.ToLower(err) 159 break 160 } 161 162 if strings.Contains(firstErr, "bucket with given name already exists") { 163 bodyErr = ErrBucketExists 164 } else { 165 bodyErr = errors.New(firstErr) 166 } 167 168 return makeGenericMgmtError(bodyErr, req, resp) 169} 170 171// Flush doesn't use the same body format as anything else... 172func (bm *BucketManager) tryParseFlushErrorMessage(req *mgmtRequest, resp *mgmtResponse) error { 173 b, err := ioutil.ReadAll(resp.Body) 174 if err != nil { 175 logDebugf("Failed to read bucket manager response body: %s", err) 176 return makeMgmtBadStatusError("failed to flush bucket", req, resp) 177 } 178 179 var bodyErrMsgs map[string]string 180 err = json.Unmarshal(b, &bodyErrMsgs) 181 if err != nil { 182 return errors.New(string(b)) 183 } 184 185 if errMsg, ok := bodyErrMsgs["_"]; ok { 186 if strings.Contains(strings.ToLower(errMsg), "flush is disabled") { 187 return ErrBucketNotFlushable 188 } 189 } 190 191 return errors.New(string(b)) 192} 193 194// BucketManager provides methods for performing bucket management operations. 195// See BucketManager for methods that allow creating and removing buckets themselves. 196type BucketManager struct { 197 provider mgmtProvider 198 tracer requestTracer 199} 200 201// GetBucketOptions is the set of options available to the bucket manager GetBucket operation. 202type GetBucketOptions struct { 203 Timeout time.Duration 204 RetryStrategy RetryStrategy 205} 206 207// GetBucket returns settings for a bucket on the cluster. 208func (bm *BucketManager) GetBucket(bucketName string, opts *GetBucketOptions) (*BucketSettings, error) { 209 if opts == nil { 210 opts = &GetBucketOptions{} 211 } 212 213 span := bm.tracer.StartSpan("GetBucket", nil). 214 SetTag("couchbase.service", "mgmt") 215 defer span.Finish() 216 217 return bm.get(span.Context(), bucketName, opts.RetryStrategy, opts.Timeout) 218} 219 220func (bm *BucketManager) get(tracectx requestSpanContext, bucketName string, 221 strategy RetryStrategy, timeout time.Duration) (*BucketSettings, error) { 222 223 req := mgmtRequest{ 224 Service: ServiceTypeManagement, 225 Path: fmt.Sprintf("/pools/default/buckets/%s", bucketName), 226 Method: "GET", 227 IsIdempotent: true, 228 RetryStrategy: strategy, 229 UniqueID: uuid.New().String(), 230 Timeout: timeout, 231 parentSpan: tracectx, 232 } 233 234 resp, err := bm.provider.executeMgmtRequest(req) 235 if err != nil { 236 return nil, makeGenericMgmtError(err, &req, resp) 237 } 238 defer ensureBodyClosed(resp.Body) 239 240 if resp.StatusCode != 200 { 241 bktErr := bm.tryParseErrorMessage(&req, resp) 242 if bktErr != nil { 243 return nil, bktErr 244 } 245 246 return nil, makeMgmtBadStatusError("failed to get bucket", &req, resp) 247 } 248 249 var bucketData jsonBucketSettings 250 jsonDec := json.NewDecoder(resp.Body) 251 err = jsonDec.Decode(&bucketData) 252 if err != nil { 253 return nil, err 254 } 255 256 var settings BucketSettings 257 err = settings.fromData(bucketData) 258 if err != nil { 259 return nil, err 260 } 261 262 return &settings, nil 263} 264 265// GetAllBucketsOptions is the set of options available to the bucket manager GetAll operation. 266type GetAllBucketsOptions struct { 267 Timeout time.Duration 268 RetryStrategy RetryStrategy 269} 270 271// GetAllBuckets returns a list of all active buckets on the cluster. 272func (bm *BucketManager) GetAllBuckets(opts *GetAllBucketsOptions) (map[string]BucketSettings, error) { 273 if opts == nil { 274 opts = &GetAllBucketsOptions{} 275 } 276 277 span := bm.tracer.StartSpan("GetAllBuckets", nil). 278 SetTag("couchbase.service", "mgmt") 279 defer span.Finish() 280 281 req := mgmtRequest{ 282 Service: ServiceTypeManagement, 283 Path: "/pools/default/buckets", 284 Method: "GET", 285 IsIdempotent: true, 286 RetryStrategy: opts.RetryStrategy, 287 UniqueID: uuid.New().String(), 288 Timeout: opts.Timeout, 289 parentSpan: span.Context(), 290 } 291 292 resp, err := bm.provider.executeMgmtRequest(req) 293 if err != nil { 294 return nil, makeGenericMgmtError(err, &req, resp) 295 } 296 defer ensureBodyClosed(resp.Body) 297 298 if resp.StatusCode != 200 { 299 bktErr := bm.tryParseErrorMessage(&req, resp) 300 if bktErr != nil { 301 return nil, bktErr 302 } 303 304 return nil, makeMgmtBadStatusError("failed to get all buckets", &req, resp) 305 } 306 307 var bucketsData []*jsonBucketSettings 308 jsonDec := json.NewDecoder(resp.Body) 309 err = jsonDec.Decode(&bucketsData) 310 if err != nil { 311 return nil, err 312 } 313 314 buckets := make(map[string]BucketSettings, len(bucketsData)) 315 for _, bucketData := range bucketsData { 316 var bucket BucketSettings 317 err := bucket.fromData(*bucketData) 318 if err != nil { 319 return nil, err 320 } 321 322 buckets[bucket.Name] = bucket 323 } 324 325 return buckets, nil 326} 327 328// CreateBucketSettings are the settings available when creating a bucket. 329type CreateBucketSettings struct { 330 BucketSettings 331 ConflictResolutionType ConflictResolutionType 332} 333 334// CreateBucketOptions is the set of options available to the bucket manager CreateBucket operation. 335type CreateBucketOptions struct { 336 Timeout time.Duration 337 RetryStrategy RetryStrategy 338} 339 340// CreateBucket creates a bucket on the cluster. 341func (bm *BucketManager) CreateBucket(settings CreateBucketSettings, opts *CreateBucketOptions) error { 342 if opts == nil { 343 opts = &CreateBucketOptions{} 344 } 345 346 span := bm.tracer.StartSpan("CreateBucket", nil). 347 SetTag("couchbase.service", "mgmt") 348 defer span.Finish() 349 350 posts, err := bm.settingsToPostData(&settings.BucketSettings) 351 if err != nil { 352 return err 353 } 354 355 if settings.ConflictResolutionType != "" { 356 posts.Add("conflictResolutionType", string(settings.ConflictResolutionType)) 357 } 358 359 req := mgmtRequest{ 360 Service: ServiceTypeManagement, 361 Path: "/pools/default/buckets", 362 Method: "POST", 363 Body: []byte(posts.Encode()), 364 ContentType: "application/x-www-form-urlencoded", 365 RetryStrategy: opts.RetryStrategy, 366 UniqueID: uuid.New().String(), 367 Timeout: opts.Timeout, 368 parentSpan: span.Context(), 369 } 370 371 resp, err := bm.provider.executeMgmtRequest(req) 372 if err != nil { 373 return makeGenericMgmtError(err, &req, resp) 374 } 375 defer ensureBodyClosed(resp.Body) 376 377 if resp.StatusCode != 202 { 378 bktErr := bm.tryParseErrorMessage(&req, resp) 379 if bktErr != nil { 380 return bktErr 381 } 382 383 return makeMgmtBadStatusError("failed to create bucket", &req, resp) 384 } 385 386 return nil 387} 388 389// UpdateBucketOptions is the set of options available to the bucket manager UpdateBucket operation. 390type UpdateBucketOptions struct { 391 Timeout time.Duration 392 RetryStrategy RetryStrategy 393} 394 395// UpdateBucket updates a bucket on the cluster. 396func (bm *BucketManager) UpdateBucket(settings BucketSettings, opts *UpdateBucketOptions) error { 397 if opts == nil { 398 opts = &UpdateBucketOptions{} 399 } 400 401 span := bm.tracer.StartSpan("UpdateBucket", nil). 402 SetTag("couchbase.service", "mgmt") 403 defer span.Finish() 404 405 posts, err := bm.settingsToPostData(&settings) 406 if err != nil { 407 return err 408 } 409 410 req := mgmtRequest{ 411 Service: ServiceTypeManagement, 412 Path: fmt.Sprintf("/pools/default/buckets/%s", settings.Name), 413 Method: "POST", 414 Body: []byte(posts.Encode()), 415 ContentType: "application/x-www-form-urlencoded", 416 RetryStrategy: opts.RetryStrategy, 417 UniqueID: uuid.New().String(), 418 Timeout: opts.Timeout, 419 parentSpan: span.Context(), 420 } 421 422 resp, err := bm.provider.executeMgmtRequest(req) 423 if err != nil { 424 return makeGenericMgmtError(err, &req, resp) 425 } 426 defer ensureBodyClosed(resp.Body) 427 428 if resp.StatusCode != 200 { 429 bktErr := bm.tryParseErrorMessage(&req, resp) 430 if bktErr != nil { 431 return bktErr 432 } 433 434 return makeMgmtBadStatusError("failed to update bucket", &req, resp) 435 } 436 437 return nil 438} 439 440// DropBucketOptions is the set of options available to the bucket manager DropBucket operation. 441type DropBucketOptions struct { 442 Timeout time.Duration 443 RetryStrategy RetryStrategy 444} 445 446// DropBucket will delete a bucket from the cluster by name. 447func (bm *BucketManager) DropBucket(name string, opts *DropBucketOptions) error { 448 if opts == nil { 449 opts = &DropBucketOptions{} 450 } 451 452 span := bm.tracer.StartSpan("DropBucket", nil). 453 SetTag("couchbase.service", "mgmt") 454 defer span.Finish() 455 456 req := mgmtRequest{ 457 Service: ServiceTypeManagement, 458 Path: fmt.Sprintf("/pools/default/buckets/%s", name), 459 Method: "DELETE", 460 RetryStrategy: opts.RetryStrategy, 461 UniqueID: uuid.New().String(), 462 Timeout: opts.Timeout, 463 parentSpan: span.Context(), 464 } 465 466 resp, err := bm.provider.executeMgmtRequest(req) 467 if err != nil { 468 return makeGenericMgmtError(err, &req, resp) 469 } 470 defer ensureBodyClosed(resp.Body) 471 472 if resp.StatusCode != 200 { 473 bktErr := bm.tryParseErrorMessage(&req, resp) 474 if bktErr != nil { 475 return bktErr 476 } 477 478 return makeMgmtBadStatusError("failed to drop bucket", &req, resp) 479 } 480 481 return nil 482} 483 484// FlushBucketOptions is the set of options available to the bucket manager FlushBucket operation. 485type FlushBucketOptions struct { 486 Timeout time.Duration 487 RetryStrategy RetryStrategy 488} 489 490// FlushBucket will delete all the of the data from a bucket. 491// Keep in mind that you must have flushing enabled in the buckets configuration. 492func (bm *BucketManager) FlushBucket(name string, opts *FlushBucketOptions) error { 493 if opts == nil { 494 opts = &FlushBucketOptions{} 495 } 496 497 span := bm.tracer.StartSpan("FlushBucket", nil). 498 SetTag("couchbase.service", "mgmt") 499 defer span.Finish() 500 501 req := mgmtRequest{ 502 Service: ServiceTypeManagement, 503 Path: fmt.Sprintf("/pools/default/buckets/%s/controller/doFlush", name), 504 Method: "POST", 505 RetryStrategy: opts.RetryStrategy, 506 UniqueID: uuid.New().String(), 507 Timeout: opts.Timeout, 508 parentSpan: span.Context(), 509 } 510 511 resp, err := bm.provider.executeMgmtRequest(req) 512 if err != nil { 513 return makeGenericMgmtError(err, &req, resp) 514 } 515 defer ensureBodyClosed(resp.Body) 516 517 if resp.StatusCode != 200 { 518 return bm.tryParseFlushErrorMessage(&req, resp) 519 } 520 521 return nil 522} 523 524func (bm *BucketManager) settingsToPostData(settings *BucketSettings) (url.Values, error) { 525 posts := url.Values{} 526 527 if settings.Name == "" { 528 return nil, makeInvalidArgumentsError("Name invalid, must be set.") 529 } 530 531 if settings.RAMQuotaMB < 100 { 532 return nil, makeInvalidArgumentsError("Memory quota invalid, must be greater than 100MB") 533 } 534 535 if settings.MaxTTL > 0 && settings.BucketType == MemcachedBucketType { 536 return nil, makeInvalidArgumentsError("maxTTL is not supported for memcached buckets") 537 } 538 539 posts.Add("name", settings.Name) 540 // posts.Add("saslPassword", settings.Password) 541 542 if settings.FlushEnabled { 543 posts.Add("flushEnabled", "1") 544 } else { 545 posts.Add("flushEnabled", "0") 546 } 547 548 // replicaIndex can't be set at all on ephemeral buckets. 549 if settings.BucketType != EphemeralBucketType { 550 if settings.ReplicaIndexDisabled { 551 posts.Add("replicaIndex", "0") 552 } else { 553 posts.Add("replicaIndex", "1") 554 } 555 } 556 557 switch settings.BucketType { 558 case CouchbaseBucketType: 559 posts.Add("bucketType", string(settings.BucketType)) 560 posts.Add("replicaNumber", fmt.Sprintf("%d", settings.NumReplicas)) 561 case MemcachedBucketType: 562 posts.Add("bucketType", string(settings.BucketType)) 563 if settings.NumReplicas > 0 { 564 return nil, makeInvalidArgumentsError("replicas cannot be used with memcached buckets") 565 } 566 case EphemeralBucketType: 567 posts.Add("bucketType", string(settings.BucketType)) 568 posts.Add("replicaNumber", fmt.Sprintf("%d", settings.NumReplicas)) 569 default: 570 return nil, makeInvalidArgumentsError("Unrecognized bucket type") 571 } 572 573 posts.Add("ramQuotaMB", fmt.Sprintf("%d", settings.RAMQuotaMB)) 574 575 if settings.EvictionPolicy != "" { 576 switch settings.BucketType { 577 case MemcachedBucketType: 578 return nil, makeInvalidArgumentsError("eviction policy is not valid for memcached buckets") 579 case CouchbaseBucketType: 580 if settings.EvictionPolicy == EvictionPolicyTypeNoEviction || settings.EvictionPolicy == EvictionPolicyTypeNotRecentlyUsed { 581 return nil, makeInvalidArgumentsError("eviction policy is not valid for couchbase buckets") 582 } 583 case EphemeralBucketType: 584 if settings.EvictionPolicy == EvictionPolicyTypeFull || settings.EvictionPolicy == EvictionPolicyTypeValueOnly { 585 return nil, makeInvalidArgumentsError("eviction policy is not valid for ephemeral buckets") 586 } 587 } 588 posts.Add("evictionPolicy", string(settings.EvictionPolicy)) 589 } 590 591 if settings.MaxTTL > 0 { 592 posts.Add("maxTTL", fmt.Sprintf("%d", settings.MaxTTL/time.Second)) 593 } 594 595 if settings.CompressionMode != "" { 596 posts.Add("compressionMode", string(settings.CompressionMode)) 597 } 598 599 return posts, nil 600} 601