1package gocb
2
3import (
4	"encoding/json"
5	"fmt"
6	"io/ioutil"
7	"net/url"
8	"strings"
9	"time"
10
11	"github.com/google/uuid"
12	"github.com/pkg/errors"
13)
14
15// BucketType specifies the kind of bucket.
16type BucketType string
17
18const (
19	// CouchbaseBucketType indicates a Couchbase bucket type.
20	CouchbaseBucketType BucketType = "membase"
21
22	// MemcachedBucketType indicates a Memcached bucket type.
23	MemcachedBucketType BucketType = "memcached"
24
25	// EphemeralBucketType indicates an Ephemeral bucket type.
26	EphemeralBucketType BucketType = "ephemeral"
27)
28
29// ConflictResolutionType specifies the kind of conflict resolution to use for a bucket.
30type ConflictResolutionType string
31
32const (
33	// ConflictResolutionTypeTimestamp specifies to use timestamp conflict resolution on the bucket.
34	ConflictResolutionTypeTimestamp ConflictResolutionType = "lww"
35
36	// ConflictResolutionTypeSequenceNumber specifies to use sequence number conflict resolution on the bucket.
37	ConflictResolutionTypeSequenceNumber ConflictResolutionType = "seqno"
38)
39
40// EvictionPolicyType specifies the kind of eviction policy to use for a bucket.
41type EvictionPolicyType string
42
43const (
44	// EvictionPolicyTypeFull specifies to use full eviction for a couchbase bucket.
45	EvictionPolicyTypeFull EvictionPolicyType = "fullEviction"
46
47	// EvictionPolicyTypeValueOnly specifies to use value only eviction for a couchbase bucket.
48	EvictionPolicyTypeValueOnly EvictionPolicyType = "valueOnly"
49
50	// EvictionPolicyTypeNotRecentlyUsed specifies to use not recently used (nru) eviction for an ephemeral bucket.
51	// UNCOMMITTED: This API may change in the future.
52	EvictionPolicyTypeNotRecentlyUsed EvictionPolicyType = "nruEviction"
53
54	// EvictionPolicyTypeNRU specifies to use no eviction for an ephemeral bucket.
55	// UNCOMMITTED: This API may change in the future.
56	EvictionPolicyTypeNoEviction EvictionPolicyType = "noEviction"
57)
58
59// CompressionMode specifies the kind of compression to use for a bucket.
60type CompressionMode string
61
62const (
63	// CompressionModeOff specifies to use no compression for a bucket.
64	CompressionModeOff CompressionMode = "off"
65
66	// CompressionModePassive specifies to use passive compression for a bucket.
67	CompressionModePassive CompressionMode = "passive"
68
69	// CompressionModeActive specifies to use active compression for a bucket.
70	CompressionModeActive CompressionMode = "active"
71)
72
73type jsonBucketSettings struct {
74	Name        string `json:"name"`
75	Controllers struct {
76		Flush string `json:"flush"`
77	} `json:"controllers"`
78	ReplicaIndex bool `json:"replicaIndex"`
79	Quota        struct {
80		RAM    uint64 `json:"ram"`
81		RawRAM uint64 `json:"rawRAM"`
82	} `json:"quota"`
83	ReplicaNumber          uint32 `json:"replicaNumber"`
84	BucketType             string `json:"bucketType"`
85	ConflictResolutionType string `json:"conflictResolutionType"`
86	EvictionPolicy         string `json:"evictionPolicy"`
87	MaxTTL                 uint32 `json:"maxTTL"`
88	CompressionMode        string `json:"compressionMode"`
89}
90
91// BucketSettings holds information about the settings for a bucket.
92type BucketSettings struct {
93	Name                 string
94	FlushEnabled         bool
95	ReplicaIndexDisabled bool // inverted so that zero value matches server default.
96	RAMQuotaMB           uint64
97	NumReplicas          uint32     // NOTE: If not set this will set 0 replicas.
98	BucketType           BucketType // Defaults to CouchbaseBucketType.
99	EvictionPolicy       EvictionPolicyType
100	MaxTTL               time.Duration
101	CompressionMode      CompressionMode
102}
103
104func (bs *BucketSettings) fromData(data jsonBucketSettings) error {
105	bs.Name = data.Name
106	bs.FlushEnabled = data.Controllers.Flush != ""
107	bs.ReplicaIndexDisabled = !data.ReplicaIndex
108	bs.RAMQuotaMB = data.Quota.RawRAM / 1024 / 1024
109	bs.NumReplicas = data.ReplicaNumber
110	bs.EvictionPolicy = EvictionPolicyType(data.EvictionPolicy)
111	bs.MaxTTL = time.Duration(data.MaxTTL) * time.Second
112	bs.CompressionMode = CompressionMode(data.CompressionMode)
113
114	switch data.BucketType {
115	case "membase":
116		bs.BucketType = CouchbaseBucketType
117	case "memcached":
118		bs.BucketType = MemcachedBucketType
119	case "ephemeral":
120		bs.BucketType = EphemeralBucketType
121	default:
122		return errors.New("unrecognized bucket type string")
123	}
124
125	return nil
126}
127
128type bucketMgrErrorResp struct {
129	Errors map[string]string `json:"errors"`
130}
131
132func (bm *BucketManager) tryParseErrorMessage(req *mgmtRequest, resp *mgmtResponse) error {
133	b, err := ioutil.ReadAll(resp.Body)
134	if err != nil {
135		logDebugf("Failed to read bucket manager response body: %s", err)
136		return nil
137	}
138
139	if resp.StatusCode == 404 {
140		// If it was a 404 then there's no chance of the response body containing any structure
141		if strings.Contains(strings.ToLower(string(b)), "resource not found") {
142			return makeGenericMgmtError(ErrBucketNotFound, req, resp)
143		}
144
145		return makeGenericMgmtError(errors.New(string(b)), req, resp)
146	}
147
148	var mgrErr bucketMgrErrorResp
149	err = json.Unmarshal(b, &mgrErr)
150	if err != nil {
151		logDebugf("Failed to unmarshal error body: %s", err)
152		return makeGenericMgmtError(errors.New(string(b)), req, resp)
153	}
154
155	var bodyErr error
156	var firstErr string
157	for _, err := range mgrErr.Errors {
158		firstErr = strings.ToLower(err)
159		break
160	}
161
162	if strings.Contains(firstErr, "bucket with given name already exists") {
163		bodyErr = ErrBucketExists
164	} else {
165		bodyErr = errors.New(firstErr)
166	}
167
168	return makeGenericMgmtError(bodyErr, req, resp)
169}
170
171// Flush doesn't use the same body format as anything else...
172func (bm *BucketManager) tryParseFlushErrorMessage(req *mgmtRequest, resp *mgmtResponse) error {
173	b, err := ioutil.ReadAll(resp.Body)
174	if err != nil {
175		logDebugf("Failed to read bucket manager response body: %s", err)
176		return makeMgmtBadStatusError("failed to flush bucket", req, resp)
177	}
178
179	var bodyErrMsgs map[string]string
180	err = json.Unmarshal(b, &bodyErrMsgs)
181	if err != nil {
182		return errors.New(string(b))
183	}
184
185	if errMsg, ok := bodyErrMsgs["_"]; ok {
186		if strings.Contains(strings.ToLower(errMsg), "flush is disabled") {
187			return ErrBucketNotFlushable
188		}
189	}
190
191	return errors.New(string(b))
192}
193
194// BucketManager provides methods for performing bucket management operations.
195// See BucketManager for methods that allow creating and removing buckets themselves.
196type BucketManager struct {
197	provider mgmtProvider
198	tracer   requestTracer
199}
200
201// GetBucketOptions is the set of options available to the bucket manager GetBucket operation.
202type GetBucketOptions struct {
203	Timeout       time.Duration
204	RetryStrategy RetryStrategy
205}
206
207// GetBucket returns settings for a bucket on the cluster.
208func (bm *BucketManager) GetBucket(bucketName string, opts *GetBucketOptions) (*BucketSettings, error) {
209	if opts == nil {
210		opts = &GetBucketOptions{}
211	}
212
213	span := bm.tracer.StartSpan("GetBucket", nil).
214		SetTag("couchbase.service", "mgmt")
215	defer span.Finish()
216
217	return bm.get(span.Context(), bucketName, opts.RetryStrategy, opts.Timeout)
218}
219
220func (bm *BucketManager) get(tracectx requestSpanContext, bucketName string,
221	strategy RetryStrategy, timeout time.Duration) (*BucketSettings, error) {
222
223	req := mgmtRequest{
224		Service:       ServiceTypeManagement,
225		Path:          fmt.Sprintf("/pools/default/buckets/%s", bucketName),
226		Method:        "GET",
227		IsIdempotent:  true,
228		RetryStrategy: strategy,
229		UniqueID:      uuid.New().String(),
230		Timeout:       timeout,
231		parentSpan:    tracectx,
232	}
233
234	resp, err := bm.provider.executeMgmtRequest(req)
235	if err != nil {
236		return nil, makeGenericMgmtError(err, &req, resp)
237	}
238	defer ensureBodyClosed(resp.Body)
239
240	if resp.StatusCode != 200 {
241		bktErr := bm.tryParseErrorMessage(&req, resp)
242		if bktErr != nil {
243			return nil, bktErr
244		}
245
246		return nil, makeMgmtBadStatusError("failed to get bucket", &req, resp)
247	}
248
249	var bucketData jsonBucketSettings
250	jsonDec := json.NewDecoder(resp.Body)
251	err = jsonDec.Decode(&bucketData)
252	if err != nil {
253		return nil, err
254	}
255
256	var settings BucketSettings
257	err = settings.fromData(bucketData)
258	if err != nil {
259		return nil, err
260	}
261
262	return &settings, nil
263}
264
265// GetAllBucketsOptions is the set of options available to the bucket manager GetAll operation.
266type GetAllBucketsOptions struct {
267	Timeout       time.Duration
268	RetryStrategy RetryStrategy
269}
270
271// GetAllBuckets returns a list of all active buckets on the cluster.
272func (bm *BucketManager) GetAllBuckets(opts *GetAllBucketsOptions) (map[string]BucketSettings, error) {
273	if opts == nil {
274		opts = &GetAllBucketsOptions{}
275	}
276
277	span := bm.tracer.StartSpan("GetAllBuckets", nil).
278		SetTag("couchbase.service", "mgmt")
279	defer span.Finish()
280
281	req := mgmtRequest{
282		Service:       ServiceTypeManagement,
283		Path:          "/pools/default/buckets",
284		Method:        "GET",
285		IsIdempotent:  true,
286		RetryStrategy: opts.RetryStrategy,
287		UniqueID:      uuid.New().String(),
288		Timeout:       opts.Timeout,
289		parentSpan:    span.Context(),
290	}
291
292	resp, err := bm.provider.executeMgmtRequest(req)
293	if err != nil {
294		return nil, makeGenericMgmtError(err, &req, resp)
295	}
296	defer ensureBodyClosed(resp.Body)
297
298	if resp.StatusCode != 200 {
299		bktErr := bm.tryParseErrorMessage(&req, resp)
300		if bktErr != nil {
301			return nil, bktErr
302		}
303
304		return nil, makeMgmtBadStatusError("failed to get all buckets", &req, resp)
305	}
306
307	var bucketsData []*jsonBucketSettings
308	jsonDec := json.NewDecoder(resp.Body)
309	err = jsonDec.Decode(&bucketsData)
310	if err != nil {
311		return nil, err
312	}
313
314	buckets := make(map[string]BucketSettings, len(bucketsData))
315	for _, bucketData := range bucketsData {
316		var bucket BucketSettings
317		err := bucket.fromData(*bucketData)
318		if err != nil {
319			return nil, err
320		}
321
322		buckets[bucket.Name] = bucket
323	}
324
325	return buckets, nil
326}
327
328// CreateBucketSettings are the settings available when creating a bucket.
329type CreateBucketSettings struct {
330	BucketSettings
331	ConflictResolutionType ConflictResolutionType
332}
333
334// CreateBucketOptions is the set of options available to the bucket manager CreateBucket operation.
335type CreateBucketOptions struct {
336	Timeout       time.Duration
337	RetryStrategy RetryStrategy
338}
339
340// CreateBucket creates a bucket on the cluster.
341func (bm *BucketManager) CreateBucket(settings CreateBucketSettings, opts *CreateBucketOptions) error {
342	if opts == nil {
343		opts = &CreateBucketOptions{}
344	}
345
346	span := bm.tracer.StartSpan("CreateBucket", nil).
347		SetTag("couchbase.service", "mgmt")
348	defer span.Finish()
349
350	posts, err := bm.settingsToPostData(&settings.BucketSettings)
351	if err != nil {
352		return err
353	}
354
355	if settings.ConflictResolutionType != "" {
356		posts.Add("conflictResolutionType", string(settings.ConflictResolutionType))
357	}
358
359	req := mgmtRequest{
360		Service:       ServiceTypeManagement,
361		Path:          "/pools/default/buckets",
362		Method:        "POST",
363		Body:          []byte(posts.Encode()),
364		ContentType:   "application/x-www-form-urlencoded",
365		RetryStrategy: opts.RetryStrategy,
366		UniqueID:      uuid.New().String(),
367		Timeout:       opts.Timeout,
368		parentSpan:    span.Context(),
369	}
370
371	resp, err := bm.provider.executeMgmtRequest(req)
372	if err != nil {
373		return makeGenericMgmtError(err, &req, resp)
374	}
375	defer ensureBodyClosed(resp.Body)
376
377	if resp.StatusCode != 202 {
378		bktErr := bm.tryParseErrorMessage(&req, resp)
379		if bktErr != nil {
380			return bktErr
381		}
382
383		return makeMgmtBadStatusError("failed to create bucket", &req, resp)
384	}
385
386	return nil
387}
388
389// UpdateBucketOptions is the set of options available to the bucket manager UpdateBucket operation.
390type UpdateBucketOptions struct {
391	Timeout       time.Duration
392	RetryStrategy RetryStrategy
393}
394
395// UpdateBucket updates a bucket on the cluster.
396func (bm *BucketManager) UpdateBucket(settings BucketSettings, opts *UpdateBucketOptions) error {
397	if opts == nil {
398		opts = &UpdateBucketOptions{}
399	}
400
401	span := bm.tracer.StartSpan("UpdateBucket", nil).
402		SetTag("couchbase.service", "mgmt")
403	defer span.Finish()
404
405	posts, err := bm.settingsToPostData(&settings)
406	if err != nil {
407		return err
408	}
409
410	req := mgmtRequest{
411		Service:       ServiceTypeManagement,
412		Path:          fmt.Sprintf("/pools/default/buckets/%s", settings.Name),
413		Method:        "POST",
414		Body:          []byte(posts.Encode()),
415		ContentType:   "application/x-www-form-urlencoded",
416		RetryStrategy: opts.RetryStrategy,
417		UniqueID:      uuid.New().String(),
418		Timeout:       opts.Timeout,
419		parentSpan:    span.Context(),
420	}
421
422	resp, err := bm.provider.executeMgmtRequest(req)
423	if err != nil {
424		return makeGenericMgmtError(err, &req, resp)
425	}
426	defer ensureBodyClosed(resp.Body)
427
428	if resp.StatusCode != 200 {
429		bktErr := bm.tryParseErrorMessage(&req, resp)
430		if bktErr != nil {
431			return bktErr
432		}
433
434		return makeMgmtBadStatusError("failed to update bucket", &req, resp)
435	}
436
437	return nil
438}
439
440// DropBucketOptions is the set of options available to the bucket manager DropBucket operation.
441type DropBucketOptions struct {
442	Timeout       time.Duration
443	RetryStrategy RetryStrategy
444}
445
446// DropBucket will delete a bucket from the cluster by name.
447func (bm *BucketManager) DropBucket(name string, opts *DropBucketOptions) error {
448	if opts == nil {
449		opts = &DropBucketOptions{}
450	}
451
452	span := bm.tracer.StartSpan("DropBucket", nil).
453		SetTag("couchbase.service", "mgmt")
454	defer span.Finish()
455
456	req := mgmtRequest{
457		Service:       ServiceTypeManagement,
458		Path:          fmt.Sprintf("/pools/default/buckets/%s", name),
459		Method:        "DELETE",
460		RetryStrategy: opts.RetryStrategy,
461		UniqueID:      uuid.New().String(),
462		Timeout:       opts.Timeout,
463		parentSpan:    span.Context(),
464	}
465
466	resp, err := bm.provider.executeMgmtRequest(req)
467	if err != nil {
468		return makeGenericMgmtError(err, &req, resp)
469	}
470	defer ensureBodyClosed(resp.Body)
471
472	if resp.StatusCode != 200 {
473		bktErr := bm.tryParseErrorMessage(&req, resp)
474		if bktErr != nil {
475			return bktErr
476		}
477
478		return makeMgmtBadStatusError("failed to drop bucket", &req, resp)
479	}
480
481	return nil
482}
483
484// FlushBucketOptions is the set of options available to the bucket manager FlushBucket operation.
485type FlushBucketOptions struct {
486	Timeout       time.Duration
487	RetryStrategy RetryStrategy
488}
489
490// FlushBucket will delete all the of the data from a bucket.
491// Keep in mind that you must have flushing enabled in the buckets configuration.
492func (bm *BucketManager) FlushBucket(name string, opts *FlushBucketOptions) error {
493	if opts == nil {
494		opts = &FlushBucketOptions{}
495	}
496
497	span := bm.tracer.StartSpan("FlushBucket", nil).
498		SetTag("couchbase.service", "mgmt")
499	defer span.Finish()
500
501	req := mgmtRequest{
502		Service:       ServiceTypeManagement,
503		Path:          fmt.Sprintf("/pools/default/buckets/%s/controller/doFlush", name),
504		Method:        "POST",
505		RetryStrategy: opts.RetryStrategy,
506		UniqueID:      uuid.New().String(),
507		Timeout:       opts.Timeout,
508		parentSpan:    span.Context(),
509	}
510
511	resp, err := bm.provider.executeMgmtRequest(req)
512	if err != nil {
513		return makeGenericMgmtError(err, &req, resp)
514	}
515	defer ensureBodyClosed(resp.Body)
516
517	if resp.StatusCode != 200 {
518		return bm.tryParseFlushErrorMessage(&req, resp)
519	}
520
521	return nil
522}
523
524func (bm *BucketManager) settingsToPostData(settings *BucketSettings) (url.Values, error) {
525	posts := url.Values{}
526
527	if settings.Name == "" {
528		return nil, makeInvalidArgumentsError("Name invalid, must be set.")
529	}
530
531	if settings.RAMQuotaMB < 100 {
532		return nil, makeInvalidArgumentsError("Memory quota invalid, must be greater than 100MB")
533	}
534
535	if settings.MaxTTL > 0 && settings.BucketType == MemcachedBucketType {
536		return nil, makeInvalidArgumentsError("maxTTL is not supported for memcached buckets")
537	}
538
539	posts.Add("name", settings.Name)
540	// posts.Add("saslPassword", settings.Password)
541
542	if settings.FlushEnabled {
543		posts.Add("flushEnabled", "1")
544	} else {
545		posts.Add("flushEnabled", "0")
546	}
547
548	// replicaIndex can't be set at all on ephemeral buckets.
549	if settings.BucketType != EphemeralBucketType {
550		if settings.ReplicaIndexDisabled {
551			posts.Add("replicaIndex", "0")
552		} else {
553			posts.Add("replicaIndex", "1")
554		}
555	}
556
557	switch settings.BucketType {
558	case CouchbaseBucketType:
559		posts.Add("bucketType", string(settings.BucketType))
560		posts.Add("replicaNumber", fmt.Sprintf("%d", settings.NumReplicas))
561	case MemcachedBucketType:
562		posts.Add("bucketType", string(settings.BucketType))
563		if settings.NumReplicas > 0 {
564			return nil, makeInvalidArgumentsError("replicas cannot be used with memcached buckets")
565		}
566	case EphemeralBucketType:
567		posts.Add("bucketType", string(settings.BucketType))
568		posts.Add("replicaNumber", fmt.Sprintf("%d", settings.NumReplicas))
569	default:
570		return nil, makeInvalidArgumentsError("Unrecognized bucket type")
571	}
572
573	posts.Add("ramQuotaMB", fmt.Sprintf("%d", settings.RAMQuotaMB))
574
575	if settings.EvictionPolicy != "" {
576		switch settings.BucketType {
577		case MemcachedBucketType:
578			return nil, makeInvalidArgumentsError("eviction policy is not valid for memcached buckets")
579		case CouchbaseBucketType:
580			if settings.EvictionPolicy == EvictionPolicyTypeNoEviction || settings.EvictionPolicy == EvictionPolicyTypeNotRecentlyUsed {
581				return nil, makeInvalidArgumentsError("eviction policy is not valid for couchbase buckets")
582			}
583		case EphemeralBucketType:
584			if settings.EvictionPolicy == EvictionPolicyTypeFull || settings.EvictionPolicy == EvictionPolicyTypeValueOnly {
585				return nil, makeInvalidArgumentsError("eviction policy is not valid for ephemeral buckets")
586			}
587		}
588		posts.Add("evictionPolicy", string(settings.EvictionPolicy))
589	}
590
591	if settings.MaxTTL > 0 {
592		posts.Add("maxTTL", fmt.Sprintf("%d", settings.MaxTTL/time.Second))
593	}
594
595	if settings.CompressionMode != "" {
596		posts.Add("compressionMode", string(settings.CompressionMode))
597	}
598
599	return posts, nil
600}
601