1// Copyright 2014 Google LLC 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package datastore 16 17import ( 18 "context" 19 "errors" 20 21 "cloud.google.com/go/internal/trace" 22 pb "google.golang.org/genproto/googleapis/datastore/v1" 23 "google.golang.org/grpc/codes" 24 "google.golang.org/grpc/status" 25) 26 27// ErrConcurrentTransaction is returned when a transaction is rolled back due 28// to a conflict with a concurrent transaction. 29var ErrConcurrentTransaction = errors.New("datastore: concurrent transaction") 30 31var errExpiredTransaction = errors.New("datastore: transaction expired") 32 33type transactionSettings struct { 34 attempts int 35 readOnly bool 36 prevID []byte // ID of the transaction to retry 37} 38 39// newTransactionSettings creates a transactionSettings with a given TransactionOption slice. 40// Unconfigured options will be set to default values. 41func newTransactionSettings(opts []TransactionOption) *transactionSettings { 42 s := &transactionSettings{attempts: 3} 43 for _, o := range opts { 44 if o == nil { 45 panic("nil TransactionOption") 46 } 47 o.apply(s) 48 } 49 return s 50} 51 52// TransactionOption configures the way a transaction is executed. 53type TransactionOption interface { 54 apply(*transactionSettings) 55} 56 57// MaxAttempts returns a TransactionOption that overrides the default 3 attempt times. 58func MaxAttempts(attempts int) TransactionOption { 59 return maxAttempts(attempts) 60} 61 62type maxAttempts int 63 64func (w maxAttempts) apply(s *transactionSettings) { 65 if w > 0 { 66 s.attempts = int(w) 67 } 68} 69 70// ReadOnly is a TransactionOption that marks the transaction as read-only. 71var ReadOnly TransactionOption 72 73func init() { 74 ReadOnly = readOnly{} 75} 76 77type readOnly struct{} 78 79func (readOnly) apply(s *transactionSettings) { 80 s.readOnly = true 81} 82 83// Transaction represents a set of datastore operations to be committed atomically. 84// 85// Operations are enqueued by calling the Put and Delete methods on Transaction 86// (or their Multi-equivalents). These operations are only committed when the 87// Commit method is invoked. To ensure consistency, reads must be performed by 88// using Transaction's Get method or by using the Transaction method when 89// building a query. 90// 91// A Transaction must be committed or rolled back exactly once. 92type Transaction struct { 93 id []byte 94 client *Client 95 ctx context.Context 96 mutations []*pb.Mutation // The mutations to apply. 97 pending map[int]*PendingKey // Map from mutation index to incomplete keys pending transaction completion. 98} 99 100// NewTransaction starts a new transaction. 101func (c *Client) NewTransaction(ctx context.Context, opts ...TransactionOption) (t *Transaction, err error) { 102 ctx = trace.StartSpan(ctx, "cloud.google.com/go/datastore.NewTransaction") 103 defer func() { trace.EndSpan(ctx, err) }() 104 105 for _, o := range opts { 106 if _, ok := o.(maxAttempts); ok { 107 return nil, errors.New("datastore: NewTransaction does not accept MaxAttempts option") 108 } 109 } 110 return c.newTransaction(ctx, newTransactionSettings(opts)) 111} 112 113func (c *Client) newTransaction(ctx context.Context, s *transactionSettings) (_ *Transaction, err error) { 114 req := &pb.BeginTransactionRequest{ProjectId: c.dataset} 115 if s.readOnly { 116 ctx = trace.StartSpan(ctx, "cloud.google.com/go/datastore.Transaction.ReadOnlyTransaction") 117 defer func() { trace.EndSpan(ctx, err) }() 118 119 req.TransactionOptions = &pb.TransactionOptions{ 120 Mode: &pb.TransactionOptions_ReadOnly_{ReadOnly: &pb.TransactionOptions_ReadOnly{}}, 121 } 122 } else if s.prevID != nil { 123 ctx = trace.StartSpan(ctx, "cloud.google.com/go/datastore.Transaction.ReadWriteTransaction") 124 defer func() { trace.EndSpan(ctx, err) }() 125 126 req.TransactionOptions = &pb.TransactionOptions{ 127 Mode: &pb.TransactionOptions_ReadWrite_{ReadWrite: &pb.TransactionOptions_ReadWrite{ 128 PreviousTransaction: s.prevID, 129 }}, 130 } 131 } 132 resp, err := c.client.BeginTransaction(ctx, req) 133 if err != nil { 134 return nil, err 135 } 136 return &Transaction{ 137 id: resp.Transaction, 138 ctx: ctx, 139 client: c, 140 mutations: nil, 141 pending: make(map[int]*PendingKey), 142 }, nil 143} 144 145// RunInTransaction runs f in a transaction. f is invoked with a Transaction 146// that f should use for all the transaction's datastore operations. 147// 148// f must not call Commit or Rollback on the provided Transaction. 149// 150// If f returns nil, RunInTransaction commits the transaction, 151// returning the Commit and a nil error if it succeeds. If the commit fails due 152// to a conflicting transaction, RunInTransaction retries f with a new 153// Transaction. It gives up and returns ErrConcurrentTransaction after three 154// failed attempts (or as configured with MaxAttempts). 155// 156// If f returns non-nil, then the transaction will be rolled back and 157// RunInTransaction will return the same error. The function f is not retried. 158// 159// Note that when f returns, the transaction is not committed. Calling code 160// must not assume that any of f's changes have been committed until 161// RunInTransaction returns nil. 162// 163// Since f may be called multiple times, f should usually be idempotent – that 164// is, it should have the same result when called multiple times. Note that 165// Transaction.Get will append when unmarshalling slice fields, so it is not 166// necessarily idempotent. 167func (c *Client) RunInTransaction(ctx context.Context, f func(tx *Transaction) error, opts ...TransactionOption) (cmt *Commit, err error) { 168 ctx = trace.StartSpan(ctx, "cloud.google.com/go/datastore.RunInTransaction") 169 defer func() { trace.EndSpan(ctx, err) }() 170 171 settings := newTransactionSettings(opts) 172 for n := 0; n < settings.attempts; n++ { 173 tx, err := c.newTransaction(ctx, settings) 174 if err != nil { 175 return nil, err 176 } 177 if err := f(tx); err != nil { 178 _ = tx.Rollback() 179 return nil, err 180 } 181 if cmt, err := tx.Commit(); err != ErrConcurrentTransaction { 182 return cmt, err 183 } 184 // Pass this transaction's ID to the retry transaction to preserve 185 // transaction priority. 186 if !settings.readOnly { 187 settings.prevID = tx.id 188 } 189 } 190 return nil, ErrConcurrentTransaction 191} 192 193// Commit applies the enqueued operations atomically. 194func (t *Transaction) Commit() (c *Commit, err error) { 195 t.ctx = trace.StartSpan(t.ctx, "cloud.google.com/go/datastore.Transaction.Commit") 196 defer func() { trace.EndSpan(t.ctx, err) }() 197 198 if t.id == nil { 199 return nil, errExpiredTransaction 200 } 201 req := &pb.CommitRequest{ 202 ProjectId: t.client.dataset, 203 TransactionSelector: &pb.CommitRequest_Transaction{Transaction: t.id}, 204 Mutations: t.mutations, 205 Mode: pb.CommitRequest_TRANSACTIONAL, 206 } 207 resp, err := t.client.client.Commit(t.ctx, req) 208 if status.Code(err) == codes.Aborted { 209 return nil, ErrConcurrentTransaction 210 } 211 t.id = nil // mark the transaction as expired 212 if err != nil { 213 return nil, err 214 } 215 216 // Copy any newly minted keys into the returned keys. 217 for i, p := range t.pending { 218 if i >= len(resp.MutationResults) || resp.MutationResults[i].Key == nil { 219 return nil, errors.New("datastore: internal error: server returned the wrong mutation results") 220 } 221 key, err := protoToKey(resp.MutationResults[i].Key) 222 if err != nil { 223 return nil, errors.New("datastore: internal error: server returned an invalid key") 224 } 225 p.key = key 226 p.commit = c 227 } 228 229 return c, nil 230} 231 232// Rollback abandons a pending transaction. 233func (t *Transaction) Rollback() (err error) { 234 t.ctx = trace.StartSpan(t.ctx, "cloud.google.com/go/datastore.Transaction.Rollback") 235 defer func() { trace.EndSpan(t.ctx, err) }() 236 237 if t.id == nil { 238 return errExpiredTransaction 239 } 240 id := t.id 241 t.id = nil 242 _, err = t.client.client.Rollback(t.ctx, &pb.RollbackRequest{ 243 ProjectId: t.client.dataset, 244 Transaction: id, 245 }) 246 return err 247} 248 249// Get is the transaction-specific version of the package function Get. 250// All reads performed during the transaction will come from a single consistent 251// snapshot. Furthermore, if the transaction is set to a serializable isolation 252// level, another transaction cannot concurrently modify the data that is read 253// or modified by this transaction. 254func (t *Transaction) Get(key *Key, dst interface{}) (err error) { 255 t.ctx = trace.StartSpan(t.ctx, "cloud.google.com/go/datastore.Transaction.Get") 256 defer func() { trace.EndSpan(t.ctx, err) }() 257 258 opts := &pb.ReadOptions{ 259 ConsistencyType: &pb.ReadOptions_Transaction{Transaction: t.id}, 260 } 261 err = t.client.get(t.ctx, []*Key{key}, []interface{}{dst}, opts) 262 if me, ok := err.(MultiError); ok { 263 return me[0] 264 } 265 return err 266} 267 268// GetMulti is a batch version of Get. 269func (t *Transaction) GetMulti(keys []*Key, dst interface{}) (err error) { 270 t.ctx = trace.StartSpan(t.ctx, "cloud.google.com/go/datastore.Transaction.GetMulti") 271 defer func() { trace.EndSpan(t.ctx, err) }() 272 273 if t.id == nil { 274 return errExpiredTransaction 275 } 276 opts := &pb.ReadOptions{ 277 ConsistencyType: &pb.ReadOptions_Transaction{Transaction: t.id}, 278 } 279 return t.client.get(t.ctx, keys, dst, opts) 280} 281 282// Put is the transaction-specific version of the package function Put. 283// 284// Put returns a PendingKey which can be resolved into a Key using the 285// return value from a successful Commit. If key is an incomplete key, the 286// returned pending key will resolve to a unique key generated by the 287// datastore. 288func (t *Transaction) Put(key *Key, src interface{}) (*PendingKey, error) { 289 h, err := t.PutMulti([]*Key{key}, []interface{}{src}) 290 if err != nil { 291 if me, ok := err.(MultiError); ok { 292 return nil, me[0] 293 } 294 return nil, err 295 } 296 return h[0], nil 297} 298 299// PutMulti is a batch version of Put. One PendingKey is returned for each 300// element of src in the same order. 301// TODO(jba): rewrite in terms of Mutate. 302func (t *Transaction) PutMulti(keys []*Key, src interface{}) (ret []*PendingKey, err error) { 303 if t.id == nil { 304 return nil, errExpiredTransaction 305 } 306 mutations, err := putMutations(keys, src) 307 if err != nil { 308 return nil, err 309 } 310 origin := len(t.mutations) 311 t.mutations = append(t.mutations, mutations...) 312 313 // Prepare the returned handles, pre-populating where possible. 314 ret = make([]*PendingKey, len(keys)) 315 for i, key := range keys { 316 p := &PendingKey{} 317 if key.Incomplete() { 318 // This key will be in the final commit result. 319 t.pending[origin+i] = p 320 } else { 321 p.key = key 322 } 323 ret[i] = p 324 } 325 326 return ret, nil 327} 328 329// Delete is the transaction-specific version of the package function Delete. 330// Delete enqueues the deletion of the entity for the given key, to be 331// committed atomically upon calling Commit. 332func (t *Transaction) Delete(key *Key) error { 333 err := t.DeleteMulti([]*Key{key}) 334 if me, ok := err.(MultiError); ok { 335 return me[0] 336 } 337 return err 338} 339 340// DeleteMulti is a batch version of Delete. 341// TODO(jba): rewrite in terms of Mutate. 342func (t *Transaction) DeleteMulti(keys []*Key) (err error) { 343 if t.id == nil { 344 return errExpiredTransaction 345 } 346 mutations, err := deleteMutations(keys) 347 if err != nil { 348 return err 349 } 350 t.mutations = append(t.mutations, mutations...) 351 return nil 352} 353 354// Mutate adds the mutations to the transaction. They will all be applied atomically 355// upon calling Commit. Mutate returns a PendingKey for each Mutation in the argument 356// list, in the same order. PendingKeys for Delete mutations are always nil. 357// 358// If any of the mutations are invalid, Mutate returns a MultiError with the errors. 359// Mutate returns a MultiError in this case even if there is only one Mutation. 360// 361// For an example, see Client.Mutate. 362func (t *Transaction) Mutate(muts ...*Mutation) ([]*PendingKey, error) { 363 if t.id == nil { 364 return nil, errExpiredTransaction 365 } 366 pmuts, err := mutationProtos(muts) 367 if err != nil { 368 return nil, err 369 } 370 origin := len(t.mutations) 371 t.mutations = append(t.mutations, pmuts...) 372 // Prepare the returned handles, pre-populating where possible. 373 ret := make([]*PendingKey, len(muts)) 374 for i, mut := range muts { 375 if mut.isDelete() { 376 continue 377 } 378 p := &PendingKey{} 379 if mut.key.Incomplete() { 380 // This key will be in the final commit result. 381 t.pending[origin+i] = p 382 } else { 383 p.key = mut.key 384 } 385 ret[i] = p 386 } 387 return ret, nil 388} 389 390// Commit represents the result of a committed transaction. 391type Commit struct{} 392 393// Key resolves a pending key handle into a final key. 394func (c *Commit) Key(p *PendingKey) *Key { 395 if p == nil { // if called on a *PendingKey from a Delete mutation 396 return nil 397 } 398 // If p.commit is nil, the PendingKey did not come from an incomplete key, 399 // so p.key is valid. 400 if p.commit != nil && c != p.commit { 401 panic("PendingKey was not created by corresponding transaction") 402 } 403 return p.key 404} 405 406// PendingKey represents the key for newly-inserted entity. It can be 407// resolved into a Key by calling the Key method of Commit. 408type PendingKey struct { 409 key *Key 410 commit *Commit 411} 412