1/* 2Copyright 2017 Google LLC 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package spanner 18 19import ( 20 "context" 21 "fmt" 22 "log" 23 "os" 24 "regexp" 25 "time" 26 27 "cloud.google.com/go/internal/trace" 28 vkit "cloud.google.com/go/spanner/apiv1" 29 "google.golang.org/api/option" 30 "google.golang.org/api/option/internaloption" 31 gtransport "google.golang.org/api/transport/grpc" 32 sppb "google.golang.org/genproto/googleapis/spanner/v1" 33 "google.golang.org/grpc" 34 "google.golang.org/grpc/codes" 35 "google.golang.org/grpc/metadata" 36) 37 38const ( 39 // resourcePrefixHeader is the name of the metadata header used to indicate 40 // the resource being operated on. 41 resourcePrefixHeader = "google-cloud-resource-prefix" 42 43 // numChannels is the default value for NumChannels of client. 44 numChannels = 4 45) 46 47const ( 48 // Scope is the scope for Cloud Spanner Data API. 49 Scope = "https://www.googleapis.com/auth/spanner.data" 50 51 // AdminScope is the scope for Cloud Spanner Admin APIs. 52 AdminScope = "https://www.googleapis.com/auth/spanner.admin" 53) 54 55var ( 56 validDBPattern = regexp.MustCompile("^projects/(?P<project>[^/]+)/instances/(?P<instance>[^/]+)/databases/(?P<database>[^/]+)$") 57) 58 59func validDatabaseName(db string) error { 60 if matched := validDBPattern.MatchString(db); !matched { 61 return fmt.Errorf("database name %q should conform to pattern %q", 62 db, validDBPattern.String()) 63 } 64 return nil 65} 66 67func parseDatabaseName(db string) (project, instance, database string, err error) { 68 matches := validDBPattern.FindStringSubmatch(db) 69 if len(matches) == 0 { 70 return "", "", "", fmt.Errorf("Failed to parse database name from %q according to pattern %q", 71 db, validDBPattern.String()) 72 } 73 return matches[1], matches[2], matches[3], nil 74} 75 76// Client is a client for reading and writing data to a Cloud Spanner database. 77// A client is safe to use concurrently, except for its Close method. 78type Client struct { 79 sc *sessionClient 80 idleSessions *sessionPool 81 logger *log.Logger 82 qo QueryOptions 83} 84 85// DatabaseName returns the full name of a database, e.g., 86// "projects/spanner-cloud-test/instances/foo/databases/foodb". 87func (c *Client) DatabaseName() string { 88 return c.sc.database 89} 90 91// ClientConfig has configurations for the client. 92type ClientConfig struct { 93 // NumChannels is the number of gRPC channels. 94 // If zero, a reasonable default is used based on the execution environment. 95 // 96 // Deprecated: The Spanner client now uses a pool of gRPC connections. Use 97 // option.WithGRPCConnectionPool(numConns) instead to specify the number of 98 // connections the client should use. The client will default to a 99 // reasonable default if this option is not specified. 100 NumChannels int 101 102 // SessionPoolConfig is the configuration for session pool. 103 SessionPoolConfig 104 105 // SessionLabels for the sessions created by this client. 106 // See https://cloud.google.com/spanner/docs/reference/rpc/google.spanner.v1#session 107 // for more info. 108 SessionLabels map[string]string 109 110 // QueryOptions is the configuration for executing a sql query. 111 QueryOptions QueryOptions 112 113 // CallOptions is the configuration for providing custom retry settings that 114 // override the default values. 115 CallOptions *vkit.CallOptions 116 117 // logger is the logger to use for this client. If it is nil, all logging 118 // will be directed to the standard logger. 119 logger *log.Logger 120} 121 122func contextWithOutgoingMetadata(ctx context.Context, md metadata.MD) context.Context { 123 existing, ok := metadata.FromOutgoingContext(ctx) 124 if ok { 125 md = metadata.Join(existing, md) 126 } 127 return metadata.NewOutgoingContext(ctx, md) 128} 129 130// NewClient creates a client to a database. A valid database name has the 131// form projects/PROJECT_ID/instances/INSTANCE_ID/databases/DATABASE_ID. It uses 132// a default configuration. 133func NewClient(ctx context.Context, database string, opts ...option.ClientOption) (*Client, error) { 134 return NewClientWithConfig(ctx, database, ClientConfig{SessionPoolConfig: DefaultSessionPoolConfig}, opts...) 135} 136 137// NewClientWithConfig creates a client to a database. A valid database name has 138// the form projects/PROJECT_ID/instances/INSTANCE_ID/databases/DATABASE_ID. 139func NewClientWithConfig(ctx context.Context, database string, config ClientConfig, opts ...option.ClientOption) (c *Client, err error) { 140 // Validate database path. 141 if err := validDatabaseName(database); err != nil { 142 return nil, err 143 } 144 145 ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.NewClient") 146 defer func() { trace.EndSpan(ctx, err) }() 147 148 // Append emulator options if SPANNER_EMULATOR_HOST has been set. 149 if emulatorAddr := os.Getenv("SPANNER_EMULATOR_HOST"); emulatorAddr != "" { 150 emulatorOpts := []option.ClientOption{ 151 option.WithEndpoint(emulatorAddr), 152 option.WithGRPCDialOption(grpc.WithInsecure()), 153 option.WithoutAuthentication(), 154 internaloption.SkipDialSettingsValidation(), 155 } 156 opts = append(emulatorOpts, opts...) 157 } 158 159 // Prepare gRPC channels. 160 hasNumChannelsConfig := config.NumChannels > 0 161 if config.NumChannels == 0 { 162 config.NumChannels = numChannels 163 } 164 // gRPC options. 165 allOpts := allClientOpts(config.NumChannels, opts...) 166 pool, err := gtransport.DialPool(ctx, allOpts...) 167 if err != nil { 168 return nil, err 169 } 170 if hasNumChannelsConfig && pool.Num() != config.NumChannels { 171 pool.Close() 172 return nil, spannerErrorf(codes.InvalidArgument, "Connection pool mismatch: NumChannels=%v, WithGRPCConnectionPool=%v. Only set one of these options, or set both to the same value.", config.NumChannels, pool.Num()) 173 } 174 175 // TODO(loite): Remove as the original map cannot be changed by the user 176 // anyways, and the client library is also not changing it. 177 // Make a copy of labels. 178 sessionLabels := make(map[string]string) 179 for k, v := range config.SessionLabels { 180 sessionLabels[k] = v 181 } 182 183 // Default configs for session pool. 184 if config.MaxOpened == 0 { 185 config.MaxOpened = uint64(pool.Num() * 100) 186 } 187 if config.MaxBurst == 0 { 188 config.MaxBurst = DefaultSessionPoolConfig.MaxBurst 189 } 190 if config.incStep == 0 { 191 config.incStep = DefaultSessionPoolConfig.incStep 192 } 193 // Create a session client. 194 sc := newSessionClient(pool, database, sessionLabels, metadata.Pairs(resourcePrefixHeader, database), config.logger, config.CallOptions) 195 // Create a session pool. 196 config.SessionPoolConfig.sessionLabels = sessionLabels 197 sp, err := newSessionPool(sc, config.SessionPoolConfig) 198 if err != nil { 199 sc.close() 200 return nil, err 201 } 202 c = &Client{ 203 sc: sc, 204 idleSessions: sp, 205 logger: config.logger, 206 qo: getQueryOptions(config.QueryOptions), 207 } 208 return c, nil 209} 210 211// Combines the default options from the generated client, the default options 212// of the hand-written client and the user options to one list of options. 213// Precedence: userOpts > clientDefaultOpts > generatedDefaultOpts 214func allClientOpts(numChannels int, userOpts ...option.ClientOption) []option.ClientOption { 215 generatedDefaultOpts := vkit.DefaultClientOptions() 216 clientDefaultOpts := []option.ClientOption{ 217 option.WithGRPCConnectionPool(numChannels), 218 option.WithUserAgent(clientUserAgent), 219 internaloption.EnableDirectPath(true), 220 } 221 allDefaultOpts := append(generatedDefaultOpts, clientDefaultOpts...) 222 return append(allDefaultOpts, userOpts...) 223} 224 225// getQueryOptions returns the query options overwritten by the environment 226// variables if exist. The input parameter is the query options set by users 227// via application-level configuration. If the environment variables are set, 228// this will return the overwritten query options. 229func getQueryOptions(opts QueryOptions) QueryOptions { 230 if opts.Options == nil { 231 opts.Options = &sppb.ExecuteSqlRequest_QueryOptions{} 232 } 233 opv := os.Getenv("SPANNER_OPTIMIZER_VERSION") 234 if opv != "" { 235 opts.Options.OptimizerVersion = opv 236 } 237 opsp := os.Getenv("SPANNER_OPTIMIZER_STATISTICS_PACKAGE") 238 if opsp != "" { 239 opts.Options.OptimizerStatisticsPackage = opsp 240 } 241 return opts 242} 243 244// Close closes the client. 245func (c *Client) Close() { 246 if c.idleSessions != nil { 247 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) 248 defer cancel() 249 c.idleSessions.close(ctx) 250 } 251 c.sc.close() 252} 253 254// Single provides a read-only snapshot transaction optimized for the case 255// where only a single read or query is needed. This is more efficient than 256// using ReadOnlyTransaction() for a single read or query. 257// 258// Single will use a strong TimestampBound by default. Use 259// ReadOnlyTransaction.WithTimestampBound to specify a different 260// TimestampBound. A non-strong bound can be used to reduce latency, or 261// "time-travel" to prior versions of the database, see the documentation of 262// TimestampBound for details. 263func (c *Client) Single() *ReadOnlyTransaction { 264 t := &ReadOnlyTransaction{singleUse: true} 265 t.txReadOnly.sp = c.idleSessions 266 t.txReadOnly.txReadEnv = t 267 t.txReadOnly.qo = c.qo 268 t.txReadOnly.replaceSessionFunc = func(ctx context.Context) error { 269 if t.sh == nil { 270 return spannerErrorf(codes.InvalidArgument, "missing session handle on transaction") 271 } 272 // Remove the session that returned 'Session not found' from the pool. 273 t.sh.destroy() 274 // Reset the transaction, acquire a new session and retry. 275 t.state = txNew 276 sh, _, err := t.acquire(ctx) 277 if err != nil { 278 return err 279 } 280 t.sh = sh 281 return nil 282 } 283 return t 284} 285 286// ReadOnlyTransaction returns a ReadOnlyTransaction that can be used for 287// multiple reads from the database. You must call Close() when the 288// ReadOnlyTransaction is no longer needed to release resources on the server. 289// 290// ReadOnlyTransaction will use a strong TimestampBound by default. Use 291// ReadOnlyTransaction.WithTimestampBound to specify a different 292// TimestampBound. A non-strong bound can be used to reduce latency, or 293// "time-travel" to prior versions of the database, see the documentation of 294// TimestampBound for details. 295func (c *Client) ReadOnlyTransaction() *ReadOnlyTransaction { 296 t := &ReadOnlyTransaction{ 297 singleUse: false, 298 txReadyOrClosed: make(chan struct{}), 299 } 300 t.txReadOnly.sp = c.idleSessions 301 t.txReadOnly.txReadEnv = t 302 t.txReadOnly.qo = c.qo 303 return t 304} 305 306// BatchReadOnlyTransaction returns a BatchReadOnlyTransaction that can be used 307// for partitioned reads or queries from a snapshot of the database. This is 308// useful in batch processing pipelines where one wants to divide the work of 309// reading from the database across multiple machines. 310// 311// Note: This transaction does not use the underlying session pool but creates a 312// new session each time, and the session is reused across clients. 313// 314// You should call Close() after the txn is no longer needed on local 315// client, and call Cleanup() when the txn is finished for all clients, to free 316// the session. 317func (c *Client) BatchReadOnlyTransaction(ctx context.Context, tb TimestampBound) (*BatchReadOnlyTransaction, error) { 318 var ( 319 tx transactionID 320 rts time.Time 321 s *session 322 sh *sessionHandle 323 err error 324 ) 325 defer func() { 326 if err != nil && sh != nil { 327 s.delete(ctx) 328 } 329 }() 330 331 // Create session. 332 s, err = c.sc.createSession(ctx) 333 if err != nil { 334 return nil, err 335 } 336 sh = &sessionHandle{session: s} 337 338 // Begin transaction. 339 res, err := sh.getClient().BeginTransaction(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.BeginTransactionRequest{ 340 Session: sh.getID(), 341 Options: &sppb.TransactionOptions{ 342 Mode: &sppb.TransactionOptions_ReadOnly_{ 343 ReadOnly: buildTransactionOptionsReadOnly(tb, true), 344 }, 345 }, 346 }) 347 if err != nil { 348 return nil, ToSpannerError(err) 349 } 350 tx = res.Id 351 if res.ReadTimestamp != nil { 352 rts = time.Unix(res.ReadTimestamp.Seconds, int64(res.ReadTimestamp.Nanos)) 353 } 354 355 t := &BatchReadOnlyTransaction{ 356 ReadOnlyTransaction: ReadOnlyTransaction{ 357 tx: tx, 358 txReadyOrClosed: make(chan struct{}), 359 state: txActive, 360 rts: rts, 361 }, 362 ID: BatchReadOnlyTransactionID{ 363 tid: tx, 364 sid: sh.getID(), 365 rts: rts, 366 }, 367 } 368 t.txReadOnly.sh = sh 369 t.txReadOnly.txReadEnv = t 370 t.txReadOnly.qo = c.qo 371 return t, nil 372} 373 374// BatchReadOnlyTransactionFromID reconstruct a BatchReadOnlyTransaction from 375// BatchReadOnlyTransactionID 376func (c *Client) BatchReadOnlyTransactionFromID(tid BatchReadOnlyTransactionID) *BatchReadOnlyTransaction { 377 s, err := c.sc.sessionWithID(tid.sid) 378 if err != nil { 379 logf(c.logger, "unexpected error: %v\nThis is an indication of an internal error in the Spanner client library.", err) 380 // Use an invalid session. Preferably, this method should just return 381 // the error instead of this, but that would mean an API change. 382 s = &session{} 383 } 384 sh := &sessionHandle{session: s} 385 386 t := &BatchReadOnlyTransaction{ 387 ReadOnlyTransaction: ReadOnlyTransaction{ 388 tx: tid.tid, 389 txReadyOrClosed: make(chan struct{}), 390 state: txActive, 391 rts: tid.rts, 392 }, 393 ID: tid, 394 } 395 t.txReadOnly.sh = sh 396 t.txReadOnly.txReadEnv = t 397 t.txReadOnly.qo = c.qo 398 return t 399} 400 401type transactionInProgressKey struct{} 402 403func checkNestedTxn(ctx context.Context) error { 404 if ctx.Value(transactionInProgressKey{}) != nil { 405 return spannerErrorf(codes.FailedPrecondition, "Cloud Spanner does not support nested transactions") 406 } 407 return nil 408} 409 410// ReadWriteTransaction executes a read-write transaction, with retries as 411// necessary. 412// 413// The function f will be called one or more times. It must not maintain 414// any state between calls. 415// 416// If the transaction cannot be committed or if f returns an ABORTED error, 417// ReadWriteTransaction will call f again. It will continue to call f until the 418// transaction can be committed or the Context times out or is cancelled. If f 419// returns an error other than ABORTED, ReadWriteTransaction will abort the 420// transaction and return the error. 421// 422// To limit the number of retries, set a deadline on the Context rather than 423// using a fixed limit on the number of attempts. ReadWriteTransaction will 424// retry as needed until that deadline is met. 425// 426// See https://godoc.org/cloud.google.com/go/spanner#ReadWriteTransaction for 427// more details. 428func (c *Client) ReadWriteTransaction(ctx context.Context, f func(context.Context, *ReadWriteTransaction) error) (commitTimestamp time.Time, err error) { 429 ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.ReadWriteTransaction") 430 defer func() { trace.EndSpan(ctx, err) }() 431 resp, err := c.rwTransaction(ctx, f, TransactionOptions{}) 432 return resp.CommitTs, err 433} 434 435// ReadWriteTransactionWithOptions executes a read-write transaction with 436// configurable options, with retries as necessary. 437// 438// ReadWriteTransactionWithOptions is a configurable ReadWriteTransaction. 439// 440// See https://godoc.org/cloud.google.com/go/spanner#ReadWriteTransaction for 441// more details. 442func (c *Client) ReadWriteTransactionWithOptions(ctx context.Context, f func(context.Context, *ReadWriteTransaction) error, options TransactionOptions) (resp CommitResponse, err error) { 443 ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.ReadWriteTransactionWithOptions") 444 defer func() { trace.EndSpan(ctx, err) }() 445 resp, err = c.rwTransaction(ctx, f, options) 446 return resp, err 447} 448 449func (c *Client) rwTransaction(ctx context.Context, f func(context.Context, *ReadWriteTransaction) error, options TransactionOptions) (resp CommitResponse, err error) { 450 if err := checkNestedTxn(ctx); err != nil { 451 return resp, err 452 } 453 var ( 454 sh *sessionHandle 455 ) 456 defer func() { 457 if sh != nil { 458 sh.recycle() 459 } 460 }() 461 err = runWithRetryOnAbortedOrSessionNotFound(ctx, func(ctx context.Context) error { 462 var ( 463 err error 464 t *ReadWriteTransaction 465 ) 466 if sh == nil || sh.getID() == "" || sh.getClient() == nil { 467 // Session handle hasn't been allocated or has been destroyed. 468 sh, err = c.idleSessions.takeWriteSession(ctx) 469 if err != nil { 470 // If session retrieval fails, just fail the transaction. 471 return err 472 } 473 t = &ReadWriteTransaction{ 474 tx: sh.getTransactionID(), 475 } 476 } else { 477 t = &ReadWriteTransaction{} 478 } 479 t.txReadOnly.sh = sh 480 t.txReadOnly.txReadEnv = t 481 t.txReadOnly.qo = c.qo 482 t.txOpts = options 483 484 trace.TracePrintf(ctx, map[string]interface{}{"transactionID": string(sh.getTransactionID())}, 485 "Starting transaction attempt") 486 if err = t.begin(ctx); err != nil { 487 return err 488 } 489 resp, err = t.runInTransaction(ctx, f) 490 return err 491 }) 492 return resp, err 493} 494 495// applyOption controls the behavior of Client.Apply. 496type applyOption struct { 497 // If atLeastOnce == true, Client.Apply will execute the mutations on Cloud 498 // Spanner at least once. 499 atLeastOnce bool 500 // transactionTag will be included with the CommitRequest. 501 transactionTag string 502 // priority is the RPC priority that is used for the commit operation. 503 priority sppb.RequestOptions_Priority 504} 505 506// An ApplyOption is an optional argument to Apply. 507type ApplyOption func(*applyOption) 508 509// ApplyAtLeastOnce returns an ApplyOption that removes replay protection. 510// 511// With this option, Apply may attempt to apply mutations more than once; if 512// the mutations are not idempotent, this may lead to a failure being reported 513// when the mutation was applied more than once. For example, an insert may 514// fail with ALREADY_EXISTS even though the row did not exist before Apply was 515// called. For this reason, most users of the library will prefer not to use 516// this option. However, ApplyAtLeastOnce requires only a single RPC, whereas 517// Apply's default replay protection may require an additional RPC. So this 518// option may be appropriate for latency sensitive and/or high throughput blind 519// writing. 520func ApplyAtLeastOnce() ApplyOption { 521 return func(ao *applyOption) { 522 ao.atLeastOnce = true 523 } 524} 525 526// TransactionTag returns an ApplyOption that will include the given tag as a 527// transaction tag for a write-only transaction. 528func TransactionTag(tag string) ApplyOption { 529 return func(ao *applyOption) { 530 ao.transactionTag = tag 531 } 532} 533 534// Priority returns an ApplyOptions that sets the RPC priority to use for the 535// commit operation. 536func Priority(priority sppb.RequestOptions_Priority) ApplyOption { 537 return func(ao *applyOption) { 538 ao.priority = priority 539 } 540} 541 542// Apply applies a list of mutations atomically to the database. 543func (c *Client) Apply(ctx context.Context, ms []*Mutation, opts ...ApplyOption) (commitTimestamp time.Time, err error) { 544 ao := &applyOption{} 545 for _, opt := range opts { 546 opt(ao) 547 } 548 549 ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.Apply") 550 defer func() { trace.EndSpan(ctx, err) }() 551 552 if !ao.atLeastOnce { 553 resp, err := c.ReadWriteTransactionWithOptions(ctx, func(ctx context.Context, t *ReadWriteTransaction) error { 554 return t.BufferWrite(ms) 555 }, TransactionOptions{CommitPriority: ao.priority, TransactionTag: ao.transactionTag}) 556 return resp.CommitTs, err 557 } 558 t := &writeOnlyTransaction{sp: c.idleSessions, commitPriority: ao.priority, transactionTag: ao.transactionTag} 559 return t.applyAtLeastOnce(ctx, ms...) 560} 561 562// logf logs the given message to the given logger, or the standard logger if 563// the given logger is nil. 564func logf(logger *log.Logger, format string, v ...interface{}) { 565 if logger == nil { 566 log.Printf(format, v...) 567 } else { 568 logger.Printf(format, v...) 569 } 570} 571