1package gocb
2
3import (
4	"crypto/x509"
5	"fmt"
6	"strconv"
7	"time"
8
9	gocbcore "github.com/couchbase/gocbcore/v9"
10	gocbconnstr "github.com/couchbase/gocbcore/v9/connstr"
11	"github.com/pkg/errors"
12)
13
14// Cluster represents a connection to a specific Couchbase cluster.
15type Cluster struct {
16	cSpec gocbconnstr.ConnSpec
17	auth  Authenticator
18
19	connectionManager connectionManager
20
21	useServerDurations bool
22	useMutationTokens  bool
23
24	timeoutsConfig TimeoutsConfig
25
26	transcoder           Transcoder
27	retryStrategyWrapper *retryStrategyWrapper
28
29	orphanLoggerEnabled    bool
30	orphanLoggerInterval   time.Duration
31	orphanLoggerSampleSize uint32
32
33	tracer requestTracer
34
35	circuitBreakerConfig CircuitBreakerConfig
36	securityConfig       SecurityConfig
37	internalConfig       InternalConfig
38}
39
40// IoConfig specifies IO related configuration options.
41type IoConfig struct {
42	DisableMutationTokens  bool
43	DisableServerDurations bool
44}
45
46// TimeoutsConfig specifies options for various operation timeouts.
47type TimeoutsConfig struct {
48	ConnectTimeout time.Duration
49	KVTimeout      time.Duration
50	// Volatile: This option is subject to change at any time.
51	KVDurableTimeout  time.Duration
52	ViewTimeout       time.Duration
53	QueryTimeout      time.Duration
54	AnalyticsTimeout  time.Duration
55	SearchTimeout     time.Duration
56	ManagementTimeout time.Duration
57}
58
59// OrphanReporterConfig specifies options for controlling the orphan
60// reporter which records when the SDK receives responses for requests
61// that are no longer in the system (usually due to being timed out).
62type OrphanReporterConfig struct {
63	Disabled       bool
64	ReportInterval time.Duration
65	SampleSize     uint32
66}
67
68// SecurityConfig specifies options for controlling security related
69// items such as TLS root certificates and verification skipping.
70type SecurityConfig struct {
71	TLSRootCAs    *x509.CertPool
72	TLSSkipVerify bool
73}
74
75// InternalConfig specifies options for controlling various internal
76// items.
77// Internal: This should never be used and is not supported.
78type InternalConfig struct {
79	TLSRootCAProvider func() *x509.CertPool
80}
81
82// ClusterOptions is the set of options available for creating a Cluster.
83type ClusterOptions struct {
84	// Authenticator specifies the authenticator to use with the cluster.
85	Authenticator Authenticator
86
87	// Username & Password specifies the cluster username and password to
88	// authenticate with.  This is equivalent to passing PasswordAuthenticator
89	// as the Authenticator parameter with the same values.
90	Username string
91	Password string
92
93	// Timeouts specifies various operation timeouts.
94	TimeoutsConfig TimeoutsConfig
95
96	// Transcoder is used for trancoding data used in KV operations.
97	Transcoder Transcoder
98
99	// RetryStrategy is used to automatically retry operations if they fail.
100	RetryStrategy RetryStrategy
101
102	// Tracer specifies the tracer to use for requests.
103	// VOLATILE: This API is subject to change at any time.
104	Tracer requestTracer
105
106	// OrphanReporterConfig specifies options for the orphan reporter.
107	OrphanReporterConfig OrphanReporterConfig
108
109	// CircuitBreakerConfig specifies options for the circuit breakers.
110	CircuitBreakerConfig CircuitBreakerConfig
111
112	// IoConfig specifies IO related configuration options.
113	IoConfig IoConfig
114
115	// SecurityConfig specifies security related configuration options.
116	SecurityConfig SecurityConfig
117
118	// Internal: This should never be used and is not supported.
119	InternalConfig InternalConfig
120}
121
122// ClusterCloseOptions is the set of options available when
123// disconnecting from a Cluster.
124type ClusterCloseOptions struct {
125}
126
127func clusterFromOptions(opts ClusterOptions) *Cluster {
128	if opts.Authenticator == nil {
129		opts.Authenticator = PasswordAuthenticator{
130			Username: opts.Username,
131			Password: opts.Password,
132		}
133	}
134
135	connectTimeout := 10000 * time.Millisecond
136	kvTimeout := 2500 * time.Millisecond
137	kvDurableTimeout := 10000 * time.Millisecond
138	viewTimeout := 75000 * time.Millisecond
139	queryTimeout := 75000 * time.Millisecond
140	analyticsTimeout := 75000 * time.Millisecond
141	searchTimeout := 75000 * time.Millisecond
142	managementTimeout := 75000 * time.Millisecond
143	if opts.TimeoutsConfig.ConnectTimeout > 0 {
144		connectTimeout = opts.TimeoutsConfig.ConnectTimeout
145	}
146	if opts.TimeoutsConfig.KVTimeout > 0 {
147		kvTimeout = opts.TimeoutsConfig.KVTimeout
148	}
149	if opts.TimeoutsConfig.KVDurableTimeout > 0 {
150		kvDurableTimeout = opts.TimeoutsConfig.KVDurableTimeout
151	}
152	if opts.TimeoutsConfig.ViewTimeout > 0 {
153		viewTimeout = opts.TimeoutsConfig.ViewTimeout
154	}
155	if opts.TimeoutsConfig.QueryTimeout > 0 {
156		queryTimeout = opts.TimeoutsConfig.QueryTimeout
157	}
158	if opts.TimeoutsConfig.AnalyticsTimeout > 0 {
159		analyticsTimeout = opts.TimeoutsConfig.AnalyticsTimeout
160	}
161	if opts.TimeoutsConfig.SearchTimeout > 0 {
162		searchTimeout = opts.TimeoutsConfig.SearchTimeout
163	}
164	if opts.TimeoutsConfig.ManagementTimeout > 0 {
165		managementTimeout = opts.TimeoutsConfig.ManagementTimeout
166	}
167	if opts.Transcoder == nil {
168		opts.Transcoder = NewJSONTranscoder()
169	}
170	if opts.RetryStrategy == nil {
171		opts.RetryStrategy = NewBestEffortRetryStrategy(nil)
172	}
173
174	useMutationTokens := true
175	useServerDurations := true
176	if opts.IoConfig.DisableMutationTokens {
177		useMutationTokens = false
178	}
179	if opts.IoConfig.DisableServerDurations {
180		useServerDurations = false
181	}
182
183	var initialTracer requestTracer
184	if opts.Tracer != nil {
185		initialTracer = opts.Tracer
186	} else {
187		initialTracer = newThresholdLoggingTracer(nil)
188	}
189	tracerAddRef(initialTracer)
190
191	return &Cluster{
192		auth: opts.Authenticator,
193		timeoutsConfig: TimeoutsConfig{
194			ConnectTimeout:    connectTimeout,
195			QueryTimeout:      queryTimeout,
196			AnalyticsTimeout:  analyticsTimeout,
197			SearchTimeout:     searchTimeout,
198			ViewTimeout:       viewTimeout,
199			KVTimeout:         kvTimeout,
200			KVDurableTimeout:  kvDurableTimeout,
201			ManagementTimeout: managementTimeout,
202		},
203		transcoder:             opts.Transcoder,
204		useMutationTokens:      useMutationTokens,
205		retryStrategyWrapper:   newRetryStrategyWrapper(opts.RetryStrategy),
206		orphanLoggerEnabled:    !opts.OrphanReporterConfig.Disabled,
207		orphanLoggerInterval:   opts.OrphanReporterConfig.ReportInterval,
208		orphanLoggerSampleSize: opts.OrphanReporterConfig.SampleSize,
209		useServerDurations:     useServerDurations,
210		tracer:                 initialTracer,
211		circuitBreakerConfig:   opts.CircuitBreakerConfig,
212		securityConfig:         opts.SecurityConfig,
213		internalConfig:         opts.InternalConfig,
214	}
215}
216
217// Connect creates and returns a Cluster instance created using the
218// provided options and a connection string.
219func Connect(connStr string, opts ClusterOptions) (*Cluster, error) {
220	connSpec, err := gocbconnstr.Parse(connStr)
221	if err != nil {
222		return nil, err
223	}
224
225	if connSpec.Scheme == "http" {
226		return nil, errors.New("http scheme is not supported, use couchbase or couchbases instead")
227	}
228
229	cluster := clusterFromOptions(opts)
230	cluster.cSpec = connSpec
231
232	err = cluster.parseExtraConnStrOptions(connSpec)
233	if err != nil {
234		return nil, err
235	}
236
237	cli := newConnectionMgr()
238	err = cli.buildConfig(cluster)
239	if err != nil {
240		return nil, err
241	}
242
243	err = cli.connect()
244	if err != nil {
245		return nil, err
246	}
247	cluster.connectionManager = cli
248
249	return cluster, nil
250}
251
252func (c *Cluster) parseExtraConnStrOptions(spec gocbconnstr.ConnSpec) error {
253	fetchOption := func(name string) (string, bool) {
254		optValue := spec.Options[name]
255		if len(optValue) == 0 {
256			return "", false
257		}
258		return optValue[len(optValue)-1], true
259	}
260
261	if valStr, ok := fetchOption("query_timeout"); ok {
262		val, err := strconv.ParseInt(valStr, 10, 64)
263		if err != nil {
264			return fmt.Errorf("query_timeout option must be a number")
265		}
266		c.timeoutsConfig.QueryTimeout = time.Duration(val) * time.Millisecond
267	}
268
269	if valStr, ok := fetchOption("analytics_timeout"); ok {
270		val, err := strconv.ParseInt(valStr, 10, 64)
271		if err != nil {
272			return fmt.Errorf("analytics_timeout option must be a number")
273		}
274		c.timeoutsConfig.AnalyticsTimeout = time.Duration(val) * time.Millisecond
275	}
276
277	if valStr, ok := fetchOption("search_timeout"); ok {
278		val, err := strconv.ParseInt(valStr, 10, 64)
279		if err != nil {
280			return fmt.Errorf("search_timeout option must be a number")
281		}
282		c.timeoutsConfig.SearchTimeout = time.Duration(val) * time.Millisecond
283	}
284
285	if valStr, ok := fetchOption("view_timeout"); ok {
286		val, err := strconv.ParseInt(valStr, 10, 64)
287		if err != nil {
288			return fmt.Errorf("view_timeout option must be a number")
289		}
290		c.timeoutsConfig.ViewTimeout = time.Duration(val) * time.Millisecond
291	}
292
293	return nil
294}
295
296// Bucket connects the cluster to server(s) and returns a new Bucket instance.
297func (c *Cluster) Bucket(bucketName string) *Bucket {
298	b := newBucket(c, bucketName)
299	err := c.connectionManager.openBucket(bucketName)
300	if err != nil {
301		b.setBootstrapError(err)
302	}
303
304	return b
305}
306
307func (c *Cluster) authenticator() Authenticator {
308	return c.auth
309}
310
311func (c *Cluster) connSpec() gocbconnstr.ConnSpec {
312	return c.cSpec
313}
314
315// WaitUntilReadyOptions is the set of options available to the WaitUntilReady operations.
316type WaitUntilReadyOptions struct {
317	DesiredState ClusterState
318	ServiceTypes []ServiceType
319}
320
321// WaitUntilReady will wait for the cluster object to be ready for use.
322// At present this will wait until memd connections have been established with the server and are ready
323// to be used before performing a ping against the specified services which also
324// exist in the cluster map.
325// If no services are specified then ServiceTypeManagement, ServiceTypeQuery, ServiceTypeSearch, ServiceTypeAnalytics
326// will be pinged.
327// Valid service types are: ServiceTypeManagement, ServiceTypeQuery, ServiceTypeSearch, ServiceTypeAnalytics.
328func (c *Cluster) WaitUntilReady(timeout time.Duration, opts *WaitUntilReadyOptions) error {
329	if opts == nil {
330		opts = &WaitUntilReadyOptions{}
331	}
332
333	cli := c.connectionManager
334	if cli == nil {
335		return errors.New("cluster is not connected")
336	}
337
338	provider, err := cli.getWaitUntilReadyProvider("")
339	if err != nil {
340		return err
341	}
342
343	desiredState := opts.DesiredState
344	if desiredState == 0 {
345		desiredState = ClusterStateOnline
346	}
347
348	services := opts.ServiceTypes
349	gocbcoreServices := make([]gocbcore.ServiceType, len(services))
350	for i, svc := range services {
351		gocbcoreServices[i] = gocbcore.ServiceType(svc)
352	}
353
354	err = provider.WaitUntilReady(
355		time.Now().Add(timeout),
356		gocbcore.WaitUntilReadyOptions{
357			DesiredState: gocbcore.ClusterState(desiredState),
358			ServiceTypes: gocbcoreServices,
359		},
360	)
361	if err != nil {
362		return err
363	}
364
365	return nil
366}
367
368// Close shuts down all buckets in this cluster and invalidates any references this cluster has.
369func (c *Cluster) Close(opts *ClusterCloseOptions) error {
370	var overallErr error
371
372	if c.connectionManager != nil {
373		err := c.connectionManager.close()
374		if err != nil {
375			logWarnf("Failed to close cluster connectionManager in cluster close: %s", err)
376			overallErr = err
377		}
378	}
379
380	if c.tracer != nil {
381		tracerDecRef(c.tracer)
382		c.tracer = nil
383	}
384
385	return overallErr
386}
387
388func (c *Cluster) getDiagnosticsProvider() (diagnosticsProvider, error) {
389	provider, err := c.connectionManager.getDiagnosticsProvider("")
390	if err != nil {
391		return nil, err
392	}
393
394	return provider, nil
395}
396
397func (c *Cluster) getQueryProvider() (queryProvider, error) {
398	provider, err := c.connectionManager.getQueryProvider()
399	if err != nil {
400		return nil, err
401	}
402
403	return provider, nil
404}
405
406func (c *Cluster) getAnalyticsProvider() (analyticsProvider, error) {
407	provider, err := c.connectionManager.getAnalyticsProvider()
408	if err != nil {
409		return nil, err
410	}
411
412	return provider, nil
413}
414
415func (c *Cluster) getSearchProvider() (searchProvider, error) {
416	provider, err := c.connectionManager.getSearchProvider()
417	if err != nil {
418		return nil, err
419	}
420
421	return provider, nil
422}
423
424func (c *Cluster) getHTTPProvider() (httpProvider, error) {
425	provider, err := c.connectionManager.getHTTPProvider()
426	if err != nil {
427		return nil, err
428	}
429
430	return provider, nil
431}
432
433// Users returns a UserManager for managing users.
434func (c *Cluster) Users() *UserManager {
435	return &UserManager{
436		provider: c,
437		tracer:   c.tracer,
438	}
439}
440
441// Buckets returns a BucketManager for managing buckets.
442func (c *Cluster) Buckets() *BucketManager {
443	return &BucketManager{
444		provider: c,
445		tracer:   c.tracer,
446	}
447}
448
449// AnalyticsIndexes returns an AnalyticsIndexManager for managing analytics indexes.
450func (c *Cluster) AnalyticsIndexes() *AnalyticsIndexManager {
451	return &AnalyticsIndexManager{
452		aProvider:     c,
453		mgmtProvider:  c,
454		globalTimeout: c.timeoutsConfig.ManagementTimeout,
455		tracer:        c.tracer,
456	}
457}
458
459// QueryIndexes returns a QueryIndexManager for managing query indexes.
460func (c *Cluster) QueryIndexes() *QueryIndexManager {
461	return &QueryIndexManager{
462		provider:      c,
463		globalTimeout: c.timeoutsConfig.ManagementTimeout,
464		tracer:        c.tracer,
465	}
466}
467
468// SearchIndexes returns a SearchIndexManager for managing search indexes.
469func (c *Cluster) SearchIndexes() *SearchIndexManager {
470	return &SearchIndexManager{
471		mgmtProvider: c,
472		tracer:       c.tracer,
473	}
474}
475