1/*
2Copyright 2017 Google LLC
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package spanner
18
19import (
20	"context"
21	"errors"
22	"fmt"
23	"reflect"
24	"strings"
25	"sync"
26	"testing"
27	"time"
28
29	. "cloud.google.com/go/spanner/internal/testutil"
30	"github.com/golang/protobuf/ptypes"
31	"github.com/google/go-cmp/cmp"
32	"google.golang.org/genproto/googleapis/rpc/errdetails"
33	sppb "google.golang.org/genproto/googleapis/spanner/v1"
34	"google.golang.org/grpc/codes"
35	gstatus "google.golang.org/grpc/status"
36)
37
38// Single can only be used once.
39func TestSingle(t *testing.T) {
40	t.Parallel()
41	ctx := context.Background()
42	server, client, teardown := setupMockedTestServer(t)
43	defer teardown()
44
45	txn := client.Single()
46	defer txn.Close()
47	_, _, e := txn.acquire(ctx)
48	if e != nil {
49		t.Fatalf("Acquire for single use, got %v, want nil.", e)
50	}
51	_, _, e = txn.acquire(ctx)
52	if wantErr := errTxClosed(); !testEqual(e, wantErr) {
53		t.Fatalf("Second acquire for single use, got %v, want %v.", e, wantErr)
54	}
55
56	// Only one CreateSessionRequest is sent.
57	if _, err := shouldHaveReceived(server.TestSpanner, []interface{}{&sppb.CreateSessionRequest{}}); err != nil {
58		t.Fatal(err)
59	}
60}
61
62// Re-using ReadOnlyTransaction: can recover from acquire failure.
63func TestReadOnlyTransaction_RecoverFromFailure(t *testing.T) {
64	t.Parallel()
65	ctx := context.Background()
66	server, client, teardown := setupMockedTestServer(t)
67	defer teardown()
68
69	txn := client.ReadOnlyTransaction()
70	defer txn.Close()
71
72	// First request will fail.
73	errUsr := gstatus.Error(codes.Unknown, "error")
74	server.TestSpanner.PutExecutionTime(MethodBeginTransaction,
75		SimulatedExecutionTime{
76			Errors: []error{errUsr},
77		})
78
79	_, _, e := txn.acquire(ctx)
80	if wantErr := toSpannerError(errUsr); !testEqual(e, wantErr) {
81		t.Fatalf("Acquire for multi use, got %v, want %v.", e, wantErr)
82	}
83	_, _, e = txn.acquire(ctx)
84	if e != nil {
85		t.Fatalf("Acquire for multi use, got %v, want nil.", e)
86	}
87}
88
89// ReadOnlyTransaction: can not be used after close.
90func TestReadOnlyTransaction_UseAfterClose(t *testing.T) {
91	t.Parallel()
92	ctx := context.Background()
93	_, client, teardown := setupMockedTestServer(t)
94	defer teardown()
95
96	txn := client.ReadOnlyTransaction()
97	txn.Close()
98
99	_, _, e := txn.acquire(ctx)
100	if wantErr := errTxClosed(); !testEqual(e, wantErr) {
101		t.Fatalf("Second acquire for multi use, got %v, want %v.", e, wantErr)
102	}
103}
104
105// ReadOnlyTransaction: can be acquired concurrently.
106func TestReadOnlyTransaction_Concurrent(t *testing.T) {
107	t.Parallel()
108	ctx := context.Background()
109	server, client, teardown := setupMockedTestServer(t)
110	defer teardown()
111	txn := client.ReadOnlyTransaction()
112	defer txn.Close()
113
114	server.TestSpanner.Freeze()
115	var (
116		sh1 *sessionHandle
117		sh2 *sessionHandle
118		ts1 *sppb.TransactionSelector
119		ts2 *sppb.TransactionSelector
120		wg  = sync.WaitGroup{}
121	)
122	acquire := func(sh **sessionHandle, ts **sppb.TransactionSelector) {
123		defer wg.Done()
124		var e error
125		*sh, *ts, e = txn.acquire(ctx)
126		if e != nil {
127			t.Errorf("Concurrent acquire for multiuse, got %v, expect nil.", e)
128		}
129	}
130	wg.Add(2)
131	go acquire(&sh1, &ts1)
132	go acquire(&sh2, &ts2)
133
134	// TODO(deklerk): Get rid of this.
135	<-time.After(100 * time.Millisecond)
136
137	server.TestSpanner.Unfreeze()
138	wg.Wait()
139	if sh1.session.id != sh2.session.id {
140		t.Fatalf("Expected acquire to get same session handle, got %v and %v.", sh1, sh2)
141	}
142	if !testEqual(ts1, ts2) {
143		t.Fatalf("Expected acquire to get same transaction selector, got %v and %v.", ts1, ts2)
144	}
145}
146
147func TestApply_Single(t *testing.T) {
148	t.Parallel()
149	ctx := context.Background()
150	server, client, teardown := setupMockedTestServer(t)
151	defer teardown()
152
153	ms := []*Mutation{
154		Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(1), "Foo", int64(50)}),
155		Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(2), "Bar", int64(1)}),
156	}
157	if _, e := client.Apply(ctx, ms, ApplyAtLeastOnce()); e != nil {
158		t.Fatalf("applyAtLeastOnce retry on abort, got %v, want nil.", e)
159	}
160
161	if _, err := shouldHaveReceived(server.TestSpanner, []interface{}{
162		&sppb.CreateSessionRequest{},
163		&sppb.CommitRequest{},
164	}); err != nil {
165		t.Fatal(err)
166	}
167}
168
169// Transaction retries on abort.
170func TestApply_RetryOnAbort(t *testing.T) {
171	ctx := context.Background()
172	t.Parallel()
173	server, client, teardown := setupMockedTestServer(t)
174	defer teardown()
175
176	// First commit will fail, and the retry will begin a new transaction.
177	server.TestSpanner.PutExecutionTime(MethodCommitTransaction,
178		SimulatedExecutionTime{
179			Errors: []error{newAbortedErrorWithMinimalRetryDelay()},
180		})
181
182	ms := []*Mutation{
183		Insert("Accounts", []string{"AccountId"}, []interface{}{int64(1)}),
184	}
185
186	if _, e := client.Apply(ctx, ms); e != nil {
187		t.Fatalf("ReadWriteTransaction retry on abort, got %v, want nil.", e)
188	}
189
190	if _, err := shouldHaveReceived(server.TestSpanner, []interface{}{
191		&sppb.CreateSessionRequest{},
192		&sppb.BeginTransactionRequest{},
193		&sppb.CommitRequest{}, // First commit fails.
194		&sppb.BeginTransactionRequest{},
195		&sppb.CommitRequest{}, // Second commit succeeds.
196	}); err != nil {
197		t.Fatal(err)
198	}
199}
200
201// Tests that SessionNotFound errors are retried.
202func TestTransaction_SessionNotFound(t *testing.T) {
203	t.Parallel()
204	ctx := context.Background()
205	server, client, teardown := setupMockedTestServer(t)
206	defer teardown()
207
208	serverErr := newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")
209	server.TestSpanner.PutExecutionTime(MethodBeginTransaction,
210		SimulatedExecutionTime{
211			Errors: []error{serverErr, serverErr, serverErr},
212		})
213	server.TestSpanner.PutExecutionTime(MethodCommitTransaction,
214		SimulatedExecutionTime{
215			Errors: []error{serverErr},
216		})
217
218	txn := client.ReadOnlyTransaction()
219	defer txn.Close()
220
221	var wantErr error
222	if _, _, got := txn.acquire(ctx); !testEqual(wantErr, got) {
223		t.Fatalf("Expect acquire to succeed, got %v, want %v.", got, wantErr)
224	}
225
226	// The server error should lead to a retry of the BeginTransaction call and
227	// a valid session handle to be returned that will be used by the following
228	// requests. Note that calling txn.Query(...) does not actually send the
229	// query to the (mock) server. That is done at the first call to
230	// RowIterator.Next. The following statement only verifies that the
231	// transaction is in a valid state and received a valid session handle.
232	if got := txn.Query(ctx, NewStatement("SELECT 1")); !testEqual(wantErr, got.err) {
233		t.Fatalf("Expect Query to succeed, got %v, want %v.", got.err, wantErr)
234	}
235
236	if got := txn.Read(ctx, "Users", KeySets(Key{"alice"}, Key{"bob"}), []string{"name", "email"}); !testEqual(wantErr, got.err) {
237		t.Fatalf("Expect Read to succeed, got %v, want %v.", got.err, wantErr)
238	}
239
240	wantErr = toSpannerError(newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s"))
241	ms := []*Mutation{
242		Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(1), "Foo", int64(50)}),
243		Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(2), "Bar", int64(1)}),
244	}
245	_, got := client.Apply(ctx, ms, ApplyAtLeastOnce())
246	if !cmp.Equal(wantErr, got,
247		cmp.AllowUnexported(Error{}), cmp.FilterPath(func(path cmp.Path) bool {
248			// Ignore statusError Details and Error.trailers.
249			if strings.Contains(path.GoString(), "{*spanner.Error}.err.(*status.statusError).Details") {
250				return true
251			}
252			if strings.Contains(path.GoString(), "{*spanner.Error}.trailers") {
253				return true
254			}
255			return false
256		}, cmp.Ignore())) {
257		t.Fatalf("Expect Apply to fail\nGot:  %v\nWant: %v\n", got, wantErr)
258	}
259}
260
261// When an error is returned from the closure sent into ReadWriteTransaction, it
262// kicks off a rollback.
263func TestReadWriteTransaction_ErrorReturned(t *testing.T) {
264	t.Parallel()
265	ctx := context.Background()
266	server, client, teardown := setupMockedTestServer(t)
267	defer teardown()
268
269	want := errors.New("an error")
270	_, got := client.ReadWriteTransaction(ctx, func(context.Context, *ReadWriteTransaction) error {
271		return want
272	})
273	if got != want {
274		t.Fatalf("got %+v, want %+v", got, want)
275	}
276	requests := drainRequestsFromServer(server.TestSpanner)
277	if err := compareRequests([]interface{}{
278		&sppb.CreateSessionRequest{},
279		&sppb.BeginTransactionRequest{},
280		&sppb.RollbackRequest{}}, requests); err != nil {
281		// TODO: remove this once the session pool maintainer has been changed
282		// so that is doesn't delete sessions already during the first
283		// maintenance window.
284		// If we failed to get 3, it might have because - due to timing - we got
285		// a fourth request. If this request is DeleteSession, that's OK and
286		// expected.
287		if err := compareRequests([]interface{}{
288			&sppb.CreateSessionRequest{},
289			&sppb.BeginTransactionRequest{},
290			&sppb.RollbackRequest{},
291			&sppb.DeleteSessionRequest{}}, requests); err != nil {
292			t.Fatal(err)
293		}
294	}
295}
296
297func TestBatchDML_WithMultipleDML(t *testing.T) {
298	t.Parallel()
299	ctx := context.Background()
300	server, client, teardown := setupMockedTestServer(t)
301	defer teardown()
302
303	_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) (err error) {
304		if _, err = tx.Update(ctx, Statement{SQL: UpdateBarSetFoo}); err != nil {
305			return err
306		}
307		if _, err = tx.BatchUpdate(ctx, []Statement{{SQL: UpdateBarSetFoo}, {SQL: UpdateBarSetFoo}}); err != nil {
308			return err
309		}
310		if _, err = tx.Update(ctx, Statement{SQL: UpdateBarSetFoo}); err != nil {
311			return err
312		}
313		_, err = tx.BatchUpdate(ctx, []Statement{{SQL: UpdateBarSetFoo}})
314		return err
315	})
316	if err != nil {
317		t.Fatal(err)
318	}
319
320	gotReqs, err := shouldHaveReceived(server.TestSpanner, []interface{}{
321		&sppb.CreateSessionRequest{},
322		&sppb.BeginTransactionRequest{},
323		&sppb.ExecuteSqlRequest{},
324		&sppb.ExecuteBatchDmlRequest{},
325		&sppb.ExecuteSqlRequest{},
326		&sppb.ExecuteBatchDmlRequest{},
327		&sppb.CommitRequest{},
328	})
329	if err != nil {
330		t.Fatal(err)
331	}
332
333	if got, want := gotReqs[2].(*sppb.ExecuteSqlRequest).Seqno, int64(1); got != want {
334		t.Errorf("got %d, want %d", got, want)
335	}
336	if got, want := gotReqs[3].(*sppb.ExecuteBatchDmlRequest).Seqno, int64(2); got != want {
337		t.Errorf("got %d, want %d", got, want)
338	}
339	if got, want := gotReqs[4].(*sppb.ExecuteSqlRequest).Seqno, int64(3); got != want {
340		t.Errorf("got %d, want %d", got, want)
341	}
342	if got, want := gotReqs[5].(*sppb.ExecuteBatchDmlRequest).Seqno, int64(4); got != want {
343		t.Errorf("got %d, want %d", got, want)
344	}
345}
346
347// shouldHaveReceived asserts that exactly expectedRequests were present in
348// the server's ReceivedRequests channel. It only looks at type, not contents.
349//
350// Note: this in-place modifies serverClientMock by popping items off the
351// ReceivedRequests channel.
352func shouldHaveReceived(server InMemSpannerServer, want []interface{}) ([]interface{}, error) {
353	got := drainRequestsFromServer(server)
354	return got, compareRequests(want, got)
355}
356
357// Compares expected requests (want) with actual requests (got).
358func compareRequests(want []interface{}, got []interface{}) error {
359	if len(got) != len(want) {
360		var gotMsg string
361		for _, r := range got {
362			gotMsg += fmt.Sprintf("%v: %+v]\n", reflect.TypeOf(r), r)
363		}
364
365		var wantMsg string
366		for _, r := range want {
367			wantMsg += fmt.Sprintf("%v: %+v]\n", reflect.TypeOf(r), r)
368		}
369
370		return fmt.Errorf("got %d requests, want %d requests:\ngot:\n%s\nwant:\n%s", len(got), len(want), gotMsg, wantMsg)
371	}
372
373	for i, want := range want {
374		if reflect.TypeOf(got[i]) != reflect.TypeOf(want) {
375			return fmt.Errorf("request %d: got %+v, want %+v", i, reflect.TypeOf(got[i]), reflect.TypeOf(want))
376		}
377	}
378	return nil
379}
380
381func drainRequestsFromServer(server InMemSpannerServer) []interface{} {
382	var reqs []interface{}
383loop:
384	for {
385		select {
386		case req := <-server.ReceivedRequests():
387			reqs = append(reqs, req)
388		default:
389			break loop
390		}
391	}
392	return reqs
393}
394
395func newAbortedErrorWithMinimalRetryDelay() error {
396	st := gstatus.New(codes.Aborted, "Transaction has been aborted")
397	retry := &errdetails.RetryInfo{
398		RetryDelay: ptypes.DurationProto(time.Nanosecond),
399	}
400	st, _ = st.WithDetails(retry)
401	return st.Err()
402}
403