1package gocb 2 3import ( 4 "crypto/x509" 5 "fmt" 6 "strconv" 7 "time" 8 9 gocbcore "github.com/couchbase/gocbcore/v9" 10 gocbconnstr "github.com/couchbase/gocbcore/v9/connstr" 11 "github.com/pkg/errors" 12) 13 14// Cluster represents a connection to a specific Couchbase cluster. 15type Cluster struct { 16 cSpec gocbconnstr.ConnSpec 17 auth Authenticator 18 19 connectionManager connectionManager 20 21 useServerDurations bool 22 useMutationTokens bool 23 24 timeoutsConfig TimeoutsConfig 25 26 transcoder Transcoder 27 retryStrategyWrapper *retryStrategyWrapper 28 29 orphanLoggerEnabled bool 30 orphanLoggerInterval time.Duration 31 orphanLoggerSampleSize uint32 32 33 tracer requestTracer 34 35 circuitBreakerConfig CircuitBreakerConfig 36 securityConfig SecurityConfig 37 internalConfig InternalConfig 38} 39 40// IoConfig specifies IO related configuration options. 41type IoConfig struct { 42 DisableMutationTokens bool 43 DisableServerDurations bool 44} 45 46// TimeoutsConfig specifies options for various operation timeouts. 47type TimeoutsConfig struct { 48 ConnectTimeout time.Duration 49 KVTimeout time.Duration 50 // Volatile: This option is subject to change at any time. 51 KVDurableTimeout time.Duration 52 ViewTimeout time.Duration 53 QueryTimeout time.Duration 54 AnalyticsTimeout time.Duration 55 SearchTimeout time.Duration 56 ManagementTimeout time.Duration 57} 58 59// OrphanReporterConfig specifies options for controlling the orphan 60// reporter which records when the SDK receives responses for requests 61// that are no longer in the system (usually due to being timed out). 62type OrphanReporterConfig struct { 63 Disabled bool 64 ReportInterval time.Duration 65 SampleSize uint32 66} 67 68// SecurityConfig specifies options for controlling security related 69// items such as TLS root certificates and verification skipping. 70type SecurityConfig struct { 71 TLSRootCAs *x509.CertPool 72 TLSSkipVerify bool 73} 74 75// InternalConfig specifies options for controlling various internal 76// items. 77// Internal: This should never be used and is not supported. 78type InternalConfig struct { 79 TLSRootCAProvider func() *x509.CertPool 80} 81 82// ClusterOptions is the set of options available for creating a Cluster. 83type ClusterOptions struct { 84 // Authenticator specifies the authenticator to use with the cluster. 85 Authenticator Authenticator 86 87 // Username & Password specifies the cluster username and password to 88 // authenticate with. This is equivalent to passing PasswordAuthenticator 89 // as the Authenticator parameter with the same values. 90 Username string 91 Password string 92 93 // Timeouts specifies various operation timeouts. 94 TimeoutsConfig TimeoutsConfig 95 96 // Transcoder is used for trancoding data used in KV operations. 97 Transcoder Transcoder 98 99 // RetryStrategy is used to automatically retry operations if they fail. 100 RetryStrategy RetryStrategy 101 102 // Tracer specifies the tracer to use for requests. 103 // VOLATILE: This API is subject to change at any time. 104 Tracer requestTracer 105 106 // OrphanReporterConfig specifies options for the orphan reporter. 107 OrphanReporterConfig OrphanReporterConfig 108 109 // CircuitBreakerConfig specifies options for the circuit breakers. 110 CircuitBreakerConfig CircuitBreakerConfig 111 112 // IoConfig specifies IO related configuration options. 113 IoConfig IoConfig 114 115 // SecurityConfig specifies security related configuration options. 116 SecurityConfig SecurityConfig 117 118 // Internal: This should never be used and is not supported. 119 InternalConfig InternalConfig 120} 121 122// ClusterCloseOptions is the set of options available when 123// disconnecting from a Cluster. 124type ClusterCloseOptions struct { 125} 126 127func clusterFromOptions(opts ClusterOptions) *Cluster { 128 if opts.Authenticator == nil { 129 opts.Authenticator = PasswordAuthenticator{ 130 Username: opts.Username, 131 Password: opts.Password, 132 } 133 } 134 135 connectTimeout := 10000 * time.Millisecond 136 kvTimeout := 2500 * time.Millisecond 137 kvDurableTimeout := 10000 * time.Millisecond 138 viewTimeout := 75000 * time.Millisecond 139 queryTimeout := 75000 * time.Millisecond 140 analyticsTimeout := 75000 * time.Millisecond 141 searchTimeout := 75000 * time.Millisecond 142 managementTimeout := 75000 * time.Millisecond 143 if opts.TimeoutsConfig.ConnectTimeout > 0 { 144 connectTimeout = opts.TimeoutsConfig.ConnectTimeout 145 } 146 if opts.TimeoutsConfig.KVTimeout > 0 { 147 kvTimeout = opts.TimeoutsConfig.KVTimeout 148 } 149 if opts.TimeoutsConfig.KVDurableTimeout > 0 { 150 kvDurableTimeout = opts.TimeoutsConfig.KVDurableTimeout 151 } 152 if opts.TimeoutsConfig.ViewTimeout > 0 { 153 viewTimeout = opts.TimeoutsConfig.ViewTimeout 154 } 155 if opts.TimeoutsConfig.QueryTimeout > 0 { 156 queryTimeout = opts.TimeoutsConfig.QueryTimeout 157 } 158 if opts.TimeoutsConfig.AnalyticsTimeout > 0 { 159 analyticsTimeout = opts.TimeoutsConfig.AnalyticsTimeout 160 } 161 if opts.TimeoutsConfig.SearchTimeout > 0 { 162 searchTimeout = opts.TimeoutsConfig.SearchTimeout 163 } 164 if opts.TimeoutsConfig.ManagementTimeout > 0 { 165 managementTimeout = opts.TimeoutsConfig.ManagementTimeout 166 } 167 if opts.Transcoder == nil { 168 opts.Transcoder = NewJSONTranscoder() 169 } 170 if opts.RetryStrategy == nil { 171 opts.RetryStrategy = NewBestEffortRetryStrategy(nil) 172 } 173 174 useMutationTokens := true 175 useServerDurations := true 176 if opts.IoConfig.DisableMutationTokens { 177 useMutationTokens = false 178 } 179 if opts.IoConfig.DisableServerDurations { 180 useServerDurations = false 181 } 182 183 var initialTracer requestTracer 184 if opts.Tracer != nil { 185 initialTracer = opts.Tracer 186 } else { 187 initialTracer = newThresholdLoggingTracer(nil) 188 } 189 tracerAddRef(initialTracer) 190 191 return &Cluster{ 192 auth: opts.Authenticator, 193 timeoutsConfig: TimeoutsConfig{ 194 ConnectTimeout: connectTimeout, 195 QueryTimeout: queryTimeout, 196 AnalyticsTimeout: analyticsTimeout, 197 SearchTimeout: searchTimeout, 198 ViewTimeout: viewTimeout, 199 KVTimeout: kvTimeout, 200 KVDurableTimeout: kvDurableTimeout, 201 ManagementTimeout: managementTimeout, 202 }, 203 transcoder: opts.Transcoder, 204 useMutationTokens: useMutationTokens, 205 retryStrategyWrapper: newRetryStrategyWrapper(opts.RetryStrategy), 206 orphanLoggerEnabled: !opts.OrphanReporterConfig.Disabled, 207 orphanLoggerInterval: opts.OrphanReporterConfig.ReportInterval, 208 orphanLoggerSampleSize: opts.OrphanReporterConfig.SampleSize, 209 useServerDurations: useServerDurations, 210 tracer: initialTracer, 211 circuitBreakerConfig: opts.CircuitBreakerConfig, 212 securityConfig: opts.SecurityConfig, 213 internalConfig: opts.InternalConfig, 214 } 215} 216 217// Connect creates and returns a Cluster instance created using the 218// provided options and a connection string. 219func Connect(connStr string, opts ClusterOptions) (*Cluster, error) { 220 connSpec, err := gocbconnstr.Parse(connStr) 221 if err != nil { 222 return nil, err 223 } 224 225 if connSpec.Scheme == "http" { 226 return nil, errors.New("http scheme is not supported, use couchbase or couchbases instead") 227 } 228 229 cluster := clusterFromOptions(opts) 230 cluster.cSpec = connSpec 231 232 err = cluster.parseExtraConnStrOptions(connSpec) 233 if err != nil { 234 return nil, err 235 } 236 237 cli := newConnectionMgr() 238 err = cli.buildConfig(cluster) 239 if err != nil { 240 return nil, err 241 } 242 243 err = cli.connect() 244 if err != nil { 245 return nil, err 246 } 247 cluster.connectionManager = cli 248 249 return cluster, nil 250} 251 252func (c *Cluster) parseExtraConnStrOptions(spec gocbconnstr.ConnSpec) error { 253 fetchOption := func(name string) (string, bool) { 254 optValue := spec.Options[name] 255 if len(optValue) == 0 { 256 return "", false 257 } 258 return optValue[len(optValue)-1], true 259 } 260 261 if valStr, ok := fetchOption("query_timeout"); ok { 262 val, err := strconv.ParseInt(valStr, 10, 64) 263 if err != nil { 264 return fmt.Errorf("query_timeout option must be a number") 265 } 266 c.timeoutsConfig.QueryTimeout = time.Duration(val) * time.Millisecond 267 } 268 269 if valStr, ok := fetchOption("analytics_timeout"); ok { 270 val, err := strconv.ParseInt(valStr, 10, 64) 271 if err != nil { 272 return fmt.Errorf("analytics_timeout option must be a number") 273 } 274 c.timeoutsConfig.AnalyticsTimeout = time.Duration(val) * time.Millisecond 275 } 276 277 if valStr, ok := fetchOption("search_timeout"); ok { 278 val, err := strconv.ParseInt(valStr, 10, 64) 279 if err != nil { 280 return fmt.Errorf("search_timeout option must be a number") 281 } 282 c.timeoutsConfig.SearchTimeout = time.Duration(val) * time.Millisecond 283 } 284 285 if valStr, ok := fetchOption("view_timeout"); ok { 286 val, err := strconv.ParseInt(valStr, 10, 64) 287 if err != nil { 288 return fmt.Errorf("view_timeout option must be a number") 289 } 290 c.timeoutsConfig.ViewTimeout = time.Duration(val) * time.Millisecond 291 } 292 293 return nil 294} 295 296// Bucket connects the cluster to server(s) and returns a new Bucket instance. 297func (c *Cluster) Bucket(bucketName string) *Bucket { 298 b := newBucket(c, bucketName) 299 err := c.connectionManager.openBucket(bucketName) 300 if err != nil { 301 b.setBootstrapError(err) 302 } 303 304 return b 305} 306 307func (c *Cluster) authenticator() Authenticator { 308 return c.auth 309} 310 311func (c *Cluster) connSpec() gocbconnstr.ConnSpec { 312 return c.cSpec 313} 314 315// WaitUntilReadyOptions is the set of options available to the WaitUntilReady operations. 316type WaitUntilReadyOptions struct { 317 DesiredState ClusterState 318 ServiceTypes []ServiceType 319} 320 321// WaitUntilReady will wait for the cluster object to be ready for use. 322// At present this will wait until memd connections have been established with the server and are ready 323// to be used before performing a ping against the specified services which also 324// exist in the cluster map. 325// If no services are specified then ServiceTypeManagement, ServiceTypeQuery, ServiceTypeSearch, ServiceTypeAnalytics 326// will be pinged. 327// Valid service types are: ServiceTypeManagement, ServiceTypeQuery, ServiceTypeSearch, ServiceTypeAnalytics. 328func (c *Cluster) WaitUntilReady(timeout time.Duration, opts *WaitUntilReadyOptions) error { 329 if opts == nil { 330 opts = &WaitUntilReadyOptions{} 331 } 332 333 cli := c.connectionManager 334 if cli == nil { 335 return errors.New("cluster is not connected") 336 } 337 338 provider, err := cli.getWaitUntilReadyProvider("") 339 if err != nil { 340 return err 341 } 342 343 desiredState := opts.DesiredState 344 if desiredState == 0 { 345 desiredState = ClusterStateOnline 346 } 347 348 services := opts.ServiceTypes 349 gocbcoreServices := make([]gocbcore.ServiceType, len(services)) 350 for i, svc := range services { 351 gocbcoreServices[i] = gocbcore.ServiceType(svc) 352 } 353 354 err = provider.WaitUntilReady( 355 time.Now().Add(timeout), 356 gocbcore.WaitUntilReadyOptions{ 357 DesiredState: gocbcore.ClusterState(desiredState), 358 ServiceTypes: gocbcoreServices, 359 }, 360 ) 361 if err != nil { 362 return err 363 } 364 365 return nil 366} 367 368// Close shuts down all buckets in this cluster and invalidates any references this cluster has. 369func (c *Cluster) Close(opts *ClusterCloseOptions) error { 370 var overallErr error 371 372 if c.connectionManager != nil { 373 err := c.connectionManager.close() 374 if err != nil { 375 logWarnf("Failed to close cluster connectionManager in cluster close: %s", err) 376 overallErr = err 377 } 378 } 379 380 if c.tracer != nil { 381 tracerDecRef(c.tracer) 382 c.tracer = nil 383 } 384 385 return overallErr 386} 387 388func (c *Cluster) getDiagnosticsProvider() (diagnosticsProvider, error) { 389 provider, err := c.connectionManager.getDiagnosticsProvider("") 390 if err != nil { 391 return nil, err 392 } 393 394 return provider, nil 395} 396 397func (c *Cluster) getQueryProvider() (queryProvider, error) { 398 provider, err := c.connectionManager.getQueryProvider() 399 if err != nil { 400 return nil, err 401 } 402 403 return provider, nil 404} 405 406func (c *Cluster) getAnalyticsProvider() (analyticsProvider, error) { 407 provider, err := c.connectionManager.getAnalyticsProvider() 408 if err != nil { 409 return nil, err 410 } 411 412 return provider, nil 413} 414 415func (c *Cluster) getSearchProvider() (searchProvider, error) { 416 provider, err := c.connectionManager.getSearchProvider() 417 if err != nil { 418 return nil, err 419 } 420 421 return provider, nil 422} 423 424func (c *Cluster) getHTTPProvider() (httpProvider, error) { 425 provider, err := c.connectionManager.getHTTPProvider() 426 if err != nil { 427 return nil, err 428 } 429 430 return provider, nil 431} 432 433// Users returns a UserManager for managing users. 434func (c *Cluster) Users() *UserManager { 435 return &UserManager{ 436 provider: c, 437 tracer: c.tracer, 438 } 439} 440 441// Buckets returns a BucketManager for managing buckets. 442func (c *Cluster) Buckets() *BucketManager { 443 return &BucketManager{ 444 provider: c, 445 tracer: c.tracer, 446 } 447} 448 449// AnalyticsIndexes returns an AnalyticsIndexManager for managing analytics indexes. 450func (c *Cluster) AnalyticsIndexes() *AnalyticsIndexManager { 451 return &AnalyticsIndexManager{ 452 aProvider: c, 453 mgmtProvider: c, 454 globalTimeout: c.timeoutsConfig.ManagementTimeout, 455 tracer: c.tracer, 456 } 457} 458 459// QueryIndexes returns a QueryIndexManager for managing query indexes. 460func (c *Cluster) QueryIndexes() *QueryIndexManager { 461 return &QueryIndexManager{ 462 provider: c, 463 globalTimeout: c.timeoutsConfig.ManagementTimeout, 464 tracer: c.tracer, 465 } 466} 467 468// SearchIndexes returns a SearchIndexManager for managing search indexes. 469func (c *Cluster) SearchIndexes() *SearchIndexManager { 470 return &SearchIndexManager{ 471 mgmtProvider: c, 472 tracer: c.tracer, 473 } 474} 475