1package txn
2
3import (
4	"fmt"
5
6	"github.com/10gen/llmgo"
7	"github.com/10gen/llmgo/bson"
8)
9
10func flush(r *Runner, t *transaction) error {
11	f := &flusher{
12		Runner:   r,
13		goal:     t,
14		goalKeys: make(map[docKey]bool),
15		queue:    make(map[docKey][]token),
16		debugId:  debugPrefix(),
17	}
18	for _, dkey := range f.goal.docKeys() {
19		f.goalKeys[dkey] = true
20	}
21	return f.run()
22}
23
24type flusher struct {
25	*Runner
26	goal     *transaction
27	goalKeys map[docKey]bool
28	queue    map[docKey][]token
29	debugId  string
30}
31
32func (f *flusher) run() (err error) {
33	if chaosEnabled {
34		defer f.handleChaos(&err)
35	}
36
37	f.debugf("Processing %s", f.goal)
38	seen := make(map[bson.ObjectId]*transaction)
39	if err := f.recurse(f.goal, seen); err != nil {
40		return err
41	}
42	if f.goal.done() {
43		return nil
44	}
45
46	// Sparse workloads will generally be managed entirely by recurse.
47	// Getting here means one or more transactions have dependencies
48	// and perhaps cycles.
49
50	// Build successors data for Tarjan's sort. Must consider
51	// that entries in txn-queue are not necessarily valid.
52	successors := make(map[bson.ObjectId][]bson.ObjectId)
53	ready := true
54	for _, dqueue := range f.queue {
55	NextPair:
56		for i := 0; i < len(dqueue); i++ {
57			pred := dqueue[i]
58			predid := pred.id()
59			predt := seen[predid]
60			if predt == nil || predt.Nonce != pred.nonce() {
61				continue
62			}
63			predsuccids, ok := successors[predid]
64			if !ok {
65				successors[predid] = nil
66			}
67
68			for j := i + 1; j < len(dqueue); j++ {
69				succ := dqueue[j]
70				succid := succ.id()
71				succt := seen[succid]
72				if succt == nil || succt.Nonce != succ.nonce() {
73					continue
74				}
75				if _, ok := successors[succid]; !ok {
76					successors[succid] = nil
77				}
78
79				// Found a valid pred/succ pair.
80				i = j - 1
81				for _, predsuccid := range predsuccids {
82					if predsuccid == succid {
83						continue NextPair
84					}
85				}
86				successors[predid] = append(predsuccids, succid)
87				if succid == f.goal.Id {
88					// There are still pre-requisites to handle.
89					ready = false
90				}
91				continue NextPair
92			}
93		}
94	}
95	f.debugf("Queues: %v", f.queue)
96	f.debugf("Successors: %v", successors)
97	if ready {
98		f.debugf("Goal %s has no real pre-requisites", f.goal)
99		return f.advance(f.goal, nil, true)
100	}
101
102	// Robert Tarjan's algorithm for detecting strongly-connected
103	// components is used for topological sorting and detecting
104	// cycles at once. The order in which transactions are applied
105	// in commonly affected documents must be a global agreement.
106	sorted := tarjanSort(successors)
107	if debugEnabled {
108		f.debugf("Tarjan output: %v", sorted)
109	}
110	pull := make(map[bson.ObjectId]*transaction)
111	for i := len(sorted) - 1; i >= 0; i-- {
112		scc := sorted[i]
113		f.debugf("Flushing %v", scc)
114		if len(scc) == 1 {
115			pull[scc[0]] = seen[scc[0]]
116		}
117		for _, id := range scc {
118			if err := f.advance(seen[id], pull, true); err != nil {
119				return err
120			}
121		}
122		if len(scc) > 1 {
123			for _, id := range scc {
124				pull[id] = seen[id]
125			}
126		}
127	}
128	return nil
129}
130
131func (f *flusher) recurse(t *transaction, seen map[bson.ObjectId]*transaction) error {
132	seen[t.Id] = t
133	err := f.advance(t, nil, false)
134	if err != errPreReqs {
135		return err
136	}
137	for _, dkey := range t.docKeys() {
138		for _, dtt := range f.queue[dkey] {
139			id := dtt.id()
140			if seen[id] != nil {
141				continue
142			}
143			qt, err := f.load(id)
144			if err != nil {
145				return err
146			}
147			err = f.recurse(qt, seen)
148			if err != nil {
149				return err
150			}
151		}
152	}
153	return nil
154}
155
156func (f *flusher) advance(t *transaction, pull map[bson.ObjectId]*transaction, force bool) error {
157	for {
158		switch t.State {
159		case tpreparing, tprepared:
160			revnos, err := f.prepare(t, force)
161			if err != nil {
162				return err
163			}
164			if t.State != tprepared {
165				continue
166			}
167			if err = f.assert(t, revnos, pull); err != nil {
168				return err
169			}
170			if t.State != tprepared {
171				continue
172			}
173			if err = f.checkpoint(t, revnos); err != nil {
174				return err
175			}
176		case tapplying:
177			return f.apply(t, pull)
178		case taborting:
179			return f.abortOrReload(t, nil, pull)
180		case tapplied, taborted:
181			return nil
182		default:
183			panic(fmt.Errorf("transaction in unknown state: %q", t.State))
184		}
185	}
186	panic("unreachable")
187}
188
189type stash string
190
191const (
192	stashStable stash = ""
193	stashInsert stash = "insert"
194	stashRemove stash = "remove"
195)
196
197type txnInfo struct {
198	Queue  []token       `bson:"txn-queue"`
199	Revno  int64         `bson:"txn-revno,omitempty"`
200	Insert bson.ObjectId `bson:"txn-insert,omitempty"`
201	Remove bson.ObjectId `bson:"txn-remove,omitempty"`
202}
203
204type stashState string
205
206const (
207	stashNew       stashState = ""
208	stashInserting stashState = "inserting"
209)
210
211var txnFields = bson.D{{"txn-queue", 1}, {"txn-revno", 1}, {"txn-remove", 1}, {"txn-insert", 1}}
212
213var errPreReqs = fmt.Errorf("transaction has pre-requisites and force is false")
214
215// prepare injects t's id onto txn-queue for all affected documents
216// and collects the current txn-queue and txn-revno values during
217// the process. If the prepared txn-queue indicates that there are
218// pre-requisite transactions to be applied and the force parameter
219// is false, errPreReqs will be returned. Otherwise, the current
220// tip revision numbers for all the documents are returned.
221func (f *flusher) prepare(t *transaction, force bool) (revnos []int64, err error) {
222	if t.State != tpreparing {
223		return f.rescan(t, force)
224	}
225	f.debugf("Preparing %s", t)
226
227	// dkeys being sorted means stable iteration across all runners. This
228	// isn't strictly required, but reduces the chances of cycles.
229	dkeys := t.docKeys()
230
231	revno := make(map[docKey]int64)
232	info := txnInfo{}
233	tt := tokenFor(t)
234NextDoc:
235	for _, dkey := range dkeys {
236		change := mgo.Change{
237			Update:    bson.D{{"$addToSet", bson.D{{"txn-queue", tt}}}},
238			ReturnNew: true,
239		}
240		c := f.tc.Database.C(dkey.C)
241		cquery := c.FindId(dkey.Id).Select(txnFields)
242
243	RetryDoc:
244		change.Upsert = false
245		chaos("")
246		if _, err := cquery.Apply(change, &info); err == nil {
247			if info.Remove == "" {
248				// Fast path, unless workload is insert/remove heavy.
249				revno[dkey] = info.Revno
250				f.queue[dkey] = info.Queue
251				f.debugf("[A] Prepared document %v with revno %d and queue: %v", dkey, info.Revno, info.Queue)
252				continue NextDoc
253			} else {
254				// Handle remove in progress before preparing it.
255				if err := f.loadAndApply(info.Remove); err != nil {
256					return nil, err
257				}
258				goto RetryDoc
259			}
260		} else if err != mgo.ErrNotFound {
261			return nil, err
262		}
263
264		// Document missing. Use stash collection.
265		change.Upsert = true
266		chaos("")
267		_, err := f.sc.FindId(dkey).Apply(change, &info)
268		if err != nil {
269			return nil, err
270		}
271		if info.Insert != "" {
272			// Handle insert in progress before preparing it.
273			if err := f.loadAndApply(info.Insert); err != nil {
274				return nil, err
275			}
276			goto RetryDoc
277		}
278
279		// Must confirm stash is still in use and is the same one
280		// prepared, since applying a remove overwrites the stash.
281		docFound := false
282		stashFound := false
283		if err = c.FindId(dkey.Id).Select(txnFields).One(&info); err == nil {
284			docFound = true
285		} else if err != mgo.ErrNotFound {
286			return nil, err
287		} else if err = f.sc.FindId(dkey).One(&info); err == nil {
288			stashFound = true
289			if info.Revno == 0 {
290				// Missing revno in the stash only happens when it
291				// has been upserted, in which case it defaults to -1.
292				// Txn-inserted documents get revno -1 while in the stash
293				// for the first time, and -revno-1 == 2 when they go live.
294				info.Revno = -1
295			}
296		} else if err != mgo.ErrNotFound {
297			return nil, err
298		}
299
300		if docFound && info.Remove == "" || stashFound && info.Insert == "" {
301			for _, dtt := range info.Queue {
302				if dtt != tt {
303					continue
304				}
305				// Found tt properly prepared.
306				if stashFound {
307					f.debugf("[B] Prepared document %v on stash with revno %d and queue: %v", dkey, info.Revno, info.Queue)
308				} else {
309					f.debugf("[B] Prepared document %v with revno %d and queue: %v", dkey, info.Revno, info.Queue)
310				}
311				revno[dkey] = info.Revno
312				f.queue[dkey] = info.Queue
313				continue NextDoc
314			}
315		}
316
317		// The stash wasn't valid and tt got overwriten. Try again.
318		f.unstashToken(tt, dkey)
319		goto RetryDoc
320	}
321
322	// Save the prepared nonce onto t.
323	nonce := tt.nonce()
324	qdoc := bson.D{{"_id", t.Id}, {"s", tpreparing}}
325	udoc := bson.D{{"$set", bson.D{{"s", tprepared}, {"n", nonce}}}}
326	chaos("set-prepared")
327	err = f.tc.Update(qdoc, udoc)
328	if err == nil {
329		t.State = tprepared
330		t.Nonce = nonce
331	} else if err == mgo.ErrNotFound {
332		f.debugf("Can't save nonce of %s: LOST RACE", tt)
333		if err := f.reload(t); err != nil {
334			return nil, err
335		} else if t.State == tpreparing {
336			panic("can't save nonce yet transaction is still preparing")
337		} else if t.State != tprepared {
338			return t.Revnos, nil
339		}
340		tt = t.token()
341	} else if err != nil {
342		return nil, err
343	}
344
345	prereqs, found := f.hasPreReqs(tt, dkeys)
346	if !found {
347		// Must only happen when reloading above.
348		return f.rescan(t, force)
349	} else if prereqs && !force {
350		f.debugf("Prepared queue with %s [has prereqs & not forced].", tt)
351		return nil, errPreReqs
352	}
353	revnos = assembledRevnos(t.Ops, revno)
354	if !prereqs {
355		f.debugf("Prepared queue with %s [no prereqs]. Revnos: %v", tt, revnos)
356	} else {
357		f.debugf("Prepared queue with %s [forced] Revnos: %v", tt, revnos)
358	}
359	return revnos, nil
360}
361
362func (f *flusher) unstashToken(tt token, dkey docKey) error {
363	qdoc := bson.D{{"_id", dkey}, {"txn-queue", tt}}
364	udoc := bson.D{{"$pull", bson.D{{"txn-queue", tt}}}}
365	chaos("")
366	if err := f.sc.Update(qdoc, udoc); err == nil {
367		chaos("")
368		err = f.sc.Remove(bson.D{{"_id", dkey}, {"txn-queue", bson.D{}}})
369	} else if err != mgo.ErrNotFound {
370		return err
371	}
372	return nil
373}
374
375func (f *flusher) rescan(t *transaction, force bool) (revnos []int64, err error) {
376	f.debugf("Rescanning %s", t)
377	if t.State != tprepared {
378		panic(fmt.Errorf("rescanning transaction in invalid state: %q", t.State))
379	}
380
381	// dkeys being sorted means stable iteration across all
382	// runners. This isn't strictly required, but reduces the chances
383	// of cycles.
384	dkeys := t.docKeys()
385
386	tt := t.token()
387	if !force {
388		prereqs, found := f.hasPreReqs(tt, dkeys)
389		if found && prereqs {
390			// Its state is already known.
391			return nil, errPreReqs
392		}
393	}
394
395	revno := make(map[docKey]int64)
396	info := txnInfo{}
397	for _, dkey := range dkeys {
398		const retries = 3
399		retry := -1
400
401	RetryDoc:
402		retry++
403		c := f.tc.Database.C(dkey.C)
404		if err := c.FindId(dkey.Id).Select(txnFields).One(&info); err == mgo.ErrNotFound {
405			// Document is missing. Look in stash.
406			chaos("")
407			if err := f.sc.FindId(dkey).One(&info); err == mgo.ErrNotFound {
408				// Stash also doesn't exist. Maybe someone applied it.
409				if err := f.reload(t); err != nil {
410					return nil, err
411				} else if t.State != tprepared {
412					return t.Revnos, err
413				}
414				// Not applying either.
415				if retry < retries {
416					// Retry since there might be an insert/remove race.
417					goto RetryDoc
418				}
419				// Neither the doc nor the stash seem to exist.
420				return nil, fmt.Errorf("cannot find document %v for applying transaction %s", dkey, t)
421			} else if err != nil {
422				return nil, err
423			}
424			// Stash found.
425			if info.Insert != "" {
426				// Handle insert in progress before assuming ordering is good.
427				if err := f.loadAndApply(info.Insert); err != nil {
428					return nil, err
429				}
430				goto RetryDoc
431			}
432			if info.Revno == 0 {
433				// Missing revno in the stash means -1.
434				info.Revno = -1
435			}
436		} else if err != nil {
437			return nil, err
438		} else if info.Remove != "" {
439			// Handle remove in progress before assuming ordering is good.
440			if err := f.loadAndApply(info.Remove); err != nil {
441				return nil, err
442			}
443			goto RetryDoc
444		}
445		revno[dkey] = info.Revno
446
447		found := false
448		for _, id := range info.Queue {
449			if id == tt {
450				found = true
451				break
452			}
453		}
454		f.queue[dkey] = info.Queue
455		if !found {
456			// Rescanned transaction id was not in the queue. This could mean one
457			// of three things:
458			//  1) The transaction was applied and popped by someone else. This is
459			//     the common case.
460			//  2) We've read an out-of-date queue from the stash. This can happen
461			//     when someone else was paused for a long while preparing another
462			//     transaction for this document, and improperly upserted to the
463			//     stash when unpaused (after someone else inserted the document).
464			//     This is rare but possible.
465			//  3) There's an actual bug somewhere, or outside interference. Worst
466			//     possible case.
467			f.debugf("Rescanned document %v misses %s in queue: %v", dkey, tt, info.Queue)
468			err := f.reload(t)
469			if t.State == tpreparing || t.State == tprepared {
470				if retry < retries {
471					// Case 2.
472					goto RetryDoc
473				}
474				// Case 3.
475				return nil, fmt.Errorf("cannot find transaction %s in queue for document %v", t, dkey)
476			}
477			// Case 1.
478			return t.Revnos, err
479		}
480	}
481
482	prereqs, found := f.hasPreReqs(tt, dkeys)
483	if !found {
484		panic("rescanning loop guarantees that this can't happen")
485	} else if prereqs && !force {
486		f.debugf("Rescanned queue with %s: has prereqs, not forced", tt)
487		return nil, errPreReqs
488	}
489	revnos = assembledRevnos(t.Ops, revno)
490	if !prereqs {
491		f.debugf("Rescanned queue with %s: no prereqs, revnos: %v", tt, revnos)
492	} else {
493		f.debugf("Rescanned queue with %s: has prereqs, forced, revnos: %v", tt, revnos)
494	}
495	return revnos, nil
496}
497
498func assembledRevnos(ops []Op, revno map[docKey]int64) []int64 {
499	revnos := make([]int64, len(ops))
500	for i, op := range ops {
501		dkey := op.docKey()
502		revnos[i] = revno[dkey]
503		drevno := revno[dkey]
504		switch {
505		case op.Insert != nil && drevno < 0:
506			revno[dkey] = -drevno + 1
507		case op.Update != nil && drevno >= 0:
508			revno[dkey] = drevno + 1
509		case op.Remove && drevno >= 0:
510			revno[dkey] = -drevno - 1
511		}
512	}
513	return revnos
514}
515
516func (f *flusher) hasPreReqs(tt token, dkeys docKeys) (prereqs, found bool) {
517	found = true
518NextDoc:
519	for _, dkey := range dkeys {
520		for _, dtt := range f.queue[dkey] {
521			if dtt == tt {
522				continue NextDoc
523			} else if dtt.id() != tt.id() {
524				prereqs = true
525			}
526		}
527		found = false
528	}
529	return
530}
531
532func (f *flusher) reload(t *transaction) error {
533	var newt transaction
534	query := f.tc.FindId(t.Id)
535	query.Select(bson.D{{"s", 1}, {"n", 1}, {"r", 1}})
536	if err := query.One(&newt); err != nil {
537		return fmt.Errorf("failed to reload transaction: %v", err)
538	}
539	t.State = newt.State
540	t.Nonce = newt.Nonce
541	t.Revnos = newt.Revnos
542	f.debugf("Reloaded %s: %q", t, t.State)
543	return nil
544}
545
546func (f *flusher) loadAndApply(id bson.ObjectId) error {
547	t, err := f.load(id)
548	if err != nil {
549		return err
550	}
551	return f.advance(t, nil, true)
552}
553
554// assert verifies that all assertions in t match the content that t
555// will be applied upon. If an assertion fails, the transaction state
556// is changed to aborted.
557func (f *flusher) assert(t *transaction, revnos []int64, pull map[bson.ObjectId]*transaction) error {
558	f.debugf("Asserting %s with revnos %v", t, revnos)
559	if t.State != tprepared {
560		panic(fmt.Errorf("asserting transaction in invalid state: %q", t.State))
561	}
562	qdoc := make(bson.D, 3)
563	revno := make(map[docKey]int64)
564	for i, op := range t.Ops {
565		dkey := op.docKey()
566		if _, ok := revno[dkey]; !ok {
567			revno[dkey] = revnos[i]
568		}
569		if op.Assert == nil {
570			continue
571		}
572		if op.Assert == DocMissing {
573			if revnos[i] >= 0 {
574				return f.abortOrReload(t, revnos, pull)
575			}
576			continue
577		}
578		if op.Insert != nil {
579			return fmt.Errorf("Insert can only Assert txn.DocMissing", op.Assert)
580		}
581		// if revnos[i] < 0 { abort }?
582
583		qdoc = append(qdoc[:0], bson.DocElem{"_id", op.Id})
584		if op.Assert != DocMissing {
585			var revnoq interface{}
586			if n := revno[dkey]; n == 0 {
587				revnoq = bson.D{{"$exists", false}}
588			} else {
589				revnoq = n
590			}
591			// XXX Add tt to the query here, once we're sure it's all working.
592			//     Not having it increases the chances of breaking on bad logic.
593			qdoc = append(qdoc, bson.DocElem{"txn-revno", revnoq})
594			if op.Assert != DocExists {
595				qdoc = append(qdoc, bson.DocElem{"$or", []interface{}{op.Assert}})
596			}
597		}
598
599		c := f.tc.Database.C(op.C)
600		if err := c.Find(qdoc).Select(bson.D{{"_id", 1}}).One(nil); err == mgo.ErrNotFound {
601			// Assertion failed or someone else started applying.
602			return f.abortOrReload(t, revnos, pull)
603		} else if err != nil {
604			return err
605		}
606	}
607	f.debugf("Asserting %s succeeded", t)
608	return nil
609}
610
611func (f *flusher) abortOrReload(t *transaction, revnos []int64, pull map[bson.ObjectId]*transaction) (err error) {
612	f.debugf("Aborting or reloading %s (was %q)", t, t.State)
613	if t.State == tprepared {
614		qdoc := bson.D{{"_id", t.Id}, {"s", tprepared}}
615		udoc := bson.D{{"$set", bson.D{{"s", taborting}}}}
616		chaos("set-aborting")
617		if err = f.tc.Update(qdoc, udoc); err == nil {
618			t.State = taborting
619		} else if err == mgo.ErrNotFound {
620			if err = f.reload(t); err != nil || t.State != taborting {
621				f.debugf("Won't abort %s. Reloaded state: %q", t, t.State)
622				return err
623			}
624		} else {
625			return err
626		}
627	} else if t.State != taborting {
628		panic(fmt.Errorf("aborting transaction in invalid state: %q", t.State))
629	}
630
631	if len(revnos) > 0 {
632		if pull == nil {
633			pull = map[bson.ObjectId]*transaction{t.Id: t}
634		}
635		seen := make(map[docKey]bool)
636		for i, op := range t.Ops {
637			dkey := op.docKey()
638			if seen[op.docKey()] {
639				continue
640			}
641			seen[dkey] = true
642
643			pullAll := tokensToPull(f.queue[dkey], pull, "")
644			if len(pullAll) == 0 {
645				continue
646			}
647			udoc := bson.D{{"$pullAll", bson.D{{"txn-queue", pullAll}}}}
648			chaos("")
649			if revnos[i] < 0 {
650				err = f.sc.UpdateId(dkey, udoc)
651			} else {
652				c := f.tc.Database.C(dkey.C)
653				err = c.UpdateId(dkey.Id, udoc)
654			}
655			if err != nil && err != mgo.ErrNotFound {
656				return err
657			}
658		}
659	}
660	udoc := bson.D{{"$set", bson.D{{"s", taborted}}}}
661	chaos("set-aborted")
662	if err := f.tc.UpdateId(t.Id, udoc); err != nil && err != mgo.ErrNotFound {
663		return err
664	}
665	t.State = taborted
666	f.debugf("Aborted %s", t)
667	return nil
668}
669
670func (f *flusher) checkpoint(t *transaction, revnos []int64) error {
671	var debugRevnos map[docKey][]int64
672	if debugEnabled {
673		debugRevnos = make(map[docKey][]int64)
674		for i, op := range t.Ops {
675			dkey := op.docKey()
676			debugRevnos[dkey] = append(debugRevnos[dkey], revnos[i])
677		}
678		f.debugf("Ready to apply %s. Saving revnos %v", t, debugRevnos)
679	}
680
681	// Save in t the txn-revno values the transaction must run on.
682	qdoc := bson.D{{"_id", t.Id}, {"s", tprepared}}
683	udoc := bson.D{{"$set", bson.D{{"s", tapplying}, {"r", revnos}}}}
684	chaos("set-applying")
685	err := f.tc.Update(qdoc, udoc)
686	if err == nil {
687		t.State = tapplying
688		t.Revnos = revnos
689		f.debugf("Ready to apply %s. Saving revnos %v: DONE", t, debugRevnos)
690	} else if err == mgo.ErrNotFound {
691		f.debugf("Ready to apply %s. Saving revnos %v: LOST RACE", t, debugRevnos)
692		return f.reload(t)
693	}
694	return nil
695}
696
697func (f *flusher) apply(t *transaction, pull map[bson.ObjectId]*transaction) error {
698	f.debugf("Applying transaction %s", t)
699	if t.State != tapplying {
700		panic(fmt.Errorf("applying transaction in invalid state: %q", t.State))
701	}
702	if pull == nil {
703		pull = map[bson.ObjectId]*transaction{t.Id: t}
704	}
705
706	logRevnos := append([]int64(nil), t.Revnos...)
707	logDoc := bson.D{{"_id", t.Id}}
708
709	tt := tokenFor(t)
710	for i := range t.Ops {
711		op := &t.Ops[i]
712		dkey := op.docKey()
713		dqueue := f.queue[dkey]
714		revno := t.Revnos[i]
715
716		var opName string
717		if debugEnabled {
718			opName = op.name()
719			f.debugf("Applying %s op %d (%s) on %v with txn-revno %d", t, i, opName, dkey, revno)
720		}
721
722		c := f.tc.Database.C(op.C)
723
724		qdoc := bson.D{{"_id", dkey.Id}, {"txn-revno", revno}, {"txn-queue", tt}}
725		if op.Insert != nil {
726			qdoc[0].Value = dkey
727			if revno == -1 {
728				qdoc[1].Value = bson.D{{"$exists", false}}
729			}
730		} else if revno == 0 {
731			// There's no document with revno 0. The only way to see it is
732			// when an existent document participates in a transaction the
733			// first time. Txn-inserted documents get revno -1 while in the
734			// stash for the first time, and -revno-1 == 2 when they go live.
735			qdoc[1].Value = bson.D{{"$exists", false}}
736		}
737
738		pullAll := tokensToPull(dqueue, pull, tt)
739
740		var d bson.D
741		var outcome string
742		var err error
743		switch {
744		case op.Update != nil:
745			if revno < 0 {
746				err = mgo.ErrNotFound
747				f.debugf("Won't try to apply update op; negative revision means the document is missing or stashed")
748			} else {
749				newRevno := revno + 1
750				logRevnos[i] = newRevno
751				if d, err = objToDoc(op.Update); err != nil {
752					return err
753				}
754				if d, err = addToDoc(d, "$pullAll", bson.D{{"txn-queue", pullAll}}); err != nil {
755					return err
756				}
757				if d, err = addToDoc(d, "$set", bson.D{{"txn-revno", newRevno}}); err != nil {
758					return err
759				}
760				chaos("")
761				err = c.Update(qdoc, d)
762			}
763		case op.Remove:
764			if revno < 0 {
765				err = mgo.ErrNotFound
766			} else {
767				newRevno := -revno - 1
768				logRevnos[i] = newRevno
769				nonce := newNonce()
770				stash := txnInfo{}
771				change := mgo.Change{
772					Update:    bson.D{{"$push", bson.D{{"n", nonce}}}},
773					Upsert:    true,
774					ReturnNew: true,
775				}
776				if _, err = f.sc.FindId(dkey).Apply(change, &stash); err != nil {
777					return err
778				}
779				change = mgo.Change{
780					Update:    bson.D{{"$set", bson.D{{"txn-remove", t.Id}}}},
781					ReturnNew: true,
782				}
783				var info txnInfo
784				if _, err = c.Find(qdoc).Apply(change, &info); err == nil {
785					// The document still exists so the stash previously
786					// observed was either out of date or necessarily
787					// contained the token being applied.
788					f.debugf("Marked document %v to be removed on revno %d with queue: %v", dkey, info.Revno, info.Queue)
789					updated := false
790					if !hasToken(stash.Queue, tt) {
791						var set, unset bson.D
792						if revno == 0 {
793							// Missing revno in stash means -1.
794							set = bson.D{{"txn-queue", info.Queue}}
795							unset = bson.D{{"n", 1}, {"txn-revno", 1}}
796						} else {
797							set = bson.D{{"txn-queue", info.Queue}, {"txn-revno", newRevno}}
798							unset = bson.D{{"n", 1}}
799						}
800						qdoc := bson.D{{"_id", dkey}, {"n", nonce}}
801						udoc := bson.D{{"$set", set}, {"$unset", unset}}
802						if err = f.sc.Update(qdoc, udoc); err == nil {
803							updated = true
804						} else if err != mgo.ErrNotFound {
805							return err
806						}
807					}
808					if updated {
809						f.debugf("Updated stash for document %v with revno %d and queue: %v", dkey, newRevno, info.Queue)
810					} else {
811						f.debugf("Stash for document %v was up-to-date", dkey)
812					}
813					err = c.Remove(qdoc)
814				}
815			}
816		case op.Insert != nil:
817			if revno >= 0 {
818				err = mgo.ErrNotFound
819			} else {
820				newRevno := -revno + 1
821				logRevnos[i] = newRevno
822				if d, err = objToDoc(op.Insert); err != nil {
823					return err
824				}
825				change := mgo.Change{
826					Update:    bson.D{{"$set", bson.D{{"txn-insert", t.Id}}}},
827					ReturnNew: true,
828				}
829				chaos("")
830				var info txnInfo
831				if _, err = f.sc.Find(qdoc).Apply(change, &info); err == nil {
832					f.debugf("Stash for document %v has revno %d and queue: %v", dkey, info.Revno, info.Queue)
833					d = setInDoc(d, bson.D{{"_id", op.Id}, {"txn-revno", newRevno}, {"txn-queue", info.Queue}})
834					// Unlikely yet unfortunate race in here if this gets seriously
835					// delayed. If someone inserts+removes meanwhile, this will
836					// reinsert, and there's no way to avoid that while keeping the
837					// collection clean or compromising sharding. applyOps can solve
838					// the former, but it can't shard (SERVER-1439).
839					chaos("insert")
840					err = c.Insert(d)
841					if err == nil || mgo.IsDup(err) {
842						if err == nil {
843							f.debugf("New document %v inserted with revno %d and queue: %v", dkey, info.Revno, info.Queue)
844						} else {
845							f.debugf("Document %v already existed", dkey)
846						}
847						chaos("")
848						if err = f.sc.Remove(qdoc); err == nil {
849							f.debugf("Stash for document %v removed", dkey)
850						}
851					}
852				}
853			}
854		case op.Assert != nil:
855			// Pure assertion. No changes to apply.
856		}
857		if err == nil {
858			outcome = "DONE"
859		} else if err == mgo.ErrNotFound || mgo.IsDup(err) {
860			outcome = "MISS"
861			err = nil
862		} else {
863			outcome = err.Error()
864		}
865		if debugEnabled {
866			f.debugf("Applying %s op %d (%s) on %v with txn-revno %d: %s", t, i, opName, dkey, revno, outcome)
867		}
868		if err != nil {
869			return err
870		}
871
872		if f.lc != nil && op.isChange() {
873			// Add change to the log document.
874			var dr bson.D
875			for li := range logDoc {
876				elem := &logDoc[li]
877				if elem.Name == op.C {
878					dr = elem.Value.(bson.D)
879					break
880				}
881			}
882			if dr == nil {
883				logDoc = append(logDoc, bson.DocElem{op.C, bson.D{{"d", []interface{}{}}, {"r", []int64{}}}})
884				dr = logDoc[len(logDoc)-1].Value.(bson.D)
885			}
886			dr[0].Value = append(dr[0].Value.([]interface{}), op.Id)
887			dr[1].Value = append(dr[1].Value.([]int64), logRevnos[i])
888		}
889	}
890	t.State = tapplied
891
892	if f.lc != nil {
893		// Insert log document into the changelog collection.
894		f.debugf("Inserting %s into change log", t)
895		err := f.lc.Insert(logDoc)
896		if err != nil && !mgo.IsDup(err) {
897			return err
898		}
899	}
900
901	// It's been applied, so errors are ignored here. It's fine for someone
902	// else to win the race and mark it as applied, and it's also fine for
903	// it to remain pending until a later point when someone will perceive
904	// it has been applied and mark it at such.
905	f.debugf("Marking %s as applied", t)
906	chaos("set-applied")
907	f.tc.Update(bson.D{{"_id", t.Id}, {"s", tapplying}}, bson.D{{"$set", bson.D{{"s", tapplied}}}})
908	return nil
909}
910
911func tokensToPull(dqueue []token, pull map[bson.ObjectId]*transaction, dontPull token) []token {
912	var result []token
913	for j := len(dqueue) - 1; j >= 0; j-- {
914		dtt := dqueue[j]
915		if dtt == dontPull {
916			continue
917		}
918		if _, ok := pull[dtt.id()]; ok {
919			// It was handled before and this is a leftover invalid
920			// nonce in the queue. Cherry-pick it out.
921			result = append(result, dtt)
922		}
923	}
924	return result
925}
926
927func objToDoc(obj interface{}) (d bson.D, err error) {
928	data, err := bson.Marshal(obj)
929	if err != nil {
930		return nil, err
931	}
932	err = bson.Unmarshal(data, &d)
933	if err != nil {
934		return nil, err
935	}
936	return d, err
937}
938
939func addToDoc(doc bson.D, key string, add bson.D) (bson.D, error) {
940	for i := range doc {
941		elem := &doc[i]
942		if elem.Name != key {
943			continue
944		}
945		if old, ok := elem.Value.(bson.D); ok {
946			elem.Value = append(old, add...)
947			return doc, nil
948		} else {
949			return nil, fmt.Errorf("invalid %q value in change document: %#v", key, elem.Value)
950		}
951	}
952	return append(doc, bson.DocElem{key, add}), nil
953}
954
955func setInDoc(doc bson.D, set bson.D) bson.D {
956	dlen := len(doc)
957NextS:
958	for s := range set {
959		sname := set[s].Name
960		for d := 0; d < dlen; d++ {
961			if doc[d].Name == sname {
962				doc[d].Value = set[s].Value
963				continue NextS
964			}
965		}
966		doc = append(doc, set[s])
967	}
968	return doc
969}
970
971func hasToken(tokens []token, tt token) bool {
972	for _, ttt := range tokens {
973		if ttt == tt {
974			return true
975		}
976	}
977	return false
978}
979
980func (f *flusher) debugf(format string, args ...interface{}) {
981	if !debugEnabled {
982		return
983	}
984	debugf(f.debugId+format, args...)
985}
986