1/*
2Copyright 2017 Google LLC
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package spanner
18
19import (
20	"context"
21	"errors"
22	"fmt"
23	"reflect"
24	"strings"
25	"sync"
26	"testing"
27	"time"
28
29	. "cloud.google.com/go/spanner/internal/testutil"
30	"github.com/golang/protobuf/ptypes"
31	"google.golang.org/api/iterator"
32	"google.golang.org/genproto/googleapis/rpc/errdetails"
33	sppb "google.golang.org/genproto/googleapis/spanner/v1"
34	"google.golang.org/grpc/codes"
35	"google.golang.org/grpc/status"
36	gstatus "google.golang.org/grpc/status"
37)
38
39// Single can only be used once.
40func TestSingle(t *testing.T) {
41	t.Parallel()
42	ctx := context.Background()
43	server, client, teardown := setupMockedTestServer(t)
44	defer teardown()
45
46	txn := client.Single()
47	defer txn.Close()
48	_, _, e := txn.acquire(ctx)
49	if e != nil {
50		t.Fatalf("Acquire for single use, got %v, want nil.", e)
51	}
52	_, _, e = txn.acquire(ctx)
53	if wantErr := errTxClosed(); !testEqual(e, wantErr) {
54		t.Fatalf("Second acquire for single use, got %v, want %v.", e, wantErr)
55	}
56
57	// Only one BatchCreateSessionsRequest is sent.
58	if _, err := shouldHaveReceived(server.TestSpanner, []interface{}{&sppb.BatchCreateSessionsRequest{}}); err != nil {
59		t.Fatal(err)
60	}
61}
62
63// Re-using ReadOnlyTransaction: can recover from acquire failure.
64func TestReadOnlyTransaction_RecoverFromFailure(t *testing.T) {
65	t.Parallel()
66	ctx := context.Background()
67	server, client, teardown := setupMockedTestServer(t)
68	defer teardown()
69
70	txn := client.ReadOnlyTransaction()
71	defer txn.Close()
72
73	// First request will fail.
74	errUsr := gstatus.Error(codes.Unknown, "error")
75	server.TestSpanner.PutExecutionTime(MethodBeginTransaction,
76		SimulatedExecutionTime{
77			Errors: []error{errUsr},
78		})
79
80	_, _, e := txn.acquire(ctx)
81	if wantErr := toSpannerError(errUsr); !testEqual(e, wantErr) {
82		t.Fatalf("Acquire for multi use, got %v, want %v.", e, wantErr)
83	}
84	_, _, e = txn.acquire(ctx)
85	if e != nil {
86		t.Fatalf("Acquire for multi use, got %v, want nil.", e)
87	}
88}
89
90// ReadOnlyTransaction: can not be used after close.
91func TestReadOnlyTransaction_UseAfterClose(t *testing.T) {
92	t.Parallel()
93	ctx := context.Background()
94	_, client, teardown := setupMockedTestServer(t)
95	defer teardown()
96
97	txn := client.ReadOnlyTransaction()
98	txn.Close()
99
100	_, _, e := txn.acquire(ctx)
101	if wantErr := errTxClosed(); !testEqual(e, wantErr) {
102		t.Fatalf("Second acquire for multi use, got %v, want %v.", e, wantErr)
103	}
104}
105
106// ReadOnlyTransaction: can be acquired concurrently.
107func TestReadOnlyTransaction_Concurrent(t *testing.T) {
108	t.Parallel()
109	ctx := context.Background()
110	server, client, teardown := setupMockedTestServer(t)
111	defer teardown()
112	txn := client.ReadOnlyTransaction()
113	defer txn.Close()
114
115	server.TestSpanner.Freeze()
116	var (
117		sh1 *sessionHandle
118		sh2 *sessionHandle
119		ts1 *sppb.TransactionSelector
120		ts2 *sppb.TransactionSelector
121		wg  = sync.WaitGroup{}
122	)
123	acquire := func(sh **sessionHandle, ts **sppb.TransactionSelector) {
124		defer wg.Done()
125		var e error
126		*sh, *ts, e = txn.acquire(ctx)
127		if e != nil {
128			t.Errorf("Concurrent acquire for multiuse, got %v, expect nil.", e)
129		}
130	}
131	wg.Add(2)
132	go acquire(&sh1, &ts1)
133	go acquire(&sh2, &ts2)
134
135	// TODO(deklerk): Get rid of this.
136	<-time.After(100 * time.Millisecond)
137
138	server.TestSpanner.Unfreeze()
139	wg.Wait()
140	if sh1.session.id != sh2.session.id {
141		t.Fatalf("Expected acquire to get same session handle, got %v and %v.", sh1, sh2)
142	}
143	if !testEqual(ts1, ts2) {
144		t.Fatalf("Expected acquire to get same transaction selector, got %v and %v.", ts1, ts2)
145	}
146}
147
148func TestApply_Single(t *testing.T) {
149	t.Parallel()
150	ctx := context.Background()
151	server, client, teardown := setupMockedTestServer(t)
152	defer teardown()
153
154	ms := []*Mutation{
155		Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(1), "Foo", int64(50)}),
156		Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(2), "Bar", int64(1)}),
157	}
158	if _, e := client.Apply(ctx, ms, ApplyAtLeastOnce()); e != nil {
159		t.Fatalf("applyAtLeastOnce retry on abort, got %v, want nil.", e)
160	}
161
162	if _, err := shouldHaveReceived(server.TestSpanner, []interface{}{
163		&sppb.BatchCreateSessionsRequest{},
164		&sppb.CommitRequest{},
165	}); err != nil {
166		t.Fatal(err)
167	}
168}
169
170// Transaction retries on abort.
171func TestApply_RetryOnAbort(t *testing.T) {
172	ctx := context.Background()
173	t.Parallel()
174	server, client, teardown := setupMockedTestServer(t)
175	defer teardown()
176
177	// First commit will fail, and the retry will begin a new transaction.
178	server.TestSpanner.PutExecutionTime(MethodCommitTransaction,
179		SimulatedExecutionTime{
180			Errors: []error{newAbortedErrorWithMinimalRetryDelay()},
181		})
182
183	ms := []*Mutation{
184		Insert("Accounts", []string{"AccountId"}, []interface{}{int64(1)}),
185	}
186
187	if _, e := client.Apply(ctx, ms); e != nil {
188		t.Fatalf("ReadWriteTransaction retry on abort, got %v, want nil.", e)
189	}
190
191	if _, err := shouldHaveReceived(server.TestSpanner, []interface{}{
192		&sppb.BatchCreateSessionsRequest{},
193		&sppb.BeginTransactionRequest{},
194		&sppb.CommitRequest{}, // First commit fails.
195		&sppb.BeginTransactionRequest{},
196		&sppb.CommitRequest{}, // Second commit succeeds.
197	}); err != nil {
198		t.Fatal(err)
199	}
200}
201
202// Tests that SessionNotFound errors are retried.
203func TestTransaction_SessionNotFound(t *testing.T) {
204	t.Parallel()
205	ctx := context.Background()
206	server, client, teardown := setupMockedTestServer(t)
207	defer teardown()
208
209	serverErr := newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")
210	server.TestSpanner.PutExecutionTime(MethodBeginTransaction,
211		SimulatedExecutionTime{
212			Errors: []error{serverErr, serverErr, serverErr},
213		})
214	server.TestSpanner.PutExecutionTime(MethodCommitTransaction,
215		SimulatedExecutionTime{
216			Errors: []error{serverErr},
217		})
218
219	txn := client.ReadOnlyTransaction()
220	defer txn.Close()
221
222	var wantErr error
223	if _, _, got := txn.acquire(ctx); !testEqual(wantErr, got) {
224		t.Fatalf("Expect acquire to succeed, got %v, want %v.", got, wantErr)
225	}
226
227	// The server error should lead to a retry of the BeginTransaction call and
228	// a valid session handle to be returned that will be used by the following
229	// requests. Note that calling txn.Query(...) does not actually send the
230	// query to the (mock) server. That is done at the first call to
231	// RowIterator.Next. The following statement only verifies that the
232	// transaction is in a valid state and received a valid session handle.
233	if got := txn.Query(ctx, NewStatement("SELECT 1")); !testEqual(wantErr, got.err) {
234		t.Fatalf("Expect Query to succeed, got %v, want %v.", got.err, wantErr)
235	}
236
237	if got := txn.Read(ctx, "Users", KeySets(Key{"alice"}, Key{"bob"}), []string{"name", "email"}); !testEqual(wantErr, got.err) {
238		t.Fatalf("Expect Read to succeed, got %v, want %v.", got.err, wantErr)
239	}
240
241	wantErr = toSpannerError(newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s"))
242	ms := []*Mutation{
243		Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(1), "Foo", int64(50)}),
244		Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(2), "Bar", int64(1)}),
245	}
246	_, got := client.Apply(ctx, ms, ApplyAtLeastOnce())
247	if !testEqual(wantErr, got) {
248		t.Fatalf("Expect Apply to fail\nGot:  %v\nWant: %v\n", got, wantErr)
249	}
250}
251
252// When an error is returned from the closure sent into ReadWriteTransaction, it
253// kicks off a rollback.
254func TestReadWriteTransaction_ErrorReturned(t *testing.T) {
255	t.Parallel()
256	ctx := context.Background()
257	server, client, teardown := setupMockedTestServer(t)
258	defer teardown()
259
260	want := errors.New("an error")
261	_, got := client.ReadWriteTransaction(ctx, func(context.Context, *ReadWriteTransaction) error {
262		return want
263	})
264	if got != want {
265		t.Fatalf("got %+v, want %+v", got, want)
266	}
267	requests := drainRequestsFromServer(server.TestSpanner)
268	if err := compareRequests([]interface{}{
269		&sppb.BatchCreateSessionsRequest{},
270		&sppb.BeginTransactionRequest{},
271		&sppb.RollbackRequest{}}, requests); err != nil {
272		// TODO: remove this once the session pool maintainer has been changed
273		// so that is doesn't delete sessions already during the first
274		// maintenance window.
275		// If we failed to get 3, it might have because - due to timing - we got
276		// a fourth request. If this request is DeleteSession, that's OK and
277		// expected.
278		if err := compareRequests([]interface{}{
279			&sppb.BatchCreateSessionsRequest{},
280			&sppb.BeginTransactionRequest{},
281			&sppb.RollbackRequest{},
282			&sppb.DeleteSessionRequest{}}, requests); err != nil {
283			t.Fatal(err)
284		}
285	}
286}
287
288func TestBatchDML_WithMultipleDML(t *testing.T) {
289	t.Parallel()
290	ctx := context.Background()
291	server, client, teardown := setupMockedTestServer(t)
292	defer teardown()
293
294	_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) (err error) {
295		if _, err = tx.Update(ctx, Statement{SQL: UpdateBarSetFoo}); err != nil {
296			return err
297		}
298		if _, err = tx.BatchUpdate(ctx, []Statement{{SQL: UpdateBarSetFoo}, {SQL: UpdateBarSetFoo}}); err != nil {
299			return err
300		}
301		if _, err = tx.Update(ctx, Statement{SQL: UpdateBarSetFoo}); err != nil {
302			return err
303		}
304		_, err = tx.BatchUpdate(ctx, []Statement{{SQL: UpdateBarSetFoo}})
305		return err
306	})
307	if err != nil {
308		t.Fatal(err)
309	}
310
311	gotReqs, err := shouldHaveReceived(server.TestSpanner, []interface{}{
312		&sppb.BatchCreateSessionsRequest{},
313		&sppb.BeginTransactionRequest{},
314		&sppb.ExecuteSqlRequest{},
315		&sppb.ExecuteBatchDmlRequest{},
316		&sppb.ExecuteSqlRequest{},
317		&sppb.ExecuteBatchDmlRequest{},
318		&sppb.CommitRequest{},
319	})
320	if err != nil {
321		t.Fatal(err)
322	}
323
324	if got, want := gotReqs[2].(*sppb.ExecuteSqlRequest).Seqno, int64(1); got != want {
325		t.Errorf("got %d, want %d", got, want)
326	}
327	if got, want := gotReqs[3].(*sppb.ExecuteBatchDmlRequest).Seqno, int64(2); got != want {
328		t.Errorf("got %d, want %d", got, want)
329	}
330	if got, want := gotReqs[4].(*sppb.ExecuteSqlRequest).Seqno, int64(3); got != want {
331		t.Errorf("got %d, want %d", got, want)
332	}
333	if got, want := gotReqs[5].(*sppb.ExecuteBatchDmlRequest).Seqno, int64(4); got != want {
334		t.Errorf("got %d, want %d", got, want)
335	}
336}
337
338// When an Aborted error happens during a commit, it does not kick off a
339// rollback.
340func TestReadWriteStmtBasedTransaction_CommitAbortedErrorReturned(t *testing.T) {
341	t.Parallel()
342	ctx := context.Background()
343	server, client, teardown := setupMockedTestServer(t)
344	defer teardown()
345	server.TestSpanner.PutExecutionTime(MethodCommitTransaction,
346		SimulatedExecutionTime{
347			Errors: []error{status.Errorf(codes.Aborted, "Transaction aborted")},
348		})
349
350	txn, err := NewReadWriteStmtBasedTransaction(ctx, client)
351	if err != nil {
352		t.Fatalf("got an error: %v", err)
353	}
354	_, err = txn.Commit(ctx)
355	if status.Code(err) != codes.Aborted || !strings.Contains(err.Error(), "Transaction aborted") {
356		t.Fatalf("got an incorrect error: %v", err)
357	}
358
359	requests := drainRequestsFromServer(server.TestSpanner)
360	if err := compareRequests([]interface{}{
361		&sppb.BatchCreateSessionsRequest{},
362		&sppb.BeginTransactionRequest{},
363		&sppb.CommitRequest{}}, requests); err != nil {
364		// TODO: remove this once the session pool maintainer has been changed
365		// so that is doesn't delete sessions already during the first
366		// maintenance window.
367		// If we failed to get 4, it might have because - due to timing - we got
368		// a fourth request. If this request is DeleteSession, that's OK and
369		// expected.
370		if err := compareRequests([]interface{}{
371			&sppb.BatchCreateSessionsRequest{},
372			&sppb.BeginTransactionRequest{},
373			&sppb.CommitRequest{},
374			&sppb.DeleteSessionRequest{}}, requests); err != nil {
375			t.Fatal(err)
376		}
377	}
378}
379
380// When a non-aborted error happens during a commit, it kicks off a rollback.
381func TestReadWriteStmtBasedTransaction_CommitNonAbortedErrorReturned(t *testing.T) {
382	t.Parallel()
383	ctx := context.Background()
384	server, client, teardown := setupMockedTestServer(t)
385	defer teardown()
386	server.TestSpanner.PutExecutionTime(MethodCommitTransaction,
387		SimulatedExecutionTime{
388			Errors: []error{status.Errorf(codes.NotFound, "Session not found")},
389		})
390
391	txn, err := NewReadWriteStmtBasedTransaction(ctx, client)
392	if err != nil {
393		t.Fatalf("got an error: %v", err)
394	}
395	_, err = txn.Commit(ctx)
396	if status.Code(err) != codes.NotFound || !strings.Contains(err.Error(), "Session not found") {
397		t.Fatalf("got an incorrect error: %v", err)
398	}
399
400	requests := drainRequestsFromServer(server.TestSpanner)
401	if err := compareRequests([]interface{}{
402		&sppb.BatchCreateSessionsRequest{},
403		&sppb.BeginTransactionRequest{},
404		&sppb.CommitRequest{},
405		&sppb.RollbackRequest{}}, requests); err != nil {
406		// TODO: remove this once the session pool maintainer has been changed
407		// so that is doesn't delete sessions already during the first
408		// maintenance window.
409		// If we failed to get 4, it might have because - due to timing - we got
410		// a fourth request. If this request is DeleteSession, that's OK and
411		// expected.
412		if err := compareRequests([]interface{}{
413			&sppb.BatchCreateSessionsRequest{},
414			&sppb.BeginTransactionRequest{},
415			&sppb.CommitRequest{},
416			&sppb.RollbackRequest{},
417			&sppb.DeleteSessionRequest{}}, requests); err != nil {
418			t.Fatal(err)
419		}
420	}
421}
422
423func TestReadWriteStmtBasedTransaction(t *testing.T) {
424	t.Parallel()
425
426	rowCount, attempts, err := testReadWriteStmtBasedTransaction(t, make(map[string]SimulatedExecutionTime))
427	if err != nil {
428		t.Fatalf("transaction failed to commit: %v", err)
429	}
430	if rowCount != SelectSingerIDAlbumIDAlbumTitleFromAlbumsRowCount {
431		t.Fatalf("Row count mismatch, got %v, expected %v", rowCount, SelectSingerIDAlbumIDAlbumTitleFromAlbumsRowCount)
432	}
433	if g, w := attempts, 1; g != w {
434		t.Fatalf("number of attempts mismatch:\nGot%d\nWant:%d", g, w)
435	}
436}
437
438func TestReadWriteStmtBasedTransaction_CommitAborted(t *testing.T) {
439	t.Parallel()
440	rowCount, attempts, err := testReadWriteStmtBasedTransaction(t, map[string]SimulatedExecutionTime{
441		MethodCommitTransaction: {Errors: []error{status.Error(codes.Aborted, "Transaction aborted")}},
442	})
443	if err != nil {
444		t.Fatalf("transaction failed to commit: %v", err)
445	}
446	if rowCount != SelectSingerIDAlbumIDAlbumTitleFromAlbumsRowCount {
447		t.Fatalf("Row count mismatch, got %v, expected %v", rowCount, SelectSingerIDAlbumIDAlbumTitleFromAlbumsRowCount)
448	}
449	if g, w := attempts, 2; g != w {
450		t.Fatalf("number of attempts mismatch:\nGot%d\nWant:%d", g, w)
451	}
452}
453
454func testReadWriteStmtBasedTransaction(t *testing.T, executionTimes map[string]SimulatedExecutionTime) (rowCount int64, attempts int, err error) {
455	server, client, teardown := setupMockedTestServer(t)
456	defer teardown()
457	for method, exec := range executionTimes {
458		server.TestSpanner.PutExecutionTime(method, exec)
459	}
460	ctx := context.Background()
461
462	f := func(tx *ReadWriteStmtBasedTransaction) (int64, error) {
463		iter := tx.Query(ctx, NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums))
464		defer iter.Stop()
465		rowCount := int64(0)
466		for {
467			row, err := iter.Next()
468			if err == iterator.Done {
469				break
470			}
471			if err != nil {
472				return 0, err
473			}
474			var singerID, albumID int64
475			var albumTitle string
476			if err := row.Columns(&singerID, &albumID, &albumTitle); err != nil {
477				return 0, err
478			}
479			rowCount++
480		}
481		return rowCount, nil
482	}
483
484	for {
485		attempts++
486		tx, err := NewReadWriteStmtBasedTransaction(ctx, client)
487		if err != nil {
488			return 0, attempts, fmt.Errorf("failed to begin a transaction: %v", err)
489		}
490		rowCount, err = f(tx)
491		if err != nil && status.Code(err) != codes.Aborted {
492			tx.Rollback(ctx)
493			return 0, attempts, err
494		} else if err == nil {
495			_, err = tx.Commit(ctx)
496			if err == nil {
497				break
498			} else if status.Code(err) != codes.Aborted {
499				return 0, attempts, err
500			}
501		}
502		// Set a default sleep time if the server delay is absent.
503		delay := 10 * time.Millisecond
504		if serverDelay, hasServerDelay := ExtractRetryDelay(err); hasServerDelay {
505			delay = serverDelay
506		}
507		time.Sleep(delay)
508	}
509
510	return rowCount, attempts, err
511}
512
513func TestBatchDML_StatementBased_WithMultipleDML(t *testing.T) {
514	t.Parallel()
515	ctx := context.Background()
516	server, client, teardown := setupMockedTestServer(t)
517	defer teardown()
518
519	tx, err := NewReadWriteStmtBasedTransaction(ctx, client)
520	if _, err = tx.Update(ctx, Statement{SQL: UpdateBarSetFoo}); err != nil {
521		tx.Rollback(ctx)
522		t.Fatal(err)
523	}
524	if _, err = tx.BatchUpdate(ctx, []Statement{{SQL: UpdateBarSetFoo}, {SQL: UpdateBarSetFoo}}); err != nil {
525		tx.Rollback(ctx)
526		t.Fatal(err)
527	}
528	if _, err = tx.Update(ctx, Statement{SQL: UpdateBarSetFoo}); err != nil {
529		tx.Rollback(ctx)
530		t.Fatal(err)
531	}
532	if _, err = tx.BatchUpdate(ctx, []Statement{{SQL: UpdateBarSetFoo}}); err != nil {
533		tx.Rollback(ctx)
534		t.Fatal(err)
535	}
536	_, err = tx.Commit(ctx)
537	if err != nil {
538		t.Fatal(err)
539	}
540
541	gotReqs, err := shouldHaveReceived(server.TestSpanner, []interface{}{
542		&sppb.BatchCreateSessionsRequest{},
543		&sppb.BeginTransactionRequest{},
544		&sppb.ExecuteSqlRequest{},
545		&sppb.ExecuteBatchDmlRequest{},
546		&sppb.ExecuteSqlRequest{},
547		&sppb.ExecuteBatchDmlRequest{},
548		&sppb.CommitRequest{},
549	})
550	if err != nil {
551		t.Fatal(err)
552	}
553
554	if got, want := gotReqs[2].(*sppb.ExecuteSqlRequest).Seqno, int64(1); got != want {
555		t.Errorf("got %d, want %d", got, want)
556	}
557	if got, want := gotReqs[3].(*sppb.ExecuteBatchDmlRequest).Seqno, int64(2); got != want {
558		t.Errorf("got %d, want %d", got, want)
559	}
560	if got, want := gotReqs[4].(*sppb.ExecuteSqlRequest).Seqno, int64(3); got != want {
561		t.Errorf("got %d, want %d", got, want)
562	}
563	if got, want := gotReqs[5].(*sppb.ExecuteBatchDmlRequest).Seqno, int64(4); got != want {
564		t.Errorf("got %d, want %d", got, want)
565	}
566}
567
568// shouldHaveReceived asserts that exactly expectedRequests were present in
569// the server's ReceivedRequests channel. It only looks at type, not contents.
570//
571// Note: this in-place modifies serverClientMock by popping items off the
572// ReceivedRequests channel.
573func shouldHaveReceived(server InMemSpannerServer, want []interface{}) ([]interface{}, error) {
574	got := drainRequestsFromServer(server)
575	return got, compareRequests(want, got)
576}
577
578// Compares expected requests (want) with actual requests (got).
579func compareRequests(want []interface{}, got []interface{}) error {
580	if len(got) != len(want) {
581		var gotMsg string
582		for _, r := range got {
583			gotMsg += fmt.Sprintf("%v: %+v]\n", reflect.TypeOf(r), r)
584		}
585
586		var wantMsg string
587		for _, r := range want {
588			wantMsg += fmt.Sprintf("%v: %+v]\n", reflect.TypeOf(r), r)
589		}
590
591		return fmt.Errorf("got %d requests, want %d requests:\ngot:\n%s\nwant:\n%s", len(got), len(want), gotMsg, wantMsg)
592	}
593
594	for i, want := range want {
595		if reflect.TypeOf(got[i]) != reflect.TypeOf(want) {
596			return fmt.Errorf("request %d: got %+v, want %+v", i, reflect.TypeOf(got[i]), reflect.TypeOf(want))
597		}
598	}
599	return nil
600}
601
602func drainRequestsFromServer(server InMemSpannerServer) []interface{} {
603	var reqs []interface{}
604loop:
605	for {
606		select {
607		case req := <-server.ReceivedRequests():
608			reqs = append(reqs, req)
609		default:
610			break loop
611		}
612	}
613	return reqs
614}
615
616func newAbortedErrorWithMinimalRetryDelay() error {
617	st := gstatus.New(codes.Aborted, "Transaction has been aborted")
618	retry := &errdetails.RetryInfo{
619		RetryDelay: ptypes.DurationProto(time.Nanosecond),
620	}
621	st, _ = st.WithDetails(retry)
622	return st.Err()
623}
624