1// Copyright 2014 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//      http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package datastore
16
17import (
18	"context"
19	"errors"
20
21	"cloud.google.com/go/internal/trace"
22	pb "google.golang.org/genproto/googleapis/datastore/v1"
23	"google.golang.org/grpc/codes"
24	"google.golang.org/grpc/status"
25)
26
27// ErrConcurrentTransaction is returned when a transaction is rolled back due
28// to a conflict with a concurrent transaction.
29var ErrConcurrentTransaction = errors.New("datastore: concurrent transaction")
30
31var errExpiredTransaction = errors.New("datastore: transaction expired")
32
33type transactionSettings struct {
34	attempts int
35	readOnly bool
36	prevID   []byte // ID of the transaction to retry
37}
38
39// newTransactionSettings creates a transactionSettings with a given TransactionOption slice.
40// Unconfigured options will be set to default values.
41func newTransactionSettings(opts []TransactionOption) *transactionSettings {
42	s := &transactionSettings{attempts: 3}
43	for _, o := range opts {
44		if o == nil {
45			panic("nil TransactionOption")
46		}
47		o.apply(s)
48	}
49	return s
50}
51
52// TransactionOption configures the way a transaction is executed.
53type TransactionOption interface {
54	apply(*transactionSettings)
55}
56
57// MaxAttempts returns a TransactionOption that overrides the default 3 attempt times.
58func MaxAttempts(attempts int) TransactionOption {
59	return maxAttempts(attempts)
60}
61
62type maxAttempts int
63
64func (w maxAttempts) apply(s *transactionSettings) {
65	if w > 0 {
66		s.attempts = int(w)
67	}
68}
69
70// ReadOnly is a TransactionOption that marks the transaction as read-only.
71var ReadOnly TransactionOption
72
73func init() {
74	ReadOnly = readOnly{}
75}
76
77type readOnly struct{}
78
79func (readOnly) apply(s *transactionSettings) {
80	s.readOnly = true
81}
82
83// Transaction represents a set of datastore operations to be committed atomically.
84//
85// Operations are enqueued by calling the Put and Delete methods on Transaction
86// (or their Multi-equivalents).  These operations are only committed when the
87// Commit method is invoked. To ensure consistency, reads must be performed by
88// using Transaction's Get method or by using the Transaction method when
89// building a query.
90//
91// A Transaction must be committed or rolled back exactly once.
92type Transaction struct {
93	id        []byte
94	client    *Client
95	ctx       context.Context
96	mutations []*pb.Mutation      // The mutations to apply.
97	pending   map[int]*PendingKey // Map from mutation index to incomplete keys pending transaction completion.
98}
99
100// NewTransaction starts a new transaction.
101func (c *Client) NewTransaction(ctx context.Context, opts ...TransactionOption) (t *Transaction, err error) {
102	ctx = trace.StartSpan(ctx, "cloud.google.com/go/datastore.NewTransaction")
103	defer func() { trace.EndSpan(ctx, err) }()
104
105	for _, o := range opts {
106		if _, ok := o.(maxAttempts); ok {
107			return nil, errors.New("datastore: NewTransaction does not accept MaxAttempts option")
108		}
109	}
110	return c.newTransaction(ctx, newTransactionSettings(opts))
111}
112
113func (c *Client) newTransaction(ctx context.Context, s *transactionSettings) (_ *Transaction, err error) {
114	req := &pb.BeginTransactionRequest{ProjectId: c.dataset}
115	if s.readOnly {
116		ctx = trace.StartSpan(ctx, "cloud.google.com/go/datastore.Transaction.ReadOnlyTransaction")
117		defer func() { trace.EndSpan(ctx, err) }()
118
119		req.TransactionOptions = &pb.TransactionOptions{
120			Mode: &pb.TransactionOptions_ReadOnly_{ReadOnly: &pb.TransactionOptions_ReadOnly{}},
121		}
122	} else if s.prevID != nil {
123		ctx = trace.StartSpan(ctx, "cloud.google.com/go/datastore.Transaction.ReadWriteTransaction")
124		defer func() { trace.EndSpan(ctx, err) }()
125
126		req.TransactionOptions = &pb.TransactionOptions{
127			Mode: &pb.TransactionOptions_ReadWrite_{ReadWrite: &pb.TransactionOptions_ReadWrite{
128				PreviousTransaction: s.prevID,
129			}},
130		}
131	}
132	resp, err := c.client.BeginTransaction(ctx, req)
133	if err != nil {
134		return nil, err
135	}
136	return &Transaction{
137		id:        resp.Transaction,
138		ctx:       ctx,
139		client:    c,
140		mutations: nil,
141		pending:   make(map[int]*PendingKey),
142	}, nil
143}
144
145// RunInTransaction runs f in a transaction. f is invoked with a Transaction
146// that f should use for all the transaction's datastore operations.
147//
148// f must not call Commit or Rollback on the provided Transaction.
149//
150// If f returns nil, RunInTransaction commits the transaction,
151// returning the Commit and a nil error if it succeeds. If the commit fails due
152// to a conflicting transaction, RunInTransaction retries f with a new
153// Transaction. It gives up and returns ErrConcurrentTransaction after three
154// failed attempts (or as configured with MaxAttempts).
155//
156// If f returns non-nil, then the transaction will be rolled back and
157// RunInTransaction will return the same error. The function f is not retried.
158//
159// Note that when f returns, the transaction is not committed. Calling code
160// must not assume that any of f's changes have been committed until
161// RunInTransaction returns nil.
162//
163// Since f may be called multiple times, f should usually be idempotent – that
164// is, it should have the same result when called multiple times. Note that
165// Transaction.Get will append when unmarshalling slice fields, so it is not
166// necessarily idempotent.
167func (c *Client) RunInTransaction(ctx context.Context, f func(tx *Transaction) error, opts ...TransactionOption) (cmt *Commit, err error) {
168	ctx = trace.StartSpan(ctx, "cloud.google.com/go/datastore.RunInTransaction")
169	defer func() { trace.EndSpan(ctx, err) }()
170
171	settings := newTransactionSettings(opts)
172	for n := 0; n < settings.attempts; n++ {
173		tx, err := c.newTransaction(ctx, settings)
174		if err != nil {
175			return nil, err
176		}
177		if err := f(tx); err != nil {
178			_ = tx.Rollback()
179			return nil, err
180		}
181		if cmt, err := tx.Commit(); err != ErrConcurrentTransaction {
182			return cmt, err
183		}
184		// Pass this transaction's ID to the retry transaction to preserve
185		// transaction priority.
186		if !settings.readOnly {
187			settings.prevID = tx.id
188		}
189	}
190	return nil, ErrConcurrentTransaction
191}
192
193// Commit applies the enqueued operations atomically.
194func (t *Transaction) Commit() (c *Commit, err error) {
195	t.ctx = trace.StartSpan(t.ctx, "cloud.google.com/go/datastore.Transaction.Commit")
196	defer func() { trace.EndSpan(t.ctx, err) }()
197
198	if t.id == nil {
199		return nil, errExpiredTransaction
200	}
201	req := &pb.CommitRequest{
202		ProjectId:           t.client.dataset,
203		TransactionSelector: &pb.CommitRequest_Transaction{Transaction: t.id},
204		Mutations:           t.mutations,
205		Mode:                pb.CommitRequest_TRANSACTIONAL,
206	}
207	resp, err := t.client.client.Commit(t.ctx, req)
208	if status.Code(err) == codes.Aborted {
209		return nil, ErrConcurrentTransaction
210	}
211	t.id = nil // mark the transaction as expired
212	if err != nil {
213		return nil, err
214	}
215
216	// Copy any newly minted keys into the returned keys.
217	for i, p := range t.pending {
218		if i >= len(resp.MutationResults) || resp.MutationResults[i].Key == nil {
219			return nil, errors.New("datastore: internal error: server returned the wrong mutation results")
220		}
221		key, err := protoToKey(resp.MutationResults[i].Key)
222		if err != nil {
223			return nil, errors.New("datastore: internal error: server returned an invalid key")
224		}
225		p.key = key
226		p.commit = c
227	}
228
229	return c, nil
230}
231
232// Rollback abandons a pending transaction.
233func (t *Transaction) Rollback() (err error) {
234	t.ctx = trace.StartSpan(t.ctx, "cloud.google.com/go/datastore.Transaction.Rollback")
235	defer func() { trace.EndSpan(t.ctx, err) }()
236
237	if t.id == nil {
238		return errExpiredTransaction
239	}
240	id := t.id
241	t.id = nil
242	_, err = t.client.client.Rollback(t.ctx, &pb.RollbackRequest{
243		ProjectId:   t.client.dataset,
244		Transaction: id,
245	})
246	return err
247}
248
249// Get is the transaction-specific version of the package function Get.
250// All reads performed during the transaction will come from a single consistent
251// snapshot. Furthermore, if the transaction is set to a serializable isolation
252// level, another transaction cannot concurrently modify the data that is read
253// or modified by this transaction.
254func (t *Transaction) Get(key *Key, dst interface{}) (err error) {
255	t.ctx = trace.StartSpan(t.ctx, "cloud.google.com/go/datastore.Transaction.Get")
256	defer func() { trace.EndSpan(t.ctx, err) }()
257
258	opts := &pb.ReadOptions{
259		ConsistencyType: &pb.ReadOptions_Transaction{Transaction: t.id},
260	}
261	err = t.client.get(t.ctx, []*Key{key}, []interface{}{dst}, opts)
262	if me, ok := err.(MultiError); ok {
263		return me[0]
264	}
265	return err
266}
267
268// GetMulti is a batch version of Get.
269func (t *Transaction) GetMulti(keys []*Key, dst interface{}) (err error) {
270	t.ctx = trace.StartSpan(t.ctx, "cloud.google.com/go/datastore.Transaction.GetMulti")
271	defer func() { trace.EndSpan(t.ctx, err) }()
272
273	if t.id == nil {
274		return errExpiredTransaction
275	}
276	opts := &pb.ReadOptions{
277		ConsistencyType: &pb.ReadOptions_Transaction{Transaction: t.id},
278	}
279	return t.client.get(t.ctx, keys, dst, opts)
280}
281
282// Put is the transaction-specific version of the package function Put.
283//
284// Put returns a PendingKey which can be resolved into a Key using the
285// return value from a successful Commit. If key is an incomplete key, the
286// returned pending key will resolve to a unique key generated by the
287// datastore.
288func (t *Transaction) Put(key *Key, src interface{}) (*PendingKey, error) {
289	h, err := t.PutMulti([]*Key{key}, []interface{}{src})
290	if err != nil {
291		if me, ok := err.(MultiError); ok {
292			return nil, me[0]
293		}
294		return nil, err
295	}
296	return h[0], nil
297}
298
299// PutMulti is a batch version of Put. One PendingKey is returned for each
300// element of src in the same order.
301// TODO(jba): rewrite in terms of Mutate.
302func (t *Transaction) PutMulti(keys []*Key, src interface{}) (ret []*PendingKey, err error) {
303	if t.id == nil {
304		return nil, errExpiredTransaction
305	}
306	mutations, err := putMutations(keys, src)
307	if err != nil {
308		return nil, err
309	}
310	origin := len(t.mutations)
311	t.mutations = append(t.mutations, mutations...)
312
313	// Prepare the returned handles, pre-populating where possible.
314	ret = make([]*PendingKey, len(keys))
315	for i, key := range keys {
316		p := &PendingKey{}
317		if key.Incomplete() {
318			// This key will be in the final commit result.
319			t.pending[origin+i] = p
320		} else {
321			p.key = key
322		}
323		ret[i] = p
324	}
325
326	return ret, nil
327}
328
329// Delete is the transaction-specific version of the package function Delete.
330// Delete enqueues the deletion of the entity for the given key, to be
331// committed atomically upon calling Commit.
332func (t *Transaction) Delete(key *Key) error {
333	err := t.DeleteMulti([]*Key{key})
334	if me, ok := err.(MultiError); ok {
335		return me[0]
336	}
337	return err
338}
339
340// DeleteMulti is a batch version of Delete.
341// TODO(jba): rewrite in terms of Mutate.
342func (t *Transaction) DeleteMulti(keys []*Key) (err error) {
343	if t.id == nil {
344		return errExpiredTransaction
345	}
346	mutations, err := deleteMutations(keys)
347	if err != nil {
348		return err
349	}
350	t.mutations = append(t.mutations, mutations...)
351	return nil
352}
353
354// Mutate adds the mutations to the transaction. They will all be applied atomically
355// upon calling Commit. Mutate returns a PendingKey for each Mutation in the argument
356// list, in the same order. PendingKeys for Delete mutations are always nil.
357//
358// If any of the mutations are invalid, Mutate returns a MultiError with the errors.
359// Mutate returns a MultiError in this case even if there is only one Mutation.
360//
361// For an example, see Client.Mutate.
362func (t *Transaction) Mutate(muts ...*Mutation) ([]*PendingKey, error) {
363	if t.id == nil {
364		return nil, errExpiredTransaction
365	}
366	pmuts, err := mutationProtos(muts)
367	if err != nil {
368		return nil, err
369	}
370	origin := len(t.mutations)
371	t.mutations = append(t.mutations, pmuts...)
372	// Prepare the returned handles, pre-populating where possible.
373	ret := make([]*PendingKey, len(muts))
374	for i, mut := range muts {
375		if mut.isDelete() {
376			continue
377		}
378		p := &PendingKey{}
379		if mut.key.Incomplete() {
380			// This key will be in the final commit result.
381			t.pending[origin+i] = p
382		} else {
383			p.key = mut.key
384		}
385		ret[i] = p
386	}
387	return ret, nil
388}
389
390// Commit represents the result of a committed transaction.
391type Commit struct{}
392
393// Key resolves a pending key handle into a final key.
394func (c *Commit) Key(p *PendingKey) *Key {
395	if p == nil { // if called on a *PendingKey from a Delete mutation
396		return nil
397	}
398	// If p.commit is nil, the PendingKey did not come from an incomplete key,
399	// so p.key is valid.
400	if p.commit != nil && c != p.commit {
401		panic("PendingKey was not created by corresponding transaction")
402	}
403	return p.key
404}
405
406// PendingKey represents the key for newly-inserted entity. It can be
407// resolved into a Key by calling the Key method of Commit.
408type PendingKey struct {
409	key    *Key
410	commit *Commit
411}
412