1/*
2Copyright 2017 Google LLC
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package spanner
18
19import (
20	"context"
21	"fmt"
22	"log"
23	"os"
24	"regexp"
25	"time"
26
27	"cloud.google.com/go/internal/trace"
28	instance "cloud.google.com/go/spanner/admin/instance/apiv1"
29	vkit "cloud.google.com/go/spanner/apiv1"
30	"google.golang.org/api/option"
31	instancepb "google.golang.org/genproto/googleapis/spanner/admin/instance/v1"
32	sppb "google.golang.org/genproto/googleapis/spanner/v1"
33	field_mask "google.golang.org/genproto/protobuf/field_mask"
34	"google.golang.org/grpc"
35	"google.golang.org/grpc/codes"
36	"google.golang.org/grpc/metadata"
37	"google.golang.org/grpc/status"
38)
39
40const (
41	endpoint = "spanner.googleapis.com:443"
42
43	// resourcePrefixHeader is the name of the metadata header used to indicate
44	// the resource being operated on.
45	resourcePrefixHeader = "google-cloud-resource-prefix"
46)
47
48const (
49	// Scope is the scope for Cloud Spanner Data API.
50	Scope = "https://www.googleapis.com/auth/spanner.data"
51
52	// AdminScope is the scope for Cloud Spanner Admin APIs.
53	AdminScope = "https://www.googleapis.com/auth/spanner.admin"
54)
55
56var (
57	validDBPattern       = regexp.MustCompile("^projects/[^/]+/instances/[^/]+/databases/[^/]+$")
58	validInstancePattern = regexp.MustCompile("^projects/[^/]+/instances/[^/]+")
59)
60
61func validDatabaseName(db string) error {
62	if matched := validDBPattern.MatchString(db); !matched {
63		return fmt.Errorf("database name %q should conform to pattern %q",
64			db, validDBPattern.String())
65	}
66	return nil
67}
68
69func getInstanceName(db string) (string, error) {
70	matches := validInstancePattern.FindStringSubmatch(db)
71	if len(matches) == 0 {
72		return "", fmt.Errorf("Failed to retrieve instance name from %q according to pattern %q",
73			db, validInstancePattern.String())
74	}
75	return matches[0], nil
76}
77
78// Client is a client for reading and writing data to a Cloud Spanner database.
79// A client is safe to use concurrently, except for its Close method.
80type Client struct {
81	sc           *sessionClient
82	idleSessions *sessionPool
83	logger       *log.Logger
84}
85
86// ClientConfig has configurations for the client.
87type ClientConfig struct {
88	// NumChannels is the number of gRPC channels.
89	// If zero, a reasonable default is used based on the execution environment.
90	NumChannels int
91
92	// SessionPoolConfig is the configuration for session pool.
93	SessionPoolConfig
94
95	// SessionLabels for the sessions created by this client.
96	// See https://cloud.google.com/spanner/docs/reference/rpc/google.spanner.v1#session
97	// for more info.
98	SessionLabels map[string]string
99
100	// logger is the logger to use for this client. If it is nil, all logging
101	// will be directed to the standard logger.
102	logger *log.Logger
103}
104
105// errDial returns error for dialing to Cloud Spanner.
106func errDial(ci int, err error) error {
107	e := toSpannerError(err).(*Error)
108	e.decorate(fmt.Sprintf("dialing fails for channel[%v]", ci))
109	return e
110}
111
112func contextWithOutgoingMetadata(ctx context.Context, md metadata.MD) context.Context {
113	existing, ok := metadata.FromOutgoingContext(ctx)
114	if ok {
115		md = metadata.Join(existing, md)
116	}
117	return metadata.NewOutgoingContext(ctx, md)
118}
119
120// getInstanceEndpoint returns an instance-specific endpoint if one exists. If
121// multiple endpoints exist, it returns the first one.
122func getInstanceEndpoint(ctx context.Context, database string, opts ...option.ClientOption) (string, error) {
123	instanceName, err := getInstanceName(database)
124	if err != nil {
125		return "", fmt.Errorf("Failed to resolve endpoint: %v", err)
126	}
127
128	c, err := instance.NewInstanceAdminClient(ctx, opts...)
129	if err != nil {
130		return "", err
131	}
132	defer c.Close()
133
134	req := &instancepb.GetInstanceRequest{
135		Name: instanceName,
136		FieldMask: &field_mask.FieldMask{
137			Paths: []string{"endpoint_uris"},
138		},
139	}
140
141	resp, err := c.GetInstance(ctx, req)
142	if err != nil {
143		return "", err
144	}
145
146	endpointURIs := resp.GetEndpointUris()
147
148	if len(endpointURIs) > 0 {
149		return endpointURIs[0], nil
150	}
151
152	// Return empty string when no endpoints exist.
153	return "", nil
154}
155
156// NewClient creates a client to a database. A valid database name has the
157// form projects/PROJECT_ID/instances/INSTANCE_ID/databases/DATABASE_ID. It uses
158// a default configuration.
159func NewClient(ctx context.Context, database string, opts ...option.ClientOption) (*Client, error) {
160	return NewClientWithConfig(ctx, database, ClientConfig{SessionPoolConfig: DefaultSessionPoolConfig}, opts...)
161}
162
163// NewClientWithConfig creates a client to a database. A valid database name has
164// the form projects/PROJECT_ID/instances/INSTANCE_ID/databases/DATABASE_ID.
165func NewClientWithConfig(ctx context.Context, database string, config ClientConfig, opts ...option.ClientOption) (c *Client, err error) {
166	// Prepare gRPC channels.
167	if config.NumChannels == 0 {
168		config.NumChannels = numChannels
169	}
170
171	// Default configs for session pool.
172	if config.MaxOpened == 0 {
173		config.MaxOpened = uint64(config.NumChannels * 100)
174	}
175	if config.MaxBurst == 0 {
176		config.MaxBurst = DefaultSessionPoolConfig.MaxBurst
177	}
178
179	// Validate database path.
180	if err := validDatabaseName(database); err != nil {
181		return nil, err
182	}
183
184	ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.NewClient")
185	defer func() { trace.EndSpan(ctx, err) }()
186
187	// Append emulator options if SPANNER_EMULATOR_HOST has been set.
188	if emulatorAddr := os.Getenv("SPANNER_EMULATOR_HOST"); emulatorAddr != "" {
189		emulatorOpts := []option.ClientOption{
190			option.WithEndpoint(emulatorAddr),
191			option.WithGRPCDialOption(grpc.WithInsecure()),
192			option.WithoutAuthentication(),
193		}
194		opts = append(opts, emulatorOpts...)
195	} else if os.Getenv("GOOGLE_CLOUD_SPANNER_ENABLE_RESOURCE_BASED_ROUTING") == "true" {
196		// Fetch the instance-specific endpoint.
197		reqOpts := []option.ClientOption{option.WithEndpoint(endpoint)}
198		reqOpts = append(reqOpts, opts...)
199		instanceEndpoint, err := getInstanceEndpoint(ctx, database, reqOpts...)
200
201		if err != nil {
202			// If there is a PermissionDenied error, fall back to use the global endpoint
203			// or the user-specified endpoint.
204			if status.Code(err) == codes.PermissionDenied {
205				logf(config.logger, `
206Warning: The client library attempted to connect to an endpoint closer to your
207Cloud Spanner data but was unable to do so. The client library will fall back
208and route requests to the endpoint given in the client options, which may
209result in increased latency. We recommend including the scope
210https://www.googleapis.com/auth/spanner.admin so that the client library can
211get an instance-specific endpoint and efficiently route requests.
212`)
213			} else {
214				return nil, err
215			}
216		}
217
218		if instanceEndpoint != "" {
219			opts = append(opts, option.WithEndpoint(instanceEndpoint))
220		}
221	}
222
223	// gRPC options.
224	allOpts := []option.ClientOption{
225		option.WithEndpoint(endpoint),
226		option.WithScopes(Scope),
227		option.WithGRPCDialOption(
228			grpc.WithDefaultCallOptions(
229				grpc.MaxCallSendMsgSize(100<<20),
230				grpc.MaxCallRecvMsgSize(100<<20),
231			),
232		),
233	}
234	allOpts = append(allOpts, opts...)
235
236	// TODO(deklerk): This should be replaced with a balancer with
237	// config.NumChannels connections, instead of config.NumChannels
238	// clients.
239	var clients []*vkit.Client
240	for i := 0; i < config.NumChannels; i++ {
241		client, err := vkit.NewClient(ctx, allOpts...)
242		if err != nil {
243			return nil, errDial(i, err)
244		}
245		clients = append(clients, client)
246	}
247
248	// TODO(loite): Remove as the original map cannot be changed by the user
249	// anyways, and the client library is also not changing it.
250	// Make a copy of labels.
251	sessionLabels := make(map[string]string)
252	for k, v := range config.SessionLabels {
253		sessionLabels[k] = v
254	}
255	// Create a session client.
256	sc := newSessionClient(clients, database, sessionLabels, metadata.Pairs(resourcePrefixHeader, database), config.logger)
257	// Create a session pool.
258	config.SessionPoolConfig.sessionLabels = sessionLabels
259	sp, err := newSessionPool(sc, config.SessionPoolConfig)
260	if err != nil {
261		sc.close()
262		return nil, err
263	}
264	c = &Client{
265		sc:           sc,
266		idleSessions: sp,
267		logger:       config.logger,
268	}
269	return c, nil
270}
271
272// Close closes the client.
273func (c *Client) Close() {
274	if c.idleSessions != nil {
275		c.idleSessions.close()
276	}
277	c.sc.close()
278}
279
280// Single provides a read-only snapshot transaction optimized for the case
281// where only a single read or query is needed.  This is more efficient than
282// using ReadOnlyTransaction() for a single read or query.
283//
284// Single will use a strong TimestampBound by default. Use
285// ReadOnlyTransaction.WithTimestampBound to specify a different
286// TimestampBound. A non-strong bound can be used to reduce latency, or
287// "time-travel" to prior versions of the database, see the documentation of
288// TimestampBound for details.
289func (c *Client) Single() *ReadOnlyTransaction {
290	t := &ReadOnlyTransaction{singleUse: true}
291	t.txReadOnly.sp = c.idleSessions
292	t.txReadOnly.txReadEnv = t
293	t.txReadOnly.replaceSessionFunc = func(ctx context.Context) error {
294		if t.sh == nil {
295			return spannerErrorf(codes.InvalidArgument, "missing session handle on transaction")
296		}
297		// Remove the session that returned 'Session not found' from the pool.
298		t.sh.destroy()
299		// Reset the transaction, acquire a new session and retry.
300		t.state = txNew
301		sh, _, err := t.acquire(ctx)
302		if err != nil {
303			return err
304		}
305		t.sh = sh
306		return nil
307	}
308	return t
309}
310
311// ReadOnlyTransaction returns a ReadOnlyTransaction that can be used for
312// multiple reads from the database.  You must call Close() when the
313// ReadOnlyTransaction is no longer needed to release resources on the server.
314//
315// ReadOnlyTransaction will use a strong TimestampBound by default.  Use
316// ReadOnlyTransaction.WithTimestampBound to specify a different
317// TimestampBound.  A non-strong bound can be used to reduce latency, or
318// "time-travel" to prior versions of the database, see the documentation of
319// TimestampBound for details.
320func (c *Client) ReadOnlyTransaction() *ReadOnlyTransaction {
321	t := &ReadOnlyTransaction{
322		singleUse:       false,
323		txReadyOrClosed: make(chan struct{}),
324	}
325	t.txReadOnly.sp = c.idleSessions
326	t.txReadOnly.txReadEnv = t
327	return t
328}
329
330// BatchReadOnlyTransaction returns a BatchReadOnlyTransaction that can be used
331// for partitioned reads or queries from a snapshot of the database. This is
332// useful in batch processing pipelines where one wants to divide the work of
333// reading from the database across multiple machines.
334//
335// Note: This transaction does not use the underlying session pool but creates a
336// new session each time, and the session is reused across clients.
337//
338// You should call Close() after the txn is no longer needed on local
339// client, and call Cleanup() when the txn is finished for all clients, to free
340// the session.
341func (c *Client) BatchReadOnlyTransaction(ctx context.Context, tb TimestampBound) (*BatchReadOnlyTransaction, error) {
342	var (
343		tx  transactionID
344		rts time.Time
345		s   *session
346		sh  *sessionHandle
347		err error
348	)
349	defer func() {
350		if err != nil && sh != nil {
351			s.delete(ctx)
352		}
353	}()
354
355	// Create session.
356	s, err = c.sc.createSession(ctx)
357	if err != nil {
358		return nil, err
359	}
360	sh = &sessionHandle{session: s}
361
362	// Begin transaction.
363	res, err := sh.getClient().BeginTransaction(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.BeginTransactionRequest{
364		Session: sh.getID(),
365		Options: &sppb.TransactionOptions{
366			Mode: &sppb.TransactionOptions_ReadOnly_{
367				ReadOnly: buildTransactionOptionsReadOnly(tb, true),
368			},
369		},
370	})
371	if err != nil {
372		return nil, toSpannerError(err)
373	}
374	tx = res.Id
375	if res.ReadTimestamp != nil {
376		rts = time.Unix(res.ReadTimestamp.Seconds, int64(res.ReadTimestamp.Nanos))
377	}
378
379	t := &BatchReadOnlyTransaction{
380		ReadOnlyTransaction: ReadOnlyTransaction{
381			tx:              tx,
382			txReadyOrClosed: make(chan struct{}),
383			state:           txActive,
384			rts:             rts,
385		},
386		ID: BatchReadOnlyTransactionID{
387			tid: tx,
388			sid: sh.getID(),
389			rts: rts,
390		},
391	}
392	t.txReadOnly.sh = sh
393	t.txReadOnly.txReadEnv = t
394	return t, nil
395}
396
397// BatchReadOnlyTransactionFromID reconstruct a BatchReadOnlyTransaction from
398// BatchReadOnlyTransactionID
399func (c *Client) BatchReadOnlyTransactionFromID(tid BatchReadOnlyTransactionID) *BatchReadOnlyTransaction {
400	s := c.sc.sessionWithID(tid.sid)
401	sh := &sessionHandle{session: s}
402
403	t := &BatchReadOnlyTransaction{
404		ReadOnlyTransaction: ReadOnlyTransaction{
405			tx:              tid.tid,
406			txReadyOrClosed: make(chan struct{}),
407			state:           txActive,
408			rts:             tid.rts,
409		},
410		ID: tid,
411	}
412	t.txReadOnly.sh = sh
413	t.txReadOnly.txReadEnv = t
414	return t
415}
416
417type transactionInProgressKey struct{}
418
419func checkNestedTxn(ctx context.Context) error {
420	if ctx.Value(transactionInProgressKey{}) != nil {
421		return spannerErrorf(codes.FailedPrecondition, "Cloud Spanner does not support nested transactions")
422	}
423	return nil
424}
425
426// ReadWriteTransaction executes a read-write transaction, with retries as
427// necessary.
428//
429// The function f will be called one or more times. It must not maintain
430// any state between calls.
431//
432// If the transaction cannot be committed or if f returns an ABORTED error,
433// ReadWriteTransaction will call f again. It will continue to call f until the
434// transaction can be committed or the Context times out or is cancelled.  If f
435// returns an error other than ABORTED, ReadWriteTransaction will abort the
436// transaction and return the error.
437//
438// To limit the number of retries, set a deadline on the Context rather than
439// using a fixed limit on the number of attempts. ReadWriteTransaction will
440// retry as needed until that deadline is met.
441//
442// See https://godoc.org/cloud.google.com/go/spanner#ReadWriteTransaction for
443// more details.
444func (c *Client) ReadWriteTransaction(ctx context.Context, f func(context.Context, *ReadWriteTransaction) error) (commitTimestamp time.Time, err error) {
445	ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.ReadWriteTransaction")
446	defer func() { trace.EndSpan(ctx, err) }()
447	if err := checkNestedTxn(ctx); err != nil {
448		return time.Time{}, err
449	}
450	var (
451		ts time.Time
452		sh *sessionHandle
453	)
454	err = runWithRetryOnAbortedOrSessionNotFound(ctx, func(ctx context.Context) error {
455		var (
456			err error
457			t   *ReadWriteTransaction
458		)
459		if sh == nil || sh.getID() == "" || sh.getClient() == nil {
460			// Session handle hasn't been allocated or has been destroyed.
461			sh, err = c.idleSessions.takeWriteSession(ctx)
462			if err != nil {
463				// If session retrieval fails, just fail the transaction.
464				return err
465			}
466			t = &ReadWriteTransaction{
467				tx: sh.getTransactionID(),
468			}
469		} else {
470			t = &ReadWriteTransaction{}
471		}
472		t.txReadOnly.sh = sh
473		t.txReadOnly.txReadEnv = t
474		trace.TracePrintf(ctx, map[string]interface{}{"transactionID": string(sh.getTransactionID())},
475			"Starting transaction attempt")
476		if err = t.begin(ctx); err != nil {
477			return err
478		}
479		ts, err = t.runInTransaction(ctx, f)
480		return err
481	})
482	if sh != nil {
483		sh.recycle()
484	}
485	return ts, err
486}
487
488// applyOption controls the behavior of Client.Apply.
489type applyOption struct {
490	// If atLeastOnce == true, Client.Apply will execute the mutations on Cloud
491	// Spanner at least once.
492	atLeastOnce bool
493}
494
495// An ApplyOption is an optional argument to Apply.
496type ApplyOption func(*applyOption)
497
498// ApplyAtLeastOnce returns an ApplyOption that removes replay protection.
499//
500// With this option, Apply may attempt to apply mutations more than once; if
501// the mutations are not idempotent, this may lead to a failure being reported
502// when the mutation was applied more than once. For example, an insert may
503// fail with ALREADY_EXISTS even though the row did not exist before Apply was
504// called. For this reason, most users of the library will prefer not to use
505// this option.  However, ApplyAtLeastOnce requires only a single RPC, whereas
506// Apply's default replay protection may require an additional RPC.  So this
507// option may be appropriate for latency sensitive and/or high throughput blind
508// writing.
509func ApplyAtLeastOnce() ApplyOption {
510	return func(ao *applyOption) {
511		ao.atLeastOnce = true
512	}
513}
514
515// Apply applies a list of mutations atomically to the database.
516func (c *Client) Apply(ctx context.Context, ms []*Mutation, opts ...ApplyOption) (commitTimestamp time.Time, err error) {
517	ao := &applyOption{}
518	for _, opt := range opts {
519		opt(ao)
520	}
521
522	ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.Apply")
523	defer func() { trace.EndSpan(ctx, err) }()
524
525	if !ao.atLeastOnce {
526		return c.ReadWriteTransaction(ctx, func(ctx context.Context, t *ReadWriteTransaction) error {
527			return t.BufferWrite(ms)
528		})
529	}
530	t := &writeOnlyTransaction{c.idleSessions}
531	return t.applyAtLeastOnce(ctx, ms...)
532}
533
534// logf logs the given message to the given logger, or the standard logger if
535// the given logger is nil.
536func logf(logger *log.Logger, format string, v ...interface{}) {
537	if logger == nil {
538		log.Printf(format, v...)
539	} else {
540		logger.Printf(format, v...)
541	}
542}
543