1// Copyright 2014 Google LLC 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package datastore 16 17import ( 18 "context" 19 "errors" 20 21 "cloud.google.com/go/internal/trace" 22 pb "google.golang.org/genproto/googleapis/datastore/v1" 23 "google.golang.org/grpc/codes" 24 "google.golang.org/grpc/status" 25) 26 27// ErrConcurrentTransaction is returned when a transaction is rolled back due 28// to a conflict with a concurrent transaction. 29var ErrConcurrentTransaction = errors.New("datastore: concurrent transaction") 30 31var errExpiredTransaction = errors.New("datastore: transaction expired") 32 33type transactionSettings struct { 34 attempts int 35 readOnly bool 36 prevID []byte // ID of the transaction to retry 37} 38 39// newTransactionSettings creates a transactionSettings with a given TransactionOption slice. 40// Unconfigured options will be set to default values. 41func newTransactionSettings(opts []TransactionOption) *transactionSettings { 42 s := &transactionSettings{attempts: 3} 43 for _, o := range opts { 44 if o == nil { 45 panic("nil TransactionOption") 46 } 47 o.apply(s) 48 } 49 return s 50} 51 52// TransactionOption configures the way a transaction is executed. 53type TransactionOption interface { 54 apply(*transactionSettings) 55} 56 57// MaxAttempts returns a TransactionOption that overrides the default 3 attempt times. 58func MaxAttempts(attempts int) TransactionOption { 59 return maxAttempts(attempts) 60} 61 62type maxAttempts int 63 64func (w maxAttempts) apply(s *transactionSettings) { 65 if w > 0 { 66 s.attempts = int(w) 67 } 68} 69 70// ReadOnly is a TransactionOption that marks the transaction as read-only. 71var ReadOnly TransactionOption 72 73func init() { 74 ReadOnly = readOnly{} 75} 76 77type readOnly struct{} 78 79func (readOnly) apply(s *transactionSettings) { 80 s.readOnly = true 81} 82 83// Transaction represents a set of datastore operations to be committed atomically. 84// 85// Operations are enqueued by calling the Put and Delete methods on Transaction 86// (or their Multi-equivalents). These operations are only committed when the 87// Commit method is invoked. To ensure consistency, reads must be performed by 88// using Transaction's Get method or by using the Transaction method when 89// building a query. 90// 91// A Transaction must be committed or rolled back exactly once. 92type Transaction struct { 93 id []byte 94 client *Client 95 ctx context.Context 96 mutations []*pb.Mutation // The mutations to apply. 97 pending map[int]*PendingKey // Map from mutation index to incomplete keys pending transaction completion. 98} 99 100// NewTransaction starts a new transaction. 101func (c *Client) NewTransaction(ctx context.Context, opts ...TransactionOption) (t *Transaction, err error) { 102 ctx = trace.StartSpan(ctx, "cloud.google.com/go/datastore.NewTransaction") 103 defer func() { trace.EndSpan(ctx, err) }() 104 105 for _, o := range opts { 106 if _, ok := o.(maxAttempts); ok { 107 return nil, errors.New("datastore: NewTransaction does not accept MaxAttempts option") 108 } 109 } 110 return c.newTransaction(ctx, newTransactionSettings(opts)) 111} 112 113func (c *Client) newTransaction(ctx context.Context, s *transactionSettings) (_ *Transaction, err error) { 114 req := &pb.BeginTransactionRequest{ProjectId: c.dataset} 115 if s.readOnly { 116 ctx = trace.StartSpan(ctx, "cloud.google.com/go/datastore.Transaction.ReadOnlyTransaction") 117 defer func() { trace.EndSpan(ctx, err) }() 118 119 req.TransactionOptions = &pb.TransactionOptions{ 120 Mode: &pb.TransactionOptions_ReadOnly_{ReadOnly: &pb.TransactionOptions_ReadOnly{}}, 121 } 122 } else if s.prevID != nil { 123 ctx = trace.StartSpan(ctx, "cloud.google.com/go/datastore.Transaction.ReadWriteTransaction") 124 defer func() { trace.EndSpan(ctx, err) }() 125 126 req.TransactionOptions = &pb.TransactionOptions{ 127 Mode: &pb.TransactionOptions_ReadWrite_{ReadWrite: &pb.TransactionOptions_ReadWrite{ 128 PreviousTransaction: s.prevID, 129 }}, 130 } 131 } 132 resp, err := c.client.BeginTransaction(ctx, req) 133 if err != nil { 134 return nil, err 135 } 136 return &Transaction{ 137 id: resp.Transaction, 138 ctx: ctx, 139 client: c, 140 mutations: nil, 141 pending: make(map[int]*PendingKey), 142 }, nil 143} 144 145// RunInTransaction runs f in a transaction. f is invoked with a Transaction 146// that f should use for all the transaction's datastore operations. 147// 148// f must not call Commit or Rollback on the provided Transaction. 149// 150// If f returns nil, RunInTransaction commits the transaction, 151// returning the Commit and a nil error if it succeeds. If the commit fails due 152// to a conflicting transaction, RunInTransaction retries f with a new 153// Transaction. It gives up and returns ErrConcurrentTransaction after three 154// failed attempts (or as configured with MaxAttempts). 155// 156// If f returns non-nil, then the transaction will be rolled back and 157// RunInTransaction will return the same error. The function f is not retried. 158// 159// Note that when f returns, the transaction is not committed. Calling code 160// must not assume that any of f's changes have been committed until 161// RunInTransaction returns nil. 162// 163// Since f may be called multiple times, f should usually be idempotent – that 164// is, it should have the same result when called multiple times. Note that 165// Transaction.Get will append when unmarshalling slice fields, so it is not 166// necessarily idempotent. 167func (c *Client) RunInTransaction(ctx context.Context, f func(tx *Transaction) error, opts ...TransactionOption) (cmt *Commit, err error) { 168 ctx = trace.StartSpan(ctx, "cloud.google.com/go/datastore.RunInTransaction") 169 defer func() { trace.EndSpan(ctx, err) }() 170 171 settings := newTransactionSettings(opts) 172 for n := 0; n < settings.attempts; n++ { 173 tx, err := c.newTransaction(ctx, settings) 174 if err != nil { 175 return nil, err 176 } 177 if err := f(tx); err != nil { 178 _ = tx.Rollback() 179 return nil, err 180 } 181 if cmt, err := tx.Commit(); err != ErrConcurrentTransaction { 182 return cmt, err 183 } 184 // Pass this transaction's ID to the retry transaction to preserve 185 // transaction priority. 186 if !settings.readOnly { 187 settings.prevID = tx.id 188 } 189 } 190 return nil, ErrConcurrentTransaction 191} 192 193// Commit applies the enqueued operations atomically. 194func (t *Transaction) Commit() (c *Commit, err error) { 195 t.ctx = trace.StartSpan(t.ctx, "cloud.google.com/go/datastore.Transaction.Commit") 196 defer func() { trace.EndSpan(t.ctx, err) }() 197 198 if t.id == nil { 199 return nil, errExpiredTransaction 200 } 201 req := &pb.CommitRequest{ 202 ProjectId: t.client.dataset, 203 TransactionSelector: &pb.CommitRequest_Transaction{Transaction: t.id}, 204 Mutations: t.mutations, 205 Mode: pb.CommitRequest_TRANSACTIONAL, 206 } 207 resp, err := t.client.client.Commit(t.ctx, req) 208 if status.Code(err) == codes.Aborted { 209 return nil, ErrConcurrentTransaction 210 } 211 t.id = nil // mark the transaction as expired 212 if err != nil { 213 return nil, err 214 } 215 216 c = &Commit{} 217 // Copy any newly minted keys into the returned keys. 218 for i, p := range t.pending { 219 if i >= len(resp.MutationResults) || resp.MutationResults[i].Key == nil { 220 return nil, errors.New("datastore: internal error: server returned the wrong mutation results") 221 } 222 key, err := protoToKey(resp.MutationResults[i].Key) 223 if err != nil { 224 return nil, errors.New("datastore: internal error: server returned an invalid key") 225 } 226 p.key = key 227 p.commit = c 228 } 229 230 return c, nil 231} 232 233// Rollback abandons a pending transaction. 234func (t *Transaction) Rollback() (err error) { 235 t.ctx = trace.StartSpan(t.ctx, "cloud.google.com/go/datastore.Transaction.Rollback") 236 defer func() { trace.EndSpan(t.ctx, err) }() 237 238 if t.id == nil { 239 return errExpiredTransaction 240 } 241 id := t.id 242 t.id = nil 243 _, err = t.client.client.Rollback(t.ctx, &pb.RollbackRequest{ 244 ProjectId: t.client.dataset, 245 Transaction: id, 246 }) 247 return err 248} 249 250// Get is the transaction-specific version of the package function Get. 251// All reads performed during the transaction will come from a single consistent 252// snapshot. Furthermore, if the transaction is set to a serializable isolation 253// level, another transaction cannot concurrently modify the data that is read 254// or modified by this transaction. 255func (t *Transaction) Get(key *Key, dst interface{}) (err error) { 256 t.ctx = trace.StartSpan(t.ctx, "cloud.google.com/go/datastore.Transaction.Get") 257 defer func() { trace.EndSpan(t.ctx, err) }() 258 259 opts := &pb.ReadOptions{ 260 ConsistencyType: &pb.ReadOptions_Transaction{Transaction: t.id}, 261 } 262 err = t.client.get(t.ctx, []*Key{key}, []interface{}{dst}, opts) 263 if me, ok := err.(MultiError); ok { 264 return me[0] 265 } 266 return err 267} 268 269// GetMulti is a batch version of Get. 270func (t *Transaction) GetMulti(keys []*Key, dst interface{}) (err error) { 271 t.ctx = trace.StartSpan(t.ctx, "cloud.google.com/go/datastore.Transaction.GetMulti") 272 defer func() { trace.EndSpan(t.ctx, err) }() 273 274 if t.id == nil { 275 return errExpiredTransaction 276 } 277 opts := &pb.ReadOptions{ 278 ConsistencyType: &pb.ReadOptions_Transaction{Transaction: t.id}, 279 } 280 return t.client.get(t.ctx, keys, dst, opts) 281} 282 283// Put is the transaction-specific version of the package function Put. 284// 285// Put returns a PendingKey which can be resolved into a Key using the 286// return value from a successful Commit. If key is an incomplete key, the 287// returned pending key will resolve to a unique key generated by the 288// datastore. 289func (t *Transaction) Put(key *Key, src interface{}) (*PendingKey, error) { 290 h, err := t.PutMulti([]*Key{key}, []interface{}{src}) 291 if err != nil { 292 if me, ok := err.(MultiError); ok { 293 return nil, me[0] 294 } 295 return nil, err 296 } 297 return h[0], nil 298} 299 300// PutMulti is a batch version of Put. One PendingKey is returned for each 301// element of src in the same order. 302// TODO(jba): rewrite in terms of Mutate. 303func (t *Transaction) PutMulti(keys []*Key, src interface{}) (ret []*PendingKey, err error) { 304 if t.id == nil { 305 return nil, errExpiredTransaction 306 } 307 mutations, err := putMutations(keys, src) 308 if err != nil { 309 return nil, err 310 } 311 origin := len(t.mutations) 312 t.mutations = append(t.mutations, mutations...) 313 314 // Prepare the returned handles, pre-populating where possible. 315 ret = make([]*PendingKey, len(keys)) 316 for i, key := range keys { 317 p := &PendingKey{} 318 if key.Incomplete() { 319 // This key will be in the final commit result. 320 t.pending[origin+i] = p 321 } else { 322 p.key = key 323 } 324 ret[i] = p 325 } 326 327 return ret, nil 328} 329 330// Delete is the transaction-specific version of the package function Delete. 331// Delete enqueues the deletion of the entity for the given key, to be 332// committed atomically upon calling Commit. 333func (t *Transaction) Delete(key *Key) error { 334 err := t.DeleteMulti([]*Key{key}) 335 if me, ok := err.(MultiError); ok { 336 return me[0] 337 } 338 return err 339} 340 341// DeleteMulti is a batch version of Delete. 342// TODO(jba): rewrite in terms of Mutate. 343func (t *Transaction) DeleteMulti(keys []*Key) (err error) { 344 if t.id == nil { 345 return errExpiredTransaction 346 } 347 mutations, err := deleteMutations(keys) 348 if err != nil { 349 return err 350 } 351 t.mutations = append(t.mutations, mutations...) 352 return nil 353} 354 355// Mutate adds the mutations to the transaction. They will all be applied atomically 356// upon calling Commit. Mutate returns a PendingKey for each Mutation in the argument 357// list, in the same order. PendingKeys for Delete mutations are always nil. 358// 359// If any of the mutations are invalid, Mutate returns a MultiError with the errors. 360// Mutate returns a MultiError in this case even if there is only one Mutation. 361// 362// For an example, see Client.Mutate. 363func (t *Transaction) Mutate(muts ...*Mutation) ([]*PendingKey, error) { 364 if t.id == nil { 365 return nil, errExpiredTransaction 366 } 367 pmuts, err := mutationProtos(muts) 368 if err != nil { 369 return nil, err 370 } 371 origin := len(t.mutations) 372 t.mutations = append(t.mutations, pmuts...) 373 // Prepare the returned handles, pre-populating where possible. 374 ret := make([]*PendingKey, len(muts)) 375 for i, mut := range muts { 376 if mut.isDelete() { 377 continue 378 } 379 p := &PendingKey{} 380 if mut.key.Incomplete() { 381 // This key will be in the final commit result. 382 t.pending[origin+i] = p 383 } else { 384 p.key = mut.key 385 } 386 ret[i] = p 387 } 388 return ret, nil 389} 390 391// Commit represents the result of a committed transaction. 392type Commit struct{} 393 394// Key resolves a pending key handle into a final key. 395func (c *Commit) Key(p *PendingKey) *Key { 396 if p == nil { // if called on a *PendingKey from a Delete mutation 397 return nil 398 } 399 // If p.commit is nil, the PendingKey did not come from an incomplete key, 400 // so p.key is valid. 401 if p.commit != nil && c != p.commit { 402 panic("PendingKey was not created by corresponding transaction") 403 } 404 return p.key 405} 406 407// PendingKey represents the key for newly-inserted entity. It can be 408// resolved into a Key by calling the Key method of Commit. 409type PendingKey struct { 410 key *Key 411 commit *Commit 412} 413