1// The txn package implements support for multi-document transactions. 2// 3// For details check the following blog post: 4// 5// http://blog.labix.org/2012/08/22/multi-doc-transactions-for-mongodb 6// 7package txn 8 9import ( 10 "encoding/binary" 11 "fmt" 12 "reflect" 13 "sort" 14 "strings" 15 "sync" 16 17 "gopkg.in/mgo.v2" 18 "gopkg.in/mgo.v2/bson" 19 20 crand "crypto/rand" 21 mrand "math/rand" 22) 23 24type state int 25 26const ( 27 tpreparing state = 1 // One or more documents not prepared 28 tprepared state = 2 // Prepared but not yet ready to run 29 taborting state = 3 // Assertions failed, cleaning up 30 tapplying state = 4 // Changes are in progress 31 taborted state = 5 // Pre-conditions failed, nothing done 32 tapplied state = 6 // All changes applied 33) 34 35func (s state) String() string { 36 switch s { 37 case tpreparing: 38 return "preparing" 39 case tprepared: 40 return "prepared" 41 case taborting: 42 return "aborting" 43 case tapplying: 44 return "applying" 45 case taborted: 46 return "aborted" 47 case tapplied: 48 return "applied" 49 } 50 panic(fmt.Errorf("unknown state: %d", s)) 51} 52 53var rand *mrand.Rand 54var randmu sync.Mutex 55 56func init() { 57 var seed int64 58 err := binary.Read(crand.Reader, binary.BigEndian, &seed) 59 if err != nil { 60 panic(err) 61 } 62 rand = mrand.New(mrand.NewSource(seed)) 63} 64 65type transaction struct { 66 Id bson.ObjectId `bson:"_id"` 67 State state `bson:"s"` 68 Info interface{} `bson:"i,omitempty"` 69 Ops []Op `bson:"o"` 70 Nonce string `bson:"n,omitempty"` 71 Revnos []int64 `bson:"r,omitempty"` 72 73 docKeysCached docKeys 74} 75 76func (t *transaction) String() string { 77 if t.Nonce == "" { 78 return t.Id.Hex() 79 } 80 return string(t.token()) 81} 82 83func (t *transaction) done() bool { 84 return t.State == tapplied || t.State == taborted 85} 86 87func (t *transaction) token() token { 88 if t.Nonce == "" { 89 panic("transaction has no nonce") 90 } 91 return tokenFor(t) 92} 93 94func (t *transaction) docKeys() docKeys { 95 if t.docKeysCached != nil { 96 return t.docKeysCached 97 } 98 dkeys := make(docKeys, 0, len(t.Ops)) 99NextOp: 100 for _, op := range t.Ops { 101 dkey := op.docKey() 102 for i := range dkeys { 103 if dkey == dkeys[i] { 104 continue NextOp 105 } 106 } 107 dkeys = append(dkeys, dkey) 108 } 109 sort.Sort(dkeys) 110 t.docKeysCached = dkeys 111 return dkeys 112} 113 114// tokenFor returns a unique transaction token that 115// is composed by t's id and a nonce. If t already has 116// a nonce assigned to it, it will be used, otherwise 117// a new nonce will be generated. 118func tokenFor(t *transaction) token { 119 nonce := t.Nonce 120 if nonce == "" { 121 nonce = newNonce() 122 } 123 return token(t.Id.Hex() + "_" + nonce) 124} 125 126func newNonce() string { 127 randmu.Lock() 128 r := rand.Uint32() 129 randmu.Unlock() 130 n := make([]byte, 8) 131 for i := uint(0); i < 8; i++ { 132 n[i] = "0123456789abcdef"[(r>>(4*i))&0xf] 133 } 134 return string(n) 135} 136 137type token string 138 139func (tt token) id() bson.ObjectId { return bson.ObjectIdHex(string(tt[:24])) } 140func (tt token) nonce() string { return string(tt[25:]) } 141 142// Op represents an operation to a single document that may be 143// applied as part of a transaction with other operations. 144type Op struct { 145 // C and Id identify the collection and document this operation 146 // refers to. Id is matched against the "_id" document field. 147 C string `bson:"c"` 148 Id interface{} `bson:"d"` 149 150 // Assert optionally holds a query document that is used to 151 // test the operation document at the time the transaction is 152 // going to be applied. The assertions for all operations in 153 // a transaction are tested before any changes take place, 154 // and the transaction is entirely aborted if any of them 155 // fails. This is also the only way to prevent a transaction 156 // from being being applied (the transaction continues despite 157 // the outcome of Insert, Update, and Remove). 158 Assert interface{} `bson:"a,omitempty"` 159 160 // The Insert, Update and Remove fields describe the mutation 161 // intended by the operation. At most one of them may be set 162 // per operation. If none are set, Assert must be set and the 163 // operation becomes a read-only test. 164 // 165 // Insert holds the document to be inserted at the time the 166 // transaction is applied. The Id field will be inserted 167 // into the document automatically as its _id field. The 168 // transaction will continue even if the document already 169 // exists. Use Assert with txn.DocMissing if the insertion is 170 // required. 171 // 172 // Update holds the update document to be applied at the time 173 // the transaction is applied. The transaction will continue 174 // even if a document with Id is missing. Use Assert to 175 // test for the document presence or its contents. 176 // 177 // Remove indicates whether to remove the document with Id. 178 // The transaction continues even if the document doesn't yet 179 // exist at the time the transaction is applied. Use Assert 180 // with txn.DocExists to make sure it will be removed. 181 Insert interface{} `bson:"i,omitempty"` 182 Update interface{} `bson:"u,omitempty"` 183 Remove bool `bson:"r,omitempty"` 184} 185 186func (op *Op) isChange() bool { 187 return op.Update != nil || op.Insert != nil || op.Remove 188} 189 190func (op *Op) docKey() docKey { 191 return docKey{op.C, op.Id} 192} 193 194func (op *Op) name() string { 195 switch { 196 case op.Update != nil: 197 return "update" 198 case op.Insert != nil: 199 return "insert" 200 case op.Remove: 201 return "remove" 202 case op.Assert != nil: 203 return "assert" 204 } 205 return "none" 206} 207 208const ( 209 // DocExists and DocMissing may be used on an operation's 210 // Assert value to assert that the document with the given 211 // Id exists or does not exist, respectively. 212 DocExists = "d+" 213 DocMissing = "d-" 214) 215 216// A Runner applies operations as part of a transaction onto any number 217// of collections within a database. See the Run method for details. 218type Runner struct { 219 tc *mgo.Collection // txns 220 sc *mgo.Collection // stash 221 lc *mgo.Collection // log 222} 223 224// NewRunner returns a new transaction runner that uses tc to hold its 225// transactions. 226// 227// Multiple transaction collections may exist in a single database, but 228// all collections that are touched by operations in a given transaction 229// collection must be handled exclusively by it. 230// 231// A second collection with the same name of tc but suffixed by ".stash" 232// will be used for implementing the transactional behavior of insert 233// and remove operations. 234func NewRunner(tc *mgo.Collection) *Runner { 235 return &Runner{tc, tc.Database.C(tc.Name + ".stash"), nil} 236} 237 238var ErrAborted = fmt.Errorf("transaction aborted") 239 240// Run creates a new transaction with ops and runs it immediately. 241// The id parameter specifies the transaction id, and may be written 242// down ahead of time to later verify the success of the change and 243// resume it, when the procedure is interrupted for any reason. If 244// empty, a random id will be generated. 245// The info parameter, if not nil, is included under the "i" 246// field of the transaction document. 247// 248// Operations across documents are not atomically applied, but are 249// guaranteed to be eventually all applied in the order provided or 250// all aborted, as long as the affected documents are only modified 251// through transactions. If documents are simultaneously modified 252// by transactions and out of transactions the behavior is undefined. 253// 254// If Run returns no errors, all operations were applied successfully. 255// If it returns ErrAborted, one or more operations can't be applied 256// and the transaction was entirely aborted with no changes performed. 257// Otherwise, if the transaction is interrupted while running for any 258// reason, it may be resumed explicitly or by attempting to apply 259// another transaction on any of the documents targeted by ops, as 260// long as the interruption was made after the transaction document 261// itself was inserted. Run Resume with the obtained transaction id 262// to confirm whether the transaction was applied or not. 263// 264// Any number of transactions may be run concurrently, with one 265// runner or many. 266func (r *Runner) Run(ops []Op, id bson.ObjectId, info interface{}) (err error) { 267 const efmt = "error in transaction op %d: %s" 268 for i := range ops { 269 op := &ops[i] 270 if op.C == "" || op.Id == nil { 271 return fmt.Errorf(efmt, i, "C or Id missing") 272 } 273 changes := 0 274 if op.Insert != nil { 275 changes++ 276 } 277 if op.Update != nil { 278 changes++ 279 } 280 if op.Remove { 281 changes++ 282 } 283 if changes > 1 { 284 return fmt.Errorf(efmt, i, "more than one of Insert/Update/Remove set") 285 } 286 if changes == 0 && op.Assert == nil { 287 return fmt.Errorf(efmt, i, "none of Assert/Insert/Update/Remove set") 288 } 289 } 290 if id == "" { 291 id = bson.NewObjectId() 292 } 293 294 // Insert transaction sooner rather than later, to stay on the safer side. 295 t := transaction{ 296 Id: id, 297 Ops: ops, 298 State: tpreparing, 299 Info: info, 300 } 301 if err = r.tc.Insert(&t); err != nil { 302 return err 303 } 304 if err = flush(r, &t); err != nil { 305 return err 306 } 307 if t.State == taborted { 308 return ErrAborted 309 } else if t.State != tapplied { 310 panic(fmt.Errorf("invalid state for %s after flush: %q", &t, t.State)) 311 } 312 return nil 313} 314 315// ResumeAll resumes all pending transactions. All ErrAborted errors 316// from individual transactions are ignored. 317func (r *Runner) ResumeAll() (err error) { 318 debugf("Resuming all unfinished transactions") 319 iter := r.tc.Find(bson.D{{"s", bson.D{{"$in", []state{tpreparing, tprepared, tapplying}}}}}).Iter() 320 var t transaction 321 for iter.Next(&t) { 322 if t.State == tapplied || t.State == taborted { 323 continue 324 } 325 debugf("Resuming %s from %q", t.Id, t.State) 326 if err := flush(r, &t); err != nil { 327 return err 328 } 329 if !t.done() { 330 panic(fmt.Errorf("invalid state for %s after flush: %q", &t, t.State)) 331 } 332 } 333 return nil 334} 335 336// Resume resumes the transaction with id. It returns mgo.ErrNotFound 337// if the transaction is not found. Otherwise, it has the same semantics 338// of the Run method after the transaction is inserted. 339func (r *Runner) Resume(id bson.ObjectId) (err error) { 340 t, err := r.load(id) 341 if err != nil { 342 return err 343 } 344 if !t.done() { 345 debugf("Resuming %s from %q", t, t.State) 346 if err := flush(r, t); err != nil { 347 return err 348 } 349 } 350 if t.State == taborted { 351 return ErrAborted 352 } else if t.State != tapplied { 353 panic(fmt.Errorf("invalid state for %s after flush: %q", t, t.State)) 354 } 355 return nil 356} 357 358// ChangeLog enables logging of changes to the given collection 359// every time a transaction that modifies content is done being 360// applied. 361// 362// Saved documents are in the format: 363// 364// {"_id": <txn id>, <collection>: {"d": [<doc id>, ...], "r": [<doc revno>, ...]}} 365// 366// The document revision is the value of the txn-revno field after 367// the change has been applied. Negative values indicate the document 368// was not present in the collection. Revisions will not change when 369// updates or removes are applied to missing documents or inserts are 370// attempted when the document isn't present. 371func (r *Runner) ChangeLog(logc *mgo.Collection) { 372 r.lc = logc 373} 374 375// PurgeMissing removes from collections any state that refers to transaction 376// documents that for whatever reason have been lost from the system (removed 377// by accident or lost in a hard crash, for example). 378// 379// This method should very rarely be needed, if at all, and should never be 380// used during the normal operation of an application. Its purpose is to put 381// a system that has seen unavoidable corruption back in a working state. 382func (r *Runner) PurgeMissing(collections ...string) error { 383 type M map[string]interface{} 384 type S []interface{} 385 386 type TDoc struct { 387 Id interface{} "_id" 388 TxnQueue []string "txn-queue" 389 } 390 391 found := make(map[bson.ObjectId]bool) 392 393 sort.Strings(collections) 394 for _, collection := range collections { 395 c := r.tc.Database.C(collection) 396 iter := c.Find(nil).Select(bson.M{"_id": 1, "txn-queue": 1}).Iter() 397 var tdoc TDoc 398 for iter.Next(&tdoc) { 399 for _, txnToken := range tdoc.TxnQueue { 400 txnId := bson.ObjectIdHex(txnToken[:24]) 401 if found[txnId] { 402 continue 403 } 404 if r.tc.FindId(txnId).One(nil) == nil { 405 found[txnId] = true 406 continue 407 } 408 logf("WARNING: purging from document %s/%v the missing transaction id %s", collection, tdoc.Id, txnId) 409 err := c.UpdateId(tdoc.Id, M{"$pull": M{"txn-queue": M{"$regex": "^" + txnId.Hex() + "_*"}}}) 410 if err != nil { 411 return fmt.Errorf("error purging missing transaction %s: %v", txnId.Hex(), err) 412 } 413 } 414 } 415 if err := iter.Close(); err != nil { 416 return fmt.Errorf("transaction queue iteration error for %s: %v", collection, err) 417 } 418 } 419 420 type StashTDoc struct { 421 Id docKey "_id" 422 TxnQueue []string "txn-queue" 423 } 424 425 iter := r.sc.Find(nil).Select(bson.M{"_id": 1, "txn-queue": 1}).Iter() 426 var stdoc StashTDoc 427 for iter.Next(&stdoc) { 428 for _, txnToken := range stdoc.TxnQueue { 429 txnId := bson.ObjectIdHex(txnToken[:24]) 430 if found[txnId] { 431 continue 432 } 433 if r.tc.FindId(txnId).One(nil) == nil { 434 found[txnId] = true 435 continue 436 } 437 logf("WARNING: purging from stash document %s/%v the missing transaction id %s", stdoc.Id.C, stdoc.Id.Id, txnId) 438 err := r.sc.UpdateId(stdoc.Id, M{"$pull": M{"txn-queue": M{"$regex": "^" + txnId.Hex() + "_*"}}}) 439 if err != nil { 440 return fmt.Errorf("error purging missing transaction %s: %v", txnId.Hex(), err) 441 } 442 } 443 } 444 if err := iter.Close(); err != nil { 445 return fmt.Errorf("transaction stash iteration error: %v", err) 446 } 447 448 return nil 449} 450 451func (r *Runner) load(id bson.ObjectId) (*transaction, error) { 452 var t transaction 453 err := r.tc.FindId(id).One(&t) 454 if err == mgo.ErrNotFound { 455 return nil, fmt.Errorf("cannot find transaction %s", id) 456 } else if err != nil { 457 return nil, err 458 } 459 return &t, nil 460} 461 462type typeNature int 463 464const ( 465 // The order of these values matters. Transactions 466 // from applications using different ordering will 467 // be incompatible with each other. 468 _ typeNature = iota 469 natureString 470 natureInt 471 natureFloat 472 natureBool 473 natureStruct 474) 475 476func valueNature(v interface{}) (value interface{}, nature typeNature) { 477 rv := reflect.ValueOf(v) 478 switch rv.Kind() { 479 case reflect.String: 480 return rv.String(), natureString 481 case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64: 482 return rv.Int(), natureInt 483 case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64: 484 return int64(rv.Uint()), natureInt 485 case reflect.Float32, reflect.Float64: 486 return rv.Float(), natureFloat 487 case reflect.Bool: 488 return rv.Bool(), natureBool 489 case reflect.Struct: 490 return v, natureStruct 491 } 492 panic("document id type unsupported by txn: " + rv.Kind().String()) 493} 494 495type docKey struct { 496 C string 497 Id interface{} 498} 499 500type docKeys []docKey 501 502func (ks docKeys) Len() int { return len(ks) } 503func (ks docKeys) Swap(i, j int) { ks[i], ks[j] = ks[j], ks[i] } 504func (ks docKeys) Less(i, j int) bool { 505 a, b := ks[i], ks[j] 506 if a.C != b.C { 507 return a.C < b.C 508 } 509 return valuecmp(a.Id, b.Id) == -1 510} 511 512func valuecmp(a, b interface{}) int { 513 av, an := valueNature(a) 514 bv, bn := valueNature(b) 515 if an < bn { 516 return -1 517 } 518 if an > bn { 519 return 1 520 } 521 522 if av == bv { 523 return 0 524 } 525 var less bool 526 switch an { 527 case natureString: 528 less = av.(string) < bv.(string) 529 case natureInt: 530 less = av.(int64) < bv.(int64) 531 case natureFloat: 532 less = av.(float64) < bv.(float64) 533 case natureBool: 534 less = !av.(bool) && bv.(bool) 535 case natureStruct: 536 less = structcmp(av, bv) == -1 537 default: 538 panic("unreachable") 539 } 540 if less { 541 return -1 542 } 543 return 1 544} 545 546func structcmp(a, b interface{}) int { 547 av := reflect.ValueOf(a) 548 bv := reflect.ValueOf(b) 549 550 var ai, bi = 0, 0 551 var an, bn = av.NumField(), bv.NumField() 552 var avi, bvi interface{} 553 var af, bf reflect.StructField 554 for { 555 for ai < an { 556 af = av.Type().Field(ai) 557 if isExported(af.Name) { 558 avi = av.Field(ai).Interface() 559 ai++ 560 break 561 } 562 ai++ 563 } 564 for bi < bn { 565 bf = bv.Type().Field(bi) 566 if isExported(bf.Name) { 567 bvi = bv.Field(bi).Interface() 568 bi++ 569 break 570 } 571 bi++ 572 } 573 if n := valuecmp(avi, bvi); n != 0 { 574 return n 575 } 576 nameA := getFieldName(af) 577 nameB := getFieldName(bf) 578 if nameA < nameB { 579 return -1 580 } 581 if nameA > nameB { 582 return 1 583 } 584 if ai == an && bi == bn { 585 return 0 586 } 587 if ai == an || bi == bn { 588 if ai == bn { 589 return -1 590 } 591 return 1 592 } 593 } 594 panic("unreachable") 595} 596 597func isExported(name string) bool { 598 a := name[0] 599 return a >= 'A' && a <= 'Z' 600} 601 602func getFieldName(f reflect.StructField) string { 603 name := f.Tag.Get("bson") 604 if i := strings.Index(name, ","); i >= 0 { 605 name = name[:i] 606 } 607 if name == "" { 608 name = strings.ToLower(f.Name) 609 } 610 return name 611} 612