1/*
2Copyright 2017 Google LLC
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package spanner
18
19import (
20	"context"
21	"fmt"
22	"log"
23	"os"
24	"regexp"
25	"time"
26
27	"cloud.google.com/go/internal/trace"
28	"google.golang.org/api/option"
29	gtransport "google.golang.org/api/transport/grpc"
30	sppb "google.golang.org/genproto/googleapis/spanner/v1"
31	"google.golang.org/grpc"
32	"google.golang.org/grpc/codes"
33	"google.golang.org/grpc/metadata"
34)
35
36const (
37	endpoint = "spanner.googleapis.com:443"
38
39	// resourcePrefixHeader is the name of the metadata header used to indicate
40	// the resource being operated on.
41	resourcePrefixHeader = "google-cloud-resource-prefix"
42)
43
44const (
45	// Scope is the scope for Cloud Spanner Data API.
46	Scope = "https://www.googleapis.com/auth/spanner.data"
47
48	// AdminScope is the scope for Cloud Spanner Admin APIs.
49	AdminScope = "https://www.googleapis.com/auth/spanner.admin"
50)
51
52var (
53	validDBPattern = regexp.MustCompile("^projects/(?P<project>[^/]+)/instances/(?P<instance>[^/]+)/databases/(?P<database>[^/]+)$")
54)
55
56func validDatabaseName(db string) error {
57	if matched := validDBPattern.MatchString(db); !matched {
58		return fmt.Errorf("database name %q should conform to pattern %q",
59			db, validDBPattern.String())
60	}
61	return nil
62}
63
64func parseDatabaseName(db string) (project, instance, database string, err error) {
65	matches := validDBPattern.FindStringSubmatch(db)
66	if len(matches) == 0 {
67		return "", "", "", fmt.Errorf("Failed to parse database name from %q according to pattern %q",
68			db, validDBPattern.String())
69	}
70	return matches[1], matches[2], matches[3], nil
71}
72
73// Client is a client for reading and writing data to a Cloud Spanner database.
74// A client is safe to use concurrently, except for its Close method.
75type Client struct {
76	sc           *sessionClient
77	idleSessions *sessionPool
78	logger       *log.Logger
79	qo           QueryOptions
80}
81
82// ClientConfig has configurations for the client.
83type ClientConfig struct {
84	// NumChannels is the number of gRPC channels.
85	// If zero, a reasonable default is used based on the execution environment.
86	//
87	// Deprecated: The Spanner client now uses a pool of gRPC connections. Use
88	// option.WithGRPCConnectionPool(numConns) instead to specify the number of
89	// connections the client should use. The client will default to a
90	// reasonable default if this option is not specified.
91	NumChannels int
92
93	// SessionPoolConfig is the configuration for session pool.
94	SessionPoolConfig
95
96	// SessionLabels for the sessions created by this client.
97	// See https://cloud.google.com/spanner/docs/reference/rpc/google.spanner.v1#session
98	// for more info.
99	SessionLabels map[string]string
100
101	// QueryOptions is the configuration for executing a sql query.
102	QueryOptions QueryOptions
103
104	// logger is the logger to use for this client. If it is nil, all logging
105	// will be directed to the standard logger.
106	logger *log.Logger
107}
108
109// errDial returns error for dialing to Cloud Spanner.
110func errDial(ci int, err error) error {
111	e := toSpannerError(err).(*Error)
112	e.decorate(fmt.Sprintf("dialing fails for channel[%v]", ci))
113	return e
114}
115
116func contextWithOutgoingMetadata(ctx context.Context, md metadata.MD) context.Context {
117	existing, ok := metadata.FromOutgoingContext(ctx)
118	if ok {
119		md = metadata.Join(existing, md)
120	}
121	return metadata.NewOutgoingContext(ctx, md)
122}
123
124// NewClient creates a client to a database. A valid database name has the
125// form projects/PROJECT_ID/instances/INSTANCE_ID/databases/DATABASE_ID. It uses
126// a default configuration.
127func NewClient(ctx context.Context, database string, opts ...option.ClientOption) (*Client, error) {
128	return NewClientWithConfig(ctx, database, ClientConfig{SessionPoolConfig: DefaultSessionPoolConfig}, opts...)
129}
130
131// NewClientWithConfig creates a client to a database. A valid database name has
132// the form projects/PROJECT_ID/instances/INSTANCE_ID/databases/DATABASE_ID.
133func NewClientWithConfig(ctx context.Context, database string, config ClientConfig, opts ...option.ClientOption) (c *Client, err error) {
134	// Validate database path.
135	if err := validDatabaseName(database); err != nil {
136		return nil, err
137	}
138
139	ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.NewClient")
140	defer func() { trace.EndSpan(ctx, err) }()
141
142	// Append emulator options if SPANNER_EMULATOR_HOST has been set.
143	if emulatorAddr := os.Getenv("SPANNER_EMULATOR_HOST"); emulatorAddr != "" {
144		emulatorOpts := []option.ClientOption{
145			option.WithEndpoint(emulatorAddr),
146			option.WithGRPCDialOption(grpc.WithInsecure()),
147			option.WithoutAuthentication(),
148		}
149		opts = append(emulatorOpts, opts...)
150	}
151
152	// Prepare gRPC channels.
153	configuredNumChannels := config.NumChannels
154	if config.NumChannels == 0 {
155		config.NumChannels = numChannels
156	}
157	// gRPC options.
158	allOpts := []option.ClientOption{
159		option.WithEndpoint(endpoint),
160		option.WithScopes(Scope),
161		option.WithGRPCDialOption(
162			grpc.WithDefaultCallOptions(
163				grpc.MaxCallSendMsgSize(100<<20),
164				grpc.MaxCallRecvMsgSize(100<<20),
165			),
166		),
167		option.WithGRPCConnectionPool(config.NumChannels),
168	}
169	// opts will take precedence above allOpts, as the values in opts will be
170	// applied after the values in allOpts.
171	allOpts = append(allOpts, opts...)
172	pool, err := gtransport.DialPool(ctx, allOpts...)
173	if err != nil {
174		return nil, err
175	}
176	if configuredNumChannels > 0 && pool.Num() != config.NumChannels {
177		pool.Close()
178		return nil, spannerErrorf(codes.InvalidArgument, "Connection pool mismatch: NumChannels=%v, WithGRPCConnectionPool=%v. Only set one of these options, or set both to the same value.", config.NumChannels, pool.Num())
179	}
180
181	// TODO(loite): Remove as the original map cannot be changed by the user
182	// anyways, and the client library is also not changing it.
183	// Make a copy of labels.
184	sessionLabels := make(map[string]string)
185	for k, v := range config.SessionLabels {
186		sessionLabels[k] = v
187	}
188
189	// Default configs for session pool.
190	if config.MaxOpened == 0 {
191		config.MaxOpened = uint64(pool.Num() * 100)
192	}
193	if config.MaxBurst == 0 {
194		config.MaxBurst = DefaultSessionPoolConfig.MaxBurst
195	}
196	if config.incStep == 0 {
197		config.incStep = DefaultSessionPoolConfig.incStep
198	}
199	// Create a session client.
200	sc := newSessionClient(pool, database, sessionLabels, metadata.Pairs(resourcePrefixHeader, database), config.logger)
201	// Create a session pool.
202	config.SessionPoolConfig.sessionLabels = sessionLabels
203	sp, err := newSessionPool(sc, config.SessionPoolConfig)
204	if err != nil {
205		sc.close()
206		return nil, err
207	}
208	c = &Client{
209		sc:           sc,
210		idleSessions: sp,
211		logger:       config.logger,
212		qo:           getQueryOptions(config.QueryOptions),
213	}
214	return c, nil
215}
216
217// getQueryOptions returns the query options overwritten by the environment
218// variables if exist. The input parameter is the query options set by users
219// via application-level configuration. If the environment variables are set,
220// this will return the overwritten query options.
221func getQueryOptions(opts QueryOptions) QueryOptions {
222	opv := os.Getenv("SPANNER_OPTIMIZER_VERSION")
223	if opv != "" {
224		if opts.Options == nil {
225			opts.Options = &sppb.ExecuteSqlRequest_QueryOptions{}
226		}
227		opts.Options.OptimizerVersion = opv
228	}
229	return opts
230}
231
232// Close closes the client.
233func (c *Client) Close() {
234	if c.idleSessions != nil {
235		c.idleSessions.close()
236	}
237	c.sc.close()
238}
239
240// Single provides a read-only snapshot transaction optimized for the case
241// where only a single read or query is needed.  This is more efficient than
242// using ReadOnlyTransaction() for a single read or query.
243//
244// Single will use a strong TimestampBound by default. Use
245// ReadOnlyTransaction.WithTimestampBound to specify a different
246// TimestampBound. A non-strong bound can be used to reduce latency, or
247// "time-travel" to prior versions of the database, see the documentation of
248// TimestampBound for details.
249func (c *Client) Single() *ReadOnlyTransaction {
250	t := &ReadOnlyTransaction{singleUse: true}
251	t.txReadOnly.sp = c.idleSessions
252	t.txReadOnly.txReadEnv = t
253	t.txReadOnly.qo = c.qo
254	t.txReadOnly.replaceSessionFunc = func(ctx context.Context) error {
255		if t.sh == nil {
256			return spannerErrorf(codes.InvalidArgument, "missing session handle on transaction")
257		}
258		// Remove the session that returned 'Session not found' from the pool.
259		t.sh.destroy()
260		// Reset the transaction, acquire a new session and retry.
261		t.state = txNew
262		sh, _, err := t.acquire(ctx)
263		if err != nil {
264			return err
265		}
266		t.sh = sh
267		return nil
268	}
269	return t
270}
271
272// ReadOnlyTransaction returns a ReadOnlyTransaction that can be used for
273// multiple reads from the database.  You must call Close() when the
274// ReadOnlyTransaction is no longer needed to release resources on the server.
275//
276// ReadOnlyTransaction will use a strong TimestampBound by default.  Use
277// ReadOnlyTransaction.WithTimestampBound to specify a different
278// TimestampBound.  A non-strong bound can be used to reduce latency, or
279// "time-travel" to prior versions of the database, see the documentation of
280// TimestampBound for details.
281func (c *Client) ReadOnlyTransaction() *ReadOnlyTransaction {
282	t := &ReadOnlyTransaction{
283		singleUse:       false,
284		txReadyOrClosed: make(chan struct{}),
285	}
286	t.txReadOnly.sp = c.idleSessions
287	t.txReadOnly.txReadEnv = t
288	t.txReadOnly.qo = c.qo
289	return t
290}
291
292// BatchReadOnlyTransaction returns a BatchReadOnlyTransaction that can be used
293// for partitioned reads or queries from a snapshot of the database. This is
294// useful in batch processing pipelines where one wants to divide the work of
295// reading from the database across multiple machines.
296//
297// Note: This transaction does not use the underlying session pool but creates a
298// new session each time, and the session is reused across clients.
299//
300// You should call Close() after the txn is no longer needed on local
301// client, and call Cleanup() when the txn is finished for all clients, to free
302// the session.
303func (c *Client) BatchReadOnlyTransaction(ctx context.Context, tb TimestampBound) (*BatchReadOnlyTransaction, error) {
304	var (
305		tx  transactionID
306		rts time.Time
307		s   *session
308		sh  *sessionHandle
309		err error
310	)
311	defer func() {
312		if err != nil && sh != nil {
313			s.delete(ctx)
314		}
315	}()
316
317	// Create session.
318	s, err = c.sc.createSession(ctx)
319	if err != nil {
320		return nil, err
321	}
322	sh = &sessionHandle{session: s}
323
324	// Begin transaction.
325	res, err := sh.getClient().BeginTransaction(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.BeginTransactionRequest{
326		Session: sh.getID(),
327		Options: &sppb.TransactionOptions{
328			Mode: &sppb.TransactionOptions_ReadOnly_{
329				ReadOnly: buildTransactionOptionsReadOnly(tb, true),
330			},
331		},
332	})
333	if err != nil {
334		return nil, toSpannerError(err)
335	}
336	tx = res.Id
337	if res.ReadTimestamp != nil {
338		rts = time.Unix(res.ReadTimestamp.Seconds, int64(res.ReadTimestamp.Nanos))
339	}
340
341	t := &BatchReadOnlyTransaction{
342		ReadOnlyTransaction: ReadOnlyTransaction{
343			tx:              tx,
344			txReadyOrClosed: make(chan struct{}),
345			state:           txActive,
346			rts:             rts,
347		},
348		ID: BatchReadOnlyTransactionID{
349			tid: tx,
350			sid: sh.getID(),
351			rts: rts,
352		},
353	}
354	t.txReadOnly.sh = sh
355	t.txReadOnly.txReadEnv = t
356	t.txReadOnly.qo = c.qo
357	return t, nil
358}
359
360// BatchReadOnlyTransactionFromID reconstruct a BatchReadOnlyTransaction from
361// BatchReadOnlyTransactionID
362func (c *Client) BatchReadOnlyTransactionFromID(tid BatchReadOnlyTransactionID) *BatchReadOnlyTransaction {
363	s, err := c.sc.sessionWithID(tid.sid)
364	if err != nil {
365		logf(c.logger, "unexpected error: %v\nThis is an indication of an internal error in the Spanner client library.", err)
366		// Use an invalid session. Preferably, this method should just return
367		// the error instead of this, but that would mean an API change.
368		s = &session{}
369	}
370	sh := &sessionHandle{session: s}
371
372	t := &BatchReadOnlyTransaction{
373		ReadOnlyTransaction: ReadOnlyTransaction{
374			tx:              tid.tid,
375			txReadyOrClosed: make(chan struct{}),
376			state:           txActive,
377			rts:             tid.rts,
378		},
379		ID: tid,
380	}
381	t.txReadOnly.sh = sh
382	t.txReadOnly.txReadEnv = t
383	t.txReadOnly.qo = c.qo
384	return t
385}
386
387type transactionInProgressKey struct{}
388
389func checkNestedTxn(ctx context.Context) error {
390	if ctx.Value(transactionInProgressKey{}) != nil {
391		return spannerErrorf(codes.FailedPrecondition, "Cloud Spanner does not support nested transactions")
392	}
393	return nil
394}
395
396// ReadWriteTransaction executes a read-write transaction, with retries as
397// necessary.
398//
399// The function f will be called one or more times. It must not maintain
400// any state between calls.
401//
402// If the transaction cannot be committed or if f returns an ABORTED error,
403// ReadWriteTransaction will call f again. It will continue to call f until the
404// transaction can be committed or the Context times out or is cancelled.  If f
405// returns an error other than ABORTED, ReadWriteTransaction will abort the
406// transaction and return the error.
407//
408// To limit the number of retries, set a deadline on the Context rather than
409// using a fixed limit on the number of attempts. ReadWriteTransaction will
410// retry as needed until that deadline is met.
411//
412// See https://godoc.org/cloud.google.com/go/spanner#ReadWriteTransaction for
413// more details.
414func (c *Client) ReadWriteTransaction(ctx context.Context, f func(context.Context, *ReadWriteTransaction) error) (commitTimestamp time.Time, err error) {
415	ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.ReadWriteTransaction")
416	defer func() { trace.EndSpan(ctx, err) }()
417	if err := checkNestedTxn(ctx); err != nil {
418		return time.Time{}, err
419	}
420	var (
421		ts time.Time
422		sh *sessionHandle
423	)
424	err = runWithRetryOnAbortedOrSessionNotFound(ctx, func(ctx context.Context) error {
425		var (
426			err error
427			t   *ReadWriteTransaction
428		)
429		if sh == nil || sh.getID() == "" || sh.getClient() == nil {
430			// Session handle hasn't been allocated or has been destroyed.
431			sh, err = c.idleSessions.takeWriteSession(ctx)
432			if err != nil {
433				// If session retrieval fails, just fail the transaction.
434				return err
435			}
436			t = &ReadWriteTransaction{
437				tx: sh.getTransactionID(),
438			}
439		} else {
440			t = &ReadWriteTransaction{}
441		}
442		t.txReadOnly.sh = sh
443		t.txReadOnly.txReadEnv = t
444		t.txReadOnly.qo = c.qo
445		trace.TracePrintf(ctx, map[string]interface{}{"transactionID": string(sh.getTransactionID())},
446			"Starting transaction attempt")
447		if err = t.begin(ctx); err != nil {
448			return err
449		}
450		ts, err = t.runInTransaction(ctx, f)
451		return err
452	})
453	if sh != nil {
454		sh.recycle()
455	}
456	return ts, err
457}
458
459// applyOption controls the behavior of Client.Apply.
460type applyOption struct {
461	// If atLeastOnce == true, Client.Apply will execute the mutations on Cloud
462	// Spanner at least once.
463	atLeastOnce bool
464}
465
466// An ApplyOption is an optional argument to Apply.
467type ApplyOption func(*applyOption)
468
469// ApplyAtLeastOnce returns an ApplyOption that removes replay protection.
470//
471// With this option, Apply may attempt to apply mutations more than once; if
472// the mutations are not idempotent, this may lead to a failure being reported
473// when the mutation was applied more than once. For example, an insert may
474// fail with ALREADY_EXISTS even though the row did not exist before Apply was
475// called. For this reason, most users of the library will prefer not to use
476// this option.  However, ApplyAtLeastOnce requires only a single RPC, whereas
477// Apply's default replay protection may require an additional RPC.  So this
478// option may be appropriate for latency sensitive and/or high throughput blind
479// writing.
480func ApplyAtLeastOnce() ApplyOption {
481	return func(ao *applyOption) {
482		ao.atLeastOnce = true
483	}
484}
485
486// Apply applies a list of mutations atomically to the database.
487func (c *Client) Apply(ctx context.Context, ms []*Mutation, opts ...ApplyOption) (commitTimestamp time.Time, err error) {
488	ao := &applyOption{}
489	for _, opt := range opts {
490		opt(ao)
491	}
492
493	ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.Apply")
494	defer func() { trace.EndSpan(ctx, err) }()
495
496	if !ao.atLeastOnce {
497		return c.ReadWriteTransaction(ctx, func(ctx context.Context, t *ReadWriteTransaction) error {
498			return t.BufferWrite(ms)
499		})
500	}
501	t := &writeOnlyTransaction{c.idleSessions}
502	return t.applyAtLeastOnce(ctx, ms...)
503}
504
505// logf logs the given message to the given logger, or the standard logger if
506// the given logger is nil.
507func logf(logger *log.Logger, format string, v ...interface{}) {
508	if logger == nil {
509		log.Printf(format, v...)
510	} else {
511		logger.Printf(format, v...)
512	}
513}
514