1package store
2
3import (
4	"context"
5	"errors"
6	"fmt"
7	"runtime"
8	"strconv"
9	"strings"
10	"sync"
11	"sync/atomic"
12	"time"
13
14	"github.com/docker/go-events"
15	"github.com/docker/go-metrics"
16	"github.com/docker/swarmkit/api"
17	pb "github.com/docker/swarmkit/api"
18	"github.com/docker/swarmkit/manager/state"
19	"github.com/docker/swarmkit/watch"
20	gogotypes "github.com/gogo/protobuf/types"
21	memdb "github.com/hashicorp/go-memdb"
22)
23
24const (
25	indexID           = "id"
26	indexName         = "name"
27	indexRuntime      = "runtime"
28	indexServiceID    = "serviceid"
29	indexNodeID       = "nodeid"
30	indexSlot         = "slot"
31	indexDesiredState = "desiredstate"
32	indexTaskState    = "taskstate"
33	indexRole         = "role"
34	indexMembership   = "membership"
35	indexNetwork      = "network"
36	indexSecret       = "secret"
37	indexConfig       = "config"
38	indexKind         = "kind"
39	indexCustom       = "custom"
40
41	prefix = "_prefix"
42
43	// MaxChangesPerTransaction is the number of changes after which a new
44	// transaction should be started within Batch.
45	MaxChangesPerTransaction = 200
46
47	// MaxTransactionBytes is the maximum serialized transaction size.
48	MaxTransactionBytes = 1.5 * 1024 * 1024
49)
50
51var (
52	// ErrExist is returned by create operations if the provided ID is already
53	// taken.
54	ErrExist = errors.New("object already exists")
55
56	// ErrNotExist is returned by altering operations (update, delete) if the
57	// provided ID is not found.
58	ErrNotExist = errors.New("object does not exist")
59
60	// ErrNameConflict is returned by create/update if the object name is
61	// already in use by another object.
62	ErrNameConflict = errors.New("name conflicts with an existing object")
63
64	// ErrInvalidFindBy is returned if an unrecognized type is passed to Find.
65	ErrInvalidFindBy = errors.New("invalid find argument type")
66
67	// ErrSequenceConflict is returned when trying to update an object
68	// whose sequence information does not match the object in the store's.
69	ErrSequenceConflict = errors.New("update out of sequence")
70
71	objectStorers []ObjectStoreConfig
72	schema        = &memdb.DBSchema{
73		Tables: map[string]*memdb.TableSchema{},
74	}
75	errUnknownStoreAction = errors.New("unknown store action")
76
77	// WedgeTimeout is the maximum amount of time the store lock may be
78	// held before declaring a suspected deadlock.
79	WedgeTimeout = 30 * time.Second
80
81	// update()/write tx latency timer.
82	updateLatencyTimer metrics.Timer
83
84	// view()/read tx latency timer.
85	viewLatencyTimer metrics.Timer
86
87	// lookup() latency timer.
88	lookupLatencyTimer metrics.Timer
89
90	// Batch() latency timer.
91	batchLatencyTimer metrics.Timer
92
93	// timer to capture the duration for which the memory store mutex is locked.
94	storeLockDurationTimer metrics.Timer
95)
96
97func init() {
98	ns := metrics.NewNamespace("swarm", "store", nil)
99	updateLatencyTimer = ns.NewTimer("write_tx_latency",
100		"Raft store write tx latency.")
101	viewLatencyTimer = ns.NewTimer("read_tx_latency",
102		"Raft store read tx latency.")
103	lookupLatencyTimer = ns.NewTimer("lookup_latency",
104		"Raft store read latency.")
105	batchLatencyTimer = ns.NewTimer("batch_latency",
106		"Raft store batch latency.")
107	storeLockDurationTimer = ns.NewTimer("memory_store_lock_duration",
108		"Duration for which the raft memory store lock was held.")
109	metrics.Register(ns)
110}
111
112func register(os ObjectStoreConfig) {
113	objectStorers = append(objectStorers, os)
114	schema.Tables[os.Table.Name] = os.Table
115}
116
117// timedMutex wraps a sync.Mutex, and keeps track of when it was locked.
118type timedMutex struct {
119	sync.Mutex
120	lockedAt atomic.Value
121}
122
123func (m *timedMutex) Lock() {
124	m.Mutex.Lock()
125	m.lockedAt.Store(time.Now())
126}
127
128// Unlocks the timedMutex and captures the duration
129// for which it was locked in a metric.
130func (m *timedMutex) Unlock() {
131	unlockedTimestamp := m.lockedAt.Load()
132	m.lockedAt.Store(time.Time{})
133	m.Mutex.Unlock()
134	lockedFor := time.Since(unlockedTimestamp.(time.Time))
135	storeLockDurationTimer.Update(lockedFor)
136}
137
138func (m *timedMutex) LockedAt() time.Time {
139	lockedTimestamp := m.lockedAt.Load()
140	if lockedTimestamp == nil {
141		return time.Time{}
142	}
143	return lockedTimestamp.(time.Time)
144}
145
146// MemoryStore is a concurrency-safe, in-memory implementation of the Store
147// interface.
148type MemoryStore struct {
149	// updateLock must be held during an update transaction.
150	updateLock timedMutex
151
152	memDB *memdb.MemDB
153	queue *watch.Queue
154
155	proposer state.Proposer
156}
157
158// NewMemoryStore returns an in-memory store. The argument is an optional
159// Proposer which will be used to propagate changes to other members in a
160// cluster.
161func NewMemoryStore(proposer state.Proposer) *MemoryStore {
162	memDB, err := memdb.NewMemDB(schema)
163	if err != nil {
164		// This shouldn't fail
165		panic(err)
166	}
167
168	return &MemoryStore{
169		memDB:    memDB,
170		queue:    watch.NewQueue(),
171		proposer: proposer,
172	}
173}
174
175// Close closes the memory store and frees its associated resources.
176func (s *MemoryStore) Close() error {
177	return s.queue.Close()
178}
179
180func fromArgs(args ...interface{}) ([]byte, error) {
181	if len(args) != 1 {
182		return nil, fmt.Errorf("must provide only a single argument")
183	}
184	arg, ok := args[0].(string)
185	if !ok {
186		return nil, fmt.Errorf("argument must be a string: %#v", args[0])
187	}
188	// Add the null character as a terminator
189	arg += "\x00"
190	return []byte(arg), nil
191}
192
193func prefixFromArgs(args ...interface{}) ([]byte, error) {
194	val, err := fromArgs(args...)
195	if err != nil {
196		return nil, err
197	}
198
199	// Strip the null terminator, the rest is a prefix
200	n := len(val)
201	if n > 0 {
202		return val[:n-1], nil
203	}
204	return val, nil
205}
206
207// ReadTx is a read transaction. Note that transaction does not imply
208// any internal batching. It only means that the transaction presents a
209// consistent view of the data that cannot be affected by other
210// transactions.
211type ReadTx interface {
212	lookup(table, index, id string) api.StoreObject
213	get(table, id string) api.StoreObject
214	find(table string, by By, checkType func(By) error, appendResult func(api.StoreObject)) error
215}
216
217type readTx struct {
218	memDBTx *memdb.Txn
219}
220
221// View executes a read transaction.
222func (s *MemoryStore) View(cb func(ReadTx)) {
223	defer metrics.StartTimer(viewLatencyTimer)()
224	memDBTx := s.memDB.Txn(false)
225
226	readTx := readTx{
227		memDBTx: memDBTx,
228	}
229	cb(readTx)
230	memDBTx.Commit()
231}
232
233// Tx is a read/write transaction. Note that transaction does not imply
234// any internal batching. The purpose of this transaction is to give the
235// user a guarantee that its changes won't be visible to other transactions
236// until the transaction is over.
237type Tx interface {
238	ReadTx
239	create(table string, o api.StoreObject) error
240	update(table string, o api.StoreObject) error
241	delete(table, id string) error
242}
243
244type tx struct {
245	readTx
246	curVersion *api.Version
247	changelist []api.Event
248}
249
250// changelistBetweenVersions returns the changes after "from" up to and
251// including "to".
252func (s *MemoryStore) changelistBetweenVersions(from, to api.Version) ([]api.Event, error) {
253	if s.proposer == nil {
254		return nil, errors.New("store does not support versioning")
255	}
256	changes, err := s.proposer.ChangesBetween(from, to)
257	if err != nil {
258		return nil, err
259	}
260
261	var changelist []api.Event
262
263	for _, change := range changes {
264		for _, sa := range change.StoreActions {
265			event, err := api.EventFromStoreAction(sa, nil)
266			if err != nil {
267				return nil, err
268			}
269			changelist = append(changelist, event)
270		}
271		changelist = append(changelist, state.EventCommit{Version: change.Version.Copy()})
272	}
273
274	return changelist, nil
275}
276
277// ApplyStoreActions updates a store based on StoreAction messages.
278func (s *MemoryStore) ApplyStoreActions(actions []api.StoreAction) error {
279	s.updateLock.Lock()
280	memDBTx := s.memDB.Txn(true)
281
282	tx := tx{
283		readTx: readTx{
284			memDBTx: memDBTx,
285		},
286	}
287
288	for _, sa := range actions {
289		if err := applyStoreAction(&tx, sa); err != nil {
290			memDBTx.Abort()
291			s.updateLock.Unlock()
292			return err
293		}
294	}
295
296	memDBTx.Commit()
297
298	for _, c := range tx.changelist {
299		s.queue.Publish(c)
300	}
301	if len(tx.changelist) != 0 {
302		s.queue.Publish(state.EventCommit{})
303	}
304	s.updateLock.Unlock()
305	return nil
306}
307
308func applyStoreAction(tx Tx, sa api.StoreAction) error {
309	for _, os := range objectStorers {
310		err := os.ApplyStoreAction(tx, sa)
311		if err != errUnknownStoreAction {
312			return err
313		}
314	}
315
316	return errors.New("unrecognized action type")
317}
318
319func (s *MemoryStore) update(proposer state.Proposer, cb func(Tx) error) error {
320	defer metrics.StartTimer(updateLatencyTimer)()
321	s.updateLock.Lock()
322	memDBTx := s.memDB.Txn(true)
323
324	var curVersion *api.Version
325
326	if proposer != nil {
327		curVersion = proposer.GetVersion()
328	}
329
330	var tx tx
331	tx.init(memDBTx, curVersion)
332
333	err := cb(&tx)
334
335	if err == nil {
336		if proposer == nil {
337			memDBTx.Commit()
338		} else {
339			var sa []api.StoreAction
340			sa, err = tx.changelistStoreActions()
341
342			if err == nil {
343				if len(sa) != 0 {
344					err = proposer.ProposeValue(context.Background(), sa, func() {
345						memDBTx.Commit()
346					})
347				} else {
348					memDBTx.Commit()
349				}
350			}
351		}
352	}
353
354	if err == nil {
355		for _, c := range tx.changelist {
356			s.queue.Publish(c)
357		}
358		if len(tx.changelist) != 0 {
359			if proposer != nil {
360				curVersion = proposer.GetVersion()
361			}
362
363			s.queue.Publish(state.EventCommit{Version: curVersion})
364		}
365	} else {
366		memDBTx.Abort()
367	}
368	s.updateLock.Unlock()
369	return err
370}
371
372func (s *MemoryStore) updateLocal(cb func(Tx) error) error {
373	return s.update(nil, cb)
374}
375
376// Update executes a read/write transaction.
377func (s *MemoryStore) Update(cb func(Tx) error) error {
378	return s.update(s.proposer, cb)
379}
380
381// Batch provides a mechanism to batch updates to a store.
382type Batch struct {
383	tx    tx
384	store *MemoryStore
385	// applied counts the times Update has run successfully
386	applied int
387	// transactionSizeEstimate is the running count of the size of the
388	// current transaction.
389	transactionSizeEstimate int
390	// changelistLen is the last known length of the transaction's
391	// changelist.
392	changelistLen int
393	err           error
394}
395
396// Update adds a single change to a batch. Each call to Update is atomic, but
397// different calls to Update may be spread across multiple transactions to
398// circumvent transaction size limits.
399func (batch *Batch) Update(cb func(Tx) error) error {
400	if batch.err != nil {
401		return batch.err
402	}
403
404	if err := cb(&batch.tx); err != nil {
405		return err
406	}
407
408	batch.applied++
409
410	for batch.changelistLen < len(batch.tx.changelist) {
411		sa, err := api.NewStoreAction(batch.tx.changelist[batch.changelistLen])
412		if err != nil {
413			return err
414		}
415		batch.transactionSizeEstimate += sa.Size()
416		batch.changelistLen++
417	}
418
419	if batch.changelistLen >= MaxChangesPerTransaction || batch.transactionSizeEstimate >= (MaxTransactionBytes*3)/4 {
420		if err := batch.commit(); err != nil {
421			return err
422		}
423
424		// Yield the update lock
425		batch.store.updateLock.Unlock()
426		runtime.Gosched()
427		batch.store.updateLock.Lock()
428
429		batch.newTx()
430	}
431
432	return nil
433}
434
435func (batch *Batch) newTx() {
436	var curVersion *api.Version
437
438	if batch.store.proposer != nil {
439		curVersion = batch.store.proposer.GetVersion()
440	}
441
442	batch.tx.init(batch.store.memDB.Txn(true), curVersion)
443	batch.transactionSizeEstimate = 0
444	batch.changelistLen = 0
445}
446
447func (batch *Batch) commit() error {
448	if batch.store.proposer != nil {
449		var sa []api.StoreAction
450		sa, batch.err = batch.tx.changelistStoreActions()
451
452		if batch.err == nil {
453			if len(sa) != 0 {
454				batch.err = batch.store.proposer.ProposeValue(context.Background(), sa, func() {
455					batch.tx.memDBTx.Commit()
456				})
457			} else {
458				batch.tx.memDBTx.Commit()
459			}
460		}
461	} else {
462		batch.tx.memDBTx.Commit()
463	}
464
465	if batch.err != nil {
466		batch.tx.memDBTx.Abort()
467		return batch.err
468	}
469
470	for _, c := range batch.tx.changelist {
471		batch.store.queue.Publish(c)
472	}
473	if len(batch.tx.changelist) != 0 {
474		batch.store.queue.Publish(state.EventCommit{})
475	}
476
477	return nil
478}
479
480// Batch performs one or more transactions that allow reads and writes
481// It invokes a callback that is passed a Batch object. The callback may
482// call batch.Update for each change it wants to make as part of the
483// batch. The changes in the batch may be split over multiple
484// transactions if necessary to keep transactions below the size limit.
485// Batch holds a lock over the state, but will yield this lock every
486// it creates a new transaction to allow other writers to proceed.
487// Thus, unrelated changes to the state may occur between calls to
488// batch.Update.
489//
490// This method allows the caller to iterate over a data set and apply
491// changes in sequence without holding the store write lock for an
492// excessive time, or producing a transaction that exceeds the maximum
493// size.
494//
495// If Batch returns an error, no guarantees are made about how many updates
496// were committed successfully.
497func (s *MemoryStore) Batch(cb func(*Batch) error) error {
498	defer metrics.StartTimer(batchLatencyTimer)()
499	s.updateLock.Lock()
500
501	batch := Batch{
502		store: s,
503	}
504	batch.newTx()
505
506	if err := cb(&batch); err != nil {
507		batch.tx.memDBTx.Abort()
508		s.updateLock.Unlock()
509		return err
510	}
511
512	err := batch.commit()
513	s.updateLock.Unlock()
514	return err
515}
516
517func (tx *tx) init(memDBTx *memdb.Txn, curVersion *api.Version) {
518	tx.memDBTx = memDBTx
519	tx.curVersion = curVersion
520	tx.changelist = nil
521}
522
523func (tx tx) changelistStoreActions() ([]api.StoreAction, error) {
524	var actions []api.StoreAction
525
526	for _, c := range tx.changelist {
527		sa, err := api.NewStoreAction(c)
528		if err != nil {
529			return nil, err
530		}
531		actions = append(actions, sa)
532	}
533
534	return actions, nil
535}
536
537// lookup is an internal typed wrapper around memdb.
538func (tx readTx) lookup(table, index, id string) api.StoreObject {
539	defer metrics.StartTimer(lookupLatencyTimer)()
540	j, err := tx.memDBTx.First(table, index, id)
541	if err != nil {
542		return nil
543	}
544	if j != nil {
545		return j.(api.StoreObject)
546	}
547	return nil
548}
549
550// create adds a new object to the store.
551// Returns ErrExist if the ID is already taken.
552func (tx *tx) create(table string, o api.StoreObject) error {
553	if tx.lookup(table, indexID, o.GetID()) != nil {
554		return ErrExist
555	}
556
557	copy := o.CopyStoreObject()
558	meta := copy.GetMeta()
559	if err := touchMeta(&meta, tx.curVersion); err != nil {
560		return err
561	}
562	copy.SetMeta(meta)
563
564	err := tx.memDBTx.Insert(table, copy)
565	if err == nil {
566		tx.changelist = append(tx.changelist, copy.EventCreate())
567		o.SetMeta(meta)
568	}
569	return err
570}
571
572// Update updates an existing object in the store.
573// Returns ErrNotExist if the object doesn't exist.
574func (tx *tx) update(table string, o api.StoreObject) error {
575	oldN := tx.lookup(table, indexID, o.GetID())
576	if oldN == nil {
577		return ErrNotExist
578	}
579
580	meta := o.GetMeta()
581
582	if tx.curVersion != nil {
583		if oldN.GetMeta().Version != meta.Version {
584			return ErrSequenceConflict
585		}
586	}
587
588	copy := o.CopyStoreObject()
589	if err := touchMeta(&meta, tx.curVersion); err != nil {
590		return err
591	}
592	copy.SetMeta(meta)
593
594	err := tx.memDBTx.Insert(table, copy)
595	if err == nil {
596		tx.changelist = append(tx.changelist, copy.EventUpdate(oldN))
597		o.SetMeta(meta)
598	}
599	return err
600}
601
602// Delete removes an object from the store.
603// Returns ErrNotExist if the object doesn't exist.
604func (tx *tx) delete(table, id string) error {
605	n := tx.lookup(table, indexID, id)
606	if n == nil {
607		return ErrNotExist
608	}
609
610	err := tx.memDBTx.Delete(table, n)
611	if err == nil {
612		tx.changelist = append(tx.changelist, n.EventDelete())
613	}
614	return err
615}
616
617// Get looks up an object by ID.
618// Returns nil if the object doesn't exist.
619func (tx readTx) get(table, id string) api.StoreObject {
620	o := tx.lookup(table, indexID, id)
621	if o == nil {
622		return nil
623	}
624	return o.CopyStoreObject()
625}
626
627// findIterators returns a slice of iterators. The union of items from these
628// iterators provides the result of the query.
629func (tx readTx) findIterators(table string, by By, checkType func(By) error) ([]memdb.ResultIterator, error) {
630	switch by.(type) {
631	case byAll, orCombinator: // generic types
632	default: // all other types
633		if err := checkType(by); err != nil {
634			return nil, err
635		}
636	}
637
638	switch v := by.(type) {
639	case byAll:
640		it, err := tx.memDBTx.Get(table, indexID)
641		if err != nil {
642			return nil, err
643		}
644		return []memdb.ResultIterator{it}, nil
645	case orCombinator:
646		var iters []memdb.ResultIterator
647		for _, subBy := range v.bys {
648			it, err := tx.findIterators(table, subBy, checkType)
649			if err != nil {
650				return nil, err
651			}
652			iters = append(iters, it...)
653		}
654		return iters, nil
655	case byName:
656		it, err := tx.memDBTx.Get(table, indexName, strings.ToLower(string(v)))
657		if err != nil {
658			return nil, err
659		}
660		return []memdb.ResultIterator{it}, nil
661	case byIDPrefix:
662		it, err := tx.memDBTx.Get(table, indexID+prefix, string(v))
663		if err != nil {
664			return nil, err
665		}
666		return []memdb.ResultIterator{it}, nil
667	case byNamePrefix:
668		it, err := tx.memDBTx.Get(table, indexName+prefix, strings.ToLower(string(v)))
669		if err != nil {
670			return nil, err
671		}
672		return []memdb.ResultIterator{it}, nil
673	case byRuntime:
674		it, err := tx.memDBTx.Get(table, indexRuntime, string(v))
675		if err != nil {
676			return nil, err
677		}
678		return []memdb.ResultIterator{it}, nil
679	case byNode:
680		it, err := tx.memDBTx.Get(table, indexNodeID, string(v))
681		if err != nil {
682			return nil, err
683		}
684		return []memdb.ResultIterator{it}, nil
685	case byService:
686		it, err := tx.memDBTx.Get(table, indexServiceID, string(v))
687		if err != nil {
688			return nil, err
689		}
690		return []memdb.ResultIterator{it}, nil
691	case bySlot:
692		it, err := tx.memDBTx.Get(table, indexSlot, v.serviceID+"\x00"+strconv.FormatUint(v.slot, 10))
693		if err != nil {
694			return nil, err
695		}
696		return []memdb.ResultIterator{it}, nil
697	case byDesiredState:
698		it, err := tx.memDBTx.Get(table, indexDesiredState, strconv.FormatInt(int64(v), 10))
699		if err != nil {
700			return nil, err
701		}
702		return []memdb.ResultIterator{it}, nil
703	case byTaskState:
704		it, err := tx.memDBTx.Get(table, indexTaskState, strconv.FormatInt(int64(v), 10))
705		if err != nil {
706			return nil, err
707		}
708		return []memdb.ResultIterator{it}, nil
709	case byRole:
710		it, err := tx.memDBTx.Get(table, indexRole, strconv.FormatInt(int64(v), 10))
711		if err != nil {
712			return nil, err
713		}
714		return []memdb.ResultIterator{it}, nil
715	case byMembership:
716		it, err := tx.memDBTx.Get(table, indexMembership, strconv.FormatInt(int64(v), 10))
717		if err != nil {
718			return nil, err
719		}
720		return []memdb.ResultIterator{it}, nil
721	case byReferencedNetworkID:
722		it, err := tx.memDBTx.Get(table, indexNetwork, string(v))
723		if err != nil {
724			return nil, err
725		}
726		return []memdb.ResultIterator{it}, nil
727	case byReferencedSecretID:
728		it, err := tx.memDBTx.Get(table, indexSecret, string(v))
729		if err != nil {
730			return nil, err
731		}
732		return []memdb.ResultIterator{it}, nil
733	case byReferencedConfigID:
734		it, err := tx.memDBTx.Get(table, indexConfig, string(v))
735		if err != nil {
736			return nil, err
737		}
738		return []memdb.ResultIterator{it}, nil
739	case byKind:
740		it, err := tx.memDBTx.Get(table, indexKind, string(v))
741		if err != nil {
742			return nil, err
743		}
744		return []memdb.ResultIterator{it}, nil
745	case byCustom:
746		var key string
747		if v.objType != "" {
748			key = v.objType + "|" + v.index + "|" + v.value
749		} else {
750			key = v.index + "|" + v.value
751		}
752		it, err := tx.memDBTx.Get(table, indexCustom, key)
753		if err != nil {
754			return nil, err
755		}
756		return []memdb.ResultIterator{it}, nil
757	case byCustomPrefix:
758		var key string
759		if v.objType != "" {
760			key = v.objType + "|" + v.index + "|" + v.value
761		} else {
762			key = v.index + "|" + v.value
763		}
764		it, err := tx.memDBTx.Get(table, indexCustom+prefix, key)
765		if err != nil {
766			return nil, err
767		}
768		return []memdb.ResultIterator{it}, nil
769	default:
770		return nil, ErrInvalidFindBy
771	}
772}
773
774// find selects a set of objects calls a callback for each matching object.
775func (tx readTx) find(table string, by By, checkType func(By) error, appendResult func(api.StoreObject)) error {
776	fromResultIterators := func(its ...memdb.ResultIterator) {
777		ids := make(map[string]struct{})
778		for _, it := range its {
779			for {
780				obj := it.Next()
781				if obj == nil {
782					break
783				}
784				o := obj.(api.StoreObject)
785				id := o.GetID()
786				if _, exists := ids[id]; !exists {
787					appendResult(o.CopyStoreObject())
788					ids[id] = struct{}{}
789				}
790			}
791		}
792	}
793
794	iters, err := tx.findIterators(table, by, checkType)
795	if err != nil {
796		return err
797	}
798
799	fromResultIterators(iters...)
800
801	return nil
802}
803
804// Save serializes the data in the store.
805func (s *MemoryStore) Save(tx ReadTx) (*pb.StoreSnapshot, error) {
806	var snapshot pb.StoreSnapshot
807	for _, os := range objectStorers {
808		if err := os.Save(tx, &snapshot); err != nil {
809			return nil, err
810		}
811	}
812
813	return &snapshot, nil
814}
815
816// Restore sets the contents of the store to the serialized data in the
817// argument.
818func (s *MemoryStore) Restore(snapshot *pb.StoreSnapshot) error {
819	return s.updateLocal(func(tx Tx) error {
820		for _, os := range objectStorers {
821			if err := os.Restore(tx, snapshot); err != nil {
822				return err
823			}
824		}
825		return nil
826	})
827}
828
829// WatchQueue returns the publish/subscribe queue.
830func (s *MemoryStore) WatchQueue() *watch.Queue {
831	return s.queue
832}
833
834// ViewAndWatch calls a callback which can observe the state of this
835// MemoryStore. It also returns a channel that will return further events from
836// this point so the snapshot can be kept up to date. The watch channel must be
837// released with watch.StopWatch when it is no longer needed. The channel is
838// guaranteed to get all events after the moment of the snapshot, and only
839// those events.
840func ViewAndWatch(store *MemoryStore, cb func(ReadTx) error, specifiers ...api.Event) (watch chan events.Event, cancel func(), err error) {
841	// Using Update to lock the store and guarantee consistency between
842	// the watcher and the the state seen by the callback. snapshotReadTx
843	// exposes this Tx as a ReadTx so the callback can't modify it.
844	err = store.Update(func(tx Tx) error {
845		if err := cb(tx); err != nil {
846			return err
847		}
848		watch, cancel = state.Watch(store.WatchQueue(), specifiers...)
849		return nil
850	})
851	if watch != nil && err != nil {
852		cancel()
853		cancel = nil
854		watch = nil
855	}
856	return
857}
858
859// WatchFrom returns a channel that will return past events from starting
860// from "version", and new events until the channel is closed. If "version"
861// is nil, this function is equivalent to
862//
863//     state.Watch(store.WatchQueue(), specifiers...).
864//
865// If the log has been compacted and it's not possible to produce the exact
866// set of events leading from "version" to the current state, this function
867// will return an error, and the caller should re-sync.
868//
869// The watch channel must be released with watch.StopWatch when it is no
870// longer needed.
871func WatchFrom(store *MemoryStore, version *api.Version, specifiers ...api.Event) (chan events.Event, func(), error) {
872	if version == nil {
873		ch, cancel := state.Watch(store.WatchQueue(), specifiers...)
874		return ch, cancel, nil
875	}
876
877	if store.proposer == nil {
878		return nil, nil, errors.New("store does not support versioning")
879	}
880
881	var (
882		curVersion  *api.Version
883		watch       chan events.Event
884		cancelWatch func()
885	)
886	// Using Update to lock the store
887	err := store.Update(func(tx Tx) error {
888		// Get current version
889		curVersion = store.proposer.GetVersion()
890		// Start the watch with the store locked so events cannot be
891		// missed
892		watch, cancelWatch = state.Watch(store.WatchQueue(), specifiers...)
893		return nil
894	})
895	if watch != nil && err != nil {
896		cancelWatch()
897		return nil, nil, err
898	}
899
900	if curVersion == nil {
901		cancelWatch()
902		return nil, nil, errors.New("could not get current version from store")
903	}
904
905	changelist, err := store.changelistBetweenVersions(*version, *curVersion)
906	if err != nil {
907		cancelWatch()
908		return nil, nil, err
909	}
910
911	ch := make(chan events.Event)
912	stop := make(chan struct{})
913	cancel := func() {
914		close(stop)
915	}
916
917	go func() {
918		defer cancelWatch()
919
920		matcher := state.Matcher(specifiers...)
921		for _, change := range changelist {
922			if matcher(change) {
923				select {
924				case ch <- change:
925				case <-stop:
926					return
927				}
928			}
929		}
930
931		for {
932			select {
933			case <-stop:
934				return
935			case e := <-watch:
936				ch <- e
937			}
938		}
939	}()
940
941	return ch, cancel, nil
942}
943
944// touchMeta updates an object's timestamps when necessary and bumps the version
945// if provided.
946func touchMeta(meta *api.Meta, version *api.Version) error {
947	// Skip meta update if version is not defined as it means we're applying
948	// from raft or restoring from a snapshot.
949	if version == nil {
950		return nil
951	}
952
953	now, err := gogotypes.TimestampProto(time.Now())
954	if err != nil {
955		return err
956	}
957
958	meta.Version = *version
959
960	// Updated CreatedAt if not defined
961	if meta.CreatedAt == nil {
962		meta.CreatedAt = now
963	}
964
965	meta.UpdatedAt = now
966
967	return nil
968}
969
970// Wedged returns true if the store lock has been held for a long time,
971// possibly indicating a deadlock.
972func (s *MemoryStore) Wedged() bool {
973	lockedAt := s.updateLock.LockedAt()
974	if lockedAt.IsZero() {
975		return false
976	}
977
978	return time.Since(lockedAt) > WedgeTimeout
979}
980