1/* 2Copyright 2017 Google LLC 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package spanner 18 19import ( 20 "context" 21 "fmt" 22 "os" 23 "regexp" 24 "sync/atomic" 25 "time" 26 27 "cloud.google.com/go/internal/trace" 28 vkit "cloud.google.com/go/spanner/apiv1" 29 "google.golang.org/api/option" 30 sppb "google.golang.org/genproto/googleapis/spanner/v1" 31 "google.golang.org/grpc" 32 "google.golang.org/grpc/codes" 33 "google.golang.org/grpc/metadata" 34) 35 36const ( 37 endpoint = "spanner.googleapis.com:443" 38 39 // resourcePrefixHeader is the name of the metadata header used to indicate 40 // the resource being operated on. 41 resourcePrefixHeader = "google-cloud-resource-prefix" 42) 43 44const ( 45 // Scope is the scope for Cloud Spanner Data API. 46 Scope = "https://www.googleapis.com/auth/spanner.data" 47 48 // AdminScope is the scope for Cloud Spanner Admin APIs. 49 AdminScope = "https://www.googleapis.com/auth/spanner.admin" 50) 51 52var ( 53 validDBPattern = regexp.MustCompile("^projects/[^/]+/instances/[^/]+/databases/[^/]+$") 54) 55 56func validDatabaseName(db string) error { 57 if matched := validDBPattern.MatchString(db); !matched { 58 return fmt.Errorf("database name %q should conform to pattern %q", 59 db, validDBPattern.String()) 60 } 61 return nil 62} 63 64// Client is a client for reading and writing data to a Cloud Spanner database. 65// A client is safe to use concurrently, except for its Close method. 66type Client struct { 67 // rr must be accessed through atomic operations. 68 rr uint32 69 clients []*vkit.Client 70 71 database string 72 // Metadata to be sent with each request. 73 md metadata.MD 74 idleSessions *sessionPool 75 // sessionLabels for the sessions created by this client. 76 sessionLabels map[string]string 77} 78 79// ClientConfig has configurations for the client. 80type ClientConfig struct { 81 // NumChannels is the number of gRPC channels. 82 // If zero, a reasonable default is used based on the execution environment. 83 NumChannels int 84 85 // SessionPoolConfig is the configuration for session pool. 86 SessionPoolConfig 87 88 // SessionLabels for the sessions created by this client. 89 // See https://cloud.google.com/spanner/docs/reference/rpc/google.spanner.v1#session 90 // for more info. 91 SessionLabels map[string]string 92} 93 94// errDial returns error for dialing to Cloud Spanner. 95func errDial(ci int, err error) error { 96 e := toSpannerError(err).(*Error) 97 e.decorate(fmt.Sprintf("dialing fails for channel[%v]", ci)) 98 return e 99} 100 101func contextWithOutgoingMetadata(ctx context.Context, md metadata.MD) context.Context { 102 existing, ok := metadata.FromOutgoingContext(ctx) 103 if ok { 104 md = metadata.Join(existing, md) 105 } 106 return metadata.NewOutgoingContext(ctx, md) 107} 108 109// NewClient creates a client to a database. A valid database name has the 110// form projects/PROJECT_ID/instances/INSTANCE_ID/databases/DATABASE_ID. It uses 111// a default configuration. 112func NewClient(ctx context.Context, database string, opts ...option.ClientOption) (*Client, error) { 113 return NewClientWithConfig(ctx, database, ClientConfig{}, opts...) 114} 115 116// NewClientWithConfig creates a client to a database. A valid database name has 117// the form projects/PROJECT_ID/instances/INSTANCE_ID/databases/DATABASE_ID. 118func NewClientWithConfig(ctx context.Context, database string, config ClientConfig, opts ...option.ClientOption) (c *Client, err error) { 119 c = &Client{ 120 database: database, 121 md: metadata.Pairs(resourcePrefixHeader, database), 122 } 123 124 // Make a copy of labels. 125 c.sessionLabels = make(map[string]string) 126 for k, v := range config.SessionLabels { 127 c.sessionLabels[k] = v 128 } 129 130 // Prepare gRPC channels. 131 if config.NumChannels == 0 { 132 config.NumChannels = numChannels 133 } 134 135 // Default configs for session pool. 136 if config.MaxOpened == 0 { 137 config.MaxOpened = uint64(config.NumChannels * 100) 138 } 139 if config.MaxBurst == 0 { 140 config.MaxBurst = 10 141 } 142 143 // Validate database path. 144 if err := validDatabaseName(database); err != nil { 145 return nil, err 146 } 147 148 ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.NewClient") 149 defer func() { trace.EndSpan(ctx, err) }() 150 151 // Append emulator options if SPANNER_EMULATOR_HOST has been set. 152 if emulatorAddr := os.Getenv("SPANNER_EMULATOR_HOST"); emulatorAddr != "" { 153 emulatorOpts := []option.ClientOption{ 154 option.WithEndpoint(emulatorAddr), 155 option.WithGRPCDialOption(grpc.WithInsecure()), 156 option.WithoutAuthentication(), 157 } 158 opts = append(opts, emulatorOpts...) 159 } 160 161 // gRPC options. 162 allOpts := []option.ClientOption{ 163 option.WithEndpoint(endpoint), 164 option.WithScopes(Scope), 165 option.WithGRPCDialOption( 166 grpc.WithDefaultCallOptions( 167 grpc.MaxCallSendMsgSize(100<<20), 168 grpc.MaxCallRecvMsgSize(100<<20), 169 ), 170 ), 171 } 172 allOpts = append(allOpts, opts...) 173 174 // TODO(deklerk): This should be replaced with a balancer with 175 // config.NumChannels connections, instead of config.NumChannels 176 // clients. 177 for i := 0; i < config.NumChannels; i++ { 178 client, err := vkit.NewClient(ctx, allOpts...) 179 if err != nil { 180 return nil, errDial(i, err) 181 } 182 c.clients = append(c.clients, client) 183 } 184 185 // Prepare session pool. 186 // TODO: support more loadbalancing options. 187 config.SessionPoolConfig.getRPCClient = func() (*vkit.Client, error) { 188 return c.rrNext(), nil 189 } 190 config.SessionPoolConfig.sessionLabels = c.sessionLabels 191 sp, err := newSessionPool(database, config.SessionPoolConfig, c.md) 192 if err != nil { 193 c.Close() 194 return nil, err 195 } 196 c.idleSessions = sp 197 return c, nil 198} 199 200// rrNext returns the next available vkit Cloud Spanner RPC client in a 201// round-robin manner. 202func (c *Client) rrNext() *vkit.Client { 203 return c.clients[atomic.AddUint32(&c.rr, 1)%uint32(len(c.clients))] 204} 205 206// Close closes the client. 207func (c *Client) Close() { 208 if c.idleSessions != nil { 209 c.idleSessions.close() 210 } 211 for _, gpc := range c.clients { 212 gpc.Close() 213 } 214} 215 216// Single provides a read-only snapshot transaction optimized for the case 217// where only a single read or query is needed. This is more efficient than 218// using ReadOnlyTransaction() for a single read or query. 219// 220// Single will use a strong TimestampBound by default. Use 221// ReadOnlyTransaction.WithTimestampBound to specify a different 222// TimestampBound. A non-strong bound can be used to reduce latency, or 223// "time-travel" to prior versions of the database, see the documentation of 224// TimestampBound for details. 225func (c *Client) Single() *ReadOnlyTransaction { 226 t := &ReadOnlyTransaction{singleUse: true, sp: c.idleSessions} 227 t.txReadOnly.txReadEnv = t 228 return t 229} 230 231// ReadOnlyTransaction returns a ReadOnlyTransaction that can be used for 232// multiple reads from the database. You must call Close() when the 233// ReadOnlyTransaction is no longer needed to release resources on the server. 234// 235// ReadOnlyTransaction will use a strong TimestampBound by default. Use 236// ReadOnlyTransaction.WithTimestampBound to specify a different 237// TimestampBound. A non-strong bound can be used to reduce latency, or 238// "time-travel" to prior versions of the database, see the documentation of 239// TimestampBound for details. 240func (c *Client) ReadOnlyTransaction() *ReadOnlyTransaction { 241 t := &ReadOnlyTransaction{ 242 singleUse: false, 243 sp: c.idleSessions, 244 txReadyOrClosed: make(chan struct{}), 245 } 246 t.txReadOnly.txReadEnv = t 247 return t 248} 249 250// BatchReadOnlyTransaction returns a BatchReadOnlyTransaction that can be used 251// for partitioned reads or queries from a snapshot of the database. This is 252// useful in batch processing pipelines where one wants to divide the work of 253// reading from the database across multiple machines. 254// 255// Note: This transaction does not use the underlying session pool but creates a 256// new session each time, and the session is reused across clients. 257// 258// You should call Close() after the txn is no longer needed on local 259// client, and call Cleanup() when the txn is finished for all clients, to free 260// the session. 261func (c *Client) BatchReadOnlyTransaction(ctx context.Context, tb TimestampBound) (*BatchReadOnlyTransaction, error) { 262 var ( 263 tx transactionID 264 rts time.Time 265 s *session 266 sh *sessionHandle 267 err error 268 ) 269 defer func() { 270 if err != nil && sh != nil { 271 s.delete(ctx) 272 } 273 }() 274 275 // Create session. 276 sc := c.rrNext() 277 s, err = createSession(ctx, sc, c.database, c.sessionLabels, c.md) 278 if err != nil { 279 return nil, err 280 } 281 sh = &sessionHandle{session: s} 282 283 // Begin transaction. 284 res, err := sh.getClient().BeginTransaction(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.BeginTransactionRequest{ 285 Session: sh.getID(), 286 Options: &sppb.TransactionOptions{ 287 Mode: &sppb.TransactionOptions_ReadOnly_{ 288 ReadOnly: buildTransactionOptionsReadOnly(tb, true), 289 }, 290 }, 291 }) 292 if err != nil { 293 return nil, toSpannerError(err) 294 } 295 tx = res.Id 296 if res.ReadTimestamp != nil { 297 rts = time.Unix(res.ReadTimestamp.Seconds, int64(res.ReadTimestamp.Nanos)) 298 } 299 300 t := &BatchReadOnlyTransaction{ 301 ReadOnlyTransaction: ReadOnlyTransaction{ 302 tx: tx, 303 txReadyOrClosed: make(chan struct{}), 304 state: txActive, 305 sh: sh, 306 rts: rts, 307 }, 308 ID: BatchReadOnlyTransactionID{ 309 tid: tx, 310 sid: sh.getID(), 311 rts: rts, 312 }, 313 } 314 t.txReadOnly.txReadEnv = t 315 return t, nil 316} 317 318// BatchReadOnlyTransactionFromID reconstruct a BatchReadOnlyTransaction from 319// BatchReadOnlyTransactionID 320func (c *Client) BatchReadOnlyTransactionFromID(tid BatchReadOnlyTransactionID) *BatchReadOnlyTransaction { 321 sc := c.rrNext() 322 s := &session{valid: true, client: sc, id: tid.sid, createTime: time.Now(), md: c.md} 323 sh := &sessionHandle{session: s} 324 325 t := &BatchReadOnlyTransaction{ 326 ReadOnlyTransaction: ReadOnlyTransaction{ 327 tx: tid.tid, 328 txReadyOrClosed: make(chan struct{}), 329 state: txActive, 330 sh: sh, 331 rts: tid.rts, 332 }, 333 ID: tid, 334 } 335 t.txReadOnly.txReadEnv = t 336 return t 337} 338 339type transactionInProgressKey struct{} 340 341func checkNestedTxn(ctx context.Context) error { 342 if ctx.Value(transactionInProgressKey{}) != nil { 343 return spannerErrorf(codes.FailedPrecondition, "Cloud Spanner does not support nested transactions") 344 } 345 return nil 346} 347 348// ReadWriteTransaction executes a read-write transaction, with retries as 349// necessary. 350// 351// The function f will be called one or more times. It must not maintain 352// any state between calls. 353// 354// If the transaction cannot be committed or if f returns an ABORTED error, 355// ReadWriteTransaction will call f again. It will continue to call f until the 356// transaction can be committed or the Context times out or is cancelled. If f 357// returns an error other than ABORTED, ReadWriteTransaction will abort the 358// transaction and return the error. 359// 360// To limit the number of retries, set a deadline on the Context rather than 361// using a fixed limit on the number of attempts. ReadWriteTransaction will 362// retry as needed until that deadline is met. 363// 364// See https://godoc.org/cloud.google.com/go/spanner#ReadWriteTransaction for 365// more details. 366func (c *Client) ReadWriteTransaction(ctx context.Context, f func(context.Context, *ReadWriteTransaction) error) (commitTimestamp time.Time, err error) { 367 ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.ReadWriteTransaction") 368 defer func() { trace.EndSpan(ctx, err) }() 369 if err := checkNestedTxn(ctx); err != nil { 370 return time.Time{}, err 371 } 372 var ( 373 ts time.Time 374 sh *sessionHandle 375 ) 376 err = runWithRetryOnAborted(ctx, func(ctx context.Context) error { 377 var ( 378 err error 379 t *ReadWriteTransaction 380 ) 381 if sh == nil || sh.getID() == "" || sh.getClient() == nil { 382 // Session handle hasn't been allocated or has been destroyed. 383 sh, err = c.idleSessions.takeWriteSession(ctx) 384 if err != nil { 385 // If session retrieval fails, just fail the transaction. 386 return err 387 } 388 t = &ReadWriteTransaction{ 389 sh: sh, 390 tx: sh.getTransactionID(), 391 } 392 } else { 393 t = &ReadWriteTransaction{ 394 sh: sh, 395 } 396 } 397 t.txReadOnly.txReadEnv = t 398 trace.TracePrintf(ctx, map[string]interface{}{"transactionID": string(sh.getTransactionID())}, 399 "Starting transaction attempt") 400 if err = t.begin(ctx); err != nil { 401 return err 402 } 403 ts, err = t.runInTransaction(ctx, f) 404 return err 405 }) 406 if sh != nil { 407 sh.recycle() 408 } 409 return ts, err 410} 411 412// applyOption controls the behavior of Client.Apply. 413type applyOption struct { 414 // If atLeastOnce == true, Client.Apply will execute the mutations on Cloud 415 // Spanner at least once. 416 atLeastOnce bool 417} 418 419// An ApplyOption is an optional argument to Apply. 420type ApplyOption func(*applyOption) 421 422// ApplyAtLeastOnce returns an ApplyOption that removes replay protection. 423// 424// With this option, Apply may attempt to apply mutations more than once; if 425// the mutations are not idempotent, this may lead to a failure being reported 426// when the mutation was applied more than once. For example, an insert may 427// fail with ALREADY_EXISTS even though the row did not exist before Apply was 428// called. For this reason, most users of the library will prefer not to use 429// this option. However, ApplyAtLeastOnce requires only a single RPC, whereas 430// Apply's default replay protection may require an additional RPC. So this 431// option may be appropriate for latency sensitive and/or high throughput blind 432// writing. 433func ApplyAtLeastOnce() ApplyOption { 434 return func(ao *applyOption) { 435 ao.atLeastOnce = true 436 } 437} 438 439// Apply applies a list of mutations atomically to the database. 440func (c *Client) Apply(ctx context.Context, ms []*Mutation, opts ...ApplyOption) (commitTimestamp time.Time, err error) { 441 ao := &applyOption{} 442 for _, opt := range opts { 443 opt(ao) 444 } 445 if !ao.atLeastOnce { 446 return c.ReadWriteTransaction(ctx, func(ctx context.Context, t *ReadWriteTransaction) error { 447 return t.BufferWrite(ms) 448 }) 449 } 450 451 ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.Apply") 452 defer func() { trace.EndSpan(ctx, err) }() 453 t := &writeOnlyTransaction{c.idleSessions} 454 return t.applyAtLeastOnce(ctx, ms...) 455} 456