1/*
2Copyright 2017 Google LLC
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package spanner
18
19import (
20	"context"
21	"fmt"
22	"log"
23	"os"
24	"regexp"
25	"time"
26
27	"cloud.google.com/go/internal/trace"
28	instance "cloud.google.com/go/spanner/admin/instance/apiv1"
29	"google.golang.org/api/option"
30	gtransport "google.golang.org/api/transport/grpc"
31	instancepb "google.golang.org/genproto/googleapis/spanner/admin/instance/v1"
32	sppb "google.golang.org/genproto/googleapis/spanner/v1"
33	field_mask "google.golang.org/genproto/protobuf/field_mask"
34	"google.golang.org/grpc"
35	"google.golang.org/grpc/codes"
36	"google.golang.org/grpc/metadata"
37	"google.golang.org/grpc/status"
38)
39
40const (
41	endpoint = "spanner.googleapis.com:443"
42
43	// resourcePrefixHeader is the name of the metadata header used to indicate
44	// the resource being operated on.
45	resourcePrefixHeader = "google-cloud-resource-prefix"
46)
47
48const (
49	// Scope is the scope for Cloud Spanner Data API.
50	Scope = "https://www.googleapis.com/auth/spanner.data"
51
52	// AdminScope is the scope for Cloud Spanner Admin APIs.
53	AdminScope = "https://www.googleapis.com/auth/spanner.admin"
54)
55
56var (
57	validDBPattern       = regexp.MustCompile("^projects/(?P<project>[^/]+)/instances/(?P<instance>[^/]+)/databases/(?P<database>[^/]+)$")
58	validInstancePattern = regexp.MustCompile("^projects/(?P<project>[^/]+)/instances/(?P<instance>[^/]+)")
59)
60
61func validDatabaseName(db string) error {
62	if matched := validDBPattern.MatchString(db); !matched {
63		return fmt.Errorf("database name %q should conform to pattern %q",
64			db, validDBPattern.String())
65	}
66	return nil
67}
68
69func getInstanceName(db string) (string, error) {
70	matches := validInstancePattern.FindStringSubmatch(db)
71	if len(matches) == 0 {
72		return "", fmt.Errorf("Failed to retrieve instance name from %q according to pattern %q",
73			db, validInstancePattern.String())
74	}
75	return matches[0], nil
76}
77
78func parseDatabaseName(db string) (project, instance, database string, err error) {
79	matches := validDBPattern.FindStringSubmatch(db)
80	if len(matches) == 0 {
81		return "", "", "", fmt.Errorf("Failed to parse database name from %q according to pattern %q",
82			db, validDBPattern.String())
83	}
84	return matches[1], matches[2], matches[3], nil
85}
86
87// Client is a client for reading and writing data to a Cloud Spanner database.
88// A client is safe to use concurrently, except for its Close method.
89type Client struct {
90	sc           *sessionClient
91	idleSessions *sessionPool
92	logger       *log.Logger
93	qo           QueryOptions
94}
95
96// ClientConfig has configurations for the client.
97type ClientConfig struct {
98	// NumChannels is the number of gRPC channels.
99	// If zero, a reasonable default is used based on the execution environment.
100	//
101	// Deprecated: The Spanner client now uses a pool of gRPC connections. Use
102	// option.WithGRPCConnectionPool(numConns) instead to specify the number of
103	// connections the client should use. The client will default to a
104	// reasonable default if this option is not specified.
105	NumChannels int
106
107	// SessionPoolConfig is the configuration for session pool.
108	SessionPoolConfig
109
110	// SessionLabels for the sessions created by this client.
111	// See https://cloud.google.com/spanner/docs/reference/rpc/google.spanner.v1#session
112	// for more info.
113	SessionLabels map[string]string
114
115	// QueryOptions is the configuration for executing a sql query.
116	QueryOptions QueryOptions
117
118	// logger is the logger to use for this client. If it is nil, all logging
119	// will be directed to the standard logger.
120	logger *log.Logger
121}
122
123// errDial returns error for dialing to Cloud Spanner.
124func errDial(ci int, err error) error {
125	e := toSpannerError(err).(*Error)
126	e.decorate(fmt.Sprintf("dialing fails for channel[%v]", ci))
127	return e
128}
129
130func contextWithOutgoingMetadata(ctx context.Context, md metadata.MD) context.Context {
131	existing, ok := metadata.FromOutgoingContext(ctx)
132	if ok {
133		md = metadata.Join(existing, md)
134	}
135	return metadata.NewOutgoingContext(ctx, md)
136}
137
138// getInstanceEndpoint returns an instance-specific endpoint if one exists. If
139// multiple endpoints exist, it returns the first one.
140func getInstanceEndpoint(ctx context.Context, database string, opts ...option.ClientOption) (string, error) {
141	instanceName, err := getInstanceName(database)
142	if err != nil {
143		return "", fmt.Errorf("Failed to resolve endpoint: %v", err)
144	}
145
146	c, err := instance.NewInstanceAdminClient(ctx, opts...)
147	if err != nil {
148		return "", err
149	}
150	defer c.Close()
151
152	req := &instancepb.GetInstanceRequest{
153		Name: instanceName,
154		FieldMask: &field_mask.FieldMask{
155			Paths: []string{"endpoint_uris"},
156		},
157	}
158
159	resp, err := c.GetInstance(ctx, req)
160	if err != nil {
161		return "", err
162	}
163
164	endpointURIs := resp.GetEndpointUris()
165
166	if len(endpointURIs) > 0 {
167		return endpointURIs[0], nil
168	}
169
170	// Return empty string when no endpoints exist.
171	return "", nil
172}
173
174// NewClient creates a client to a database. A valid database name has the
175// form projects/PROJECT_ID/instances/INSTANCE_ID/databases/DATABASE_ID. It uses
176// a default configuration.
177func NewClient(ctx context.Context, database string, opts ...option.ClientOption) (*Client, error) {
178	return NewClientWithConfig(ctx, database, ClientConfig{SessionPoolConfig: DefaultSessionPoolConfig}, opts...)
179}
180
181// NewClientWithConfig creates a client to a database. A valid database name has
182// the form projects/PROJECT_ID/instances/INSTANCE_ID/databases/DATABASE_ID.
183func NewClientWithConfig(ctx context.Context, database string, config ClientConfig, opts ...option.ClientOption) (c *Client, err error) {
184	// Validate database path.
185	if err := validDatabaseName(database); err != nil {
186		return nil, err
187	}
188
189	ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.NewClient")
190	defer func() { trace.EndSpan(ctx, err) }()
191
192	// Append emulator options if SPANNER_EMULATOR_HOST has been set.
193	if emulatorAddr := os.Getenv("SPANNER_EMULATOR_HOST"); emulatorAddr != "" {
194		emulatorOpts := []option.ClientOption{
195			option.WithEndpoint(emulatorAddr),
196			option.WithGRPCDialOption(grpc.WithInsecure()),
197			option.WithoutAuthentication(),
198		}
199		opts = append(opts, emulatorOpts...)
200	} else if os.Getenv("GOOGLE_CLOUD_SPANNER_ENABLE_RESOURCE_BASED_ROUTING") == "true" {
201		// Fetch the instance-specific endpoint.
202		reqOpts := []option.ClientOption{option.WithEndpoint(endpoint)}
203		reqOpts = append(reqOpts, opts...)
204		instanceEndpoint, err := getInstanceEndpoint(ctx, database, reqOpts...)
205
206		if err != nil {
207			// If there is a PermissionDenied error, fall back to use the global endpoint
208			// or the user-specified endpoint.
209			if status.Code(err) == codes.PermissionDenied {
210				logf(config.logger, `
211Warning: The client library attempted to connect to an endpoint closer to your
212Cloud Spanner data but was unable to do so. The client library will fall back
213and route requests to the endpoint given in the client options, which may
214result in increased latency. We recommend including the scope
215https://www.googleapis.com/auth/spanner.admin so that the client library can
216get an instance-specific endpoint and efficiently route requests.
217`)
218			} else {
219				return nil, err
220			}
221		}
222
223		if instanceEndpoint != "" {
224			opts = append(opts, option.WithEndpoint(instanceEndpoint))
225		}
226	}
227
228	// Prepare gRPC channels.
229	configuredNumChannels := config.NumChannels
230	if config.NumChannels == 0 {
231		config.NumChannels = numChannels
232	}
233	// gRPC options.
234	allOpts := []option.ClientOption{
235		option.WithEndpoint(endpoint),
236		option.WithScopes(Scope),
237		option.WithGRPCDialOption(
238			grpc.WithDefaultCallOptions(
239				grpc.MaxCallSendMsgSize(100<<20),
240				grpc.MaxCallRecvMsgSize(100<<20),
241			),
242		),
243		option.WithGRPCConnectionPool(config.NumChannels),
244	}
245	// opts will take precedence above allOpts, as the values in opts will be
246	// applied after the values in allOpts.
247	allOpts = append(allOpts, opts...)
248	pool, err := gtransport.DialPool(ctx, allOpts...)
249	if configuredNumChannels > 0 && pool.Num() != config.NumChannels {
250		pool.Close()
251		return nil, spannerErrorf(codes.InvalidArgument, "Connection pool mismatch: NumChannels=%v, WithGRPCConnectionPool=%v. Only set one of these options, or set both to the same value.", config.NumChannels, pool.Num())
252	}
253
254	// TODO(loite): Remove as the original map cannot be changed by the user
255	// anyways, and the client library is also not changing it.
256	// Make a copy of labels.
257	sessionLabels := make(map[string]string)
258	for k, v := range config.SessionLabels {
259		sessionLabels[k] = v
260	}
261
262	// Default configs for session pool.
263	if config.MaxOpened == 0 {
264		config.MaxOpened = uint64(pool.Num() * 100)
265	}
266	if config.MaxBurst == 0 {
267		config.MaxBurst = DefaultSessionPoolConfig.MaxBurst
268	}
269	// Create a session client.
270	sc := newSessionClient(pool, database, sessionLabels, metadata.Pairs(resourcePrefixHeader, database), config.logger)
271	// Create a session pool.
272	config.SessionPoolConfig.sessionLabels = sessionLabels
273	sp, err := newSessionPool(sc, config.SessionPoolConfig)
274	if err != nil {
275		sc.close()
276		return nil, err
277	}
278	c = &Client{
279		sc:           sc,
280		idleSessions: sp,
281		logger:       config.logger,
282		qo:           getQueryOptions(config.QueryOptions),
283	}
284	return c, nil
285}
286
287// getQueryOptions returns the query options overwritten by the environment
288// variables if exist. The input parameter is the query options set by users
289// via application-level configuration. If the environment variables are set,
290// this will return the overwritten query options.
291func getQueryOptions(opts QueryOptions) QueryOptions {
292	opv := os.Getenv("SPANNER_OPTIMIZER_VERSION")
293	if opv != "" {
294		if opts.Options == nil {
295			opts.Options = &sppb.ExecuteSqlRequest_QueryOptions{}
296		}
297		opts.Options.OptimizerVersion = opv
298	}
299	return opts
300}
301
302// Close closes the client.
303func (c *Client) Close() {
304	if c.idleSessions != nil {
305		c.idleSessions.close()
306	}
307	c.sc.close()
308}
309
310// Single provides a read-only snapshot transaction optimized for the case
311// where only a single read or query is needed.  This is more efficient than
312// using ReadOnlyTransaction() for a single read or query.
313//
314// Single will use a strong TimestampBound by default. Use
315// ReadOnlyTransaction.WithTimestampBound to specify a different
316// TimestampBound. A non-strong bound can be used to reduce latency, or
317// "time-travel" to prior versions of the database, see the documentation of
318// TimestampBound for details.
319func (c *Client) Single() *ReadOnlyTransaction {
320	t := &ReadOnlyTransaction{singleUse: true}
321	t.txReadOnly.sp = c.idleSessions
322	t.txReadOnly.txReadEnv = t
323	t.txReadOnly.qo = c.qo
324	t.txReadOnly.replaceSessionFunc = func(ctx context.Context) error {
325		if t.sh == nil {
326			return spannerErrorf(codes.InvalidArgument, "missing session handle on transaction")
327		}
328		// Remove the session that returned 'Session not found' from the pool.
329		t.sh.destroy()
330		// Reset the transaction, acquire a new session and retry.
331		t.state = txNew
332		sh, _, err := t.acquire(ctx)
333		if err != nil {
334			return err
335		}
336		t.sh = sh
337		return nil
338	}
339	return t
340}
341
342// ReadOnlyTransaction returns a ReadOnlyTransaction that can be used for
343// multiple reads from the database.  You must call Close() when the
344// ReadOnlyTransaction is no longer needed to release resources on the server.
345//
346// ReadOnlyTransaction will use a strong TimestampBound by default.  Use
347// ReadOnlyTransaction.WithTimestampBound to specify a different
348// TimestampBound.  A non-strong bound can be used to reduce latency, or
349// "time-travel" to prior versions of the database, see the documentation of
350// TimestampBound for details.
351func (c *Client) ReadOnlyTransaction() *ReadOnlyTransaction {
352	t := &ReadOnlyTransaction{
353		singleUse:       false,
354		txReadyOrClosed: make(chan struct{}),
355	}
356	t.txReadOnly.sp = c.idleSessions
357	t.txReadOnly.txReadEnv = t
358	t.txReadOnly.qo = c.qo
359	return t
360}
361
362// BatchReadOnlyTransaction returns a BatchReadOnlyTransaction that can be used
363// for partitioned reads or queries from a snapshot of the database. This is
364// useful in batch processing pipelines where one wants to divide the work of
365// reading from the database across multiple machines.
366//
367// Note: This transaction does not use the underlying session pool but creates a
368// new session each time, and the session is reused across clients.
369//
370// You should call Close() after the txn is no longer needed on local
371// client, and call Cleanup() when the txn is finished for all clients, to free
372// the session.
373func (c *Client) BatchReadOnlyTransaction(ctx context.Context, tb TimestampBound) (*BatchReadOnlyTransaction, error) {
374	var (
375		tx  transactionID
376		rts time.Time
377		s   *session
378		sh  *sessionHandle
379		err error
380	)
381	defer func() {
382		if err != nil && sh != nil {
383			s.delete(ctx)
384		}
385	}()
386
387	// Create session.
388	s, err = c.sc.createSession(ctx)
389	if err != nil {
390		return nil, err
391	}
392	sh = &sessionHandle{session: s}
393
394	// Begin transaction.
395	res, err := sh.getClient().BeginTransaction(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.BeginTransactionRequest{
396		Session: sh.getID(),
397		Options: &sppb.TransactionOptions{
398			Mode: &sppb.TransactionOptions_ReadOnly_{
399				ReadOnly: buildTransactionOptionsReadOnly(tb, true),
400			},
401		},
402	})
403	if err != nil {
404		return nil, toSpannerError(err)
405	}
406	tx = res.Id
407	if res.ReadTimestamp != nil {
408		rts = time.Unix(res.ReadTimestamp.Seconds, int64(res.ReadTimestamp.Nanos))
409	}
410
411	t := &BatchReadOnlyTransaction{
412		ReadOnlyTransaction: ReadOnlyTransaction{
413			tx:              tx,
414			txReadyOrClosed: make(chan struct{}),
415			state:           txActive,
416			rts:             rts,
417		},
418		ID: BatchReadOnlyTransactionID{
419			tid: tx,
420			sid: sh.getID(),
421			rts: rts,
422		},
423	}
424	t.txReadOnly.sh = sh
425	t.txReadOnly.txReadEnv = t
426	t.txReadOnly.qo = c.qo
427	return t, nil
428}
429
430// BatchReadOnlyTransactionFromID reconstruct a BatchReadOnlyTransaction from
431// BatchReadOnlyTransactionID
432func (c *Client) BatchReadOnlyTransactionFromID(tid BatchReadOnlyTransactionID) *BatchReadOnlyTransaction {
433	s, err := c.sc.sessionWithID(tid.sid)
434	if err != nil {
435		logf(c.logger, "unexpected error: %v\nThis is an indication of an internal error in the Spanner client library.", err)
436		// Use an invalid session. Preferably, this method should just return
437		// the error instead of this, but that would mean an API change.
438		s = &session{}
439	}
440	sh := &sessionHandle{session: s}
441
442	t := &BatchReadOnlyTransaction{
443		ReadOnlyTransaction: ReadOnlyTransaction{
444			tx:              tid.tid,
445			txReadyOrClosed: make(chan struct{}),
446			state:           txActive,
447			rts:             tid.rts,
448		},
449		ID: tid,
450	}
451	t.txReadOnly.sh = sh
452	t.txReadOnly.txReadEnv = t
453	t.txReadOnly.qo = c.qo
454	return t
455}
456
457type transactionInProgressKey struct{}
458
459func checkNestedTxn(ctx context.Context) error {
460	if ctx.Value(transactionInProgressKey{}) != nil {
461		return spannerErrorf(codes.FailedPrecondition, "Cloud Spanner does not support nested transactions")
462	}
463	return nil
464}
465
466// ReadWriteTransaction executes a read-write transaction, with retries as
467// necessary.
468//
469// The function f will be called one or more times. It must not maintain
470// any state between calls.
471//
472// If the transaction cannot be committed or if f returns an ABORTED error,
473// ReadWriteTransaction will call f again. It will continue to call f until the
474// transaction can be committed or the Context times out or is cancelled.  If f
475// returns an error other than ABORTED, ReadWriteTransaction will abort the
476// transaction and return the error.
477//
478// To limit the number of retries, set a deadline on the Context rather than
479// using a fixed limit on the number of attempts. ReadWriteTransaction will
480// retry as needed until that deadline is met.
481//
482// See https://godoc.org/cloud.google.com/go/spanner#ReadWriteTransaction for
483// more details.
484func (c *Client) ReadWriteTransaction(ctx context.Context, f func(context.Context, *ReadWriteTransaction) error) (commitTimestamp time.Time, err error) {
485	ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.ReadWriteTransaction")
486	defer func() { trace.EndSpan(ctx, err) }()
487	if err := checkNestedTxn(ctx); err != nil {
488		return time.Time{}, err
489	}
490	var (
491		ts time.Time
492		sh *sessionHandle
493	)
494	err = runWithRetryOnAbortedOrSessionNotFound(ctx, func(ctx context.Context) error {
495		var (
496			err error
497			t   *ReadWriteTransaction
498		)
499		if sh == nil || sh.getID() == "" || sh.getClient() == nil {
500			// Session handle hasn't been allocated or has been destroyed.
501			sh, err = c.idleSessions.takeWriteSession(ctx)
502			if err != nil {
503				// If session retrieval fails, just fail the transaction.
504				return err
505			}
506			t = &ReadWriteTransaction{
507				tx: sh.getTransactionID(),
508			}
509		} else {
510			t = &ReadWriteTransaction{}
511		}
512		t.txReadOnly.sh = sh
513		t.txReadOnly.txReadEnv = t
514		t.txReadOnly.qo = c.qo
515		trace.TracePrintf(ctx, map[string]interface{}{"transactionID": string(sh.getTransactionID())},
516			"Starting transaction attempt")
517		if err = t.begin(ctx); err != nil {
518			return err
519		}
520		ts, err = t.runInTransaction(ctx, f)
521		return err
522	})
523	if sh != nil {
524		sh.recycle()
525	}
526	return ts, err
527}
528
529// applyOption controls the behavior of Client.Apply.
530type applyOption struct {
531	// If atLeastOnce == true, Client.Apply will execute the mutations on Cloud
532	// Spanner at least once.
533	atLeastOnce bool
534}
535
536// An ApplyOption is an optional argument to Apply.
537type ApplyOption func(*applyOption)
538
539// ApplyAtLeastOnce returns an ApplyOption that removes replay protection.
540//
541// With this option, Apply may attempt to apply mutations more than once; if
542// the mutations are not idempotent, this may lead to a failure being reported
543// when the mutation was applied more than once. For example, an insert may
544// fail with ALREADY_EXISTS even though the row did not exist before Apply was
545// called. For this reason, most users of the library will prefer not to use
546// this option.  However, ApplyAtLeastOnce requires only a single RPC, whereas
547// Apply's default replay protection may require an additional RPC.  So this
548// option may be appropriate for latency sensitive and/or high throughput blind
549// writing.
550func ApplyAtLeastOnce() ApplyOption {
551	return func(ao *applyOption) {
552		ao.atLeastOnce = true
553	}
554}
555
556// Apply applies a list of mutations atomically to the database.
557func (c *Client) Apply(ctx context.Context, ms []*Mutation, opts ...ApplyOption) (commitTimestamp time.Time, err error) {
558	ao := &applyOption{}
559	for _, opt := range opts {
560		opt(ao)
561	}
562
563	ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.Apply")
564	defer func() { trace.EndSpan(ctx, err) }()
565
566	if !ao.atLeastOnce {
567		return c.ReadWriteTransaction(ctx, func(ctx context.Context, t *ReadWriteTransaction) error {
568			return t.BufferWrite(ms)
569		})
570	}
571	t := &writeOnlyTransaction{c.idleSessions}
572	return t.applyAtLeastOnce(ctx, ms...)
573}
574
575// logf logs the given message to the given logger, or the standard logger if
576// the given logger is nil.
577func logf(logger *log.Logger, format string, v ...interface{}) {
578	if logger == nil {
579		log.Printf(format, v...)
580	} else {
581		logger.Printf(format, v...)
582	}
583}
584