1/* 2Copyright 2017 Google LLC 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package spanner 18 19import ( 20 "context" 21 "fmt" 22 "log" 23 "os" 24 "regexp" 25 "time" 26 27 "cloud.google.com/go/internal/trace" 28 instance "cloud.google.com/go/spanner/admin/instance/apiv1" 29 "google.golang.org/api/option" 30 gtransport "google.golang.org/api/transport/grpc" 31 instancepb "google.golang.org/genproto/googleapis/spanner/admin/instance/v1" 32 sppb "google.golang.org/genproto/googleapis/spanner/v1" 33 field_mask "google.golang.org/genproto/protobuf/field_mask" 34 "google.golang.org/grpc" 35 "google.golang.org/grpc/codes" 36 "google.golang.org/grpc/metadata" 37 "google.golang.org/grpc/status" 38) 39 40const ( 41 endpoint = "spanner.googleapis.com:443" 42 43 // resourcePrefixHeader is the name of the metadata header used to indicate 44 // the resource being operated on. 45 resourcePrefixHeader = "google-cloud-resource-prefix" 46) 47 48const ( 49 // Scope is the scope for Cloud Spanner Data API. 50 Scope = "https://www.googleapis.com/auth/spanner.data" 51 52 // AdminScope is the scope for Cloud Spanner Admin APIs. 53 AdminScope = "https://www.googleapis.com/auth/spanner.admin" 54) 55 56var ( 57 validDBPattern = regexp.MustCompile("^projects/(?P<project>[^/]+)/instances/(?P<instance>[^/]+)/databases/(?P<database>[^/]+)$") 58 validInstancePattern = regexp.MustCompile("^projects/(?P<project>[^/]+)/instances/(?P<instance>[^/]+)") 59) 60 61func validDatabaseName(db string) error { 62 if matched := validDBPattern.MatchString(db); !matched { 63 return fmt.Errorf("database name %q should conform to pattern %q", 64 db, validDBPattern.String()) 65 } 66 return nil 67} 68 69func getInstanceName(db string) (string, error) { 70 matches := validInstancePattern.FindStringSubmatch(db) 71 if len(matches) == 0 { 72 return "", fmt.Errorf("Failed to retrieve instance name from %q according to pattern %q", 73 db, validInstancePattern.String()) 74 } 75 return matches[0], nil 76} 77 78func parseDatabaseName(db string) (project, instance, database string, err error) { 79 matches := validDBPattern.FindStringSubmatch(db) 80 if len(matches) == 0 { 81 return "", "", "", fmt.Errorf("Failed to parse database name from %q according to pattern %q", 82 db, validDBPattern.String()) 83 } 84 return matches[1], matches[2], matches[3], nil 85} 86 87// Client is a client for reading and writing data to a Cloud Spanner database. 88// A client is safe to use concurrently, except for its Close method. 89type Client struct { 90 sc *sessionClient 91 idleSessions *sessionPool 92 logger *log.Logger 93 qo QueryOptions 94} 95 96// ClientConfig has configurations for the client. 97type ClientConfig struct { 98 // NumChannels is the number of gRPC channels. 99 // If zero, a reasonable default is used based on the execution environment. 100 // 101 // Deprecated: The Spanner client now uses a pool of gRPC connections. Use 102 // option.WithGRPCConnectionPool(numConns) instead to specify the number of 103 // connections the client should use. The client will default to a 104 // reasonable default if this option is not specified. 105 NumChannels int 106 107 // SessionPoolConfig is the configuration for session pool. 108 SessionPoolConfig 109 110 // SessionLabels for the sessions created by this client. 111 // See https://cloud.google.com/spanner/docs/reference/rpc/google.spanner.v1#session 112 // for more info. 113 SessionLabels map[string]string 114 115 // QueryOptions is the configuration for executing a sql query. 116 QueryOptions QueryOptions 117 118 // logger is the logger to use for this client. If it is nil, all logging 119 // will be directed to the standard logger. 120 logger *log.Logger 121} 122 123// errDial returns error for dialing to Cloud Spanner. 124func errDial(ci int, err error) error { 125 e := toSpannerError(err).(*Error) 126 e.decorate(fmt.Sprintf("dialing fails for channel[%v]", ci)) 127 return e 128} 129 130func contextWithOutgoingMetadata(ctx context.Context, md metadata.MD) context.Context { 131 existing, ok := metadata.FromOutgoingContext(ctx) 132 if ok { 133 md = metadata.Join(existing, md) 134 } 135 return metadata.NewOutgoingContext(ctx, md) 136} 137 138// getInstanceEndpoint returns an instance-specific endpoint if one exists. If 139// multiple endpoints exist, it returns the first one. 140func getInstanceEndpoint(ctx context.Context, database string, opts ...option.ClientOption) (string, error) { 141 instanceName, err := getInstanceName(database) 142 if err != nil { 143 return "", fmt.Errorf("Failed to resolve endpoint: %v", err) 144 } 145 146 c, err := instance.NewInstanceAdminClient(ctx, opts...) 147 if err != nil { 148 return "", err 149 } 150 defer c.Close() 151 152 req := &instancepb.GetInstanceRequest{ 153 Name: instanceName, 154 FieldMask: &field_mask.FieldMask{ 155 Paths: []string{"endpoint_uris"}, 156 }, 157 } 158 159 resp, err := c.GetInstance(ctx, req) 160 if err != nil { 161 return "", err 162 } 163 164 endpointURIs := resp.GetEndpointUris() 165 166 if len(endpointURIs) > 0 { 167 return endpointURIs[0], nil 168 } 169 170 // Return empty string when no endpoints exist. 171 return "", nil 172} 173 174// NewClient creates a client to a database. A valid database name has the 175// form projects/PROJECT_ID/instances/INSTANCE_ID/databases/DATABASE_ID. It uses 176// a default configuration. 177func NewClient(ctx context.Context, database string, opts ...option.ClientOption) (*Client, error) { 178 return NewClientWithConfig(ctx, database, ClientConfig{SessionPoolConfig: DefaultSessionPoolConfig}, opts...) 179} 180 181// NewClientWithConfig creates a client to a database. A valid database name has 182// the form projects/PROJECT_ID/instances/INSTANCE_ID/databases/DATABASE_ID. 183func NewClientWithConfig(ctx context.Context, database string, config ClientConfig, opts ...option.ClientOption) (c *Client, err error) { 184 // Validate database path. 185 if err := validDatabaseName(database); err != nil { 186 return nil, err 187 } 188 189 ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.NewClient") 190 defer func() { trace.EndSpan(ctx, err) }() 191 192 // Append emulator options if SPANNER_EMULATOR_HOST has been set. 193 if emulatorAddr := os.Getenv("SPANNER_EMULATOR_HOST"); emulatorAddr != "" { 194 emulatorOpts := []option.ClientOption{ 195 option.WithEndpoint(emulatorAddr), 196 option.WithGRPCDialOption(grpc.WithInsecure()), 197 option.WithoutAuthentication(), 198 } 199 opts = append(opts, emulatorOpts...) 200 } else if os.Getenv("GOOGLE_CLOUD_SPANNER_ENABLE_RESOURCE_BASED_ROUTING") == "true" { 201 // Fetch the instance-specific endpoint. 202 reqOpts := []option.ClientOption{option.WithEndpoint(endpoint)} 203 reqOpts = append(reqOpts, opts...) 204 instanceEndpoint, err := getInstanceEndpoint(ctx, database, reqOpts...) 205 206 if err != nil { 207 // If there is a PermissionDenied error, fall back to use the global endpoint 208 // or the user-specified endpoint. 209 if status.Code(err) == codes.PermissionDenied { 210 logf(config.logger, ` 211Warning: The client library attempted to connect to an endpoint closer to your 212Cloud Spanner data but was unable to do so. The client library will fall back 213and route requests to the endpoint given in the client options, which may 214result in increased latency. We recommend including the scope 215https://www.googleapis.com/auth/spanner.admin so that the client library can 216get an instance-specific endpoint and efficiently route requests. 217`) 218 } else { 219 return nil, err 220 } 221 } 222 223 if instanceEndpoint != "" { 224 opts = append(opts, option.WithEndpoint(instanceEndpoint)) 225 } 226 } 227 228 // Prepare gRPC channels. 229 configuredNumChannels := config.NumChannels 230 if config.NumChannels == 0 { 231 config.NumChannels = numChannels 232 } 233 // gRPC options. 234 allOpts := []option.ClientOption{ 235 option.WithEndpoint(endpoint), 236 option.WithScopes(Scope), 237 option.WithGRPCDialOption( 238 grpc.WithDefaultCallOptions( 239 grpc.MaxCallSendMsgSize(100<<20), 240 grpc.MaxCallRecvMsgSize(100<<20), 241 ), 242 ), 243 option.WithGRPCConnectionPool(config.NumChannels), 244 } 245 // opts will take precedence above allOpts, as the values in opts will be 246 // applied after the values in allOpts. 247 allOpts = append(allOpts, opts...) 248 pool, err := gtransport.DialPool(ctx, allOpts...) 249 if configuredNumChannels > 0 && pool.Num() != config.NumChannels { 250 pool.Close() 251 return nil, spannerErrorf(codes.InvalidArgument, "Connection pool mismatch: NumChannels=%v, WithGRPCConnectionPool=%v. Only set one of these options, or set both to the same value.", config.NumChannels, pool.Num()) 252 } 253 254 // TODO(loite): Remove as the original map cannot be changed by the user 255 // anyways, and the client library is also not changing it. 256 // Make a copy of labels. 257 sessionLabels := make(map[string]string) 258 for k, v := range config.SessionLabels { 259 sessionLabels[k] = v 260 } 261 262 // Default configs for session pool. 263 if config.MaxOpened == 0 { 264 config.MaxOpened = uint64(pool.Num() * 100) 265 } 266 if config.MaxBurst == 0 { 267 config.MaxBurst = DefaultSessionPoolConfig.MaxBurst 268 } 269 // Create a session client. 270 sc := newSessionClient(pool, database, sessionLabels, metadata.Pairs(resourcePrefixHeader, database), config.logger) 271 // Create a session pool. 272 config.SessionPoolConfig.sessionLabels = sessionLabels 273 sp, err := newSessionPool(sc, config.SessionPoolConfig) 274 if err != nil { 275 sc.close() 276 return nil, err 277 } 278 c = &Client{ 279 sc: sc, 280 idleSessions: sp, 281 logger: config.logger, 282 qo: getQueryOptions(config.QueryOptions), 283 } 284 return c, nil 285} 286 287// getQueryOptions returns the query options overwritten by the environment 288// variables if exist. The input parameter is the query options set by users 289// via application-level configuration. If the environment variables are set, 290// this will return the overwritten query options. 291func getQueryOptions(opts QueryOptions) QueryOptions { 292 opv := os.Getenv("SPANNER_OPTIMIZER_VERSION") 293 if opv != "" { 294 if opts.Options == nil { 295 opts.Options = &sppb.ExecuteSqlRequest_QueryOptions{} 296 } 297 opts.Options.OptimizerVersion = opv 298 } 299 return opts 300} 301 302// Close closes the client. 303func (c *Client) Close() { 304 if c.idleSessions != nil { 305 c.idleSessions.close() 306 } 307 c.sc.close() 308} 309 310// Single provides a read-only snapshot transaction optimized for the case 311// where only a single read or query is needed. This is more efficient than 312// using ReadOnlyTransaction() for a single read or query. 313// 314// Single will use a strong TimestampBound by default. Use 315// ReadOnlyTransaction.WithTimestampBound to specify a different 316// TimestampBound. A non-strong bound can be used to reduce latency, or 317// "time-travel" to prior versions of the database, see the documentation of 318// TimestampBound for details. 319func (c *Client) Single() *ReadOnlyTransaction { 320 t := &ReadOnlyTransaction{singleUse: true} 321 t.txReadOnly.sp = c.idleSessions 322 t.txReadOnly.txReadEnv = t 323 t.txReadOnly.qo = c.qo 324 t.txReadOnly.replaceSessionFunc = func(ctx context.Context) error { 325 if t.sh == nil { 326 return spannerErrorf(codes.InvalidArgument, "missing session handle on transaction") 327 } 328 // Remove the session that returned 'Session not found' from the pool. 329 t.sh.destroy() 330 // Reset the transaction, acquire a new session and retry. 331 t.state = txNew 332 sh, _, err := t.acquire(ctx) 333 if err != nil { 334 return err 335 } 336 t.sh = sh 337 return nil 338 } 339 return t 340} 341 342// ReadOnlyTransaction returns a ReadOnlyTransaction that can be used for 343// multiple reads from the database. You must call Close() when the 344// ReadOnlyTransaction is no longer needed to release resources on the server. 345// 346// ReadOnlyTransaction will use a strong TimestampBound by default. Use 347// ReadOnlyTransaction.WithTimestampBound to specify a different 348// TimestampBound. A non-strong bound can be used to reduce latency, or 349// "time-travel" to prior versions of the database, see the documentation of 350// TimestampBound for details. 351func (c *Client) ReadOnlyTransaction() *ReadOnlyTransaction { 352 t := &ReadOnlyTransaction{ 353 singleUse: false, 354 txReadyOrClosed: make(chan struct{}), 355 } 356 t.txReadOnly.sp = c.idleSessions 357 t.txReadOnly.txReadEnv = t 358 t.txReadOnly.qo = c.qo 359 return t 360} 361 362// BatchReadOnlyTransaction returns a BatchReadOnlyTransaction that can be used 363// for partitioned reads or queries from a snapshot of the database. This is 364// useful in batch processing pipelines where one wants to divide the work of 365// reading from the database across multiple machines. 366// 367// Note: This transaction does not use the underlying session pool but creates a 368// new session each time, and the session is reused across clients. 369// 370// You should call Close() after the txn is no longer needed on local 371// client, and call Cleanup() when the txn is finished for all clients, to free 372// the session. 373func (c *Client) BatchReadOnlyTransaction(ctx context.Context, tb TimestampBound) (*BatchReadOnlyTransaction, error) { 374 var ( 375 tx transactionID 376 rts time.Time 377 s *session 378 sh *sessionHandle 379 err error 380 ) 381 defer func() { 382 if err != nil && sh != nil { 383 s.delete(ctx) 384 } 385 }() 386 387 // Create session. 388 s, err = c.sc.createSession(ctx) 389 if err != nil { 390 return nil, err 391 } 392 sh = &sessionHandle{session: s} 393 394 // Begin transaction. 395 res, err := sh.getClient().BeginTransaction(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.BeginTransactionRequest{ 396 Session: sh.getID(), 397 Options: &sppb.TransactionOptions{ 398 Mode: &sppb.TransactionOptions_ReadOnly_{ 399 ReadOnly: buildTransactionOptionsReadOnly(tb, true), 400 }, 401 }, 402 }) 403 if err != nil { 404 return nil, toSpannerError(err) 405 } 406 tx = res.Id 407 if res.ReadTimestamp != nil { 408 rts = time.Unix(res.ReadTimestamp.Seconds, int64(res.ReadTimestamp.Nanos)) 409 } 410 411 t := &BatchReadOnlyTransaction{ 412 ReadOnlyTransaction: ReadOnlyTransaction{ 413 tx: tx, 414 txReadyOrClosed: make(chan struct{}), 415 state: txActive, 416 rts: rts, 417 }, 418 ID: BatchReadOnlyTransactionID{ 419 tid: tx, 420 sid: sh.getID(), 421 rts: rts, 422 }, 423 } 424 t.txReadOnly.sh = sh 425 t.txReadOnly.txReadEnv = t 426 t.txReadOnly.qo = c.qo 427 return t, nil 428} 429 430// BatchReadOnlyTransactionFromID reconstruct a BatchReadOnlyTransaction from 431// BatchReadOnlyTransactionID 432func (c *Client) BatchReadOnlyTransactionFromID(tid BatchReadOnlyTransactionID) *BatchReadOnlyTransaction { 433 s, err := c.sc.sessionWithID(tid.sid) 434 if err != nil { 435 logf(c.logger, "unexpected error: %v\nThis is an indication of an internal error in the Spanner client library.", err) 436 // Use an invalid session. Preferably, this method should just return 437 // the error instead of this, but that would mean an API change. 438 s = &session{} 439 } 440 sh := &sessionHandle{session: s} 441 442 t := &BatchReadOnlyTransaction{ 443 ReadOnlyTransaction: ReadOnlyTransaction{ 444 tx: tid.tid, 445 txReadyOrClosed: make(chan struct{}), 446 state: txActive, 447 rts: tid.rts, 448 }, 449 ID: tid, 450 } 451 t.txReadOnly.sh = sh 452 t.txReadOnly.txReadEnv = t 453 t.txReadOnly.qo = c.qo 454 return t 455} 456 457type transactionInProgressKey struct{} 458 459func checkNestedTxn(ctx context.Context) error { 460 if ctx.Value(transactionInProgressKey{}) != nil { 461 return spannerErrorf(codes.FailedPrecondition, "Cloud Spanner does not support nested transactions") 462 } 463 return nil 464} 465 466// ReadWriteTransaction executes a read-write transaction, with retries as 467// necessary. 468// 469// The function f will be called one or more times. It must not maintain 470// any state between calls. 471// 472// If the transaction cannot be committed or if f returns an ABORTED error, 473// ReadWriteTransaction will call f again. It will continue to call f until the 474// transaction can be committed or the Context times out or is cancelled. If f 475// returns an error other than ABORTED, ReadWriteTransaction will abort the 476// transaction and return the error. 477// 478// To limit the number of retries, set a deadline on the Context rather than 479// using a fixed limit on the number of attempts. ReadWriteTransaction will 480// retry as needed until that deadline is met. 481// 482// See https://godoc.org/cloud.google.com/go/spanner#ReadWriteTransaction for 483// more details. 484func (c *Client) ReadWriteTransaction(ctx context.Context, f func(context.Context, *ReadWriteTransaction) error) (commitTimestamp time.Time, err error) { 485 ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.ReadWriteTransaction") 486 defer func() { trace.EndSpan(ctx, err) }() 487 if err := checkNestedTxn(ctx); err != nil { 488 return time.Time{}, err 489 } 490 var ( 491 ts time.Time 492 sh *sessionHandle 493 ) 494 err = runWithRetryOnAbortedOrSessionNotFound(ctx, func(ctx context.Context) error { 495 var ( 496 err error 497 t *ReadWriteTransaction 498 ) 499 if sh == nil || sh.getID() == "" || sh.getClient() == nil { 500 // Session handle hasn't been allocated or has been destroyed. 501 sh, err = c.idleSessions.takeWriteSession(ctx) 502 if err != nil { 503 // If session retrieval fails, just fail the transaction. 504 return err 505 } 506 t = &ReadWriteTransaction{ 507 tx: sh.getTransactionID(), 508 } 509 } else { 510 t = &ReadWriteTransaction{} 511 } 512 t.txReadOnly.sh = sh 513 t.txReadOnly.txReadEnv = t 514 t.txReadOnly.qo = c.qo 515 trace.TracePrintf(ctx, map[string]interface{}{"transactionID": string(sh.getTransactionID())}, 516 "Starting transaction attempt") 517 if err = t.begin(ctx); err != nil { 518 return err 519 } 520 ts, err = t.runInTransaction(ctx, f) 521 return err 522 }) 523 if sh != nil { 524 sh.recycle() 525 } 526 return ts, err 527} 528 529// applyOption controls the behavior of Client.Apply. 530type applyOption struct { 531 // If atLeastOnce == true, Client.Apply will execute the mutations on Cloud 532 // Spanner at least once. 533 atLeastOnce bool 534} 535 536// An ApplyOption is an optional argument to Apply. 537type ApplyOption func(*applyOption) 538 539// ApplyAtLeastOnce returns an ApplyOption that removes replay protection. 540// 541// With this option, Apply may attempt to apply mutations more than once; if 542// the mutations are not idempotent, this may lead to a failure being reported 543// when the mutation was applied more than once. For example, an insert may 544// fail with ALREADY_EXISTS even though the row did not exist before Apply was 545// called. For this reason, most users of the library will prefer not to use 546// this option. However, ApplyAtLeastOnce requires only a single RPC, whereas 547// Apply's default replay protection may require an additional RPC. So this 548// option may be appropriate for latency sensitive and/or high throughput blind 549// writing. 550func ApplyAtLeastOnce() ApplyOption { 551 return func(ao *applyOption) { 552 ao.atLeastOnce = true 553 } 554} 555 556// Apply applies a list of mutations atomically to the database. 557func (c *Client) Apply(ctx context.Context, ms []*Mutation, opts ...ApplyOption) (commitTimestamp time.Time, err error) { 558 ao := &applyOption{} 559 for _, opt := range opts { 560 opt(ao) 561 } 562 563 ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.Apply") 564 defer func() { trace.EndSpan(ctx, err) }() 565 566 if !ao.atLeastOnce { 567 return c.ReadWriteTransaction(ctx, func(ctx context.Context, t *ReadWriteTransaction) error { 568 return t.BufferWrite(ms) 569 }) 570 } 571 t := &writeOnlyTransaction{c.idleSessions} 572 return t.applyAtLeastOnce(ctx, ms...) 573} 574 575// logf logs the given message to the given logger, or the standard logger if 576// the given logger is nil. 577func logf(logger *log.Logger, format string, v ...interface{}) { 578 if logger == nil { 579 log.Printf(format, v...) 580 } else { 581 logger.Printf(format, v...) 582 } 583} 584