1/* 2Copyright 2017 Google LLC 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package spanner 18 19import ( 20 "context" 21 "fmt" 22 "regexp" 23 "sync/atomic" 24 "time" 25 26 "cloud.google.com/go/internal/trace" 27 "cloud.google.com/go/internal/version" 28 "google.golang.org/api/option" 29 gtransport "google.golang.org/api/transport/grpc" 30 sppb "google.golang.org/genproto/googleapis/spanner/v1" 31 "google.golang.org/grpc" 32 "google.golang.org/grpc/codes" 33 "google.golang.org/grpc/metadata" 34) 35 36const ( 37 endpoint = "spanner.googleapis.com:443" 38 39 // resourcePrefixHeader is the name of the metadata header used to indicate 40 // the resource being operated on. 41 resourcePrefixHeader = "google-cloud-resource-prefix" 42 43 // xGoogHeaderKey is the name of the metadata header used to indicate client 44 // information. 45 xGoogHeaderKey = "x-goog-api-client" 46) 47 48const ( 49 // Scope is the scope for Cloud Spanner Data API. 50 Scope = "https://www.googleapis.com/auth/spanner.data" 51 52 // AdminScope is the scope for Cloud Spanner Admin APIs. 53 AdminScope = "https://www.googleapis.com/auth/spanner.admin" 54) 55 56var ( 57 validDBPattern = regexp.MustCompile("^projects/[^/]+/instances/[^/]+/databases/[^/]+$") 58 xGoogHeaderVal = fmt.Sprintf("gl-go/%s gccl/%s grpc/%s", version.Go(), version.Repo, grpc.Version) 59) 60 61func validDatabaseName(db string) error { 62 if matched := validDBPattern.MatchString(db); !matched { 63 return fmt.Errorf("database name %q should conform to pattern %q", 64 db, validDBPattern.String()) 65 } 66 return nil 67} 68 69// Client is a client for reading and writing data to a Cloud Spanner database. 70// A client is safe to use concurrently, except for its Close method. 71type Client struct { 72 // rr must be accessed through atomic operations. 73 rr uint32 74 conns []*grpc.ClientConn 75 clients []sppb.SpannerClient 76 77 database string 78 // Metadata to be sent with each request. 79 md metadata.MD 80 idleSessions *sessionPool 81 // sessionLabels for the sessions created by this client. 82 sessionLabels map[string]string 83} 84 85// ClientConfig has configurations for the client. 86type ClientConfig struct { 87 // NumChannels is the number of gRPC channels. 88 // If zero, a reasonable default is used based on the execution environment. 89 NumChannels int 90 91 // SessionPoolConfig is the configuration for session pool. 92 SessionPoolConfig 93 94 // SessionLabels for the sessions created by this client. 95 // See https://cloud.google.com/spanner/docs/reference/rpc/google.spanner.v1#session 96 // for more info. 97 SessionLabels map[string]string 98} 99 100// errDial returns error for dialing to Cloud Spanner. 101func errDial(ci int, err error) error { 102 e := toSpannerError(err).(*Error) 103 e.decorate(fmt.Sprintf("dialing fails for channel[%v]", ci)) 104 return e 105} 106 107func contextWithOutgoingMetadata(ctx context.Context, md metadata.MD) context.Context { 108 existing, ok := metadata.FromOutgoingContext(ctx) 109 if ok { 110 md = metadata.Join(existing, md) 111 } 112 return metadata.NewOutgoingContext(ctx, md) 113} 114 115// NewClient creates a client to a database. A valid database name has the 116// form projects/PROJECT_ID/instances/INSTANCE_ID/databases/DATABASE_ID. It uses 117// a default configuration. 118func NewClient(ctx context.Context, database string, opts ...option.ClientOption) (*Client, error) { 119 return NewClientWithConfig(ctx, database, ClientConfig{}, opts...) 120} 121 122// NewClientWithConfig creates a client to a database. A valid database name has 123// the form projects/PROJECT_ID/instances/INSTANCE_ID/databases/DATABASE_ID. 124func NewClientWithConfig(ctx context.Context, database string, config ClientConfig, opts ...option.ClientOption) (c *Client, err error) { 125 ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.NewClient") 126 defer func() { trace.EndSpan(ctx, err) }() 127 128 // Validate database path. 129 if err := validDatabaseName(database); err != nil { 130 return nil, err 131 } 132 c = &Client{ 133 database: database, 134 md: metadata.Pairs( 135 resourcePrefixHeader, database, 136 xGoogHeaderKey, xGoogHeaderVal), 137 } 138 139 // Make a copy of labels. 140 c.sessionLabels = make(map[string]string) 141 for k, v := range config.SessionLabels { 142 c.sessionLabels[k] = v 143 } 144 145 // gRPC options. 146 allOpts := []option.ClientOption{ 147 option.WithEndpoint(endpoint), 148 option.WithScopes(Scope), 149 option.WithGRPCDialOption( 150 grpc.WithDefaultCallOptions( 151 grpc.MaxCallSendMsgSize(100<<20), 152 grpc.MaxCallRecvMsgSize(100<<20), 153 ), 154 ), 155 } 156 allOpts = append(allOpts, opts...) 157 158 // Prepare gRPC channels. 159 if config.NumChannels == 0 { 160 config.NumChannels = numChannels 161 } 162 163 // Default configs for session pool. 164 if config.MaxOpened == 0 { 165 config.MaxOpened = uint64(config.NumChannels * 100) 166 } 167 if config.MaxBurst == 0 { 168 config.MaxBurst = 10 169 } 170 171 // TODO(deklerk): This should be replaced with a balancer with 172 // config.NumChannels connections, instead of config.NumChannels 173 // clientconns. 174 for i := 0; i < config.NumChannels; i++ { 175 conn, err := gtransport.Dial(ctx, allOpts...) 176 if err != nil { 177 return nil, errDial(i, err) 178 } 179 c.conns = append(c.conns, conn) 180 c.clients = append(c.clients, sppb.NewSpannerClient(conn)) 181 } 182 183 // Prepare session pool. 184 config.SessionPoolConfig.getRPCClient = func() (sppb.SpannerClient, error) { 185 // TODO: support more loadbalancing options. 186 return c.rrNext(), nil 187 } 188 config.SessionPoolConfig.sessionLabels = c.sessionLabels 189 sp, err := newSessionPool(database, config.SessionPoolConfig, c.md) 190 if err != nil { 191 c.Close() 192 return nil, err 193 } 194 c.idleSessions = sp 195 return c, nil 196} 197 198// rrNext returns the next available Cloud Spanner RPC client in a round-robin 199// manner. 200func (c *Client) rrNext() sppb.SpannerClient { 201 return c.clients[atomic.AddUint32(&c.rr, 1)%uint32(len(c.clients))] 202} 203 204// Close closes the client. 205func (c *Client) Close() { 206 if c.idleSessions != nil { 207 c.idleSessions.close() 208 } 209 for _, conn := range c.conns { 210 conn.Close() 211 } 212} 213 214// Single provides a read-only snapshot transaction optimized for the case 215// where only a single read or query is needed. This is more efficient than 216// using ReadOnlyTransaction() for a single read or query. 217// 218// Single will use a strong TimestampBound by default. Use 219// ReadOnlyTransaction.WithTimestampBound to specify a different 220// TimestampBound. A non-strong bound can be used to reduce latency, or 221// "time-travel" to prior versions of the database, see the documentation of 222// TimestampBound for details. 223func (c *Client) Single() *ReadOnlyTransaction { 224 t := &ReadOnlyTransaction{singleUse: true, sp: c.idleSessions} 225 t.txReadOnly.txReadEnv = t 226 return t 227} 228 229// ReadOnlyTransaction returns a ReadOnlyTransaction that can be used for 230// multiple reads from the database. You must call Close() when the 231// ReadOnlyTransaction is no longer needed to release resources on the server. 232// 233// ReadOnlyTransaction will use a strong TimestampBound by default. Use 234// ReadOnlyTransaction.WithTimestampBound to specify a different 235// TimestampBound. A non-strong bound can be used to reduce latency, or 236// "time-travel" to prior versions of the database, see the documentation of 237// TimestampBound for details. 238func (c *Client) ReadOnlyTransaction() *ReadOnlyTransaction { 239 t := &ReadOnlyTransaction{ 240 singleUse: false, 241 sp: c.idleSessions, 242 txReadyOrClosed: make(chan struct{}), 243 } 244 t.txReadOnly.txReadEnv = t 245 return t 246} 247 248// BatchReadOnlyTransaction returns a BatchReadOnlyTransaction that can be used 249// for partitioned reads or queries from a snapshot of the database. This is 250// useful in batch processing pipelines where one wants to divide the work of 251// reading from the database across multiple machines. 252// 253// Note: This transaction does not use the underlying session pool but creates a 254// new session each time, and the session is reused across clients. 255// 256// You should call Close() after the txn is no longer needed on local 257// client, and call Cleanup() when the txn is finished for all clients, to free 258// the session. 259func (c *Client) BatchReadOnlyTransaction(ctx context.Context, tb TimestampBound) (*BatchReadOnlyTransaction, error) { 260 var ( 261 tx transactionID 262 rts time.Time 263 s *session 264 sh *sessionHandle 265 err error 266 ) 267 defer func() { 268 if err != nil && sh != nil { 269 s.delete(ctx) 270 } 271 }() 272 273 // Create session. 274 sc := c.rrNext() 275 s, err = createSession(ctx, sc, c.database, c.sessionLabels, c.md) 276 if err != nil { 277 return nil, err 278 } 279 sh = &sessionHandle{session: s} 280 281 // Begin transaction. 282 err = runRetryable(contextWithOutgoingMetadata(ctx, sh.getMetadata()), func(ctx context.Context) error { 283 res, e := sh.getClient().BeginTransaction(ctx, &sppb.BeginTransactionRequest{ 284 Session: sh.getID(), 285 Options: &sppb.TransactionOptions{ 286 Mode: &sppb.TransactionOptions_ReadOnly_{ 287 ReadOnly: buildTransactionOptionsReadOnly(tb, true), 288 }, 289 }, 290 }) 291 if e != nil { 292 return e 293 } 294 tx = res.Id 295 if res.ReadTimestamp != nil { 296 rts = time.Unix(res.ReadTimestamp.Seconds, int64(res.ReadTimestamp.Nanos)) 297 } 298 return nil 299 }) 300 if err != nil { 301 return nil, err 302 } 303 304 t := &BatchReadOnlyTransaction{ 305 ReadOnlyTransaction: ReadOnlyTransaction{ 306 tx: tx, 307 txReadyOrClosed: make(chan struct{}), 308 state: txActive, 309 sh: sh, 310 rts: rts, 311 }, 312 ID: BatchReadOnlyTransactionID{ 313 tid: tx, 314 sid: sh.getID(), 315 rts: rts, 316 }, 317 } 318 t.txReadOnly.txReadEnv = t 319 return t, nil 320} 321 322// BatchReadOnlyTransactionFromID reconstruct a BatchReadOnlyTransaction from 323// BatchReadOnlyTransactionID 324func (c *Client) BatchReadOnlyTransactionFromID(tid BatchReadOnlyTransactionID) *BatchReadOnlyTransaction { 325 sc := c.rrNext() 326 s := &session{valid: true, client: sc, id: tid.sid, createTime: time.Now(), md: c.md} 327 sh := &sessionHandle{session: s} 328 329 t := &BatchReadOnlyTransaction{ 330 ReadOnlyTransaction: ReadOnlyTransaction{ 331 tx: tid.tid, 332 txReadyOrClosed: make(chan struct{}), 333 state: txActive, 334 sh: sh, 335 rts: tid.rts, 336 }, 337 ID: tid, 338 } 339 t.txReadOnly.txReadEnv = t 340 return t 341} 342 343type transactionInProgressKey struct{} 344 345func checkNestedTxn(ctx context.Context) error { 346 if ctx.Value(transactionInProgressKey{}) != nil { 347 return spannerErrorf(codes.FailedPrecondition, "Cloud Spanner does not support nested transactions") 348 } 349 return nil 350} 351 352// ReadWriteTransaction executes a read-write transaction, with retries as 353// necessary. 354// 355// The function f will be called one or more times. It must not maintain 356// any state between calls. 357// 358// If the transaction cannot be committed or if f returns an ABORTED error, 359// ReadWriteTransaction will call f again. It will continue to call f until the 360// transaction can be committed or the Context times out or is cancelled. If f 361// returns an error other than ABORTED, ReadWriteTransaction will abort the 362// transaction and return the error. 363// 364// To limit the number of retries, set a deadline on the Context rather than 365// using a fixed limit on the number of attempts. ReadWriteTransaction will 366// retry as needed until that deadline is met. 367// 368// See https://godoc.org/cloud.google.com/go/spanner#ReadWriteTransaction for 369// more details. 370func (c *Client) ReadWriteTransaction(ctx context.Context, f func(context.Context, *ReadWriteTransaction) error) (commitTimestamp time.Time, err error) { 371 ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.ReadWriteTransaction") 372 defer func() { trace.EndSpan(ctx, err) }() 373 if err := checkNestedTxn(ctx); err != nil { 374 return time.Time{}, err 375 } 376 var ( 377 ts time.Time 378 sh *sessionHandle 379 ) 380 err = runRetryableNoWrap(ctx, func(ctx context.Context) error { 381 var ( 382 err error 383 t *ReadWriteTransaction 384 ) 385 if sh == nil || sh.getID() == "" || sh.getClient() == nil { 386 // Session handle hasn't been allocated or has been destroyed. 387 sh, err = c.idleSessions.takeWriteSession(ctx) 388 if err != nil { 389 // If session retrieval fails, just fail the transaction. 390 return err 391 } 392 t = &ReadWriteTransaction{ 393 sh: sh, 394 tx: sh.getTransactionID(), 395 } 396 } else { 397 t = &ReadWriteTransaction{ 398 sh: sh, 399 } 400 } 401 t.txReadOnly.txReadEnv = t 402 trace.TracePrintf(ctx, map[string]interface{}{"transactionID": string(sh.getTransactionID())}, 403 "Starting transaction attempt") 404 if err = t.begin(ctx); err != nil { 405 // Mask error from begin operation as retryable error. 406 return errRetry(err) 407 } 408 ts, err = t.runInTransaction(ctx, f) 409 return err 410 }) 411 if sh != nil { 412 sh.recycle() 413 } 414 return ts, err 415} 416 417// applyOption controls the behavior of Client.Apply. 418type applyOption struct { 419 // If atLeastOnce == true, Client.Apply will execute the mutations on Cloud 420 // Spanner at least once. 421 atLeastOnce bool 422} 423 424// An ApplyOption is an optional argument to Apply. 425type ApplyOption func(*applyOption) 426 427// ApplyAtLeastOnce returns an ApplyOption that removes replay protection. 428// 429// With this option, Apply may attempt to apply mutations more than once; if 430// the mutations are not idempotent, this may lead to a failure being reported 431// when the mutation was applied more than once. For example, an insert may 432// fail with ALREADY_EXISTS even though the row did not exist before Apply was 433// called. For this reason, most users of the library will prefer not to use 434// this option. However, ApplyAtLeastOnce requires only a single RPC, whereas 435// Apply's default replay protection may require an additional RPC. So this 436// option may be appropriate for latency sensitive and/or high throughput blind 437// writing. 438func ApplyAtLeastOnce() ApplyOption { 439 return func(ao *applyOption) { 440 ao.atLeastOnce = true 441 } 442} 443 444// Apply applies a list of mutations atomically to the database. 445func (c *Client) Apply(ctx context.Context, ms []*Mutation, opts ...ApplyOption) (commitTimestamp time.Time, err error) { 446 ao := &applyOption{} 447 for _, opt := range opts { 448 opt(ao) 449 } 450 if !ao.atLeastOnce { 451 return c.ReadWriteTransaction(ctx, func(ctx context.Context, t *ReadWriteTransaction) error { 452 return t.BufferWrite(ms) 453 }) 454 } 455 456 ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.Apply") 457 defer func() { trace.EndSpan(ctx, err) }() 458 t := &writeOnlyTransaction{c.idleSessions} 459 return t.applyAtLeastOnce(ctx, ms...) 460} 461