1// The txn package implements support for multi-document transactions.
2//
3// For details check the following blog post:
4//
5//     http://blog.labix.org/2012/08/22/multi-doc-transactions-for-mongodb
6//
7package txn
8
9import (
10	"encoding/binary"
11	"fmt"
12	"reflect"
13	"sort"
14	"strings"
15	"sync"
16
17	"gopkg.in/mgo.v2"
18	"gopkg.in/mgo.v2/bson"
19
20	crand "crypto/rand"
21	mrand "math/rand"
22)
23
24type state int
25
26const (
27	tpreparing state = 1 // One or more documents not prepared
28	tprepared  state = 2 // Prepared but not yet ready to run
29	taborting  state = 3 // Assertions failed, cleaning up
30	tapplying  state = 4 // Changes are in progress
31	taborted   state = 5 // Pre-conditions failed, nothing done
32	tapplied   state = 6 // All changes applied
33)
34
35func (s state) String() string {
36	switch s {
37	case tpreparing:
38		return "preparing"
39	case tprepared:
40		return "prepared"
41	case taborting:
42		return "aborting"
43	case tapplying:
44		return "applying"
45	case taborted:
46		return "aborted"
47	case tapplied:
48		return "applied"
49	}
50	panic(fmt.Errorf("unknown state: %d", s))
51}
52
53var rand *mrand.Rand
54var randmu sync.Mutex
55
56func init() {
57	var seed int64
58	err := binary.Read(crand.Reader, binary.BigEndian, &seed)
59	if err != nil {
60		panic(err)
61	}
62	rand = mrand.New(mrand.NewSource(seed))
63}
64
65type transaction struct {
66	Id     bson.ObjectId `bson:"_id"`
67	State  state         `bson:"s"`
68	Info   interface{}   `bson:"i,omitempty"`
69	Ops    []Op          `bson:"o"`
70	Nonce  string        `bson:"n,omitempty"`
71	Revnos []int64       `bson:"r,omitempty"`
72
73	docKeysCached docKeys
74}
75
76func (t *transaction) String() string {
77	if t.Nonce == "" {
78		return t.Id.Hex()
79	}
80	return string(t.token())
81}
82
83func (t *transaction) done() bool {
84	return t.State == tapplied || t.State == taborted
85}
86
87func (t *transaction) token() token {
88	if t.Nonce == "" {
89		panic("transaction has no nonce")
90	}
91	return tokenFor(t)
92}
93
94func (t *transaction) docKeys() docKeys {
95	if t.docKeysCached != nil {
96		return t.docKeysCached
97	}
98	dkeys := make(docKeys, 0, len(t.Ops))
99NextOp:
100	for _, op := range t.Ops {
101		dkey := op.docKey()
102		for i := range dkeys {
103			if dkey == dkeys[i] {
104				continue NextOp
105			}
106		}
107		dkeys = append(dkeys, dkey)
108	}
109	sort.Sort(dkeys)
110	t.docKeysCached = dkeys
111	return dkeys
112}
113
114// tokenFor returns a unique transaction token that
115// is composed by t's id and a nonce. If t already has
116// a nonce assigned to it, it will be used, otherwise
117// a new nonce will be generated.
118func tokenFor(t *transaction) token {
119	nonce := t.Nonce
120	if nonce == "" {
121		nonce = newNonce()
122	}
123	return token(t.Id.Hex() + "_" + nonce)
124}
125
126func newNonce() string {
127	randmu.Lock()
128	r := rand.Uint32()
129	randmu.Unlock()
130	n := make([]byte, 8)
131	for i := uint(0); i < 8; i++ {
132		n[i] = "0123456789abcdef"[(r>>(4*i))&0xf]
133	}
134	return string(n)
135}
136
137type token string
138
139func (tt token) id() bson.ObjectId { return bson.ObjectIdHex(string(tt[:24])) }
140func (tt token) nonce() string     { return string(tt[25:]) }
141
142// Op represents an operation to a single document that may be
143// applied as part of a transaction with other operations.
144type Op struct {
145	// C and Id identify the collection and document this operation
146	// refers to. Id is matched against the "_id" document field.
147	C  string      `bson:"c"`
148	Id interface{} `bson:"d"`
149
150	// Assert optionally holds a query document that is used to
151	// test the operation document at the time the transaction is
152	// going to be applied. The assertions for all operations in
153	// a transaction are tested before any changes take place,
154	// and the transaction is entirely aborted if any of them
155	// fails. This is also the only way to prevent a transaction
156	// from being being applied (the transaction continues despite
157	// the outcome of Insert, Update, and Remove).
158	Assert interface{} `bson:"a,omitempty"`
159
160	// The Insert, Update and Remove fields describe the mutation
161	// intended by the operation. At most one of them may be set
162	// per operation. If none are set, Assert must be set and the
163	// operation becomes a read-only test.
164	//
165	// Insert holds the document to be inserted at the time the
166	// transaction is applied. The Id field will be inserted
167	// into the document automatically as its _id field. The
168	// transaction will continue even if the document already
169	// exists. Use Assert with txn.DocMissing if the insertion is
170	// required.
171	//
172	// Update holds the update document to be applied at the time
173	// the transaction is applied. The transaction will continue
174	// even if a document with Id is missing. Use Assert to
175	// test for the document presence or its contents.
176	//
177	// Remove indicates whether to remove the document with Id.
178	// The transaction continues even if the document doesn't yet
179	// exist at the time the transaction is applied. Use Assert
180	// with txn.DocExists to make sure it will be removed.
181	Insert interface{} `bson:"i,omitempty"`
182	Update interface{} `bson:"u,omitempty"`
183	Remove bool        `bson:"r,omitempty"`
184}
185
186func (op *Op) isChange() bool {
187	return op.Update != nil || op.Insert != nil || op.Remove
188}
189
190func (op *Op) docKey() docKey {
191	return docKey{op.C, op.Id}
192}
193
194func (op *Op) name() string {
195	switch {
196	case op.Update != nil:
197		return "update"
198	case op.Insert != nil:
199		return "insert"
200	case op.Remove:
201		return "remove"
202	case op.Assert != nil:
203		return "assert"
204	}
205	return "none"
206}
207
208const (
209	// DocExists and DocMissing may be used on an operation's
210	// Assert value to assert that the document with the given
211	// Id exists or does not exist, respectively.
212	DocExists  = "d+"
213	DocMissing = "d-"
214)
215
216// A Runner applies operations as part of a transaction onto any number
217// of collections within a database. See the Run method for details.
218type Runner struct {
219	tc *mgo.Collection // txns
220	sc *mgo.Collection // stash
221	lc *mgo.Collection // log
222}
223
224// NewRunner returns a new transaction runner that uses tc to hold its
225// transactions.
226//
227// Multiple transaction collections may exist in a single database, but
228// all collections that are touched by operations in a given transaction
229// collection must be handled exclusively by it.
230//
231// A second collection with the same name of tc but suffixed by ".stash"
232// will be used for implementing the transactional behavior of insert
233// and remove operations.
234func NewRunner(tc *mgo.Collection) *Runner {
235	return &Runner{tc, tc.Database.C(tc.Name + ".stash"), nil}
236}
237
238var ErrAborted = fmt.Errorf("transaction aborted")
239
240// Run creates a new transaction with ops and runs it immediately.
241// The id parameter specifies the transaction id, and may be written
242// down ahead of time to later verify the success of the change and
243// resume it, when the procedure is interrupted for any reason. If
244// empty, a random id will be generated.
245// The info parameter, if not nil, is included under the "i"
246// field of the transaction document.
247//
248// Operations across documents are not atomically applied, but are
249// guaranteed to be eventually all applied in the order provided or
250// all aborted, as long as the affected documents are only modified
251// through transactions. If documents are simultaneously modified
252// by transactions and out of transactions the behavior is undefined.
253//
254// If Run returns no errors, all operations were applied successfully.
255// If it returns ErrAborted, one or more operations can't be applied
256// and the transaction was entirely aborted with no changes performed.
257// Otherwise, if the transaction is interrupted while running for any
258// reason, it may be resumed explicitly or by attempting to apply
259// another transaction on any of the documents targeted by ops, as
260// long as the interruption was made after the transaction document
261// itself was inserted. Run Resume with the obtained transaction id
262// to confirm whether the transaction was applied or not.
263//
264// Any number of transactions may be run concurrently, with one
265// runner or many.
266func (r *Runner) Run(ops []Op, id bson.ObjectId, info interface{}) (err error) {
267	const efmt = "error in transaction op %d: %s"
268	for i := range ops {
269		op := &ops[i]
270		if op.C == "" || op.Id == nil {
271			return fmt.Errorf(efmt, i, "C or Id missing")
272		}
273		changes := 0
274		if op.Insert != nil {
275			changes++
276		}
277		if op.Update != nil {
278			changes++
279		}
280		if op.Remove {
281			changes++
282		}
283		if changes > 1 {
284			return fmt.Errorf(efmt, i, "more than one of Insert/Update/Remove set")
285		}
286		if changes == 0 && op.Assert == nil {
287			return fmt.Errorf(efmt, i, "none of Assert/Insert/Update/Remove set")
288		}
289	}
290	if id == "" {
291		id = bson.NewObjectId()
292	}
293
294	// Insert transaction sooner rather than later, to stay on the safer side.
295	t := transaction{
296		Id:    id,
297		Ops:   ops,
298		State: tpreparing,
299		Info:  info,
300	}
301	if err = r.tc.Insert(&t); err != nil {
302		return err
303	}
304	if err = flush(r, &t); err != nil {
305		return err
306	}
307	if t.State == taborted {
308		return ErrAborted
309	} else if t.State != tapplied {
310		panic(fmt.Errorf("invalid state for %s after flush: %q", &t, t.State))
311	}
312	return nil
313}
314
315// ResumeAll resumes all pending transactions. All ErrAborted errors
316// from individual transactions are ignored.
317func (r *Runner) ResumeAll() (err error) {
318	debugf("Resuming all unfinished transactions")
319	iter := r.tc.Find(bson.D{{"s", bson.D{{"$in", []state{tpreparing, tprepared, tapplying}}}}}).Iter()
320	var t transaction
321	for iter.Next(&t) {
322		if t.State == tapplied || t.State == taborted {
323			continue
324		}
325		debugf("Resuming %s from %q", t.Id, t.State)
326		if err := flush(r, &t); err != nil {
327			return err
328		}
329		if !t.done() {
330			panic(fmt.Errorf("invalid state for %s after flush: %q", &t, t.State))
331		}
332	}
333	return nil
334}
335
336// Resume resumes the transaction with id. It returns mgo.ErrNotFound
337// if the transaction is not found. Otherwise, it has the same semantics
338// of the Run method after the transaction is inserted.
339func (r *Runner) Resume(id bson.ObjectId) (err error) {
340	t, err := r.load(id)
341	if err != nil {
342		return err
343	}
344	if !t.done() {
345		debugf("Resuming %s from %q", t, t.State)
346		if err := flush(r, t); err != nil {
347			return err
348		}
349	}
350	if t.State == taborted {
351		return ErrAborted
352	} else if t.State != tapplied {
353		panic(fmt.Errorf("invalid state for %s after flush: %q", t, t.State))
354	}
355	return nil
356}
357
358// ChangeLog enables logging of changes to the given collection
359// every time a transaction that modifies content is done being
360// applied.
361//
362// Saved documents are in the format:
363//
364//     {"_id": <txn id>, <collection>: {"d": [<doc id>, ...], "r": [<doc revno>, ...]}}
365//
366// The document revision is the value of the txn-revno field after
367// the change has been applied. Negative values indicate the document
368// was not present in the collection. Revisions will not change when
369// updates or removes are applied to missing documents or inserts are
370// attempted when the document isn't present.
371func (r *Runner) ChangeLog(logc *mgo.Collection) {
372	r.lc = logc
373}
374
375// PurgeMissing removes from collections any state that refers to transaction
376// documents that for whatever reason have been lost from the system (removed
377// by accident or lost in a hard crash, for example).
378//
379// This method should very rarely be needed, if at all, and should never be
380// used during the normal operation of an application. Its purpose is to put
381// a system that has seen unavoidable corruption back in a working state.
382func (r *Runner) PurgeMissing(collections ...string) error {
383	type M map[string]interface{}
384	type S []interface{}
385
386	type TDoc struct {
387		Id       interface{} "_id"
388		TxnQueue []string    "txn-queue"
389	}
390
391	found := make(map[bson.ObjectId]bool)
392
393	sort.Strings(collections)
394	for _, collection := range collections {
395		c := r.tc.Database.C(collection)
396		iter := c.Find(nil).Select(bson.M{"_id": 1, "txn-queue": 1}).Iter()
397		var tdoc TDoc
398		for iter.Next(&tdoc) {
399			for _, txnToken := range tdoc.TxnQueue {
400				txnId := bson.ObjectIdHex(txnToken[:24])
401				if found[txnId] {
402					continue
403				}
404				if r.tc.FindId(txnId).One(nil) == nil {
405					found[txnId] = true
406					continue
407				}
408				logf("WARNING: purging from document %s/%v the missing transaction id %s", collection, tdoc.Id, txnId)
409				err := c.UpdateId(tdoc.Id, M{"$pull": M{"txn-queue": M{"$regex": "^" + txnId.Hex() + "_*"}}})
410				if err != nil {
411					return fmt.Errorf("error purging missing transaction %s: %v", txnId.Hex(), err)
412				}
413			}
414		}
415		if err := iter.Close(); err != nil {
416			return fmt.Errorf("transaction queue iteration error for %s: %v", collection, err)
417		}
418	}
419
420	type StashTDoc struct {
421		Id       docKey   "_id"
422		TxnQueue []string "txn-queue"
423	}
424
425	iter := r.sc.Find(nil).Select(bson.M{"_id": 1, "txn-queue": 1}).Iter()
426	var stdoc StashTDoc
427	for iter.Next(&stdoc) {
428		for _, txnToken := range stdoc.TxnQueue {
429			txnId := bson.ObjectIdHex(txnToken[:24])
430			if found[txnId] {
431				continue
432			}
433			if r.tc.FindId(txnId).One(nil) == nil {
434				found[txnId] = true
435				continue
436			}
437			logf("WARNING: purging from stash document %s/%v the missing transaction id %s", stdoc.Id.C, stdoc.Id.Id, txnId)
438			err := r.sc.UpdateId(stdoc.Id, M{"$pull": M{"txn-queue": M{"$regex": "^" + txnId.Hex() + "_*"}}})
439			if err != nil {
440				return fmt.Errorf("error purging missing transaction %s: %v", txnId.Hex(), err)
441			}
442		}
443	}
444	if err := iter.Close(); err != nil {
445		return fmt.Errorf("transaction stash iteration error: %v", err)
446	}
447
448	return nil
449}
450
451func (r *Runner) load(id bson.ObjectId) (*transaction, error) {
452	var t transaction
453	err := r.tc.FindId(id).One(&t)
454	if err == mgo.ErrNotFound {
455		return nil, fmt.Errorf("cannot find transaction %s", id)
456	} else if err != nil {
457		return nil, err
458	}
459	return &t, nil
460}
461
462type typeNature int
463
464const (
465	// The order of these values matters. Transactions
466	// from applications using different ordering will
467	// be incompatible with each other.
468	_ typeNature = iota
469	natureString
470	natureInt
471	natureFloat
472	natureBool
473	natureStruct
474)
475
476func valueNature(v interface{}) (value interface{}, nature typeNature) {
477	rv := reflect.ValueOf(v)
478	switch rv.Kind() {
479	case reflect.String:
480		return rv.String(), natureString
481	case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
482		return rv.Int(), natureInt
483	case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64:
484		return int64(rv.Uint()), natureInt
485	case reflect.Float32, reflect.Float64:
486		return rv.Float(), natureFloat
487	case reflect.Bool:
488		return rv.Bool(), natureBool
489	case reflect.Struct:
490		return v, natureStruct
491	}
492	panic("document id type unsupported by txn: " + rv.Kind().String())
493}
494
495type docKey struct {
496	C  string
497	Id interface{}
498}
499
500type docKeys []docKey
501
502func (ks docKeys) Len() int      { return len(ks) }
503func (ks docKeys) Swap(i, j int) { ks[i], ks[j] = ks[j], ks[i] }
504func (ks docKeys) Less(i, j int) bool {
505	a, b := ks[i], ks[j]
506	if a.C != b.C {
507		return a.C < b.C
508	}
509	return valuecmp(a.Id, b.Id) == -1
510}
511
512func valuecmp(a, b interface{}) int {
513	av, an := valueNature(a)
514	bv, bn := valueNature(b)
515	if an < bn {
516		return -1
517	}
518	if an > bn {
519		return 1
520	}
521
522	if av == bv {
523		return 0
524	}
525	var less bool
526	switch an {
527	case natureString:
528		less = av.(string) < bv.(string)
529	case natureInt:
530		less = av.(int64) < bv.(int64)
531	case natureFloat:
532		less = av.(float64) < bv.(float64)
533	case natureBool:
534		less = !av.(bool) && bv.(bool)
535	case natureStruct:
536		less = structcmp(av, bv) == -1
537	default:
538		panic("unreachable")
539	}
540	if less {
541		return -1
542	}
543	return 1
544}
545
546func structcmp(a, b interface{}) int {
547	av := reflect.ValueOf(a)
548	bv := reflect.ValueOf(b)
549
550	var ai, bi = 0, 0
551	var an, bn = av.NumField(), bv.NumField()
552	var avi, bvi interface{}
553	var af, bf reflect.StructField
554	for {
555		for ai < an {
556			af = av.Type().Field(ai)
557			if isExported(af.Name) {
558				avi = av.Field(ai).Interface()
559				ai++
560				break
561			}
562			ai++
563		}
564		for bi < bn {
565			bf = bv.Type().Field(bi)
566			if isExported(bf.Name) {
567				bvi = bv.Field(bi).Interface()
568				bi++
569				break
570			}
571			bi++
572		}
573		if n := valuecmp(avi, bvi); n != 0 {
574			return n
575		}
576		nameA := getFieldName(af)
577		nameB := getFieldName(bf)
578		if nameA < nameB {
579			return -1
580		}
581		if nameA > nameB {
582			return 1
583		}
584		if ai == an && bi == bn {
585			return 0
586		}
587		if ai == an || bi == bn {
588			if ai == bn {
589				return -1
590			}
591			return 1
592		}
593	}
594	panic("unreachable")
595}
596
597func isExported(name string) bool {
598	a := name[0]
599	return a >= 'A' && a <= 'Z'
600}
601
602func getFieldName(f reflect.StructField) string {
603	name := f.Tag.Get("bson")
604	if i := strings.Index(name, ","); i >= 0 {
605		name = name[:i]
606	}
607	if name == "" {
608		name = strings.ToLower(f.Name)
609	}
610	return name
611}
612