1/*
2Copyright 2017 Google LLC
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package spanner
18
19import (
20	"context"
21	"fmt"
22	"log"
23	"os"
24	"regexp"
25	"time"
26
27	"cloud.google.com/go/internal/trace"
28	vkit "cloud.google.com/go/spanner/apiv1"
29	"google.golang.org/api/option"
30	sppb "google.golang.org/genproto/googleapis/spanner/v1"
31	"google.golang.org/grpc"
32	"google.golang.org/grpc/codes"
33	"google.golang.org/grpc/metadata"
34)
35
36const (
37	endpoint = "spanner.googleapis.com:443"
38
39	// resourcePrefixHeader is the name of the metadata header used to indicate
40	// the resource being operated on.
41	resourcePrefixHeader = "google-cloud-resource-prefix"
42)
43
44const (
45	// Scope is the scope for Cloud Spanner Data API.
46	Scope = "https://www.googleapis.com/auth/spanner.data"
47
48	// AdminScope is the scope for Cloud Spanner Admin APIs.
49	AdminScope = "https://www.googleapis.com/auth/spanner.admin"
50)
51
52var (
53	validDBPattern = regexp.MustCompile("^projects/[^/]+/instances/[^/]+/databases/[^/]+$")
54)
55
56func validDatabaseName(db string) error {
57	if matched := validDBPattern.MatchString(db); !matched {
58		return fmt.Errorf("database name %q should conform to pattern %q",
59			db, validDBPattern.String())
60	}
61	return nil
62}
63
64// Client is a client for reading and writing data to a Cloud Spanner database.
65// A client is safe to use concurrently, except for its Close method.
66type Client struct {
67	sc           *sessionClient
68	idleSessions *sessionPool
69	logger       *log.Logger
70}
71
72// ClientConfig has configurations for the client.
73type ClientConfig struct {
74	// NumChannels is the number of gRPC channels.
75	// If zero, a reasonable default is used based on the execution environment.
76	NumChannels int
77
78	// SessionPoolConfig is the configuration for session pool.
79	SessionPoolConfig
80
81	// SessionLabels for the sessions created by this client.
82	// See https://cloud.google.com/spanner/docs/reference/rpc/google.spanner.v1#session
83	// for more info.
84	SessionLabels map[string]string
85
86	// logger is the logger to use for this client. If it is nil, all logging
87	// will be directed to the standard logger.
88	logger *log.Logger
89}
90
91// errDial returns error for dialing to Cloud Spanner.
92func errDial(ci int, err error) error {
93	e := toSpannerError(err).(*Error)
94	e.decorate(fmt.Sprintf("dialing fails for channel[%v]", ci))
95	return e
96}
97
98func contextWithOutgoingMetadata(ctx context.Context, md metadata.MD) context.Context {
99	existing, ok := metadata.FromOutgoingContext(ctx)
100	if ok {
101		md = metadata.Join(existing, md)
102	}
103	return metadata.NewOutgoingContext(ctx, md)
104}
105
106// NewClient creates a client to a database. A valid database name has the
107// form projects/PROJECT_ID/instances/INSTANCE_ID/databases/DATABASE_ID. It uses
108// a default configuration.
109func NewClient(ctx context.Context, database string, opts ...option.ClientOption) (*Client, error) {
110	return NewClientWithConfig(ctx, database, ClientConfig{SessionPoolConfig: DefaultSessionPoolConfig}, opts...)
111}
112
113// NewClientWithConfig creates a client to a database. A valid database name has
114// the form projects/PROJECT_ID/instances/INSTANCE_ID/databases/DATABASE_ID.
115func NewClientWithConfig(ctx context.Context, database string, config ClientConfig, opts ...option.ClientOption) (c *Client, err error) {
116	// Prepare gRPC channels.
117	if config.NumChannels == 0 {
118		config.NumChannels = numChannels
119	}
120
121	// Default configs for session pool.
122	if config.MaxOpened == 0 {
123		config.MaxOpened = uint64(config.NumChannels * 100)
124	}
125	if config.MaxBurst == 0 {
126		config.MaxBurst = DefaultSessionPoolConfig.MaxBurst
127	}
128
129	// Validate database path.
130	if err := validDatabaseName(database); err != nil {
131		return nil, err
132	}
133
134	ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.NewClient")
135	defer func() { trace.EndSpan(ctx, err) }()
136
137	// Append emulator options if SPANNER_EMULATOR_HOST has been set.
138	if emulatorAddr := os.Getenv("SPANNER_EMULATOR_HOST"); emulatorAddr != "" {
139		emulatorOpts := []option.ClientOption{
140			option.WithEndpoint(emulatorAddr),
141			option.WithGRPCDialOption(grpc.WithInsecure()),
142			option.WithoutAuthentication(),
143		}
144		opts = append(opts, emulatorOpts...)
145	}
146
147	// gRPC options.
148	allOpts := []option.ClientOption{
149		option.WithEndpoint(endpoint),
150		option.WithScopes(Scope),
151		option.WithGRPCDialOption(
152			grpc.WithDefaultCallOptions(
153				grpc.MaxCallSendMsgSize(100<<20),
154				grpc.MaxCallRecvMsgSize(100<<20),
155			),
156		),
157	}
158	allOpts = append(allOpts, opts...)
159
160	// TODO(deklerk): This should be replaced with a balancer with
161	// config.NumChannels connections, instead of config.NumChannels
162	// clients.
163	var clients []*vkit.Client
164	for i := 0; i < config.NumChannels; i++ {
165		client, err := vkit.NewClient(ctx, allOpts...)
166		if err != nil {
167			return nil, errDial(i, err)
168		}
169		clients = append(clients, client)
170	}
171
172	// TODO(loite): Remove as the original map cannot be changed by the user
173	// anyways, and the client library is also not changing it.
174	// Make a copy of labels.
175	sessionLabels := make(map[string]string)
176	for k, v := range config.SessionLabels {
177		sessionLabels[k] = v
178	}
179	// Create a session client.
180	sc := newSessionClient(clients, database, sessionLabels, metadata.Pairs(resourcePrefixHeader, database), config.logger)
181	// Create a session pool.
182	config.SessionPoolConfig.sessionLabels = sessionLabels
183	sp, err := newSessionPool(sc, config.SessionPoolConfig)
184	if err != nil {
185		sc.close()
186		return nil, err
187	}
188	c = &Client{
189		sc:           sc,
190		idleSessions: sp,
191		logger:       config.logger,
192	}
193	return c, nil
194}
195
196// Close closes the client.
197func (c *Client) Close() {
198	if c.idleSessions != nil {
199		c.idleSessions.close()
200	}
201	c.sc.close()
202}
203
204// Single provides a read-only snapshot transaction optimized for the case
205// where only a single read or query is needed.  This is more efficient than
206// using ReadOnlyTransaction() for a single read or query.
207//
208// Single will use a strong TimestampBound by default. Use
209// ReadOnlyTransaction.WithTimestampBound to specify a different
210// TimestampBound. A non-strong bound can be used to reduce latency, or
211// "time-travel" to prior versions of the database, see the documentation of
212// TimestampBound for details.
213func (c *Client) Single() *ReadOnlyTransaction {
214	t := &ReadOnlyTransaction{singleUse: true, sp: c.idleSessions}
215	t.txReadOnly.txReadEnv = t
216	return t
217}
218
219// ReadOnlyTransaction returns a ReadOnlyTransaction that can be used for
220// multiple reads from the database.  You must call Close() when the
221// ReadOnlyTransaction is no longer needed to release resources on the server.
222//
223// ReadOnlyTransaction will use a strong TimestampBound by default.  Use
224// ReadOnlyTransaction.WithTimestampBound to specify a different
225// TimestampBound.  A non-strong bound can be used to reduce latency, or
226// "time-travel" to prior versions of the database, see the documentation of
227// TimestampBound for details.
228func (c *Client) ReadOnlyTransaction() *ReadOnlyTransaction {
229	t := &ReadOnlyTransaction{
230		singleUse:       false,
231		sp:              c.idleSessions,
232		txReadyOrClosed: make(chan struct{}),
233	}
234	t.txReadOnly.txReadEnv = t
235	return t
236}
237
238// BatchReadOnlyTransaction returns a BatchReadOnlyTransaction that can be used
239// for partitioned reads or queries from a snapshot of the database. This is
240// useful in batch processing pipelines where one wants to divide the work of
241// reading from the database across multiple machines.
242//
243// Note: This transaction does not use the underlying session pool but creates a
244// new session each time, and the session is reused across clients.
245//
246// You should call Close() after the txn is no longer needed on local
247// client, and call Cleanup() when the txn is finished for all clients, to free
248// the session.
249func (c *Client) BatchReadOnlyTransaction(ctx context.Context, tb TimestampBound) (*BatchReadOnlyTransaction, error) {
250	var (
251		tx  transactionID
252		rts time.Time
253		s   *session
254		sh  *sessionHandle
255		err error
256	)
257	defer func() {
258		if err != nil && sh != nil {
259			s.delete(ctx)
260		}
261	}()
262
263	// Create session.
264	s, err = c.sc.createSession(ctx)
265	if err != nil {
266		return nil, err
267	}
268	sh = &sessionHandle{session: s}
269
270	// Begin transaction.
271	res, err := sh.getClient().BeginTransaction(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.BeginTransactionRequest{
272		Session: sh.getID(),
273		Options: &sppb.TransactionOptions{
274			Mode: &sppb.TransactionOptions_ReadOnly_{
275				ReadOnly: buildTransactionOptionsReadOnly(tb, true),
276			},
277		},
278	})
279	if err != nil {
280		return nil, toSpannerError(err)
281	}
282	tx = res.Id
283	if res.ReadTimestamp != nil {
284		rts = time.Unix(res.ReadTimestamp.Seconds, int64(res.ReadTimestamp.Nanos))
285	}
286
287	t := &BatchReadOnlyTransaction{
288		ReadOnlyTransaction: ReadOnlyTransaction{
289			tx:              tx,
290			txReadyOrClosed: make(chan struct{}),
291			state:           txActive,
292			sh:              sh,
293			rts:             rts,
294		},
295		ID: BatchReadOnlyTransactionID{
296			tid: tx,
297			sid: sh.getID(),
298			rts: rts,
299		},
300	}
301	t.txReadOnly.txReadEnv = t
302	return t, nil
303}
304
305// BatchReadOnlyTransactionFromID reconstruct a BatchReadOnlyTransaction from
306// BatchReadOnlyTransactionID
307func (c *Client) BatchReadOnlyTransactionFromID(tid BatchReadOnlyTransactionID) *BatchReadOnlyTransaction {
308	s := c.sc.sessionWithID(tid.sid)
309	sh := &sessionHandle{session: s}
310
311	t := &BatchReadOnlyTransaction{
312		ReadOnlyTransaction: ReadOnlyTransaction{
313			tx:              tid.tid,
314			txReadyOrClosed: make(chan struct{}),
315			state:           txActive,
316			sh:              sh,
317			rts:             tid.rts,
318		},
319		ID: tid,
320	}
321	t.txReadOnly.txReadEnv = t
322	return t
323}
324
325type transactionInProgressKey struct{}
326
327func checkNestedTxn(ctx context.Context) error {
328	if ctx.Value(transactionInProgressKey{}) != nil {
329		return spannerErrorf(codes.FailedPrecondition, "Cloud Spanner does not support nested transactions")
330	}
331	return nil
332}
333
334// ReadWriteTransaction executes a read-write transaction, with retries as
335// necessary.
336//
337// The function f will be called one or more times. It must not maintain
338// any state between calls.
339//
340// If the transaction cannot be committed or if f returns an ABORTED error,
341// ReadWriteTransaction will call f again. It will continue to call f until the
342// transaction can be committed or the Context times out or is cancelled.  If f
343// returns an error other than ABORTED, ReadWriteTransaction will abort the
344// transaction and return the error.
345//
346// To limit the number of retries, set a deadline on the Context rather than
347// using a fixed limit on the number of attempts. ReadWriteTransaction will
348// retry as needed until that deadline is met.
349//
350// See https://godoc.org/cloud.google.com/go/spanner#ReadWriteTransaction for
351// more details.
352func (c *Client) ReadWriteTransaction(ctx context.Context, f func(context.Context, *ReadWriteTransaction) error) (commitTimestamp time.Time, err error) {
353	ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.ReadWriteTransaction")
354	defer func() { trace.EndSpan(ctx, err) }()
355	if err := checkNestedTxn(ctx); err != nil {
356		return time.Time{}, err
357	}
358	var (
359		ts time.Time
360		sh *sessionHandle
361	)
362	err = runWithRetryOnAborted(ctx, func(ctx context.Context) error {
363		var (
364			err error
365			t   *ReadWriteTransaction
366		)
367		if sh == nil || sh.getID() == "" || sh.getClient() == nil {
368			// Session handle hasn't been allocated or has been destroyed.
369			sh, err = c.idleSessions.takeWriteSession(ctx)
370			if err != nil {
371				// If session retrieval fails, just fail the transaction.
372				return err
373			}
374			t = &ReadWriteTransaction{
375				sh: sh,
376				tx: sh.getTransactionID(),
377			}
378		} else {
379			t = &ReadWriteTransaction{
380				sh: sh,
381			}
382		}
383		t.txReadOnly.txReadEnv = t
384		trace.TracePrintf(ctx, map[string]interface{}{"transactionID": string(sh.getTransactionID())},
385			"Starting transaction attempt")
386		if err = t.begin(ctx); err != nil {
387			return err
388		}
389		ts, err = t.runInTransaction(ctx, f)
390		return err
391	})
392	if sh != nil {
393		sh.recycle()
394	}
395	return ts, err
396}
397
398// applyOption controls the behavior of Client.Apply.
399type applyOption struct {
400	// If atLeastOnce == true, Client.Apply will execute the mutations on Cloud
401	// Spanner at least once.
402	atLeastOnce bool
403}
404
405// An ApplyOption is an optional argument to Apply.
406type ApplyOption func(*applyOption)
407
408// ApplyAtLeastOnce returns an ApplyOption that removes replay protection.
409//
410// With this option, Apply may attempt to apply mutations more than once; if
411// the mutations are not idempotent, this may lead to a failure being reported
412// when the mutation was applied more than once. For example, an insert may
413// fail with ALREADY_EXISTS even though the row did not exist before Apply was
414// called. For this reason, most users of the library will prefer not to use
415// this option.  However, ApplyAtLeastOnce requires only a single RPC, whereas
416// Apply's default replay protection may require an additional RPC.  So this
417// option may be appropriate for latency sensitive and/or high throughput blind
418// writing.
419func ApplyAtLeastOnce() ApplyOption {
420	return func(ao *applyOption) {
421		ao.atLeastOnce = true
422	}
423}
424
425// Apply applies a list of mutations atomically to the database.
426func (c *Client) Apply(ctx context.Context, ms []*Mutation, opts ...ApplyOption) (commitTimestamp time.Time, err error) {
427	ao := &applyOption{}
428	for _, opt := range opts {
429		opt(ao)
430	}
431	if !ao.atLeastOnce {
432		return c.ReadWriteTransaction(ctx, func(ctx context.Context, t *ReadWriteTransaction) error {
433			return t.BufferWrite(ms)
434		})
435	}
436
437	ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.Apply")
438	defer func() { trace.EndSpan(ctx, err) }()
439	t := &writeOnlyTransaction{c.idleSessions}
440	return t.applyAtLeastOnce(ctx, ms...)
441}
442
443// logf logs the given message to the given logger, or the standard logger if
444// the given logger is nil.
445func logf(logger *log.Logger, format string, v ...interface{}) {
446	if logger == nil {
447		log.Printf(format, v...)
448	} else {
449		logger.Printf(format, v...)
450	}
451}
452