1/*
2Copyright 2017 Google LLC
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package spanner
18
19import (
20	"context"
21	"fmt"
22	"os"
23	"regexp"
24	"sync/atomic"
25	"time"
26
27	"cloud.google.com/go/internal/trace"
28	vkit "cloud.google.com/go/spanner/apiv1"
29	"google.golang.org/api/option"
30	sppb "google.golang.org/genproto/googleapis/spanner/v1"
31	"google.golang.org/grpc"
32	"google.golang.org/grpc/codes"
33	"google.golang.org/grpc/metadata"
34)
35
36const (
37	endpoint = "spanner.googleapis.com:443"
38
39	// resourcePrefixHeader is the name of the metadata header used to indicate
40	// the resource being operated on.
41	resourcePrefixHeader = "google-cloud-resource-prefix"
42)
43
44const (
45	// Scope is the scope for Cloud Spanner Data API.
46	Scope = "https://www.googleapis.com/auth/spanner.data"
47
48	// AdminScope is the scope for Cloud Spanner Admin APIs.
49	AdminScope = "https://www.googleapis.com/auth/spanner.admin"
50)
51
52var (
53	validDBPattern = regexp.MustCompile("^projects/[^/]+/instances/[^/]+/databases/[^/]+$")
54)
55
56func validDatabaseName(db string) error {
57	if matched := validDBPattern.MatchString(db); !matched {
58		return fmt.Errorf("database name %q should conform to pattern %q",
59			db, validDBPattern.String())
60	}
61	return nil
62}
63
64// Client is a client for reading and writing data to a Cloud Spanner database.
65// A client is safe to use concurrently, except for its Close method.
66type Client struct {
67	// rr must be accessed through atomic operations.
68	rr      uint32
69	clients []*vkit.Client
70
71	database string
72	// Metadata to be sent with each request.
73	md           metadata.MD
74	idleSessions *sessionPool
75	// sessionLabels for the sessions created by this client.
76	sessionLabels map[string]string
77}
78
79// ClientConfig has configurations for the client.
80type ClientConfig struct {
81	// NumChannels is the number of gRPC channels.
82	// If zero, a reasonable default is used based on the execution environment.
83	NumChannels int
84
85	// SessionPoolConfig is the configuration for session pool.
86	SessionPoolConfig
87
88	// SessionLabels for the sessions created by this client.
89	// See https://cloud.google.com/spanner/docs/reference/rpc/google.spanner.v1#session
90	// for more info.
91	SessionLabels map[string]string
92}
93
94// errDial returns error for dialing to Cloud Spanner.
95func errDial(ci int, err error) error {
96	e := toSpannerError(err).(*Error)
97	e.decorate(fmt.Sprintf("dialing fails for channel[%v]", ci))
98	return e
99}
100
101func contextWithOutgoingMetadata(ctx context.Context, md metadata.MD) context.Context {
102	existing, ok := metadata.FromOutgoingContext(ctx)
103	if ok {
104		md = metadata.Join(existing, md)
105	}
106	return metadata.NewOutgoingContext(ctx, md)
107}
108
109// NewClient creates a client to a database. A valid database name has the
110// form projects/PROJECT_ID/instances/INSTANCE_ID/databases/DATABASE_ID. It uses
111// a default configuration.
112func NewClient(ctx context.Context, database string, opts ...option.ClientOption) (*Client, error) {
113	return NewClientWithConfig(ctx, database, ClientConfig{}, opts...)
114}
115
116// NewClientWithConfig creates a client to a database. A valid database name has
117// the form projects/PROJECT_ID/instances/INSTANCE_ID/databases/DATABASE_ID.
118func NewClientWithConfig(ctx context.Context, database string, config ClientConfig, opts ...option.ClientOption) (c *Client, err error) {
119	c = &Client{
120		database: database,
121		md:       metadata.Pairs(resourcePrefixHeader, database),
122	}
123
124	// Make a copy of labels.
125	c.sessionLabels = make(map[string]string)
126	for k, v := range config.SessionLabels {
127		c.sessionLabels[k] = v
128	}
129
130	// Prepare gRPC channels.
131	if config.NumChannels == 0 {
132		config.NumChannels = numChannels
133	}
134
135	// Default configs for session pool.
136	if config.MaxOpened == 0 {
137		config.MaxOpened = uint64(config.NumChannels * 100)
138	}
139	if config.MaxBurst == 0 {
140		config.MaxBurst = 10
141	}
142
143	// Validate database path.
144	if err := validDatabaseName(database); err != nil {
145		return nil, err
146	}
147
148	ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.NewClient")
149	defer func() { trace.EndSpan(ctx, err) }()
150
151	// Append emulator options if SPANNER_EMULATOR_HOST has been set.
152	if emulatorAddr := os.Getenv("SPANNER_EMULATOR_HOST"); emulatorAddr != "" {
153		emulatorOpts := []option.ClientOption{
154			option.WithEndpoint(emulatorAddr),
155			option.WithGRPCDialOption(grpc.WithInsecure()),
156			option.WithoutAuthentication(),
157		}
158		opts = append(opts, emulatorOpts...)
159	}
160
161	// gRPC options.
162	allOpts := []option.ClientOption{
163		option.WithEndpoint(endpoint),
164		option.WithScopes(Scope),
165		option.WithGRPCDialOption(
166			grpc.WithDefaultCallOptions(
167				grpc.MaxCallSendMsgSize(100<<20),
168				grpc.MaxCallRecvMsgSize(100<<20),
169			),
170		),
171	}
172	allOpts = append(allOpts, opts...)
173
174	// TODO(deklerk): This should be replaced with a balancer with
175	// config.NumChannels connections, instead of config.NumChannels
176	// clients.
177	for i := 0; i < config.NumChannels; i++ {
178		client, err := vkit.NewClient(ctx, allOpts...)
179		if err != nil {
180			return nil, errDial(i, err)
181		}
182		c.clients = append(c.clients, client)
183	}
184
185	// Prepare session pool.
186	// TODO: support more loadbalancing options.
187	config.SessionPoolConfig.getRPCClient = func() (*vkit.Client, error) {
188		return c.rrNext(), nil
189	}
190	config.SessionPoolConfig.sessionLabels = c.sessionLabels
191	sp, err := newSessionPool(database, config.SessionPoolConfig, c.md)
192	if err != nil {
193		c.Close()
194		return nil, err
195	}
196	c.idleSessions = sp
197	return c, nil
198}
199
200// rrNext returns the next available vkit Cloud Spanner RPC client in a
201// round-robin manner.
202func (c *Client) rrNext() *vkit.Client {
203	return c.clients[atomic.AddUint32(&c.rr, 1)%uint32(len(c.clients))]
204}
205
206// Close closes the client.
207func (c *Client) Close() {
208	if c.idleSessions != nil {
209		c.idleSessions.close()
210	}
211	for _, gpc := range c.clients {
212		gpc.Close()
213	}
214}
215
216// Single provides a read-only snapshot transaction optimized for the case
217// where only a single read or query is needed.  This is more efficient than
218// using ReadOnlyTransaction() for a single read or query.
219//
220// Single will use a strong TimestampBound by default. Use
221// ReadOnlyTransaction.WithTimestampBound to specify a different
222// TimestampBound. A non-strong bound can be used to reduce latency, or
223// "time-travel" to prior versions of the database, see the documentation of
224// TimestampBound for details.
225func (c *Client) Single() *ReadOnlyTransaction {
226	t := &ReadOnlyTransaction{singleUse: true, sp: c.idleSessions}
227	t.txReadOnly.txReadEnv = t
228	return t
229}
230
231// ReadOnlyTransaction returns a ReadOnlyTransaction that can be used for
232// multiple reads from the database.  You must call Close() when the
233// ReadOnlyTransaction is no longer needed to release resources on the server.
234//
235// ReadOnlyTransaction will use a strong TimestampBound by default.  Use
236// ReadOnlyTransaction.WithTimestampBound to specify a different
237// TimestampBound.  A non-strong bound can be used to reduce latency, or
238// "time-travel" to prior versions of the database, see the documentation of
239// TimestampBound for details.
240func (c *Client) ReadOnlyTransaction() *ReadOnlyTransaction {
241	t := &ReadOnlyTransaction{
242		singleUse:       false,
243		sp:              c.idleSessions,
244		txReadyOrClosed: make(chan struct{}),
245	}
246	t.txReadOnly.txReadEnv = t
247	return t
248}
249
250// BatchReadOnlyTransaction returns a BatchReadOnlyTransaction that can be used
251// for partitioned reads or queries from a snapshot of the database. This is
252// useful in batch processing pipelines where one wants to divide the work of
253// reading from the database across multiple machines.
254//
255// Note: This transaction does not use the underlying session pool but creates a
256// new session each time, and the session is reused across clients.
257//
258// You should call Close() after the txn is no longer needed on local
259// client, and call Cleanup() when the txn is finished for all clients, to free
260// the session.
261func (c *Client) BatchReadOnlyTransaction(ctx context.Context, tb TimestampBound) (*BatchReadOnlyTransaction, error) {
262	var (
263		tx  transactionID
264		rts time.Time
265		s   *session
266		sh  *sessionHandle
267		err error
268	)
269	defer func() {
270		if err != nil && sh != nil {
271			s.delete(ctx)
272		}
273	}()
274
275	// Create session.
276	sc := c.rrNext()
277	s, err = createSession(ctx, sc, c.database, c.sessionLabels, c.md)
278	if err != nil {
279		return nil, err
280	}
281	sh = &sessionHandle{session: s}
282
283	// Begin transaction.
284	res, err := sh.getClient().BeginTransaction(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.BeginTransactionRequest{
285		Session: sh.getID(),
286		Options: &sppb.TransactionOptions{
287			Mode: &sppb.TransactionOptions_ReadOnly_{
288				ReadOnly: buildTransactionOptionsReadOnly(tb, true),
289			},
290		},
291	})
292	if err != nil {
293		return nil, toSpannerError(err)
294	}
295	tx = res.Id
296	if res.ReadTimestamp != nil {
297		rts = time.Unix(res.ReadTimestamp.Seconds, int64(res.ReadTimestamp.Nanos))
298	}
299
300	t := &BatchReadOnlyTransaction{
301		ReadOnlyTransaction: ReadOnlyTransaction{
302			tx:              tx,
303			txReadyOrClosed: make(chan struct{}),
304			state:           txActive,
305			sh:              sh,
306			rts:             rts,
307		},
308		ID: BatchReadOnlyTransactionID{
309			tid: tx,
310			sid: sh.getID(),
311			rts: rts,
312		},
313	}
314	t.txReadOnly.txReadEnv = t
315	return t, nil
316}
317
318// BatchReadOnlyTransactionFromID reconstruct a BatchReadOnlyTransaction from
319// BatchReadOnlyTransactionID
320func (c *Client) BatchReadOnlyTransactionFromID(tid BatchReadOnlyTransactionID) *BatchReadOnlyTransaction {
321	sc := c.rrNext()
322	s := &session{valid: true, client: sc, id: tid.sid, createTime: time.Now(), md: c.md}
323	sh := &sessionHandle{session: s}
324
325	t := &BatchReadOnlyTransaction{
326		ReadOnlyTransaction: ReadOnlyTransaction{
327			tx:              tid.tid,
328			txReadyOrClosed: make(chan struct{}),
329			state:           txActive,
330			sh:              sh,
331			rts:             tid.rts,
332		},
333		ID: tid,
334	}
335	t.txReadOnly.txReadEnv = t
336	return t
337}
338
339type transactionInProgressKey struct{}
340
341func checkNestedTxn(ctx context.Context) error {
342	if ctx.Value(transactionInProgressKey{}) != nil {
343		return spannerErrorf(codes.FailedPrecondition, "Cloud Spanner does not support nested transactions")
344	}
345	return nil
346}
347
348// ReadWriteTransaction executes a read-write transaction, with retries as
349// necessary.
350//
351// The function f will be called one or more times. It must not maintain
352// any state between calls.
353//
354// If the transaction cannot be committed or if f returns an ABORTED error,
355// ReadWriteTransaction will call f again. It will continue to call f until the
356// transaction can be committed or the Context times out or is cancelled.  If f
357// returns an error other than ABORTED, ReadWriteTransaction will abort the
358// transaction and return the error.
359//
360// To limit the number of retries, set a deadline on the Context rather than
361// using a fixed limit on the number of attempts. ReadWriteTransaction will
362// retry as needed until that deadline is met.
363//
364// See https://godoc.org/cloud.google.com/go/spanner#ReadWriteTransaction for
365// more details.
366func (c *Client) ReadWriteTransaction(ctx context.Context, f func(context.Context, *ReadWriteTransaction) error) (commitTimestamp time.Time, err error) {
367	ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.ReadWriteTransaction")
368	defer func() { trace.EndSpan(ctx, err) }()
369	if err := checkNestedTxn(ctx); err != nil {
370		return time.Time{}, err
371	}
372	var (
373		ts time.Time
374		sh *sessionHandle
375	)
376	err = runWithRetryOnAborted(ctx, func(ctx context.Context) error {
377		var (
378			err error
379			t   *ReadWriteTransaction
380		)
381		if sh == nil || sh.getID() == "" || sh.getClient() == nil {
382			// Session handle hasn't been allocated or has been destroyed.
383			sh, err = c.idleSessions.takeWriteSession(ctx)
384			if err != nil {
385				// If session retrieval fails, just fail the transaction.
386				return err
387			}
388			t = &ReadWriteTransaction{
389				sh: sh,
390				tx: sh.getTransactionID(),
391			}
392		} else {
393			t = &ReadWriteTransaction{
394				sh: sh,
395			}
396		}
397		t.txReadOnly.txReadEnv = t
398		trace.TracePrintf(ctx, map[string]interface{}{"transactionID": string(sh.getTransactionID())},
399			"Starting transaction attempt")
400		if err = t.begin(ctx); err != nil {
401			return err
402		}
403		ts, err = t.runInTransaction(ctx, f)
404		return err
405	})
406	if sh != nil {
407		sh.recycle()
408	}
409	return ts, err
410}
411
412// applyOption controls the behavior of Client.Apply.
413type applyOption struct {
414	// If atLeastOnce == true, Client.Apply will execute the mutations on Cloud
415	// Spanner at least once.
416	atLeastOnce bool
417}
418
419// An ApplyOption is an optional argument to Apply.
420type ApplyOption func(*applyOption)
421
422// ApplyAtLeastOnce returns an ApplyOption that removes replay protection.
423//
424// With this option, Apply may attempt to apply mutations more than once; if
425// the mutations are not idempotent, this may lead to a failure being reported
426// when the mutation was applied more than once. For example, an insert may
427// fail with ALREADY_EXISTS even though the row did not exist before Apply was
428// called. For this reason, most users of the library will prefer not to use
429// this option.  However, ApplyAtLeastOnce requires only a single RPC, whereas
430// Apply's default replay protection may require an additional RPC.  So this
431// option may be appropriate for latency sensitive and/or high throughput blind
432// writing.
433func ApplyAtLeastOnce() ApplyOption {
434	return func(ao *applyOption) {
435		ao.atLeastOnce = true
436	}
437}
438
439// Apply applies a list of mutations atomically to the database.
440func (c *Client) Apply(ctx context.Context, ms []*Mutation, opts ...ApplyOption) (commitTimestamp time.Time, err error) {
441	ao := &applyOption{}
442	for _, opt := range opts {
443		opt(ao)
444	}
445	if !ao.atLeastOnce {
446		return c.ReadWriteTransaction(ctx, func(ctx context.Context, t *ReadWriteTransaction) error {
447			return t.BufferWrite(ms)
448		})
449	}
450
451	ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.Apply")
452	defer func() { trace.EndSpan(ctx, err) }()
453	t := &writeOnlyTransaction{c.idleSessions}
454	return t.applyAtLeastOnce(ctx, ms...)
455}
456