1/* 2Copyright 2017 Google LLC 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package spanner 18 19import ( 20 "context" 21 "fmt" 22 "log" 23 "os" 24 "regexp" 25 "time" 26 27 "cloud.google.com/go/internal/trace" 28 "google.golang.org/api/option" 29 gtransport "google.golang.org/api/transport/grpc" 30 sppb "google.golang.org/genproto/googleapis/spanner/v1" 31 "google.golang.org/grpc" 32 "google.golang.org/grpc/codes" 33 "google.golang.org/grpc/metadata" 34) 35 36const ( 37 endpoint = "spanner.googleapis.com:443" 38 39 // resourcePrefixHeader is the name of the metadata header used to indicate 40 // the resource being operated on. 41 resourcePrefixHeader = "google-cloud-resource-prefix" 42) 43 44const ( 45 // Scope is the scope for Cloud Spanner Data API. 46 Scope = "https://www.googleapis.com/auth/spanner.data" 47 48 // AdminScope is the scope for Cloud Spanner Admin APIs. 49 AdminScope = "https://www.googleapis.com/auth/spanner.admin" 50) 51 52var ( 53 validDBPattern = regexp.MustCompile("^projects/(?P<project>[^/]+)/instances/(?P<instance>[^/]+)/databases/(?P<database>[^/]+)$") 54) 55 56func validDatabaseName(db string) error { 57 if matched := validDBPattern.MatchString(db); !matched { 58 return fmt.Errorf("database name %q should conform to pattern %q", 59 db, validDBPattern.String()) 60 } 61 return nil 62} 63 64func parseDatabaseName(db string) (project, instance, database string, err error) { 65 matches := validDBPattern.FindStringSubmatch(db) 66 if len(matches) == 0 { 67 return "", "", "", fmt.Errorf("Failed to parse database name from %q according to pattern %q", 68 db, validDBPattern.String()) 69 } 70 return matches[1], matches[2], matches[3], nil 71} 72 73// Client is a client for reading and writing data to a Cloud Spanner database. 74// A client is safe to use concurrently, except for its Close method. 75type Client struct { 76 sc *sessionClient 77 idleSessions *sessionPool 78 logger *log.Logger 79 qo QueryOptions 80} 81 82// ClientConfig has configurations for the client. 83type ClientConfig struct { 84 // NumChannels is the number of gRPC channels. 85 // If zero, a reasonable default is used based on the execution environment. 86 // 87 // Deprecated: The Spanner client now uses a pool of gRPC connections. Use 88 // option.WithGRPCConnectionPool(numConns) instead to specify the number of 89 // connections the client should use. The client will default to a 90 // reasonable default if this option is not specified. 91 NumChannels int 92 93 // SessionPoolConfig is the configuration for session pool. 94 SessionPoolConfig 95 96 // SessionLabels for the sessions created by this client. 97 // See https://cloud.google.com/spanner/docs/reference/rpc/google.spanner.v1#session 98 // for more info. 99 SessionLabels map[string]string 100 101 // QueryOptions is the configuration for executing a sql query. 102 QueryOptions QueryOptions 103 104 // logger is the logger to use for this client. If it is nil, all logging 105 // will be directed to the standard logger. 106 logger *log.Logger 107} 108 109// errDial returns error for dialing to Cloud Spanner. 110func errDial(ci int, err error) error { 111 e := toSpannerError(err).(*Error) 112 e.decorate(fmt.Sprintf("dialing fails for channel[%v]", ci)) 113 return e 114} 115 116func contextWithOutgoingMetadata(ctx context.Context, md metadata.MD) context.Context { 117 existing, ok := metadata.FromOutgoingContext(ctx) 118 if ok { 119 md = metadata.Join(existing, md) 120 } 121 return metadata.NewOutgoingContext(ctx, md) 122} 123 124// NewClient creates a client to a database. A valid database name has the 125// form projects/PROJECT_ID/instances/INSTANCE_ID/databases/DATABASE_ID. It uses 126// a default configuration. 127func NewClient(ctx context.Context, database string, opts ...option.ClientOption) (*Client, error) { 128 return NewClientWithConfig(ctx, database, ClientConfig{SessionPoolConfig: DefaultSessionPoolConfig}, opts...) 129} 130 131// NewClientWithConfig creates a client to a database. A valid database name has 132// the form projects/PROJECT_ID/instances/INSTANCE_ID/databases/DATABASE_ID. 133func NewClientWithConfig(ctx context.Context, database string, config ClientConfig, opts ...option.ClientOption) (c *Client, err error) { 134 // Validate database path. 135 if err := validDatabaseName(database); err != nil { 136 return nil, err 137 } 138 139 ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.NewClient") 140 defer func() { trace.EndSpan(ctx, err) }() 141 142 // Append emulator options if SPANNER_EMULATOR_HOST has been set. 143 if emulatorAddr := os.Getenv("SPANNER_EMULATOR_HOST"); emulatorAddr != "" { 144 emulatorOpts := []option.ClientOption{ 145 option.WithEndpoint(emulatorAddr), 146 option.WithGRPCDialOption(grpc.WithInsecure()), 147 option.WithoutAuthentication(), 148 } 149 opts = append(emulatorOpts, opts...) 150 } 151 152 // Prepare gRPC channels. 153 configuredNumChannels := config.NumChannels 154 if config.NumChannels == 0 { 155 config.NumChannels = numChannels 156 } 157 // gRPC options. 158 allOpts := []option.ClientOption{ 159 option.WithEndpoint(endpoint), 160 option.WithScopes(Scope), 161 option.WithGRPCDialOption( 162 grpc.WithDefaultCallOptions( 163 grpc.MaxCallSendMsgSize(100<<20), 164 grpc.MaxCallRecvMsgSize(100<<20), 165 ), 166 ), 167 option.WithGRPCConnectionPool(config.NumChannels), 168 } 169 // opts will take precedence above allOpts, as the values in opts will be 170 // applied after the values in allOpts. 171 allOpts = append(allOpts, opts...) 172 pool, err := gtransport.DialPool(ctx, allOpts...) 173 if err != nil { 174 return nil, err 175 } 176 if configuredNumChannels > 0 && pool.Num() != config.NumChannels { 177 pool.Close() 178 return nil, spannerErrorf(codes.InvalidArgument, "Connection pool mismatch: NumChannels=%v, WithGRPCConnectionPool=%v. Only set one of these options, or set both to the same value.", config.NumChannels, pool.Num()) 179 } 180 181 // TODO(loite): Remove as the original map cannot be changed by the user 182 // anyways, and the client library is also not changing it. 183 // Make a copy of labels. 184 sessionLabels := make(map[string]string) 185 for k, v := range config.SessionLabels { 186 sessionLabels[k] = v 187 } 188 189 // Default configs for session pool. 190 if config.MaxOpened == 0 { 191 config.MaxOpened = uint64(pool.Num() * 100) 192 } 193 if config.MaxBurst == 0 { 194 config.MaxBurst = DefaultSessionPoolConfig.MaxBurst 195 } 196 if config.incStep == 0 { 197 config.incStep = DefaultSessionPoolConfig.incStep 198 } 199 // Create a session client. 200 sc := newSessionClient(pool, database, sessionLabels, metadata.Pairs(resourcePrefixHeader, database), config.logger) 201 // Create a session pool. 202 config.SessionPoolConfig.sessionLabels = sessionLabels 203 sp, err := newSessionPool(sc, config.SessionPoolConfig) 204 if err != nil { 205 sc.close() 206 return nil, err 207 } 208 c = &Client{ 209 sc: sc, 210 idleSessions: sp, 211 logger: config.logger, 212 qo: getQueryOptions(config.QueryOptions), 213 } 214 return c, nil 215} 216 217// getQueryOptions returns the query options overwritten by the environment 218// variables if exist. The input parameter is the query options set by users 219// via application-level configuration. If the environment variables are set, 220// this will return the overwritten query options. 221func getQueryOptions(opts QueryOptions) QueryOptions { 222 opv := os.Getenv("SPANNER_OPTIMIZER_VERSION") 223 if opv != "" { 224 if opts.Options == nil { 225 opts.Options = &sppb.ExecuteSqlRequest_QueryOptions{} 226 } 227 opts.Options.OptimizerVersion = opv 228 } 229 return opts 230} 231 232// Close closes the client. 233func (c *Client) Close() { 234 if c.idleSessions != nil { 235 c.idleSessions.close() 236 } 237 c.sc.close() 238} 239 240// Single provides a read-only snapshot transaction optimized for the case 241// where only a single read or query is needed. This is more efficient than 242// using ReadOnlyTransaction() for a single read or query. 243// 244// Single will use a strong TimestampBound by default. Use 245// ReadOnlyTransaction.WithTimestampBound to specify a different 246// TimestampBound. A non-strong bound can be used to reduce latency, or 247// "time-travel" to prior versions of the database, see the documentation of 248// TimestampBound for details. 249func (c *Client) Single() *ReadOnlyTransaction { 250 t := &ReadOnlyTransaction{singleUse: true} 251 t.txReadOnly.sp = c.idleSessions 252 t.txReadOnly.txReadEnv = t 253 t.txReadOnly.qo = c.qo 254 t.txReadOnly.replaceSessionFunc = func(ctx context.Context) error { 255 if t.sh == nil { 256 return spannerErrorf(codes.InvalidArgument, "missing session handle on transaction") 257 } 258 // Remove the session that returned 'Session not found' from the pool. 259 t.sh.destroy() 260 // Reset the transaction, acquire a new session and retry. 261 t.state = txNew 262 sh, _, err := t.acquire(ctx) 263 if err != nil { 264 return err 265 } 266 t.sh = sh 267 return nil 268 } 269 return t 270} 271 272// ReadOnlyTransaction returns a ReadOnlyTransaction that can be used for 273// multiple reads from the database. You must call Close() when the 274// ReadOnlyTransaction is no longer needed to release resources on the server. 275// 276// ReadOnlyTransaction will use a strong TimestampBound by default. Use 277// ReadOnlyTransaction.WithTimestampBound to specify a different 278// TimestampBound. A non-strong bound can be used to reduce latency, or 279// "time-travel" to prior versions of the database, see the documentation of 280// TimestampBound for details. 281func (c *Client) ReadOnlyTransaction() *ReadOnlyTransaction { 282 t := &ReadOnlyTransaction{ 283 singleUse: false, 284 txReadyOrClosed: make(chan struct{}), 285 } 286 t.txReadOnly.sp = c.idleSessions 287 t.txReadOnly.txReadEnv = t 288 t.txReadOnly.qo = c.qo 289 return t 290} 291 292// BatchReadOnlyTransaction returns a BatchReadOnlyTransaction that can be used 293// for partitioned reads or queries from a snapshot of the database. This is 294// useful in batch processing pipelines where one wants to divide the work of 295// reading from the database across multiple machines. 296// 297// Note: This transaction does not use the underlying session pool but creates a 298// new session each time, and the session is reused across clients. 299// 300// You should call Close() after the txn is no longer needed on local 301// client, and call Cleanup() when the txn is finished for all clients, to free 302// the session. 303func (c *Client) BatchReadOnlyTransaction(ctx context.Context, tb TimestampBound) (*BatchReadOnlyTransaction, error) { 304 var ( 305 tx transactionID 306 rts time.Time 307 s *session 308 sh *sessionHandle 309 err error 310 ) 311 defer func() { 312 if err != nil && sh != nil { 313 s.delete(ctx) 314 } 315 }() 316 317 // Create session. 318 s, err = c.sc.createSession(ctx) 319 if err != nil { 320 return nil, err 321 } 322 sh = &sessionHandle{session: s} 323 324 // Begin transaction. 325 res, err := sh.getClient().BeginTransaction(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.BeginTransactionRequest{ 326 Session: sh.getID(), 327 Options: &sppb.TransactionOptions{ 328 Mode: &sppb.TransactionOptions_ReadOnly_{ 329 ReadOnly: buildTransactionOptionsReadOnly(tb, true), 330 }, 331 }, 332 }) 333 if err != nil { 334 return nil, toSpannerError(err) 335 } 336 tx = res.Id 337 if res.ReadTimestamp != nil { 338 rts = time.Unix(res.ReadTimestamp.Seconds, int64(res.ReadTimestamp.Nanos)) 339 } 340 341 t := &BatchReadOnlyTransaction{ 342 ReadOnlyTransaction: ReadOnlyTransaction{ 343 tx: tx, 344 txReadyOrClosed: make(chan struct{}), 345 state: txActive, 346 rts: rts, 347 }, 348 ID: BatchReadOnlyTransactionID{ 349 tid: tx, 350 sid: sh.getID(), 351 rts: rts, 352 }, 353 } 354 t.txReadOnly.sh = sh 355 t.txReadOnly.txReadEnv = t 356 t.txReadOnly.qo = c.qo 357 return t, nil 358} 359 360// BatchReadOnlyTransactionFromID reconstruct a BatchReadOnlyTransaction from 361// BatchReadOnlyTransactionID 362func (c *Client) BatchReadOnlyTransactionFromID(tid BatchReadOnlyTransactionID) *BatchReadOnlyTransaction { 363 s, err := c.sc.sessionWithID(tid.sid) 364 if err != nil { 365 logf(c.logger, "unexpected error: %v\nThis is an indication of an internal error in the Spanner client library.", err) 366 // Use an invalid session. Preferably, this method should just return 367 // the error instead of this, but that would mean an API change. 368 s = &session{} 369 } 370 sh := &sessionHandle{session: s} 371 372 t := &BatchReadOnlyTransaction{ 373 ReadOnlyTransaction: ReadOnlyTransaction{ 374 tx: tid.tid, 375 txReadyOrClosed: make(chan struct{}), 376 state: txActive, 377 rts: tid.rts, 378 }, 379 ID: tid, 380 } 381 t.txReadOnly.sh = sh 382 t.txReadOnly.txReadEnv = t 383 t.txReadOnly.qo = c.qo 384 return t 385} 386 387type transactionInProgressKey struct{} 388 389func checkNestedTxn(ctx context.Context) error { 390 if ctx.Value(transactionInProgressKey{}) != nil { 391 return spannerErrorf(codes.FailedPrecondition, "Cloud Spanner does not support nested transactions") 392 } 393 return nil 394} 395 396// ReadWriteTransaction executes a read-write transaction, with retries as 397// necessary. 398// 399// The function f will be called one or more times. It must not maintain 400// any state between calls. 401// 402// If the transaction cannot be committed or if f returns an ABORTED error, 403// ReadWriteTransaction will call f again. It will continue to call f until the 404// transaction can be committed or the Context times out or is cancelled. If f 405// returns an error other than ABORTED, ReadWriteTransaction will abort the 406// transaction and return the error. 407// 408// To limit the number of retries, set a deadline on the Context rather than 409// using a fixed limit on the number of attempts. ReadWriteTransaction will 410// retry as needed until that deadline is met. 411// 412// See https://godoc.org/cloud.google.com/go/spanner#ReadWriteTransaction for 413// more details. 414func (c *Client) ReadWriteTransaction(ctx context.Context, f func(context.Context, *ReadWriteTransaction) error) (commitTimestamp time.Time, err error) { 415 ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.ReadWriteTransaction") 416 defer func() { trace.EndSpan(ctx, err) }() 417 if err := checkNestedTxn(ctx); err != nil { 418 return time.Time{}, err 419 } 420 var ( 421 ts time.Time 422 sh *sessionHandle 423 ) 424 err = runWithRetryOnAbortedOrSessionNotFound(ctx, func(ctx context.Context) error { 425 var ( 426 err error 427 t *ReadWriteTransaction 428 ) 429 if sh == nil || sh.getID() == "" || sh.getClient() == nil { 430 // Session handle hasn't been allocated or has been destroyed. 431 sh, err = c.idleSessions.takeWriteSession(ctx) 432 if err != nil { 433 // If session retrieval fails, just fail the transaction. 434 return err 435 } 436 t = &ReadWriteTransaction{ 437 tx: sh.getTransactionID(), 438 } 439 } else { 440 t = &ReadWriteTransaction{} 441 } 442 t.txReadOnly.sh = sh 443 t.txReadOnly.txReadEnv = t 444 t.txReadOnly.qo = c.qo 445 trace.TracePrintf(ctx, map[string]interface{}{"transactionID": string(sh.getTransactionID())}, 446 "Starting transaction attempt") 447 if err = t.begin(ctx); err != nil { 448 return err 449 } 450 ts, err = t.runInTransaction(ctx, f) 451 return err 452 }) 453 if sh != nil { 454 sh.recycle() 455 } 456 return ts, err 457} 458 459// applyOption controls the behavior of Client.Apply. 460type applyOption struct { 461 // If atLeastOnce == true, Client.Apply will execute the mutations on Cloud 462 // Spanner at least once. 463 atLeastOnce bool 464} 465 466// An ApplyOption is an optional argument to Apply. 467type ApplyOption func(*applyOption) 468 469// ApplyAtLeastOnce returns an ApplyOption that removes replay protection. 470// 471// With this option, Apply may attempt to apply mutations more than once; if 472// the mutations are not idempotent, this may lead to a failure being reported 473// when the mutation was applied more than once. For example, an insert may 474// fail with ALREADY_EXISTS even though the row did not exist before Apply was 475// called. For this reason, most users of the library will prefer not to use 476// this option. However, ApplyAtLeastOnce requires only a single RPC, whereas 477// Apply's default replay protection may require an additional RPC. So this 478// option may be appropriate for latency sensitive and/or high throughput blind 479// writing. 480func ApplyAtLeastOnce() ApplyOption { 481 return func(ao *applyOption) { 482 ao.atLeastOnce = true 483 } 484} 485 486// Apply applies a list of mutations atomically to the database. 487func (c *Client) Apply(ctx context.Context, ms []*Mutation, opts ...ApplyOption) (commitTimestamp time.Time, err error) { 488 ao := &applyOption{} 489 for _, opt := range opts { 490 opt(ao) 491 } 492 493 ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.Apply") 494 defer func() { trace.EndSpan(ctx, err) }() 495 496 if !ao.atLeastOnce { 497 return c.ReadWriteTransaction(ctx, func(ctx context.Context, t *ReadWriteTransaction) error { 498 return t.BufferWrite(ms) 499 }) 500 } 501 t := &writeOnlyTransaction{c.idleSessions} 502 return t.applyAtLeastOnce(ctx, ms...) 503} 504 505// logf logs the given message to the given logger, or the standard logger if 506// the given logger is nil. 507func logf(logger *log.Logger, format string, v ...interface{}) { 508 if logger == nil { 509 log.Printf(format, v...) 510 } else { 511 logger.Printf(format, v...) 512 } 513} 514