1// Copyright 2014 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//      http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package datastore
16
17import (
18	"context"
19	"errors"
20
21	"cloud.google.com/go/internal/trace"
22	pb "google.golang.org/genproto/googleapis/datastore/v1"
23	"google.golang.org/grpc/codes"
24	"google.golang.org/grpc/status"
25)
26
27// ErrConcurrentTransaction is returned when a transaction is rolled back due
28// to a conflict with a concurrent transaction.
29var ErrConcurrentTransaction = errors.New("datastore: concurrent transaction")
30
31var errExpiredTransaction = errors.New("datastore: transaction expired")
32
33type transactionSettings struct {
34	attempts int
35	readOnly bool
36	prevID   []byte // ID of the transaction to retry
37}
38
39// newTransactionSettings creates a transactionSettings with a given TransactionOption slice.
40// Unconfigured options will be set to default values.
41func newTransactionSettings(opts []TransactionOption) *transactionSettings {
42	s := &transactionSettings{attempts: 3}
43	for _, o := range opts {
44		if o == nil {
45			panic("nil TransactionOption")
46		}
47		o.apply(s)
48	}
49	return s
50}
51
52// TransactionOption configures the way a transaction is executed.
53type TransactionOption interface {
54	apply(*transactionSettings)
55}
56
57// MaxAttempts returns a TransactionOption that overrides the default 3 attempt times.
58func MaxAttempts(attempts int) TransactionOption {
59	return maxAttempts(attempts)
60}
61
62type maxAttempts int
63
64func (w maxAttempts) apply(s *transactionSettings) {
65	if w > 0 {
66		s.attempts = int(w)
67	}
68}
69
70// ReadOnly is a TransactionOption that marks the transaction as read-only.
71var ReadOnly TransactionOption
72
73func init() {
74	ReadOnly = readOnly{}
75}
76
77type readOnly struct{}
78
79func (readOnly) apply(s *transactionSettings) {
80	s.readOnly = true
81}
82
83// Transaction represents a set of datastore operations to be committed atomically.
84//
85// Operations are enqueued by calling the Put and Delete methods on Transaction
86// (or their Multi-equivalents).  These operations are only committed when the
87// Commit method is invoked. To ensure consistency, reads must be performed by
88// using Transaction's Get method or by using the Transaction method when
89// building a query.
90//
91// A Transaction must be committed or rolled back exactly once.
92type Transaction struct {
93	id        []byte
94	client    *Client
95	ctx       context.Context
96	mutations []*pb.Mutation      // The mutations to apply.
97	pending   map[int]*PendingKey // Map from mutation index to incomplete keys pending transaction completion.
98}
99
100// NewTransaction starts a new transaction.
101func (c *Client) NewTransaction(ctx context.Context, opts ...TransactionOption) (t *Transaction, err error) {
102	ctx = trace.StartSpan(ctx, "cloud.google.com/go/datastore.NewTransaction")
103	defer func() { trace.EndSpan(ctx, err) }()
104
105	for _, o := range opts {
106		if _, ok := o.(maxAttempts); ok {
107			return nil, errors.New("datastore: NewTransaction does not accept MaxAttempts option")
108		}
109	}
110	return c.newTransaction(ctx, newTransactionSettings(opts))
111}
112
113func (c *Client) newTransaction(ctx context.Context, s *transactionSettings) (_ *Transaction, err error) {
114	req := &pb.BeginTransactionRequest{ProjectId: c.dataset}
115	if s.readOnly {
116		ctx = trace.StartSpan(ctx, "cloud.google.com/go/datastore.Transaction.ReadOnlyTransaction")
117		defer func() { trace.EndSpan(ctx, err) }()
118
119		req.TransactionOptions = &pb.TransactionOptions{
120			Mode: &pb.TransactionOptions_ReadOnly_{ReadOnly: &pb.TransactionOptions_ReadOnly{}},
121		}
122	} else if s.prevID != nil {
123		ctx = trace.StartSpan(ctx, "cloud.google.com/go/datastore.Transaction.ReadWriteTransaction")
124		defer func() { trace.EndSpan(ctx, err) }()
125
126		req.TransactionOptions = &pb.TransactionOptions{
127			Mode: &pb.TransactionOptions_ReadWrite_{ReadWrite: &pb.TransactionOptions_ReadWrite{
128				PreviousTransaction: s.prevID,
129			}},
130		}
131	}
132	resp, err := c.client.BeginTransaction(ctx, req)
133	if err != nil {
134		return nil, err
135	}
136	return &Transaction{
137		id:        resp.Transaction,
138		ctx:       ctx,
139		client:    c,
140		mutations: nil,
141		pending:   make(map[int]*PendingKey),
142	}, nil
143}
144
145// RunInTransaction runs f in a transaction. f is invoked with a Transaction
146// that f should use for all the transaction's datastore operations.
147//
148// f must not call Commit or Rollback on the provided Transaction.
149//
150// If f returns nil, RunInTransaction commits the transaction,
151// returning the Commit and a nil error if it succeeds. If the commit fails due
152// to a conflicting transaction, RunInTransaction retries f with a new
153// Transaction. It gives up and returns ErrConcurrentTransaction after three
154// failed attempts (or as configured with MaxAttempts).
155//
156// If f returns non-nil, then the transaction will be rolled back and
157// RunInTransaction will return the same error. The function f is not retried.
158//
159// Note that when f returns, the transaction is not committed. Calling code
160// must not assume that any of f's changes have been committed until
161// RunInTransaction returns nil.
162//
163// Since f may be called multiple times, f should usually be idempotent – that
164// is, it should have the same result when called multiple times. Note that
165// Transaction.Get will append when unmarshalling slice fields, so it is not
166// necessarily idempotent.
167func (c *Client) RunInTransaction(ctx context.Context, f func(tx *Transaction) error, opts ...TransactionOption) (cmt *Commit, err error) {
168	ctx = trace.StartSpan(ctx, "cloud.google.com/go/datastore.RunInTransaction")
169	defer func() { trace.EndSpan(ctx, err) }()
170
171	settings := newTransactionSettings(opts)
172	for n := 0; n < settings.attempts; n++ {
173		tx, err := c.newTransaction(ctx, settings)
174		if err != nil {
175			return nil, err
176		}
177		if err := f(tx); err != nil {
178			_ = tx.Rollback()
179			return nil, err
180		}
181		if cmt, err := tx.Commit(); err != ErrConcurrentTransaction {
182			return cmt, err
183		}
184		// Pass this transaction's ID to the retry transaction to preserve
185		// transaction priority.
186		if !settings.readOnly {
187			settings.prevID = tx.id
188		}
189	}
190	return nil, ErrConcurrentTransaction
191}
192
193// Commit applies the enqueued operations atomically.
194func (t *Transaction) Commit() (c *Commit, err error) {
195	t.ctx = trace.StartSpan(t.ctx, "cloud.google.com/go/datastore.Transaction.Commit")
196	defer func() { trace.EndSpan(t.ctx, err) }()
197
198	if t.id == nil {
199		return nil, errExpiredTransaction
200	}
201	req := &pb.CommitRequest{
202		ProjectId:           t.client.dataset,
203		TransactionSelector: &pb.CommitRequest_Transaction{Transaction: t.id},
204		Mutations:           t.mutations,
205		Mode:                pb.CommitRequest_TRANSACTIONAL,
206	}
207	resp, err := t.client.client.Commit(t.ctx, req)
208	if status.Code(err) == codes.Aborted {
209		return nil, ErrConcurrentTransaction
210	}
211	t.id = nil // mark the transaction as expired
212	if err != nil {
213		return nil, err
214	}
215
216	c = &Commit{}
217	// Copy any newly minted keys into the returned keys.
218	for i, p := range t.pending {
219		if i >= len(resp.MutationResults) || resp.MutationResults[i].Key == nil {
220			return nil, errors.New("datastore: internal error: server returned the wrong mutation results")
221		}
222		key, err := protoToKey(resp.MutationResults[i].Key)
223		if err != nil {
224			return nil, errors.New("datastore: internal error: server returned an invalid key")
225		}
226		p.key = key
227		p.commit = c
228	}
229
230	return c, nil
231}
232
233// Rollback abandons a pending transaction.
234func (t *Transaction) Rollback() (err error) {
235	t.ctx = trace.StartSpan(t.ctx, "cloud.google.com/go/datastore.Transaction.Rollback")
236	defer func() { trace.EndSpan(t.ctx, err) }()
237
238	if t.id == nil {
239		return errExpiredTransaction
240	}
241	id := t.id
242	t.id = nil
243	_, err = t.client.client.Rollback(t.ctx, &pb.RollbackRequest{
244		ProjectId:   t.client.dataset,
245		Transaction: id,
246	})
247	return err
248}
249
250// Get is the transaction-specific version of the package function Get.
251// All reads performed during the transaction will come from a single consistent
252// snapshot. Furthermore, if the transaction is set to a serializable isolation
253// level, another transaction cannot concurrently modify the data that is read
254// or modified by this transaction.
255func (t *Transaction) Get(key *Key, dst interface{}) (err error) {
256	t.ctx = trace.StartSpan(t.ctx, "cloud.google.com/go/datastore.Transaction.Get")
257	defer func() { trace.EndSpan(t.ctx, err) }()
258
259	opts := &pb.ReadOptions{
260		ConsistencyType: &pb.ReadOptions_Transaction{Transaction: t.id},
261	}
262	err = t.client.get(t.ctx, []*Key{key}, []interface{}{dst}, opts)
263	if me, ok := err.(MultiError); ok {
264		return me[0]
265	}
266	return err
267}
268
269// GetMulti is a batch version of Get.
270func (t *Transaction) GetMulti(keys []*Key, dst interface{}) (err error) {
271	t.ctx = trace.StartSpan(t.ctx, "cloud.google.com/go/datastore.Transaction.GetMulti")
272	defer func() { trace.EndSpan(t.ctx, err) }()
273
274	if t.id == nil {
275		return errExpiredTransaction
276	}
277	opts := &pb.ReadOptions{
278		ConsistencyType: &pb.ReadOptions_Transaction{Transaction: t.id},
279	}
280	return t.client.get(t.ctx, keys, dst, opts)
281}
282
283// Put is the transaction-specific version of the package function Put.
284//
285// Put returns a PendingKey which can be resolved into a Key using the
286// return value from a successful Commit. If key is an incomplete key, the
287// returned pending key will resolve to a unique key generated by the
288// datastore.
289func (t *Transaction) Put(key *Key, src interface{}) (*PendingKey, error) {
290	h, err := t.PutMulti([]*Key{key}, []interface{}{src})
291	if err != nil {
292		if me, ok := err.(MultiError); ok {
293			return nil, me[0]
294		}
295		return nil, err
296	}
297	return h[0], nil
298}
299
300// PutMulti is a batch version of Put. One PendingKey is returned for each
301// element of src in the same order.
302// TODO(jba): rewrite in terms of Mutate.
303func (t *Transaction) PutMulti(keys []*Key, src interface{}) (ret []*PendingKey, err error) {
304	if t.id == nil {
305		return nil, errExpiredTransaction
306	}
307	mutations, err := putMutations(keys, src)
308	if err != nil {
309		return nil, err
310	}
311	origin := len(t.mutations)
312	t.mutations = append(t.mutations, mutations...)
313
314	// Prepare the returned handles, pre-populating where possible.
315	ret = make([]*PendingKey, len(keys))
316	for i, key := range keys {
317		p := &PendingKey{}
318		if key.Incomplete() {
319			// This key will be in the final commit result.
320			t.pending[origin+i] = p
321		} else {
322			p.key = key
323		}
324		ret[i] = p
325	}
326
327	return ret, nil
328}
329
330// Delete is the transaction-specific version of the package function Delete.
331// Delete enqueues the deletion of the entity for the given key, to be
332// committed atomically upon calling Commit.
333func (t *Transaction) Delete(key *Key) error {
334	err := t.DeleteMulti([]*Key{key})
335	if me, ok := err.(MultiError); ok {
336		return me[0]
337	}
338	return err
339}
340
341// DeleteMulti is a batch version of Delete.
342// TODO(jba): rewrite in terms of Mutate.
343func (t *Transaction) DeleteMulti(keys []*Key) (err error) {
344	if t.id == nil {
345		return errExpiredTransaction
346	}
347	mutations, err := deleteMutations(keys)
348	if err != nil {
349		return err
350	}
351	t.mutations = append(t.mutations, mutations...)
352	return nil
353}
354
355// Mutate adds the mutations to the transaction. They will all be applied atomically
356// upon calling Commit. Mutate returns a PendingKey for each Mutation in the argument
357// list, in the same order. PendingKeys for Delete mutations are always nil.
358//
359// If any of the mutations are invalid, Mutate returns a MultiError with the errors.
360// Mutate returns a MultiError in this case even if there is only one Mutation.
361//
362// For an example, see Client.Mutate.
363func (t *Transaction) Mutate(muts ...*Mutation) ([]*PendingKey, error) {
364	if t.id == nil {
365		return nil, errExpiredTransaction
366	}
367	pmuts, err := mutationProtos(muts)
368	if err != nil {
369		return nil, err
370	}
371	origin := len(t.mutations)
372	t.mutations = append(t.mutations, pmuts...)
373	// Prepare the returned handles, pre-populating where possible.
374	ret := make([]*PendingKey, len(muts))
375	for i, mut := range muts {
376		if mut.isDelete() {
377			continue
378		}
379		p := &PendingKey{}
380		if mut.key.Incomplete() {
381			// This key will be in the final commit result.
382			t.pending[origin+i] = p
383		} else {
384			p.key = mut.key
385		}
386		ret[i] = p
387	}
388	return ret, nil
389}
390
391// Commit represents the result of a committed transaction.
392type Commit struct{}
393
394// Key resolves a pending key handle into a final key.
395func (c *Commit) Key(p *PendingKey) *Key {
396	if p == nil { // if called on a *PendingKey from a Delete mutation
397		return nil
398	}
399	// If p.commit is nil, the PendingKey did not come from an incomplete key,
400	// so p.key is valid.
401	if p.commit != nil && c != p.commit {
402		panic("PendingKey was not created by corresponding transaction")
403	}
404	return p.key
405}
406
407// PendingKey represents the key for newly-inserted entity. It can be
408// resolved into a Key by calling the Key method of Commit.
409type PendingKey struct {
410	key    *Key
411	commit *Commit
412}
413