1/* 2Copyright 2017 Google LLC 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package spanner 18 19import ( 20 "context" 21 "fmt" 22 "log" 23 "os" 24 "regexp" 25 "time" 26 27 "cloud.google.com/go/internal/trace" 28 vkit "cloud.google.com/go/spanner/apiv1" 29 "google.golang.org/api/option" 30 "google.golang.org/api/option/internaloption" 31 gtransport "google.golang.org/api/transport/grpc" 32 sppb "google.golang.org/genproto/googleapis/spanner/v1" 33 "google.golang.org/grpc" 34 "google.golang.org/grpc/codes" 35 "google.golang.org/grpc/metadata" 36) 37 38const ( 39 endpoint = "spanner.googleapis.com:443" 40 41 // resourcePrefixHeader is the name of the metadata header used to indicate 42 // the resource being operated on. 43 resourcePrefixHeader = "google-cloud-resource-prefix" 44 45 // numChannels is the default value for NumChannels of client. 46 numChannels = 4 47) 48 49const ( 50 // Scope is the scope for Cloud Spanner Data API. 51 Scope = "https://www.googleapis.com/auth/spanner.data" 52 53 // AdminScope is the scope for Cloud Spanner Admin APIs. 54 AdminScope = "https://www.googleapis.com/auth/spanner.admin" 55) 56 57var ( 58 validDBPattern = regexp.MustCompile("^projects/(?P<project>[^/]+)/instances/(?P<instance>[^/]+)/databases/(?P<database>[^/]+)$") 59) 60 61func validDatabaseName(db string) error { 62 if matched := validDBPattern.MatchString(db); !matched { 63 return fmt.Errorf("database name %q should conform to pattern %q", 64 db, validDBPattern.String()) 65 } 66 return nil 67} 68 69func parseDatabaseName(db string) (project, instance, database string, err error) { 70 matches := validDBPattern.FindStringSubmatch(db) 71 if len(matches) == 0 { 72 return "", "", "", fmt.Errorf("Failed to parse database name from %q according to pattern %q", 73 db, validDBPattern.String()) 74 } 75 return matches[1], matches[2], matches[3], nil 76} 77 78// Client is a client for reading and writing data to a Cloud Spanner database. 79// A client is safe to use concurrently, except for its Close method. 80type Client struct { 81 sc *sessionClient 82 idleSessions *sessionPool 83 logger *log.Logger 84 qo QueryOptions 85} 86 87// ClientConfig has configurations for the client. 88type ClientConfig struct { 89 // NumChannels is the number of gRPC channels. 90 // If zero, a reasonable default is used based on the execution environment. 91 // 92 // Deprecated: The Spanner client now uses a pool of gRPC connections. Use 93 // option.WithGRPCConnectionPool(numConns) instead to specify the number of 94 // connections the client should use. The client will default to a 95 // reasonable default if this option is not specified. 96 NumChannels int 97 98 // SessionPoolConfig is the configuration for session pool. 99 SessionPoolConfig 100 101 // SessionLabels for the sessions created by this client. 102 // See https://cloud.google.com/spanner/docs/reference/rpc/google.spanner.v1#session 103 // for more info. 104 SessionLabels map[string]string 105 106 // QueryOptions is the configuration for executing a sql query. 107 QueryOptions QueryOptions 108 109 // CallOptions is the configuration for providing custom retry settings that 110 // override the default values. 111 CallOptions *vkit.CallOptions 112 113 // logger is the logger to use for this client. If it is nil, all logging 114 // will be directed to the standard logger. 115 logger *log.Logger 116} 117 118// errDial returns error for dialing to Cloud Spanner. 119func errDial(ci int, err error) error { 120 e := toSpannerError(err).(*Error) 121 e.decorate(fmt.Sprintf("dialing fails for channel[%v]", ci)) 122 return e 123} 124 125func contextWithOutgoingMetadata(ctx context.Context, md metadata.MD) context.Context { 126 existing, ok := metadata.FromOutgoingContext(ctx) 127 if ok { 128 md = metadata.Join(existing, md) 129 } 130 return metadata.NewOutgoingContext(ctx, md) 131} 132 133// NewClient creates a client to a database. A valid database name has the 134// form projects/PROJECT_ID/instances/INSTANCE_ID/databases/DATABASE_ID. It uses 135// a default configuration. 136func NewClient(ctx context.Context, database string, opts ...option.ClientOption) (*Client, error) { 137 return NewClientWithConfig(ctx, database, ClientConfig{SessionPoolConfig: DefaultSessionPoolConfig}, opts...) 138} 139 140// NewClientWithConfig creates a client to a database. A valid database name has 141// the form projects/PROJECT_ID/instances/INSTANCE_ID/databases/DATABASE_ID. 142func NewClientWithConfig(ctx context.Context, database string, config ClientConfig, opts ...option.ClientOption) (c *Client, err error) { 143 // Validate database path. 144 if err := validDatabaseName(database); err != nil { 145 return nil, err 146 } 147 148 ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.NewClient") 149 defer func() { trace.EndSpan(ctx, err) }() 150 151 // Append emulator options if SPANNER_EMULATOR_HOST has been set. 152 if emulatorAddr := os.Getenv("SPANNER_EMULATOR_HOST"); emulatorAddr != "" { 153 emulatorOpts := []option.ClientOption{ 154 option.WithEndpoint(emulatorAddr), 155 option.WithGRPCDialOption(grpc.WithInsecure()), 156 option.WithoutAuthentication(), 157 internaloption.SkipDialSettingsValidation(), 158 } 159 opts = append(emulatorOpts, opts...) 160 } 161 162 // Prepare gRPC channels. 163 hasNumChannelsConfig := config.NumChannels > 0 164 if config.NumChannels == 0 { 165 config.NumChannels = numChannels 166 } 167 // gRPC options. 168 allOpts := []option.ClientOption{ 169 option.WithEndpoint(endpoint), 170 option.WithScopes(Scope), 171 option.WithGRPCDialOption( 172 grpc.WithDefaultCallOptions( 173 grpc.MaxCallSendMsgSize(100<<20), 174 grpc.MaxCallRecvMsgSize(100<<20), 175 ), 176 ), 177 option.WithGRPCConnectionPool(config.NumChannels), 178 } 179 // opts will take precedence above allOpts, as the values in opts will be 180 // applied after the values in allOpts. 181 allOpts = append(allOpts, opts...) 182 pool, err := gtransport.DialPool(ctx, allOpts...) 183 if err != nil { 184 return nil, err 185 } 186 if hasNumChannelsConfig && pool.Num() != config.NumChannels { 187 pool.Close() 188 return nil, spannerErrorf(codes.InvalidArgument, "Connection pool mismatch: NumChannels=%v, WithGRPCConnectionPool=%v. Only set one of these options, or set both to the same value.", config.NumChannels, pool.Num()) 189 } 190 191 // TODO(loite): Remove as the original map cannot be changed by the user 192 // anyways, and the client library is also not changing it. 193 // Make a copy of labels. 194 sessionLabels := make(map[string]string) 195 for k, v := range config.SessionLabels { 196 sessionLabels[k] = v 197 } 198 199 // Default configs for session pool. 200 if config.MaxOpened == 0 { 201 config.MaxOpened = uint64(pool.Num() * 100) 202 } 203 if config.MaxBurst == 0 { 204 config.MaxBurst = DefaultSessionPoolConfig.MaxBurst 205 } 206 if config.incStep == 0 { 207 config.incStep = DefaultSessionPoolConfig.incStep 208 } 209 // Create a session client. 210 sc := newSessionClient(pool, database, sessionLabels, metadata.Pairs(resourcePrefixHeader, database), config.logger, config.CallOptions) 211 // Create a session pool. 212 config.SessionPoolConfig.sessionLabels = sessionLabels 213 sp, err := newSessionPool(sc, config.SessionPoolConfig) 214 if err != nil { 215 sc.close() 216 return nil, err 217 } 218 c = &Client{ 219 sc: sc, 220 idleSessions: sp, 221 logger: config.logger, 222 qo: getQueryOptions(config.QueryOptions), 223 } 224 return c, nil 225} 226 227// getQueryOptions returns the query options overwritten by the environment 228// variables if exist. The input parameter is the query options set by users 229// via application-level configuration. If the environment variables are set, 230// this will return the overwritten query options. 231func getQueryOptions(opts QueryOptions) QueryOptions { 232 opv := os.Getenv("SPANNER_OPTIMIZER_VERSION") 233 if opv != "" { 234 if opts.Options == nil { 235 opts.Options = &sppb.ExecuteSqlRequest_QueryOptions{} 236 } 237 opts.Options.OptimizerVersion = opv 238 } 239 return opts 240} 241 242// Close closes the client. 243func (c *Client) Close() { 244 if c.idleSessions != nil { 245 c.idleSessions.close() 246 } 247 c.sc.close() 248} 249 250// Single provides a read-only snapshot transaction optimized for the case 251// where only a single read or query is needed. This is more efficient than 252// using ReadOnlyTransaction() for a single read or query. 253// 254// Single will use a strong TimestampBound by default. Use 255// ReadOnlyTransaction.WithTimestampBound to specify a different 256// TimestampBound. A non-strong bound can be used to reduce latency, or 257// "time-travel" to prior versions of the database, see the documentation of 258// TimestampBound for details. 259func (c *Client) Single() *ReadOnlyTransaction { 260 t := &ReadOnlyTransaction{singleUse: true} 261 t.txReadOnly.sp = c.idleSessions 262 t.txReadOnly.txReadEnv = t 263 t.txReadOnly.qo = c.qo 264 t.txReadOnly.replaceSessionFunc = func(ctx context.Context) error { 265 if t.sh == nil { 266 return spannerErrorf(codes.InvalidArgument, "missing session handle on transaction") 267 } 268 // Remove the session that returned 'Session not found' from the pool. 269 t.sh.destroy() 270 // Reset the transaction, acquire a new session and retry. 271 t.state = txNew 272 sh, _, err := t.acquire(ctx) 273 if err != nil { 274 return err 275 } 276 t.sh = sh 277 return nil 278 } 279 return t 280} 281 282// ReadOnlyTransaction returns a ReadOnlyTransaction that can be used for 283// multiple reads from the database. You must call Close() when the 284// ReadOnlyTransaction is no longer needed to release resources on the server. 285// 286// ReadOnlyTransaction will use a strong TimestampBound by default. Use 287// ReadOnlyTransaction.WithTimestampBound to specify a different 288// TimestampBound. A non-strong bound can be used to reduce latency, or 289// "time-travel" to prior versions of the database, see the documentation of 290// TimestampBound for details. 291func (c *Client) ReadOnlyTransaction() *ReadOnlyTransaction { 292 t := &ReadOnlyTransaction{ 293 singleUse: false, 294 txReadyOrClosed: make(chan struct{}), 295 } 296 t.txReadOnly.sp = c.idleSessions 297 t.txReadOnly.txReadEnv = t 298 t.txReadOnly.qo = c.qo 299 return t 300} 301 302// BatchReadOnlyTransaction returns a BatchReadOnlyTransaction that can be used 303// for partitioned reads or queries from a snapshot of the database. This is 304// useful in batch processing pipelines where one wants to divide the work of 305// reading from the database across multiple machines. 306// 307// Note: This transaction does not use the underlying session pool but creates a 308// new session each time, and the session is reused across clients. 309// 310// You should call Close() after the txn is no longer needed on local 311// client, and call Cleanup() when the txn is finished for all clients, to free 312// the session. 313func (c *Client) BatchReadOnlyTransaction(ctx context.Context, tb TimestampBound) (*BatchReadOnlyTransaction, error) { 314 var ( 315 tx transactionID 316 rts time.Time 317 s *session 318 sh *sessionHandle 319 err error 320 ) 321 defer func() { 322 if err != nil && sh != nil { 323 s.delete(ctx) 324 } 325 }() 326 327 // Create session. 328 s, err = c.sc.createSession(ctx) 329 if err != nil { 330 return nil, err 331 } 332 sh = &sessionHandle{session: s} 333 334 // Begin transaction. 335 res, err := sh.getClient().BeginTransaction(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.BeginTransactionRequest{ 336 Session: sh.getID(), 337 Options: &sppb.TransactionOptions{ 338 Mode: &sppb.TransactionOptions_ReadOnly_{ 339 ReadOnly: buildTransactionOptionsReadOnly(tb, true), 340 }, 341 }, 342 }) 343 if err != nil { 344 return nil, toSpannerError(err) 345 } 346 tx = res.Id 347 if res.ReadTimestamp != nil { 348 rts = time.Unix(res.ReadTimestamp.Seconds, int64(res.ReadTimestamp.Nanos)) 349 } 350 351 t := &BatchReadOnlyTransaction{ 352 ReadOnlyTransaction: ReadOnlyTransaction{ 353 tx: tx, 354 txReadyOrClosed: make(chan struct{}), 355 state: txActive, 356 rts: rts, 357 }, 358 ID: BatchReadOnlyTransactionID{ 359 tid: tx, 360 sid: sh.getID(), 361 rts: rts, 362 }, 363 } 364 t.txReadOnly.sh = sh 365 t.txReadOnly.txReadEnv = t 366 t.txReadOnly.qo = c.qo 367 return t, nil 368} 369 370// BatchReadOnlyTransactionFromID reconstruct a BatchReadOnlyTransaction from 371// BatchReadOnlyTransactionID 372func (c *Client) BatchReadOnlyTransactionFromID(tid BatchReadOnlyTransactionID) *BatchReadOnlyTransaction { 373 s, err := c.sc.sessionWithID(tid.sid) 374 if err != nil { 375 logf(c.logger, "unexpected error: %v\nThis is an indication of an internal error in the Spanner client library.", err) 376 // Use an invalid session. Preferably, this method should just return 377 // the error instead of this, but that would mean an API change. 378 s = &session{} 379 } 380 sh := &sessionHandle{session: s} 381 382 t := &BatchReadOnlyTransaction{ 383 ReadOnlyTransaction: ReadOnlyTransaction{ 384 tx: tid.tid, 385 txReadyOrClosed: make(chan struct{}), 386 state: txActive, 387 rts: tid.rts, 388 }, 389 ID: tid, 390 } 391 t.txReadOnly.sh = sh 392 t.txReadOnly.txReadEnv = t 393 t.txReadOnly.qo = c.qo 394 return t 395} 396 397type transactionInProgressKey struct{} 398 399func checkNestedTxn(ctx context.Context) error { 400 if ctx.Value(transactionInProgressKey{}) != nil { 401 return spannerErrorf(codes.FailedPrecondition, "Cloud Spanner does not support nested transactions") 402 } 403 return nil 404} 405 406// ReadWriteTransaction executes a read-write transaction, with retries as 407// necessary. 408// 409// The function f will be called one or more times. It must not maintain 410// any state between calls. 411// 412// If the transaction cannot be committed or if f returns an ABORTED error, 413// ReadWriteTransaction will call f again. It will continue to call f until the 414// transaction can be committed or the Context times out or is cancelled. If f 415// returns an error other than ABORTED, ReadWriteTransaction will abort the 416// transaction and return the error. 417// 418// To limit the number of retries, set a deadline on the Context rather than 419// using a fixed limit on the number of attempts. ReadWriteTransaction will 420// retry as needed until that deadline is met. 421// 422// See https://godoc.org/cloud.google.com/go/spanner#ReadWriteTransaction for 423// more details. 424func (c *Client) ReadWriteTransaction(ctx context.Context, f func(context.Context, *ReadWriteTransaction) error) (commitTimestamp time.Time, err error) { 425 ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.ReadWriteTransaction") 426 defer func() { trace.EndSpan(ctx, err) }() 427 if err := checkNestedTxn(ctx); err != nil { 428 return time.Time{}, err 429 } 430 var ( 431 ts time.Time 432 sh *sessionHandle 433 ) 434 err = runWithRetryOnAbortedOrSessionNotFound(ctx, func(ctx context.Context) error { 435 var ( 436 err error 437 t *ReadWriteTransaction 438 ) 439 if sh == nil || sh.getID() == "" || sh.getClient() == nil { 440 // Session handle hasn't been allocated or has been destroyed. 441 sh, err = c.idleSessions.takeWriteSession(ctx) 442 if err != nil { 443 // If session retrieval fails, just fail the transaction. 444 return err 445 } 446 t = &ReadWriteTransaction{ 447 tx: sh.getTransactionID(), 448 } 449 } else { 450 t = &ReadWriteTransaction{} 451 } 452 t.txReadOnly.sh = sh 453 t.txReadOnly.txReadEnv = t 454 t.txReadOnly.qo = c.qo 455 trace.TracePrintf(ctx, map[string]interface{}{"transactionID": string(sh.getTransactionID())}, 456 "Starting transaction attempt") 457 if err = t.begin(ctx); err != nil { 458 return err 459 } 460 ts, err = t.runInTransaction(ctx, f) 461 return err 462 }) 463 if sh != nil { 464 sh.recycle() 465 } 466 return ts, err 467} 468 469// applyOption controls the behavior of Client.Apply. 470type applyOption struct { 471 // If atLeastOnce == true, Client.Apply will execute the mutations on Cloud 472 // Spanner at least once. 473 atLeastOnce bool 474} 475 476// An ApplyOption is an optional argument to Apply. 477type ApplyOption func(*applyOption) 478 479// ApplyAtLeastOnce returns an ApplyOption that removes replay protection. 480// 481// With this option, Apply may attempt to apply mutations more than once; if 482// the mutations are not idempotent, this may lead to a failure being reported 483// when the mutation was applied more than once. For example, an insert may 484// fail with ALREADY_EXISTS even though the row did not exist before Apply was 485// called. For this reason, most users of the library will prefer not to use 486// this option. However, ApplyAtLeastOnce requires only a single RPC, whereas 487// Apply's default replay protection may require an additional RPC. So this 488// option may be appropriate for latency sensitive and/or high throughput blind 489// writing. 490func ApplyAtLeastOnce() ApplyOption { 491 return func(ao *applyOption) { 492 ao.atLeastOnce = true 493 } 494} 495 496// Apply applies a list of mutations atomically to the database. 497func (c *Client) Apply(ctx context.Context, ms []*Mutation, opts ...ApplyOption) (commitTimestamp time.Time, err error) { 498 ao := &applyOption{} 499 for _, opt := range opts { 500 opt(ao) 501 } 502 503 ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.Apply") 504 defer func() { trace.EndSpan(ctx, err) }() 505 506 if !ao.atLeastOnce { 507 return c.ReadWriteTransaction(ctx, func(ctx context.Context, t *ReadWriteTransaction) error { 508 return t.BufferWrite(ms) 509 }) 510 } 511 t := &writeOnlyTransaction{c.idleSessions} 512 return t.applyAtLeastOnce(ctx, ms...) 513} 514 515// logf logs the given message to the given logger, or the standard logger if 516// the given logger is nil. 517func logf(logger *log.Logger, format string, v ...interface{}) { 518 if logger == nil { 519 log.Printf(format, v...) 520 } else { 521 logger.Printf(format, v...) 522 } 523} 524