1/* 2Copyright 2017 Google LLC 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package spanner 18 19import ( 20 "context" 21 "fmt" 22 "log" 23 "os" 24 "regexp" 25 "time" 26 27 "cloud.google.com/go/internal/trace" 28 instance "cloud.google.com/go/spanner/admin/instance/apiv1" 29 vkit "cloud.google.com/go/spanner/apiv1" 30 "google.golang.org/api/option" 31 instancepb "google.golang.org/genproto/googleapis/spanner/admin/instance/v1" 32 sppb "google.golang.org/genproto/googleapis/spanner/v1" 33 field_mask "google.golang.org/genproto/protobuf/field_mask" 34 "google.golang.org/grpc" 35 "google.golang.org/grpc/codes" 36 "google.golang.org/grpc/metadata" 37 "google.golang.org/grpc/status" 38) 39 40const ( 41 endpoint = "spanner.googleapis.com:443" 42 43 // resourcePrefixHeader is the name of the metadata header used to indicate 44 // the resource being operated on. 45 resourcePrefixHeader = "google-cloud-resource-prefix" 46) 47 48const ( 49 // Scope is the scope for Cloud Spanner Data API. 50 Scope = "https://www.googleapis.com/auth/spanner.data" 51 52 // AdminScope is the scope for Cloud Spanner Admin APIs. 53 AdminScope = "https://www.googleapis.com/auth/spanner.admin" 54) 55 56var ( 57 validDBPattern = regexp.MustCompile("^projects/[^/]+/instances/[^/]+/databases/[^/]+$") 58 validInstancePattern = regexp.MustCompile("^projects/[^/]+/instances/[^/]+") 59) 60 61func validDatabaseName(db string) error { 62 if matched := validDBPattern.MatchString(db); !matched { 63 return fmt.Errorf("database name %q should conform to pattern %q", 64 db, validDBPattern.String()) 65 } 66 return nil 67} 68 69func getInstanceName(db string) (string, error) { 70 matches := validInstancePattern.FindStringSubmatch(db) 71 if len(matches) == 0 { 72 return "", fmt.Errorf("Failed to retrieve instance name from %q according to pattern %q", 73 db, validInstancePattern.String()) 74 } 75 return matches[0], nil 76} 77 78// Client is a client for reading and writing data to a Cloud Spanner database. 79// A client is safe to use concurrently, except for its Close method. 80type Client struct { 81 sc *sessionClient 82 idleSessions *sessionPool 83 logger *log.Logger 84} 85 86// ClientConfig has configurations for the client. 87type ClientConfig struct { 88 // NumChannels is the number of gRPC channels. 89 // If zero, a reasonable default is used based on the execution environment. 90 NumChannels int 91 92 // SessionPoolConfig is the configuration for session pool. 93 SessionPoolConfig 94 95 // SessionLabels for the sessions created by this client. 96 // See https://cloud.google.com/spanner/docs/reference/rpc/google.spanner.v1#session 97 // for more info. 98 SessionLabels map[string]string 99 100 // logger is the logger to use for this client. If it is nil, all logging 101 // will be directed to the standard logger. 102 logger *log.Logger 103} 104 105// errDial returns error for dialing to Cloud Spanner. 106func errDial(ci int, err error) error { 107 e := toSpannerError(err).(*Error) 108 e.decorate(fmt.Sprintf("dialing fails for channel[%v]", ci)) 109 return e 110} 111 112func contextWithOutgoingMetadata(ctx context.Context, md metadata.MD) context.Context { 113 existing, ok := metadata.FromOutgoingContext(ctx) 114 if ok { 115 md = metadata.Join(existing, md) 116 } 117 return metadata.NewOutgoingContext(ctx, md) 118} 119 120// getInstanceEndpoint returns an instance-specific endpoint if one exists. If 121// multiple endpoints exist, it returns the first one. 122func getInstanceEndpoint(ctx context.Context, database string, opts ...option.ClientOption) (string, error) { 123 instanceName, err := getInstanceName(database) 124 if err != nil { 125 return "", fmt.Errorf("Failed to resolve endpoint: %v", err) 126 } 127 128 c, err := instance.NewInstanceAdminClient(ctx, opts...) 129 if err != nil { 130 return "", err 131 } 132 defer c.Close() 133 134 req := &instancepb.GetInstanceRequest{ 135 Name: instanceName, 136 FieldMask: &field_mask.FieldMask{ 137 Paths: []string{"endpoint_uris"}, 138 }, 139 } 140 141 resp, err := c.GetInstance(ctx, req) 142 if err != nil { 143 return "", err 144 } 145 146 endpointURIs := resp.GetEndpointUris() 147 148 if len(endpointURIs) > 0 { 149 return endpointURIs[0], nil 150 } 151 152 // Return empty string when no endpoints exist. 153 return "", nil 154} 155 156// NewClient creates a client to a database. A valid database name has the 157// form projects/PROJECT_ID/instances/INSTANCE_ID/databases/DATABASE_ID. It uses 158// a default configuration. 159func NewClient(ctx context.Context, database string, opts ...option.ClientOption) (*Client, error) { 160 return NewClientWithConfig(ctx, database, ClientConfig{SessionPoolConfig: DefaultSessionPoolConfig}, opts...) 161} 162 163// NewClientWithConfig creates a client to a database. A valid database name has 164// the form projects/PROJECT_ID/instances/INSTANCE_ID/databases/DATABASE_ID. 165func NewClientWithConfig(ctx context.Context, database string, config ClientConfig, opts ...option.ClientOption) (c *Client, err error) { 166 // Prepare gRPC channels. 167 if config.NumChannels == 0 { 168 config.NumChannels = numChannels 169 } 170 171 // Default configs for session pool. 172 if config.MaxOpened == 0 { 173 config.MaxOpened = uint64(config.NumChannels * 100) 174 } 175 if config.MaxBurst == 0 { 176 config.MaxBurst = DefaultSessionPoolConfig.MaxBurst 177 } 178 179 // Validate database path. 180 if err := validDatabaseName(database); err != nil { 181 return nil, err 182 } 183 184 ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.NewClient") 185 defer func() { trace.EndSpan(ctx, err) }() 186 187 // Append emulator options if SPANNER_EMULATOR_HOST has been set. 188 if emulatorAddr := os.Getenv("SPANNER_EMULATOR_HOST"); emulatorAddr != "" { 189 emulatorOpts := []option.ClientOption{ 190 option.WithEndpoint(emulatorAddr), 191 option.WithGRPCDialOption(grpc.WithInsecure()), 192 option.WithoutAuthentication(), 193 } 194 opts = append(opts, emulatorOpts...) 195 } else if os.Getenv("GOOGLE_CLOUD_SPANNER_ENABLE_RESOURCE_BASED_ROUTING") == "true" { 196 // Fetch the instance-specific endpoint. 197 reqOpts := []option.ClientOption{option.WithEndpoint(endpoint)} 198 reqOpts = append(reqOpts, opts...) 199 instanceEndpoint, err := getInstanceEndpoint(ctx, database, reqOpts...) 200 201 if err != nil { 202 // If there is a PermissionDenied error, fall back to use the global endpoint 203 // or the user-specified endpoint. 204 if status.Code(err) == codes.PermissionDenied { 205 logf(config.logger, ` 206Warning: The client library attempted to connect to an endpoint closer to your 207Cloud Spanner data but was unable to do so. The client library will fall back 208and route requests to the endpoint given in the client options, which may 209result in increased latency. We recommend including the scope 210https://www.googleapis.com/auth/spanner.admin so that the client library can 211get an instance-specific endpoint and efficiently route requests. 212`) 213 } else { 214 return nil, err 215 } 216 } 217 218 if instanceEndpoint != "" { 219 opts = append(opts, option.WithEndpoint(instanceEndpoint)) 220 } 221 } 222 223 // gRPC options. 224 allOpts := []option.ClientOption{ 225 option.WithEndpoint(endpoint), 226 option.WithScopes(Scope), 227 option.WithGRPCDialOption( 228 grpc.WithDefaultCallOptions( 229 grpc.MaxCallSendMsgSize(100<<20), 230 grpc.MaxCallRecvMsgSize(100<<20), 231 ), 232 ), 233 } 234 allOpts = append(allOpts, opts...) 235 236 // TODO(deklerk): This should be replaced with a balancer with 237 // config.NumChannels connections, instead of config.NumChannels 238 // clients. 239 var clients []*vkit.Client 240 for i := 0; i < config.NumChannels; i++ { 241 client, err := vkit.NewClient(ctx, allOpts...) 242 if err != nil { 243 return nil, errDial(i, err) 244 } 245 clients = append(clients, client) 246 } 247 248 // TODO(loite): Remove as the original map cannot be changed by the user 249 // anyways, and the client library is also not changing it. 250 // Make a copy of labels. 251 sessionLabels := make(map[string]string) 252 for k, v := range config.SessionLabels { 253 sessionLabels[k] = v 254 } 255 // Create a session client. 256 sc := newSessionClient(clients, database, sessionLabels, metadata.Pairs(resourcePrefixHeader, database), config.logger) 257 // Create a session pool. 258 config.SessionPoolConfig.sessionLabels = sessionLabels 259 sp, err := newSessionPool(sc, config.SessionPoolConfig) 260 if err != nil { 261 sc.close() 262 return nil, err 263 } 264 c = &Client{ 265 sc: sc, 266 idleSessions: sp, 267 logger: config.logger, 268 } 269 return c, nil 270} 271 272// Close closes the client. 273func (c *Client) Close() { 274 if c.idleSessions != nil { 275 c.idleSessions.close() 276 } 277 c.sc.close() 278} 279 280// Single provides a read-only snapshot transaction optimized for the case 281// where only a single read or query is needed. This is more efficient than 282// using ReadOnlyTransaction() for a single read or query. 283// 284// Single will use a strong TimestampBound by default. Use 285// ReadOnlyTransaction.WithTimestampBound to specify a different 286// TimestampBound. A non-strong bound can be used to reduce latency, or 287// "time-travel" to prior versions of the database, see the documentation of 288// TimestampBound for details. 289func (c *Client) Single() *ReadOnlyTransaction { 290 t := &ReadOnlyTransaction{singleUse: true} 291 t.txReadOnly.sp = c.idleSessions 292 t.txReadOnly.txReadEnv = t 293 t.txReadOnly.replaceSessionFunc = func(ctx context.Context) error { 294 if t.sh == nil { 295 return spannerErrorf(codes.InvalidArgument, "missing session handle on transaction") 296 } 297 // Remove the session that returned 'Session not found' from the pool. 298 t.sh.destroy() 299 // Reset the transaction, acquire a new session and retry. 300 t.state = txNew 301 sh, _, err := t.acquire(ctx) 302 if err != nil { 303 return err 304 } 305 t.sh = sh 306 return nil 307 } 308 return t 309} 310 311// ReadOnlyTransaction returns a ReadOnlyTransaction that can be used for 312// multiple reads from the database. You must call Close() when the 313// ReadOnlyTransaction is no longer needed to release resources on the server. 314// 315// ReadOnlyTransaction will use a strong TimestampBound by default. Use 316// ReadOnlyTransaction.WithTimestampBound to specify a different 317// TimestampBound. A non-strong bound can be used to reduce latency, or 318// "time-travel" to prior versions of the database, see the documentation of 319// TimestampBound for details. 320func (c *Client) ReadOnlyTransaction() *ReadOnlyTransaction { 321 t := &ReadOnlyTransaction{ 322 singleUse: false, 323 txReadyOrClosed: make(chan struct{}), 324 } 325 t.txReadOnly.sp = c.idleSessions 326 t.txReadOnly.txReadEnv = t 327 return t 328} 329 330// BatchReadOnlyTransaction returns a BatchReadOnlyTransaction that can be used 331// for partitioned reads or queries from a snapshot of the database. This is 332// useful in batch processing pipelines where one wants to divide the work of 333// reading from the database across multiple machines. 334// 335// Note: This transaction does not use the underlying session pool but creates a 336// new session each time, and the session is reused across clients. 337// 338// You should call Close() after the txn is no longer needed on local 339// client, and call Cleanup() when the txn is finished for all clients, to free 340// the session. 341func (c *Client) BatchReadOnlyTransaction(ctx context.Context, tb TimestampBound) (*BatchReadOnlyTransaction, error) { 342 var ( 343 tx transactionID 344 rts time.Time 345 s *session 346 sh *sessionHandle 347 err error 348 ) 349 defer func() { 350 if err != nil && sh != nil { 351 s.delete(ctx) 352 } 353 }() 354 355 // Create session. 356 s, err = c.sc.createSession(ctx) 357 if err != nil { 358 return nil, err 359 } 360 sh = &sessionHandle{session: s} 361 362 // Begin transaction. 363 res, err := sh.getClient().BeginTransaction(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.BeginTransactionRequest{ 364 Session: sh.getID(), 365 Options: &sppb.TransactionOptions{ 366 Mode: &sppb.TransactionOptions_ReadOnly_{ 367 ReadOnly: buildTransactionOptionsReadOnly(tb, true), 368 }, 369 }, 370 }) 371 if err != nil { 372 return nil, toSpannerError(err) 373 } 374 tx = res.Id 375 if res.ReadTimestamp != nil { 376 rts = time.Unix(res.ReadTimestamp.Seconds, int64(res.ReadTimestamp.Nanos)) 377 } 378 379 t := &BatchReadOnlyTransaction{ 380 ReadOnlyTransaction: ReadOnlyTransaction{ 381 tx: tx, 382 txReadyOrClosed: make(chan struct{}), 383 state: txActive, 384 rts: rts, 385 }, 386 ID: BatchReadOnlyTransactionID{ 387 tid: tx, 388 sid: sh.getID(), 389 rts: rts, 390 }, 391 } 392 t.txReadOnly.sh = sh 393 t.txReadOnly.txReadEnv = t 394 return t, nil 395} 396 397// BatchReadOnlyTransactionFromID reconstruct a BatchReadOnlyTransaction from 398// BatchReadOnlyTransactionID 399func (c *Client) BatchReadOnlyTransactionFromID(tid BatchReadOnlyTransactionID) *BatchReadOnlyTransaction { 400 s := c.sc.sessionWithID(tid.sid) 401 sh := &sessionHandle{session: s} 402 403 t := &BatchReadOnlyTransaction{ 404 ReadOnlyTransaction: ReadOnlyTransaction{ 405 tx: tid.tid, 406 txReadyOrClosed: make(chan struct{}), 407 state: txActive, 408 rts: tid.rts, 409 }, 410 ID: tid, 411 } 412 t.txReadOnly.sh = sh 413 t.txReadOnly.txReadEnv = t 414 return t 415} 416 417type transactionInProgressKey struct{} 418 419func checkNestedTxn(ctx context.Context) error { 420 if ctx.Value(transactionInProgressKey{}) != nil { 421 return spannerErrorf(codes.FailedPrecondition, "Cloud Spanner does not support nested transactions") 422 } 423 return nil 424} 425 426// ReadWriteTransaction executes a read-write transaction, with retries as 427// necessary. 428// 429// The function f will be called one or more times. It must not maintain 430// any state between calls. 431// 432// If the transaction cannot be committed or if f returns an ABORTED error, 433// ReadWriteTransaction will call f again. It will continue to call f until the 434// transaction can be committed or the Context times out or is cancelled. If f 435// returns an error other than ABORTED, ReadWriteTransaction will abort the 436// transaction and return the error. 437// 438// To limit the number of retries, set a deadline on the Context rather than 439// using a fixed limit on the number of attempts. ReadWriteTransaction will 440// retry as needed until that deadline is met. 441// 442// See https://godoc.org/cloud.google.com/go/spanner#ReadWriteTransaction for 443// more details. 444func (c *Client) ReadWriteTransaction(ctx context.Context, f func(context.Context, *ReadWriteTransaction) error) (commitTimestamp time.Time, err error) { 445 ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.ReadWriteTransaction") 446 defer func() { trace.EndSpan(ctx, err) }() 447 if err := checkNestedTxn(ctx); err != nil { 448 return time.Time{}, err 449 } 450 var ( 451 ts time.Time 452 sh *sessionHandle 453 ) 454 err = runWithRetryOnAbortedOrSessionNotFound(ctx, func(ctx context.Context) error { 455 var ( 456 err error 457 t *ReadWriteTransaction 458 ) 459 if sh == nil || sh.getID() == "" || sh.getClient() == nil { 460 // Session handle hasn't been allocated or has been destroyed. 461 sh, err = c.idleSessions.takeWriteSession(ctx) 462 if err != nil { 463 // If session retrieval fails, just fail the transaction. 464 return err 465 } 466 t = &ReadWriteTransaction{ 467 tx: sh.getTransactionID(), 468 } 469 } else { 470 t = &ReadWriteTransaction{} 471 } 472 t.txReadOnly.sh = sh 473 t.txReadOnly.txReadEnv = t 474 trace.TracePrintf(ctx, map[string]interface{}{"transactionID": string(sh.getTransactionID())}, 475 "Starting transaction attempt") 476 if err = t.begin(ctx); err != nil { 477 return err 478 } 479 ts, err = t.runInTransaction(ctx, f) 480 return err 481 }) 482 if sh != nil { 483 sh.recycle() 484 } 485 return ts, err 486} 487 488// applyOption controls the behavior of Client.Apply. 489type applyOption struct { 490 // If atLeastOnce == true, Client.Apply will execute the mutations on Cloud 491 // Spanner at least once. 492 atLeastOnce bool 493} 494 495// An ApplyOption is an optional argument to Apply. 496type ApplyOption func(*applyOption) 497 498// ApplyAtLeastOnce returns an ApplyOption that removes replay protection. 499// 500// With this option, Apply may attempt to apply mutations more than once; if 501// the mutations are not idempotent, this may lead to a failure being reported 502// when the mutation was applied more than once. For example, an insert may 503// fail with ALREADY_EXISTS even though the row did not exist before Apply was 504// called. For this reason, most users of the library will prefer not to use 505// this option. However, ApplyAtLeastOnce requires only a single RPC, whereas 506// Apply's default replay protection may require an additional RPC. So this 507// option may be appropriate for latency sensitive and/or high throughput blind 508// writing. 509func ApplyAtLeastOnce() ApplyOption { 510 return func(ao *applyOption) { 511 ao.atLeastOnce = true 512 } 513} 514 515// Apply applies a list of mutations atomically to the database. 516func (c *Client) Apply(ctx context.Context, ms []*Mutation, opts ...ApplyOption) (commitTimestamp time.Time, err error) { 517 ao := &applyOption{} 518 for _, opt := range opts { 519 opt(ao) 520 } 521 522 ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.Apply") 523 defer func() { trace.EndSpan(ctx, err) }() 524 525 if !ao.atLeastOnce { 526 return c.ReadWriteTransaction(ctx, func(ctx context.Context, t *ReadWriteTransaction) error { 527 return t.BufferWrite(ms) 528 }) 529 } 530 t := &writeOnlyTransaction{c.idleSessions} 531 return t.applyAtLeastOnce(ctx, ms...) 532} 533 534// logf logs the given message to the given logger, or the standard logger if 535// the given logger is nil. 536func logf(logger *log.Logger, format string, v ...interface{}) { 537 if logger == nil { 538 log.Printf(format, v...) 539 } else { 540 logger.Printf(format, v...) 541 } 542} 543