1/* 2Copyright 2017 Google LLC 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package spanner 18 19import ( 20 "context" 21 "fmt" 22 "log" 23 "os" 24 "regexp" 25 "time" 26 27 "cloud.google.com/go/internal/trace" 28 vkit "cloud.google.com/go/spanner/apiv1" 29 "google.golang.org/api/option" 30 sppb "google.golang.org/genproto/googleapis/spanner/v1" 31 "google.golang.org/grpc" 32 "google.golang.org/grpc/codes" 33 "google.golang.org/grpc/metadata" 34) 35 36const ( 37 endpoint = "spanner.googleapis.com:443" 38 39 // resourcePrefixHeader is the name of the metadata header used to indicate 40 // the resource being operated on. 41 resourcePrefixHeader = "google-cloud-resource-prefix" 42) 43 44const ( 45 // Scope is the scope for Cloud Spanner Data API. 46 Scope = "https://www.googleapis.com/auth/spanner.data" 47 48 // AdminScope is the scope for Cloud Spanner Admin APIs. 49 AdminScope = "https://www.googleapis.com/auth/spanner.admin" 50) 51 52var ( 53 validDBPattern = regexp.MustCompile("^projects/[^/]+/instances/[^/]+/databases/[^/]+$") 54) 55 56func validDatabaseName(db string) error { 57 if matched := validDBPattern.MatchString(db); !matched { 58 return fmt.Errorf("database name %q should conform to pattern %q", 59 db, validDBPattern.String()) 60 } 61 return nil 62} 63 64// Client is a client for reading and writing data to a Cloud Spanner database. 65// A client is safe to use concurrently, except for its Close method. 66type Client struct { 67 sc *sessionClient 68 idleSessions *sessionPool 69 logger *log.Logger 70} 71 72// ClientConfig has configurations for the client. 73type ClientConfig struct { 74 // NumChannels is the number of gRPC channels. 75 // If zero, a reasonable default is used based on the execution environment. 76 NumChannels int 77 78 // SessionPoolConfig is the configuration for session pool. 79 SessionPoolConfig 80 81 // SessionLabels for the sessions created by this client. 82 // See https://cloud.google.com/spanner/docs/reference/rpc/google.spanner.v1#session 83 // for more info. 84 SessionLabels map[string]string 85 86 // logger is the logger to use for this client. If it is nil, all logging 87 // will be directed to the standard logger. 88 logger *log.Logger 89} 90 91// errDial returns error for dialing to Cloud Spanner. 92func errDial(ci int, err error) error { 93 e := toSpannerError(err).(*Error) 94 e.decorate(fmt.Sprintf("dialing fails for channel[%v]", ci)) 95 return e 96} 97 98func contextWithOutgoingMetadata(ctx context.Context, md metadata.MD) context.Context { 99 existing, ok := metadata.FromOutgoingContext(ctx) 100 if ok { 101 md = metadata.Join(existing, md) 102 } 103 return metadata.NewOutgoingContext(ctx, md) 104} 105 106// NewClient creates a client to a database. A valid database name has the 107// form projects/PROJECT_ID/instances/INSTANCE_ID/databases/DATABASE_ID. It uses 108// a default configuration. 109func NewClient(ctx context.Context, database string, opts ...option.ClientOption) (*Client, error) { 110 return NewClientWithConfig(ctx, database, ClientConfig{SessionPoolConfig: DefaultSessionPoolConfig}, opts...) 111} 112 113// NewClientWithConfig creates a client to a database. A valid database name has 114// the form projects/PROJECT_ID/instances/INSTANCE_ID/databases/DATABASE_ID. 115func NewClientWithConfig(ctx context.Context, database string, config ClientConfig, opts ...option.ClientOption) (c *Client, err error) { 116 // Prepare gRPC channels. 117 if config.NumChannels == 0 { 118 config.NumChannels = numChannels 119 } 120 121 // Default configs for session pool. 122 if config.MaxOpened == 0 { 123 config.MaxOpened = uint64(config.NumChannels * 100) 124 } 125 if config.MaxBurst == 0 { 126 config.MaxBurst = DefaultSessionPoolConfig.MaxBurst 127 } 128 129 // Validate database path. 130 if err := validDatabaseName(database); err != nil { 131 return nil, err 132 } 133 134 ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.NewClient") 135 defer func() { trace.EndSpan(ctx, err) }() 136 137 // Append emulator options if SPANNER_EMULATOR_HOST has been set. 138 if emulatorAddr := os.Getenv("SPANNER_EMULATOR_HOST"); emulatorAddr != "" { 139 emulatorOpts := []option.ClientOption{ 140 option.WithEndpoint(emulatorAddr), 141 option.WithGRPCDialOption(grpc.WithInsecure()), 142 option.WithoutAuthentication(), 143 } 144 opts = append(opts, emulatorOpts...) 145 } 146 147 // gRPC options. 148 allOpts := []option.ClientOption{ 149 option.WithEndpoint(endpoint), 150 option.WithScopes(Scope), 151 option.WithGRPCDialOption( 152 grpc.WithDefaultCallOptions( 153 grpc.MaxCallSendMsgSize(100<<20), 154 grpc.MaxCallRecvMsgSize(100<<20), 155 ), 156 ), 157 } 158 allOpts = append(allOpts, opts...) 159 160 // TODO(deklerk): This should be replaced with a balancer with 161 // config.NumChannels connections, instead of config.NumChannels 162 // clients. 163 var clients []*vkit.Client 164 for i := 0; i < config.NumChannels; i++ { 165 client, err := vkit.NewClient(ctx, allOpts...) 166 if err != nil { 167 return nil, errDial(i, err) 168 } 169 clients = append(clients, client) 170 } 171 172 // TODO(loite): Remove as the original map cannot be changed by the user 173 // anyways, and the client library is also not changing it. 174 // Make a copy of labels. 175 sessionLabels := make(map[string]string) 176 for k, v := range config.SessionLabels { 177 sessionLabels[k] = v 178 } 179 // Create a session client. 180 sc := newSessionClient(clients, database, sessionLabels, metadata.Pairs(resourcePrefixHeader, database), config.logger) 181 // Create a session pool. 182 config.SessionPoolConfig.sessionLabels = sessionLabels 183 sp, err := newSessionPool(sc, config.SessionPoolConfig) 184 if err != nil { 185 sc.close() 186 return nil, err 187 } 188 c = &Client{ 189 sc: sc, 190 idleSessions: sp, 191 logger: config.logger, 192 } 193 return c, nil 194} 195 196// Close closes the client. 197func (c *Client) Close() { 198 if c.idleSessions != nil { 199 c.idleSessions.close() 200 } 201 c.sc.close() 202} 203 204// Single provides a read-only snapshot transaction optimized for the case 205// where only a single read or query is needed. This is more efficient than 206// using ReadOnlyTransaction() for a single read or query. 207// 208// Single will use a strong TimestampBound by default. Use 209// ReadOnlyTransaction.WithTimestampBound to specify a different 210// TimestampBound. A non-strong bound can be used to reduce latency, or 211// "time-travel" to prior versions of the database, see the documentation of 212// TimestampBound for details. 213func (c *Client) Single() *ReadOnlyTransaction { 214 t := &ReadOnlyTransaction{singleUse: true, sp: c.idleSessions} 215 t.txReadOnly.txReadEnv = t 216 return t 217} 218 219// ReadOnlyTransaction returns a ReadOnlyTransaction that can be used for 220// multiple reads from the database. You must call Close() when the 221// ReadOnlyTransaction is no longer needed to release resources on the server. 222// 223// ReadOnlyTransaction will use a strong TimestampBound by default. Use 224// ReadOnlyTransaction.WithTimestampBound to specify a different 225// TimestampBound. A non-strong bound can be used to reduce latency, or 226// "time-travel" to prior versions of the database, see the documentation of 227// TimestampBound for details. 228func (c *Client) ReadOnlyTransaction() *ReadOnlyTransaction { 229 t := &ReadOnlyTransaction{ 230 singleUse: false, 231 sp: c.idleSessions, 232 txReadyOrClosed: make(chan struct{}), 233 } 234 t.txReadOnly.txReadEnv = t 235 return t 236} 237 238// BatchReadOnlyTransaction returns a BatchReadOnlyTransaction that can be used 239// for partitioned reads or queries from a snapshot of the database. This is 240// useful in batch processing pipelines where one wants to divide the work of 241// reading from the database across multiple machines. 242// 243// Note: This transaction does not use the underlying session pool but creates a 244// new session each time, and the session is reused across clients. 245// 246// You should call Close() after the txn is no longer needed on local 247// client, and call Cleanup() when the txn is finished for all clients, to free 248// the session. 249func (c *Client) BatchReadOnlyTransaction(ctx context.Context, tb TimestampBound) (*BatchReadOnlyTransaction, error) { 250 var ( 251 tx transactionID 252 rts time.Time 253 s *session 254 sh *sessionHandle 255 err error 256 ) 257 defer func() { 258 if err != nil && sh != nil { 259 s.delete(ctx) 260 } 261 }() 262 263 // Create session. 264 s, err = c.sc.createSession(ctx) 265 if err != nil { 266 return nil, err 267 } 268 sh = &sessionHandle{session: s} 269 270 // Begin transaction. 271 res, err := sh.getClient().BeginTransaction(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.BeginTransactionRequest{ 272 Session: sh.getID(), 273 Options: &sppb.TransactionOptions{ 274 Mode: &sppb.TransactionOptions_ReadOnly_{ 275 ReadOnly: buildTransactionOptionsReadOnly(tb, true), 276 }, 277 }, 278 }) 279 if err != nil { 280 return nil, toSpannerError(err) 281 } 282 tx = res.Id 283 if res.ReadTimestamp != nil { 284 rts = time.Unix(res.ReadTimestamp.Seconds, int64(res.ReadTimestamp.Nanos)) 285 } 286 287 t := &BatchReadOnlyTransaction{ 288 ReadOnlyTransaction: ReadOnlyTransaction{ 289 tx: tx, 290 txReadyOrClosed: make(chan struct{}), 291 state: txActive, 292 sh: sh, 293 rts: rts, 294 }, 295 ID: BatchReadOnlyTransactionID{ 296 tid: tx, 297 sid: sh.getID(), 298 rts: rts, 299 }, 300 } 301 t.txReadOnly.txReadEnv = t 302 return t, nil 303} 304 305// BatchReadOnlyTransactionFromID reconstruct a BatchReadOnlyTransaction from 306// BatchReadOnlyTransactionID 307func (c *Client) BatchReadOnlyTransactionFromID(tid BatchReadOnlyTransactionID) *BatchReadOnlyTransaction { 308 s := c.sc.sessionWithID(tid.sid) 309 sh := &sessionHandle{session: s} 310 311 t := &BatchReadOnlyTransaction{ 312 ReadOnlyTransaction: ReadOnlyTransaction{ 313 tx: tid.tid, 314 txReadyOrClosed: make(chan struct{}), 315 state: txActive, 316 sh: sh, 317 rts: tid.rts, 318 }, 319 ID: tid, 320 } 321 t.txReadOnly.txReadEnv = t 322 return t 323} 324 325type transactionInProgressKey struct{} 326 327func checkNestedTxn(ctx context.Context) error { 328 if ctx.Value(transactionInProgressKey{}) != nil { 329 return spannerErrorf(codes.FailedPrecondition, "Cloud Spanner does not support nested transactions") 330 } 331 return nil 332} 333 334// ReadWriteTransaction executes a read-write transaction, with retries as 335// necessary. 336// 337// The function f will be called one or more times. It must not maintain 338// any state between calls. 339// 340// If the transaction cannot be committed or if f returns an ABORTED error, 341// ReadWriteTransaction will call f again. It will continue to call f until the 342// transaction can be committed or the Context times out or is cancelled. If f 343// returns an error other than ABORTED, ReadWriteTransaction will abort the 344// transaction and return the error. 345// 346// To limit the number of retries, set a deadline on the Context rather than 347// using a fixed limit on the number of attempts. ReadWriteTransaction will 348// retry as needed until that deadline is met. 349// 350// See https://godoc.org/cloud.google.com/go/spanner#ReadWriteTransaction for 351// more details. 352func (c *Client) ReadWriteTransaction(ctx context.Context, f func(context.Context, *ReadWriteTransaction) error) (commitTimestamp time.Time, err error) { 353 ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.ReadWriteTransaction") 354 defer func() { trace.EndSpan(ctx, err) }() 355 if err := checkNestedTxn(ctx); err != nil { 356 return time.Time{}, err 357 } 358 var ( 359 ts time.Time 360 sh *sessionHandle 361 ) 362 err = runWithRetryOnAborted(ctx, func(ctx context.Context) error { 363 var ( 364 err error 365 t *ReadWriteTransaction 366 ) 367 if sh == nil || sh.getID() == "" || sh.getClient() == nil { 368 // Session handle hasn't been allocated or has been destroyed. 369 sh, err = c.idleSessions.takeWriteSession(ctx) 370 if err != nil { 371 // If session retrieval fails, just fail the transaction. 372 return err 373 } 374 t = &ReadWriteTransaction{ 375 sh: sh, 376 tx: sh.getTransactionID(), 377 } 378 } else { 379 t = &ReadWriteTransaction{ 380 sh: sh, 381 } 382 } 383 t.txReadOnly.txReadEnv = t 384 trace.TracePrintf(ctx, map[string]interface{}{"transactionID": string(sh.getTransactionID())}, 385 "Starting transaction attempt") 386 if err = t.begin(ctx); err != nil { 387 return err 388 } 389 ts, err = t.runInTransaction(ctx, f) 390 return err 391 }) 392 if sh != nil { 393 sh.recycle() 394 } 395 return ts, err 396} 397 398// applyOption controls the behavior of Client.Apply. 399type applyOption struct { 400 // If atLeastOnce == true, Client.Apply will execute the mutations on Cloud 401 // Spanner at least once. 402 atLeastOnce bool 403} 404 405// An ApplyOption is an optional argument to Apply. 406type ApplyOption func(*applyOption) 407 408// ApplyAtLeastOnce returns an ApplyOption that removes replay protection. 409// 410// With this option, Apply may attempt to apply mutations more than once; if 411// the mutations are not idempotent, this may lead to a failure being reported 412// when the mutation was applied more than once. For example, an insert may 413// fail with ALREADY_EXISTS even though the row did not exist before Apply was 414// called. For this reason, most users of the library will prefer not to use 415// this option. However, ApplyAtLeastOnce requires only a single RPC, whereas 416// Apply's default replay protection may require an additional RPC. So this 417// option may be appropriate for latency sensitive and/or high throughput blind 418// writing. 419func ApplyAtLeastOnce() ApplyOption { 420 return func(ao *applyOption) { 421 ao.atLeastOnce = true 422 } 423} 424 425// Apply applies a list of mutations atomically to the database. 426func (c *Client) Apply(ctx context.Context, ms []*Mutation, opts ...ApplyOption) (commitTimestamp time.Time, err error) { 427 ao := &applyOption{} 428 for _, opt := range opts { 429 opt(ao) 430 } 431 if !ao.atLeastOnce { 432 return c.ReadWriteTransaction(ctx, func(ctx context.Context, t *ReadWriteTransaction) error { 433 return t.BufferWrite(ms) 434 }) 435 } 436 437 ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.Apply") 438 defer func() { trace.EndSpan(ctx, err) }() 439 t := &writeOnlyTransaction{c.idleSessions} 440 return t.applyAtLeastOnce(ctx, ms...) 441} 442 443// logf logs the given message to the given logger, or the standard logger if 444// the given logger is nil. 445func logf(logger *log.Logger, format string, v ...interface{}) { 446 if logger == nil { 447 log.Printf(format, v...) 448 } else { 449 logger.Printf(format, v...) 450 } 451} 452