1package state
2
3import (
4	"reflect"
5	"strings"
6	"testing"
7	"time"
8
9	"github.com/stretchr/testify/require"
10
11	"github.com/hashicorp/consul/agent/structs"
12	"github.com/hashicorp/go-memdb"
13)
14
15func TestStateStore_ReapTombstones(t *testing.T) {
16	s := testStateStore(t)
17
18	// Create some KV pairs.
19	testSetKey(t, s, 1, "foo", "foo", nil)
20	testSetKey(t, s, 2, "foo/bar", "bar", nil)
21	testSetKey(t, s, 3, "foo/baz", "bar", nil)
22	testSetKey(t, s, 4, "foo/moo", "bar", nil)
23	testSetKey(t, s, 5, "foo/zoo", "bar", nil)
24
25	// Call a delete on some specific keys.
26	if err := s.KVSDelete(6, "foo/baz", nil); err != nil {
27		t.Fatalf("err: %s", err)
28	}
29	if err := s.KVSDelete(7, "foo/moo", nil); err != nil {
30		t.Fatalf("err: %s", err)
31	}
32
33	// Pull out the list and check the index, which should come from the
34	// tombstones.
35	idx, _, err := s.KVSList(nil, "foo/", nil)
36	if err != nil {
37		t.Fatalf("err: %s", err)
38	}
39	if idx != 7 {
40		t.Fatalf("bad index: %d", idx)
41	}
42
43	// Reap the tombstones <= 6.
44	if err := s.ReapTombstones(6); err != nil {
45		t.Fatalf("err: %s", err)
46	}
47
48	// Should still be good because 7 is in there.
49	idx, _, err = s.KVSList(nil, "foo/", nil)
50	if err != nil {
51		t.Fatalf("err: %s", err)
52	}
53	if idx != 7 {
54		t.Fatalf("bad index: %d", idx)
55	}
56
57	// Now reap them all.
58	if err := s.ReapTombstones(7); err != nil {
59		t.Fatalf("err: %s", err)
60	}
61
62	// At this point the sub index will slide backwards.
63	idx, _, err = s.KVSList(nil, "foo/", nil)
64	if err != nil {
65		t.Fatalf("err: %s", err)
66	}
67	if idx != 5 {
68		t.Fatalf("bad index: %d", idx)
69	}
70
71	// Make sure the tombstones are actually gone.
72	snap := s.Snapshot()
73	defer snap.Close()
74	stones, err := snap.Tombstones()
75	if err != nil {
76		t.Fatalf("err: %s", err)
77	}
78	if stones.Next() != nil {
79		t.Fatalf("unexpected extra tombstones")
80	}
81}
82
83func TestStateStore_GC(t *testing.T) {
84	// Build up a fast GC.
85	ttl := 10 * time.Millisecond
86	gran := 5 * time.Millisecond
87	gc, err := NewTombstoneGC(ttl, gran)
88	if err != nil {
89		t.Fatalf("err: %s", err)
90	}
91
92	// Enable it and attach it to the state store.
93	gc.SetEnabled(true)
94	s, err := NewStateStore(gc)
95	if err != nil {
96		t.Fatalf("err: %s", err)
97	}
98
99	// Create some KV pairs.
100	testSetKey(t, s, 1, "foo", "foo", nil)
101	testSetKey(t, s, 2, "foo/bar", "bar", nil)
102	testSetKey(t, s, 3, "foo/baz", "bar", nil)
103	testSetKey(t, s, 4, "foo/moo", "bar", nil)
104	testSetKey(t, s, 5, "foo/zoo", "bar", nil)
105
106	// Delete a key and make sure the GC sees it.
107	if err := s.KVSDelete(6, "foo/zoo", nil); err != nil {
108		t.Fatalf("err: %s", err)
109	}
110	select {
111	case idx := <-gc.ExpireCh():
112		if idx != 6 {
113			t.Fatalf("bad index: %d", idx)
114		}
115	case <-time.After(2 * ttl):
116		t.Fatalf("GC never fired")
117	}
118
119	// Check for the same behavior with a tree delete.
120	if err := s.KVSDeleteTree(7, "foo/moo", nil); err != nil {
121		t.Fatalf("err: %s", err)
122	}
123	select {
124	case idx := <-gc.ExpireCh():
125		if idx != 7 {
126			t.Fatalf("bad index: %d", idx)
127		}
128	case <-time.After(2 * ttl):
129		t.Fatalf("GC never fired")
130	}
131
132	// Check for the same behavior with a CAS delete.
133	if ok, err := s.KVSDeleteCAS(8, 3, "foo/baz", nil); !ok || err != nil {
134		t.Fatalf("err: %s", err)
135	}
136	select {
137	case idx := <-gc.ExpireCh():
138		if idx != 8 {
139			t.Fatalf("bad index: %d", idx)
140		}
141	case <-time.After(2 * ttl):
142		t.Fatalf("GC never fired")
143	}
144
145	// Finally, try it with an expiring session.
146	testRegisterNode(t, s, 9, "node1")
147	session := &structs.Session{
148		ID:       testUUID(),
149		Node:     "node1",
150		Behavior: structs.SessionKeysDelete,
151	}
152	if err := s.SessionCreate(10, session); err != nil {
153		t.Fatalf("err: %s", err)
154	}
155	d := &structs.DirEntry{
156		Key:     "lock",
157		Session: session.ID,
158	}
159	if ok, err := s.KVSLock(11, d); !ok || err != nil {
160		t.Fatalf("err: %v", err)
161	}
162	if err := s.SessionDestroy(12, session.ID, nil); err != nil {
163		t.Fatalf("err: %s", err)
164	}
165	select {
166	case idx := <-gc.ExpireCh():
167		if idx != 12 {
168			t.Fatalf("bad index: %d", idx)
169		}
170	case <-time.After(2 * ttl):
171		t.Fatalf("GC never fired")
172	}
173}
174
175func TestStateStore_KVSSet_KVSGet(t *testing.T) {
176	s := testStateStore(t)
177
178	// Get on an nonexistent key returns nil.
179	ws := memdb.NewWatchSet()
180	idx, result, err := s.KVSGet(ws, "foo", nil)
181	if result != nil || err != nil || idx != 0 {
182		t.Fatalf("expected (0, nil, nil), got : (%#v, %#v, %#v)", idx, result, err)
183	}
184
185	// Write a new K/V entry to the store.
186	entry := &structs.DirEntry{
187		Key:   "foo",
188		Value: []byte("bar"),
189	}
190	if err := s.KVSSet(1, entry); err != nil {
191		t.Fatalf("err: %s", err)
192	}
193	if !watchFired(ws) {
194		t.Fatalf("bad")
195	}
196
197	// Retrieve the K/V entry again.
198	ws = memdb.NewWatchSet()
199	idx, result, err = s.KVSGet(ws, "foo", nil)
200	if err != nil {
201		t.Fatalf("err: %s", err)
202	}
203	if result == nil {
204		t.Fatalf("expected k/v pair, got nothing")
205	}
206	if idx != 1 {
207		t.Fatalf("bad index: %d", idx)
208	}
209
210	// Check that the index was injected into the result.
211	if result.CreateIndex != 1 || result.ModifyIndex != 1 {
212		t.Fatalf("bad index: %d, %d", result.CreateIndex, result.ModifyIndex)
213	}
214
215	// Check that the value matches.
216	if v := string(result.Value); v != "bar" {
217		t.Fatalf("expected 'bar', got: '%s'", v)
218	}
219
220	// Updating the entry works and changes the index.
221	update := &structs.DirEntry{
222		Key:   "foo",
223		Value: []byte("baz"),
224	}
225	if err := s.KVSSet(2, update); err != nil {
226		t.Fatalf("err: %s", err)
227	}
228	if !watchFired(ws) {
229		t.Fatalf("bad")
230	}
231
232	// Fetch the kv pair and check.
233	ws = memdb.NewWatchSet()
234	idx, result, err = s.KVSGet(ws, "foo", nil)
235	if err != nil {
236		t.Fatalf("err: %s", err)
237	}
238	if result.CreateIndex != 1 || result.ModifyIndex != 2 {
239		t.Fatalf("bad index: %d, %d", result.CreateIndex, result.ModifyIndex)
240	}
241	if v := string(result.Value); v != "baz" {
242		t.Fatalf("expected 'baz', got '%s'", v)
243	}
244	if idx != 2 {
245		t.Fatalf("bad index: %d", idx)
246	}
247
248	// Attempt to set the session during an update.
249	update = &structs.DirEntry{
250		Key:     "foo",
251		Value:   []byte("zoo"),
252		Session: "nope",
253	}
254	if err := s.KVSSet(3, update); err != nil {
255		t.Fatalf("err: %s", err)
256	}
257	if !watchFired(ws) {
258		t.Fatalf("bad")
259	}
260
261	// Fetch the kv pair and check.
262	ws = memdb.NewWatchSet()
263	idx, result, err = s.KVSGet(ws, "foo", nil)
264	if err != nil {
265		t.Fatalf("err: %s", err)
266	}
267	if result.CreateIndex != 1 || result.ModifyIndex != 3 {
268		t.Fatalf("bad index: %d, %d", result.CreateIndex, result.ModifyIndex)
269	}
270	if v := string(result.Value); v != "zoo" {
271		t.Fatalf("expected 'zoo', got '%s'", v)
272	}
273	if result.Session != "" {
274		t.Fatalf("expected empty session, got '%s", result.Session)
275	}
276	if idx != 3 {
277		t.Fatalf("bad index: %d", idx)
278	}
279
280	// Make a real session and then lock the key to set the session.
281	testRegisterNode(t, s, 4, "node1")
282	session := testUUID()
283	if err := s.SessionCreate(5, &structs.Session{ID: session, Node: "node1"}); err != nil {
284		t.Fatalf("err: %s", err)
285	}
286	update = &structs.DirEntry{
287		Key:     "foo",
288		Value:   []byte("locked"),
289		Session: session,
290	}
291	ok, err := s.KVSLock(6, update)
292	if !ok || err != nil {
293		t.Fatalf("didn't get the lock: %v %s", ok, err)
294	}
295	if !watchFired(ws) {
296		t.Fatalf("bad")
297	}
298
299	// Fetch the kv pair and check.
300	ws = memdb.NewWatchSet()
301	idx, result, err = s.KVSGet(ws, "foo", nil)
302	if err != nil {
303		t.Fatalf("err: %s", err)
304	}
305	if result.CreateIndex != 1 || result.ModifyIndex != 6 {
306		t.Fatalf("bad index: %d, %d", result.CreateIndex, result.ModifyIndex)
307	}
308	if v := string(result.Value); v != "locked" {
309		t.Fatalf("expected 'zoo', got '%s'", v)
310	}
311	if result.Session != session {
312		t.Fatalf("expected session, got '%s", result.Session)
313	}
314	if idx != 6 {
315		t.Fatalf("bad index: %d", idx)
316	}
317
318	// Now make an update without the session and make sure it gets applied
319	// and doesn't take away the session (it is allowed to change the value).
320	update = &structs.DirEntry{
321		Key:   "foo",
322		Value: []byte("stoleit"),
323	}
324	if err := s.KVSSet(7, update); err != nil {
325		t.Fatalf("err: %s", err)
326	}
327	if !watchFired(ws) {
328		t.Fatalf("bad")
329	}
330
331	// Fetch the kv pair and check.
332	ws = memdb.NewWatchSet()
333	idx, result, err = s.KVSGet(ws, "foo", nil)
334	if err != nil {
335		t.Fatalf("err: %s", err)
336	}
337	if result.CreateIndex != 1 || result.ModifyIndex != 7 {
338		t.Fatalf("bad index: %d, %d", result.CreateIndex, result.ModifyIndex)
339	}
340	if v := string(result.Value); v != "stoleit" {
341		t.Fatalf("expected 'zoo', got '%s'", v)
342	}
343	if result.Session != session {
344		t.Fatalf("expected session, got '%s", result.Session)
345	}
346	if idx != 7 {
347		t.Fatalf("bad index: %d", idx)
348	}
349
350	// Setting some unrelated key should not fire the watch.
351	testSetKey(t, s, 8, "other", "yup", nil)
352	if watchFired(ws) {
353		t.Fatalf("bad")
354	}
355
356	// Fetch a key that doesn't exist and make sure we get the right
357	// response.
358	idx, result, err = s.KVSGet(nil, "nope", nil)
359	if result != nil || err != nil || idx != 8 {
360		t.Fatalf("expected (8, nil, nil), got : (%#v, %#v, %#v)", idx, result, err)
361	}
362
363	// setting the same value again does not update the index
364	ws = memdb.NewWatchSet()
365	// Write a new K/V entry to the store.
366	entry = &structs.DirEntry{
367		Key:   "foo",
368		Value: []byte("bar"),
369	}
370	require.Nil(t, s.KVSSet(1, entry))
371	require.Nil(t, s.KVSSet(2, entry))
372
373	idx, _, err = s.KVSGet(ws, entry.Key, nil)
374	require.Nil(t, err)
375
376	require.Equal(t, uint64(1), idx)
377}
378
379func TestStateStore_KVSList(t *testing.T) {
380	s := testStateStore(t)
381
382	// Listing an empty KVS returns nothing
383	ws := memdb.NewWatchSet()
384	idx, entries, err := s.KVSList(ws, "", nil)
385	if idx != 0 || entries != nil || err != nil {
386		t.Fatalf("expected (0, nil, nil), got: (%d, %#v, %#v)", idx, entries, err)
387	}
388
389	// Create some KVS entries
390	testSetKey(t, s, 1, "foo", "foo", nil)
391	testSetKey(t, s, 2, "foo/bar", "bar", nil)
392	testSetKey(t, s, 3, "foo/bar/zip", "zip", nil)
393	testSetKey(t, s, 4, "foo/bar/zip/zorp", "zorp", nil)
394	testSetKey(t, s, 5, "foo/bar/baz", "baz", nil)
395	if !watchFired(ws) {
396		t.Fatalf("bad")
397	}
398
399	// List out all of the keys
400	idx, entries, err = s.KVSList(nil, "", nil)
401	if err != nil {
402		t.Fatalf("err: %s", err)
403	}
404	if idx != 5 {
405		t.Fatalf("bad index: %d", idx)
406	}
407
408	// Check that all of the keys were returned
409	if n := len(entries); n != 5 {
410		t.Fatalf("expected 5 kvs entries, got: %d", n)
411	}
412
413	// Try listing with a provided prefix
414	idx, entries, err = s.KVSList(nil, "foo/bar/zip", nil)
415	if err != nil {
416		t.Fatalf("err: %s", err)
417	}
418	if idx != 4 {
419		t.Fatalf("bad index: %d", idx)
420	}
421
422	// Check that only the keys in the prefix were returned
423	if n := len(entries); n != 2 {
424		t.Fatalf("expected 2 kvs entries, got: %d", n)
425	}
426	if entries[0].Key != "foo/bar/zip" || entries[1].Key != "foo/bar/zip/zorp" {
427		t.Fatalf("bad: %#v", entries)
428	}
429
430	// Delete a key and make sure the index comes from the tombstone.
431	ws = memdb.NewWatchSet()
432	idx, _, err = s.KVSList(ws, "foo/bar/baz", nil)
433	if err != nil {
434		t.Fatalf("err: %s", err)
435	}
436	if err := s.KVSDelete(6, "foo/bar/baz", nil); err != nil {
437		t.Fatalf("err: %s", err)
438	}
439	if !watchFired(ws) {
440		t.Fatalf("bad")
441	}
442	ws = memdb.NewWatchSet()
443	idx, _, err = s.KVSList(ws, "foo/bar/baz", nil)
444	if err != nil {
445		t.Fatalf("err: %s", err)
446	}
447	if idx != 6 {
448		t.Fatalf("bad index: %d", idx)
449	}
450
451	// Set a different key to bump the index. This shouldn't fire the
452	// watch since there's a different prefix.
453	testSetKey(t, s, 7, "some/other/key", "", nil)
454	if watchFired(ws) {
455		t.Fatalf("bad")
456	}
457
458	// Make sure we get the right index from the tombstone.
459	idx, _, err = s.KVSList(nil, "foo/bar/baz", nil)
460	if err != nil {
461		t.Fatalf("err: %s", err)
462	}
463	if idx != 6 {
464		t.Fatalf("bad index: %d", idx)
465	}
466
467	// Now reap the tombstones and make sure we get the latest index
468	// since there are no matching keys.
469	if err := s.ReapTombstones(6); err != nil {
470		t.Fatalf("err: %s", err)
471	}
472	idx, _, err = s.KVSList(nil, "foo/bar/baz", nil)
473	if err != nil {
474		t.Fatalf("err: %s", err)
475	}
476	if idx != 7 {
477		t.Fatalf("bad index: %d", idx)
478	}
479
480	// List all the keys to make sure the index is also correct.
481	idx, _, err = s.KVSList(nil, "", nil)
482	if err != nil {
483		t.Fatalf("err: %s", err)
484	}
485	if idx != 7 {
486		t.Fatalf("bad index: %d", idx)
487	}
488}
489
490func TestStateStore_KVSDelete(t *testing.T) {
491	s := testStateStore(t)
492
493	// Create some KV pairs
494	testSetKey(t, s, 1, "foo", "foo", nil)
495	testSetKey(t, s, 2, "foo/bar", "bar", nil)
496
497	// Call a delete on a specific key
498	if err := s.KVSDelete(3, "foo", nil); err != nil {
499		t.Fatalf("err: %s", err)
500	}
501
502	// The entry was removed from the state store
503	tx := s.db.Txn(false)
504	defer tx.Abort()
505	e, err := firstWithTxn(tx, "kvs", "id", "foo", nil)
506	if err != nil {
507		t.Fatalf("err: %s", err)
508	}
509	if e != nil {
510		t.Fatalf("expected kvs entry to be deleted, got: %#v", e)
511	}
512
513	// Try fetching the other keys to ensure they still exist
514	e, err = firstWithTxn(tx, "kvs", "id", "foo/bar", nil)
515	if err != nil {
516		t.Fatalf("err: %s", err)
517	}
518	if e == nil || string(e.(*structs.DirEntry).Value) != "bar" {
519		t.Fatalf("bad kvs entry: %#v", e)
520	}
521
522	// Check that the index table was updated
523	if idx := s.maxIndex("kvs"); idx != 3 {
524		t.Fatalf("bad index: %d", idx)
525	}
526
527	// Check that the tombstone was created and that prevents the index
528	// from sliding backwards.
529	idx, _, err := s.KVSList(nil, "foo", nil)
530	if err != nil {
531		t.Fatalf("err: %s", err)
532	}
533	if idx != 3 {
534		t.Fatalf("bad index: %d", idx)
535	}
536
537	// Now reap the tombstone and watch the index revert to the remaining
538	// foo/bar key's index.
539	if err := s.ReapTombstones(3); err != nil {
540		t.Fatalf("err: %s", err)
541	}
542	idx, _, err = s.KVSList(nil, "foo", nil)
543	if err != nil {
544		t.Fatalf("err: %s", err)
545	}
546	if idx != 2 {
547		t.Fatalf("bad index: %d", idx)
548	}
549
550	// Deleting a nonexistent key should be idempotent and not return an
551	// error
552	if err := s.KVSDelete(4, "foo", nil); err != nil {
553		t.Fatalf("err: %s", err)
554	}
555	if idx := s.maxIndex("kvs"); idx != 3 {
556		t.Fatalf("bad index: %d", idx)
557	}
558}
559
560func TestStateStore_KVSDeleteCAS(t *testing.T) {
561	s := testStateStore(t)
562
563	// Create some KV entries
564	testSetKey(t, s, 1, "foo", "foo", nil)
565	testSetKey(t, s, 2, "bar", "bar", nil)
566	testSetKey(t, s, 3, "baz", "baz", nil)
567
568	// Do a CAS delete with an index lower than the entry
569	ok, err := s.KVSDeleteCAS(4, 1, "bar", nil)
570	if ok || err != nil {
571		t.Fatalf("expected (false, nil), got: (%v, %#v)", ok, err)
572	}
573
574	// Check that the index is untouched and the entry
575	// has not been deleted.
576	idx, e, err := s.KVSGet(nil, "foo", nil)
577	if err != nil {
578		t.Fatalf("err: %s", err)
579	}
580	if e == nil {
581		t.Fatalf("expected a kvs entry, got nil")
582	}
583	if idx != 3 {
584		t.Fatalf("bad index: %d", idx)
585	}
586
587	// Do another CAS delete, this time with the correct index
588	// which should cause the delete to take place.
589	ok, err = s.KVSDeleteCAS(4, 2, "bar", nil)
590	if !ok || err != nil {
591		t.Fatalf("expected (true, nil), got: (%v, %#v)", ok, err)
592	}
593
594	// Entry was deleted and index was updated
595	idx, e, err = s.KVSGet(nil, "bar", nil)
596	if err != nil {
597		t.Fatalf("err: %s", err)
598	}
599	if e != nil {
600		t.Fatalf("entry should be deleted")
601	}
602	if idx != 4 {
603		t.Fatalf("bad index: %d", idx)
604	}
605
606	// Add another key to bump the index.
607	testSetKey(t, s, 5, "some/other/key", "baz", nil)
608
609	// Check that the tombstone was created and that prevents the index
610	// from sliding backwards.
611	idx, _, err = s.KVSList(nil, "bar", nil)
612	if err != nil {
613		t.Fatalf("err: %s", err)
614	}
615	if idx != 4 {
616		t.Fatalf("bad index: %d", idx)
617	}
618
619	// Now reap the tombstone and watch the index move up to the table
620	// index since there are no matching keys.
621	if err := s.ReapTombstones(4); err != nil {
622		t.Fatalf("err: %s", err)
623	}
624	idx, _, err = s.KVSList(nil, "bar", nil)
625	if err != nil {
626		t.Fatalf("err: %s", err)
627	}
628	if idx != 5 {
629		t.Fatalf("bad index: %d", idx)
630	}
631
632	// A delete on a nonexistent key should be idempotent and not return an
633	// error
634	ok, err = s.KVSDeleteCAS(6, 2, "bar", nil)
635	if !ok || err != nil {
636		t.Fatalf("expected (true, nil), got: (%v, %#v)", ok, err)
637	}
638	if idx := s.maxIndex("kvs"); idx != 5 {
639		t.Fatalf("bad index: %d", idx)
640	}
641}
642
643func TestStateStore_KVSSetCAS(t *testing.T) {
644	s := testStateStore(t)
645
646	// Doing a CAS with ModifyIndex != 0 and no existing entry
647	// is a no-op.
648	entry := &structs.DirEntry{
649		Key:   "foo",
650		Value: []byte("foo"),
651		RaftIndex: structs.RaftIndex{
652			CreateIndex: 1,
653			ModifyIndex: 1,
654		},
655	}
656	ok, err := s.KVSSetCAS(2, entry)
657	if ok || err != nil {
658		t.Fatalf("expected (false, nil), got: (%#v, %#v)", ok, err)
659	}
660
661	// Check that nothing was actually stored
662	tx := s.db.Txn(false)
663	if e, err := firstWithTxn(tx, "kvs", "id", "foo", nil); e != nil || err != nil {
664		t.Fatalf("expected (nil, nil), got: (%#v, %#v)", e, err)
665	}
666	tx.Abort()
667
668	// Index was not updated
669	if idx := s.maxIndex("kvs"); idx != 0 {
670		t.Fatalf("bad index: %d", idx)
671	}
672
673	// Doing a CAS with a ModifyIndex of zero when no entry exists
674	// performs the set and saves into the state store.
675	entry = &structs.DirEntry{
676		Key:   "foo",
677		Value: []byte("foo"),
678		RaftIndex: structs.RaftIndex{
679			CreateIndex: 0,
680			ModifyIndex: 0,
681		},
682	}
683	ok, err = s.KVSSetCAS(2, entry)
684	if !ok || err != nil {
685		t.Fatalf("expected (true, nil), got: (%#v, %#v)", ok, err)
686	}
687
688	// Entry was inserted
689	idx, entry, err := s.KVSGet(nil, "foo", nil)
690	if err != nil {
691		t.Fatalf("err: %s", err)
692	}
693	if string(entry.Value) != "foo" || entry.CreateIndex != 2 || entry.ModifyIndex != 2 {
694		t.Fatalf("bad entry: %#v", entry)
695	}
696	if idx != 2 {
697		t.Fatalf("bad index: %d", idx)
698	}
699
700	// Doing a CAS with a ModifyIndex of zero when an entry exists does
701	// not do anything.
702	entry = &structs.DirEntry{
703		Key:   "foo",
704		Value: []byte("foo"),
705		RaftIndex: structs.RaftIndex{
706			CreateIndex: 0,
707			ModifyIndex: 0,
708		},
709	}
710	ok, err = s.KVSSetCAS(3, entry)
711	if ok || err != nil {
712		t.Fatalf("expected (false, nil), got: (%#v, %#v)", ok, err)
713	}
714
715	// Doing a CAS with a ModifyIndex which does not match the current
716	// index does not do anything.
717	entry = &structs.DirEntry{
718		Key:   "foo",
719		Value: []byte("bar"),
720		RaftIndex: structs.RaftIndex{
721			CreateIndex: 3,
722			ModifyIndex: 3,
723		},
724	}
725	ok, err = s.KVSSetCAS(3, entry)
726	if ok || err != nil {
727		t.Fatalf("expected (false, nil), got: (%#v, %#v)", ok, err)
728	}
729
730	// Entry was not updated in the store
731	idx, entry, err = s.KVSGet(nil, "foo", nil)
732	if err != nil {
733		t.Fatalf("err: %s", err)
734	}
735	if string(entry.Value) != "foo" || entry.CreateIndex != 2 || entry.ModifyIndex != 2 {
736		t.Fatalf("bad entry: %#v", entry)
737	}
738	if idx != 2 {
739		t.Fatalf("bad index: %d", idx)
740	}
741
742	// Doing a CAS with the proper current index should make the
743	// modification.
744	entry = &structs.DirEntry{
745		Key:   "foo",
746		Value: []byte("bar"),
747		RaftIndex: structs.RaftIndex{
748			CreateIndex: 2,
749			ModifyIndex: 2,
750		},
751	}
752	ok, err = s.KVSSetCAS(3, entry)
753	if !ok || err != nil {
754		t.Fatalf("expected (true, nil), got: (%#v, %#v)", ok, err)
755	}
756
757	// Entry was updated
758	idx, entry, err = s.KVSGet(nil, "foo", nil)
759	if err != nil {
760		t.Fatalf("err: %s", err)
761	}
762	if string(entry.Value) != "bar" || entry.CreateIndex != 2 || entry.ModifyIndex != 3 {
763		t.Fatalf("bad entry: %#v", entry)
764	}
765	if idx != 3 {
766		t.Fatalf("bad index: %d", idx)
767	}
768
769	// Attempt to update the session during the CAS.
770	entry = &structs.DirEntry{
771		Key:     "foo",
772		Value:   []byte("zoo"),
773		Session: "nope",
774		RaftIndex: structs.RaftIndex{
775			CreateIndex: 2,
776			ModifyIndex: 3,
777		},
778	}
779	ok, err = s.KVSSetCAS(4, entry)
780	if !ok || err != nil {
781		t.Fatalf("expected (true, nil), got: (%#v, %#v)", ok, err)
782	}
783
784	// Entry was updated, but the session should have been ignored.
785	idx, entry, err = s.KVSGet(nil, "foo", nil)
786	if err != nil {
787		t.Fatalf("err: %s", err)
788	}
789	if string(entry.Value) != "zoo" || entry.CreateIndex != 2 || entry.ModifyIndex != 4 ||
790		entry.Session != "" {
791		t.Fatalf("bad entry: %#v", entry)
792	}
793	if idx != 4 {
794		t.Fatalf("bad index: %d", idx)
795	}
796
797	// Now lock it and try the update, which should keep the session.
798	testRegisterNode(t, s, 5, "node1")
799	session := testUUID()
800	if err := s.SessionCreate(6, &structs.Session{ID: session, Node: "node1"}); err != nil {
801		t.Fatalf("err: %s", err)
802	}
803	entry = &structs.DirEntry{
804		Key:     "foo",
805		Value:   []byte("locked"),
806		Session: session,
807		RaftIndex: structs.RaftIndex{
808			CreateIndex: 2,
809			ModifyIndex: 4,
810		},
811	}
812	ok, err = s.KVSLock(6, entry)
813	if !ok || err != nil {
814		t.Fatalf("didn't get the lock: %v %s", ok, err)
815	}
816	entry = &structs.DirEntry{
817		Key:   "foo",
818		Value: []byte("locked"),
819		RaftIndex: structs.RaftIndex{
820			CreateIndex: 2,
821			ModifyIndex: 6,
822		},
823	}
824	ok, err = s.KVSSetCAS(7, entry)
825	if !ok || err != nil {
826		t.Fatalf("expected (true, nil), got: (%#v, %#v)", ok, err)
827	}
828
829	// Entry was updated, and the lock status should have stayed the same.
830	idx, entry, err = s.KVSGet(nil, "foo", nil)
831	if err != nil {
832		t.Fatalf("err: %s", err)
833	}
834	if string(entry.Value) != "locked" || entry.CreateIndex != 2 || entry.ModifyIndex != 7 ||
835		entry.Session != session {
836		t.Fatalf("bad entry: %#v", entry)
837	}
838	if idx != 7 {
839		t.Fatalf("bad index: %d", idx)
840	}
841}
842
843func TestStateStore_KVSDeleteTree(t *testing.T) {
844	s := testStateStore(t)
845
846	// Create kvs entries in the state store.
847	testSetKey(t, s, 1, "foo/bar", "bar", nil)
848	testSetKey(t, s, 2, "foo/bar/baz", "baz", nil)
849	testSetKey(t, s, 3, "foo/bar/zip", "zip", nil)
850	testSetKey(t, s, 4, "foo/zorp", "zorp", nil)
851
852	// Calling tree deletion which affects nothing does not
853	// modify the table index.
854	if err := s.KVSDeleteTree(9, "bar", nil); err != nil {
855		t.Fatalf("err: %s", err)
856	}
857	if idx := s.maxIndex("kvs"); idx != 4 {
858		t.Fatalf("bad index: %d", idx)
859	}
860
861	// Call tree deletion with a nested prefix.
862	if err := s.KVSDeleteTree(5, "foo/bar", nil); err != nil {
863		t.Fatalf("err: %s", err)
864	}
865
866	// Check that all the matching keys were deleted
867	tx := s.db.Txn(false)
868	defer tx.Abort()
869
870	entries, err := tx.Get("kvs", "id")
871	if err != nil {
872		t.Fatalf("err: %s", err)
873	}
874
875	num := 0
876	for entry := entries.Next(); entry != nil; entry = entries.Next() {
877		if entry.(*structs.DirEntry).Key != "foo/zorp" {
878			t.Fatalf("unexpected kvs entry: %#v", entry)
879		}
880		num++
881	}
882
883	if num != 1 {
884		t.Fatalf("expected 1 key, got: %d", num)
885	}
886
887	// Index should be updated if modifications are made
888	if idx := s.maxIndex("kvs"); idx != 5 {
889		t.Fatalf("bad index: %d", idx)
890	}
891
892	// Check that the tombstones ware created and that prevents the index
893	// from sliding backwards.
894	idx, _, err := s.KVSList(nil, "foo", nil)
895	if err != nil {
896		t.Fatalf("err: %s", err)
897	}
898	if idx != 5 {
899		t.Fatalf("bad index: %d", idx)
900	}
901
902	// Now reap the tombstones and watch the index revert to the remaining
903	// foo/zorp key's index.
904	if err := s.ReapTombstones(5); err != nil {
905		t.Fatalf("err: %s", err)
906	}
907	idx, _, err = s.KVSList(nil, "foo", nil)
908	if err != nil {
909		t.Fatalf("err: %s", err)
910	}
911	if idx != 4 {
912		t.Fatalf("bad index: %d", idx)
913	}
914}
915
916func TestStateStore_Watches_PrefixDelete(t *testing.T) {
917	s := testStateStore(t)
918
919	// Create some KVS entries
920	testSetKey(t, s, 1, "foo", "foo", nil)
921	testSetKey(t, s, 2, "foo/bar", "bar", nil)
922	testSetKey(t, s, 3, "foo/bar/zip", "zip", nil)
923	testSetKey(t, s, 4, "foo/bar/zip/zorp", "zorp", nil)
924	testSetKey(t, s, 5, "foo/bar/zip/zap", "zap", nil)
925	testSetKey(t, s, 6, "foo/nope", "nope", nil)
926
927	ws := memdb.NewWatchSet()
928	got, _, err := s.KVSList(ws, "foo/bar", nil)
929	if err != nil {
930		t.Fatalf("unexpected err: %s", err)
931	}
932	var wantIndex uint64 = 5
933	if got != wantIndex {
934		t.Fatalf("bad index: %d, expected %d", wantIndex, got)
935	}
936
937	// Delete a key and make sure the index comes from the tombstone.
938	if err := s.KVSDeleteTree(7, "foo/bar/zip", nil); err != nil {
939		t.Fatalf("unexpected err: %s", err)
940	}
941	// Make sure watch fires
942	if !watchFired(ws) {
943		t.Fatalf("expected watch to fire but it did not")
944	}
945
946	//Verify index matches tombstone
947	got, _, err = s.KVSList(ws, "foo/bar", nil)
948	if err != nil {
949		t.Fatalf("unexpected err: %s", err)
950	}
951	wantIndex = 7
952	if got != wantIndex {
953		t.Fatalf("bad index: %d, expected %d", got, wantIndex)
954	}
955	// Make sure watch fires
956	if !watchFired(ws) {
957		t.Fatalf("expected watch to fire but it did not")
958	}
959
960	// Reap tombstone and verify list on the same key reverts its index value
961	if err := s.ReapTombstones(wantIndex); err != nil {
962		t.Fatalf("err: %s", err)
963	}
964
965	got, _, err = s.KVSList(nil, "foo/bar", nil)
966	wantIndex = 2
967	if err != nil {
968		t.Fatalf("err: %s", err)
969	}
970	if got != wantIndex {
971		t.Fatalf("bad index: %d, expected %d", got, wantIndex)
972	}
973
974	// Set a different key to bump the index. This shouldn't fire the
975	// watch since there's a different prefix.
976	testSetKey(t, s, 8, "some/other/key", "", nil)
977
978	// Now ask for the index for a node within the prefix that was deleted
979	// We expect to get the max index in the tree
980	wantIndex = 8
981	ws = memdb.NewWatchSet()
982	got, _, err = s.KVSList(ws, "foo/bar/baz", nil)
983	if err != nil {
984		t.Fatalf("err: %s", err)
985	}
986	if watchFired(ws) {
987		t.Fatalf("Watch should not have fired")
988	}
989	if got != wantIndex {
990		t.Fatalf("bad index: %d, expected %d", got, wantIndex)
991	}
992
993	// List all the keys to make sure the index returned is the max index
994	got, _, err = s.KVSList(nil, "", nil)
995	if err != nil {
996		t.Fatalf("err: %s", err)
997	}
998	if got != wantIndex {
999		t.Fatalf("bad index: %d, expected %d", got, wantIndex)
1000	}
1001
1002	// Delete all the keys, special case where tombstones are not inserted
1003	if err := s.KVSDeleteTree(9, "", nil); err != nil {
1004		t.Fatalf("unexpected err: %s", err)
1005	}
1006	wantIndex = 9
1007	got, _, err = s.KVSList(nil, "/foo/bar", nil)
1008	if err != nil {
1009		t.Fatalf("err: %s", err)
1010	}
1011	if got != wantIndex {
1012		t.Fatalf("bad index: %d, expected %d", got, wantIndex)
1013	}
1014
1015}
1016
1017func TestStateStore_KVSLockDelay(t *testing.T) {
1018	s := testStateStore(t)
1019
1020	// KVSLockDelay is exercised in the lock/unlock and session invalidation
1021	// cases below, so we just do a basic check on a nonexistent key here.
1022	expires := s.KVSLockDelay("/not/there", nil)
1023	if expires.After(time.Now()) {
1024		t.Fatalf("bad: %v", expires)
1025	}
1026}
1027
1028func TestStateStore_KVSLock(t *testing.T) {
1029	s := testStateStore(t)
1030
1031	// Lock with no session should fail.
1032	ok, err := s.KVSLock(0, &structs.DirEntry{Key: "foo", Value: []byte("foo")})
1033	if ok || err == nil || !strings.Contains(err.Error(), "missing session") {
1034		t.Fatalf("didn't detect missing session: %v %s", ok, err)
1035	}
1036
1037	// Now try with a bogus session.
1038	ok, err = s.KVSLock(1, &structs.DirEntry{Key: "foo", Value: []byte("foo"), Session: testUUID()})
1039	if ok || err == nil || !strings.Contains(err.Error(), "invalid session") {
1040		t.Fatalf("didn't detect invalid session: %v %s", ok, err)
1041	}
1042
1043	// Make a real session.
1044	testRegisterNode(t, s, 2, "node1")
1045	session1 := testUUID()
1046	if err := s.SessionCreate(3, &structs.Session{ID: session1, Node: "node1"}); err != nil {
1047		t.Fatalf("err: %s", err)
1048	}
1049
1050	// Lock and make the key at the same time.
1051	ok, err = s.KVSLock(4, &structs.DirEntry{Key: "foo", Value: []byte("foo"), Session: session1})
1052	if !ok || err != nil {
1053		t.Fatalf("didn't get the lock: %v %s", ok, err)
1054	}
1055
1056	// Make sure the indexes got set properly.
1057	idx, result, err := s.KVSGet(nil, "foo", nil)
1058	if err != nil {
1059		t.Fatalf("err: %s", err)
1060	}
1061	if result.LockIndex != 1 || result.CreateIndex != 4 || result.ModifyIndex != 4 ||
1062		string(result.Value) != "foo" {
1063		t.Fatalf("bad entry: %#v", result)
1064	}
1065	if idx != 4 {
1066		t.Fatalf("bad index: %d", idx)
1067	}
1068
1069	// Re-locking with the same session should update the value and report
1070	// success.
1071	ok, err = s.KVSLock(5, &structs.DirEntry{Key: "foo", Value: []byte("bar"), Session: session1})
1072	if !ok || err != nil {
1073		t.Fatalf("didn't handle locking an already-locked key: %v %s", ok, err)
1074	}
1075
1076	// Make sure the indexes got set properly, note that the lock index
1077	// won't go up since we didn't lock it again.
1078	idx, result, err = s.KVSGet(nil, "foo", nil)
1079	if err != nil {
1080		t.Fatalf("err: %s", err)
1081	}
1082	if result.LockIndex != 1 || result.CreateIndex != 4 || result.ModifyIndex != 5 ||
1083		string(result.Value) != "bar" {
1084		t.Fatalf("bad entry: %#v", result)
1085	}
1086	if idx != 5 {
1087		t.Fatalf("bad index: %d", idx)
1088	}
1089
1090	// Unlock and the re-lock.
1091	ok, err = s.KVSUnlock(6, &structs.DirEntry{Key: "foo", Value: []byte("baz"), Session: session1})
1092	if !ok || err != nil {
1093		t.Fatalf("didn't handle unlocking a locked key: %v %s", ok, err)
1094	}
1095	ok, err = s.KVSLock(7, &structs.DirEntry{Key: "foo", Value: []byte("zoo"), Session: session1})
1096	if !ok || err != nil {
1097		t.Fatalf("didn't get the lock: %v %s", ok, err)
1098	}
1099
1100	// Make sure the indexes got set properly.
1101	idx, result, err = s.KVSGet(nil, "foo", nil)
1102	if err != nil {
1103		t.Fatalf("err: %s", err)
1104	}
1105	if result.LockIndex != 2 || result.CreateIndex != 4 || result.ModifyIndex != 7 ||
1106		string(result.Value) != "zoo" {
1107		t.Fatalf("bad entry: %#v", result)
1108	}
1109	if idx != 7 {
1110		t.Fatalf("bad index: %d", idx)
1111	}
1112
1113	// Lock an existing key.
1114	testSetKey(t, s, 8, "bar", "bar", nil)
1115	ok, err = s.KVSLock(9, &structs.DirEntry{Key: "bar", Value: []byte("xxx"), Session: session1})
1116	if !ok || err != nil {
1117		t.Fatalf("didn't get the lock: %v %s", ok, err)
1118	}
1119
1120	// Make sure the indexes got set properly.
1121	idx, result, err = s.KVSGet(nil, "bar", nil)
1122	if err != nil {
1123		t.Fatalf("err: %s", err)
1124	}
1125	if result.LockIndex != 1 || result.CreateIndex != 8 || result.ModifyIndex != 9 ||
1126		string(result.Value) != "xxx" {
1127		t.Fatalf("bad entry: %#v", result)
1128	}
1129	if idx != 9 {
1130		t.Fatalf("bad index: %d", idx)
1131	}
1132
1133	// Attempting a re-lock with a different session should also fail.
1134	session2 := testUUID()
1135	if err := s.SessionCreate(10, &structs.Session{ID: session2, Node: "node1"}); err != nil {
1136		t.Fatalf("err: %s", err)
1137	}
1138
1139	// Re-locking should not return an error, but will report that it didn't
1140	// get the lock.
1141	ok, err = s.KVSLock(11, &structs.DirEntry{Key: "bar", Value: []byte("nope"), Session: session2})
1142	if ok || err != nil {
1143		t.Fatalf("didn't handle locking an already-locked key: %v %s", ok, err)
1144	}
1145
1146	// Make sure the indexes didn't update.
1147	idx, result, err = s.KVSGet(nil, "bar", nil)
1148	if err != nil {
1149		t.Fatalf("err: %s", err)
1150	}
1151	if result.LockIndex != 1 || result.CreateIndex != 8 || result.ModifyIndex != 9 ||
1152		string(result.Value) != "xxx" {
1153		t.Fatalf("bad entry: %#v", result)
1154	}
1155	if idx != 9 {
1156		t.Fatalf("bad index: %d", idx)
1157	}
1158}
1159
1160func TestStateStore_KVSUnlock(t *testing.T) {
1161	s := testStateStore(t)
1162
1163	// Unlock with no session should fail.
1164	ok, err := s.KVSUnlock(0, &structs.DirEntry{Key: "foo", Value: []byte("bar")})
1165	if ok || err == nil || !strings.Contains(err.Error(), "missing session") {
1166		t.Fatalf("didn't detect missing session: %v %s", ok, err)
1167	}
1168
1169	// Make a real session.
1170	testRegisterNode(t, s, 1, "node1")
1171	session1 := testUUID()
1172	if err := s.SessionCreate(2, &structs.Session{ID: session1, Node: "node1"}); err != nil {
1173		t.Fatalf("err: %s", err)
1174	}
1175
1176	// Unlock with a real session but no key should not return an error, but
1177	// will report it didn't unlock anything.
1178	ok, err = s.KVSUnlock(3, &structs.DirEntry{Key: "foo", Value: []byte("bar"), Session: session1})
1179	if ok || err != nil {
1180		t.Fatalf("didn't handle unlocking a missing key: %v %s", ok, err)
1181	}
1182
1183	// Make a key and unlock it, without it being locked.
1184	testSetKey(t, s, 4, "foo", "bar", nil)
1185	ok, err = s.KVSUnlock(5, &structs.DirEntry{Key: "foo", Value: []byte("baz"), Session: session1})
1186	if ok || err != nil {
1187		t.Fatalf("didn't handle unlocking a non-locked key: %v %s", ok, err)
1188	}
1189
1190	// Make sure the indexes didn't update.
1191	idx, result, err := s.KVSGet(nil, "foo", nil)
1192	if err != nil {
1193		t.Fatalf("err: %s", err)
1194	}
1195	if result.LockIndex != 0 || result.CreateIndex != 4 || result.ModifyIndex != 4 ||
1196		string(result.Value) != "bar" {
1197		t.Fatalf("bad entry: %#v", result)
1198	}
1199	if idx != 4 {
1200		t.Fatalf("bad index: %d", idx)
1201	}
1202
1203	// Lock it with the first session.
1204	ok, err = s.KVSLock(6, &structs.DirEntry{Key: "foo", Value: []byte("bar"), Session: session1})
1205	if !ok || err != nil {
1206		t.Fatalf("didn't get the lock: %v %s", ok, err)
1207	}
1208
1209	// Attempt an unlock with another session.
1210	session2 := testUUID()
1211	if err := s.SessionCreate(7, &structs.Session{ID: session2, Node: "node1"}); err != nil {
1212		t.Fatalf("err: %s", err)
1213	}
1214	ok, err = s.KVSUnlock(8, &structs.DirEntry{Key: "foo", Value: []byte("zoo"), Session: session2})
1215	if ok || err != nil {
1216		t.Fatalf("didn't handle unlocking with the wrong session: %v %s", ok, err)
1217	}
1218
1219	// Make sure the indexes didn't update.
1220	idx, result, err = s.KVSGet(nil, "foo", nil)
1221	if err != nil {
1222		t.Fatalf("err: %s", err)
1223	}
1224	if result.LockIndex != 1 || result.CreateIndex != 4 || result.ModifyIndex != 6 ||
1225		string(result.Value) != "bar" {
1226		t.Fatalf("bad entry: %#v", result)
1227	}
1228	if idx != 6 {
1229		t.Fatalf("bad index: %d", idx)
1230	}
1231
1232	// Now do the unlock with the correct session.
1233	ok, err = s.KVSUnlock(9, &structs.DirEntry{Key: "foo", Value: []byte("zoo"), Session: session1})
1234	if !ok || err != nil {
1235		t.Fatalf("didn't handle unlocking with the correct session: %v %s", ok, err)
1236	}
1237
1238	// Make sure the indexes got set properly.
1239	idx, result, err = s.KVSGet(nil, "foo", nil)
1240	if err != nil {
1241		t.Fatalf("err: %s", err)
1242	}
1243	if result.LockIndex != 1 || result.CreateIndex != 4 || result.ModifyIndex != 9 ||
1244		string(result.Value) != "zoo" {
1245		t.Fatalf("bad entry: %#v", result)
1246	}
1247	if idx != 9 {
1248		t.Fatalf("bad index: %d", idx)
1249	}
1250
1251	// Unlocking again should fail and not change anything.
1252	ok, err = s.KVSUnlock(10, &structs.DirEntry{Key: "foo", Value: []byte("nope"), Session: session1})
1253	if ok || err != nil {
1254		t.Fatalf("didn't handle unlocking with the previous session: %v %s", ok, err)
1255	}
1256
1257	// Make sure the indexes didn't update.
1258	idx, result, err = s.KVSGet(nil, "foo", nil)
1259	if err != nil {
1260		t.Fatalf("err: %s", err)
1261	}
1262	if result.LockIndex != 1 || result.CreateIndex != 4 || result.ModifyIndex != 9 ||
1263		string(result.Value) != "zoo" {
1264		t.Fatalf("bad entry: %#v", result)
1265	}
1266	if idx != 9 {
1267		t.Fatalf("bad index: %d", idx)
1268	}
1269}
1270
1271func TestStateStore_KVS_Snapshot_Restore(t *testing.T) {
1272	s := testStateStore(t)
1273
1274	// Build up some entries to seed.
1275	entries := structs.DirEntries{
1276		&structs.DirEntry{
1277			Key:   "aaa",
1278			Flags: 23,
1279			Value: []byte("hello"),
1280		},
1281		&structs.DirEntry{
1282			Key:   "bar/a",
1283			Value: []byte("one"),
1284		},
1285		&structs.DirEntry{
1286			Key:   "bar/b",
1287			Value: []byte("two"),
1288		},
1289		&structs.DirEntry{
1290			Key:   "bar/c",
1291			Value: []byte("three"),
1292		},
1293	}
1294	for i, entry := range entries {
1295		if err := s.KVSSet(uint64(i+1), entry); err != nil {
1296			t.Fatalf("err: %s", err)
1297		}
1298	}
1299
1300	// Make a node and session so we can test a locked key.
1301	testRegisterNode(t, s, 5, "node1")
1302	session := testUUID()
1303	if err := s.SessionCreate(6, &structs.Session{ID: session, Node: "node1"}); err != nil {
1304		t.Fatalf("err: %s", err)
1305	}
1306	entries[3].Session = session
1307	if ok, err := s.KVSLock(7, entries[3]); !ok || err != nil {
1308		t.Fatalf("didn't get the lock: %v %s", ok, err)
1309	}
1310
1311	// This is required for the compare later.
1312	entries[3].LockIndex = 1
1313
1314	// Snapshot the keys.
1315	snap := s.Snapshot()
1316	defer snap.Close()
1317
1318	// Alter the real state store.
1319	if err := s.KVSSet(8, &structs.DirEntry{Key: "aaa", Value: []byte("nope")}); err != nil {
1320		t.Fatalf("err: %s", err)
1321	}
1322
1323	// Verify the snapshot.
1324	if idx := snap.LastIndex(); idx != 6 {
1325		t.Fatalf("bad index: %d", idx)
1326	}
1327	iter, err := snap.KVs()
1328	if err != nil {
1329		t.Fatalf("err: %s", err)
1330	}
1331	var dump structs.DirEntries
1332	for entry := iter.Next(); entry != nil; entry = iter.Next() {
1333		dump = append(dump, entry.(*structs.DirEntry))
1334	}
1335	if !reflect.DeepEqual(dump, entries) {
1336		t.Fatalf("bad: %#v", dump)
1337	}
1338
1339	// Restore the values into a new state store.
1340	func() {
1341		s := testStateStore(t)
1342		restore := s.Restore()
1343		for _, entry := range dump {
1344			if err := restore.KVS(entry); err != nil {
1345				t.Fatalf("err: %s", err)
1346			}
1347		}
1348		restore.Commit()
1349
1350		// Read the restored keys back out and verify they match.
1351		idx, res, err := s.KVSList(nil, "", nil)
1352		if err != nil {
1353			t.Fatalf("err: %s", err)
1354		}
1355		if idx != 7 {
1356			t.Fatalf("bad index: %d", idx)
1357		}
1358		if !reflect.DeepEqual(res, entries) {
1359			t.Fatalf("bad: %#v", res)
1360		}
1361
1362		// Check that the index was updated.
1363		if idx := s.maxIndex("kvs"); idx != 7 {
1364			t.Fatalf("bad index: %d", idx)
1365		}
1366	}()
1367}
1368
1369func TestStateStore_Tombstone_Snapshot_Restore(t *testing.T) {
1370	s := testStateStore(t)
1371
1372	// Insert a key and then delete it to create a tombstone.
1373	testSetKey(t, s, 1, "foo/bar", "bar", nil)
1374	testSetKey(t, s, 2, "foo/bar/baz", "bar", nil)
1375	testSetKey(t, s, 3, "foo/bar/zoo", "bar", nil)
1376	if err := s.KVSDelete(4, "foo/bar", nil); err != nil {
1377		t.Fatalf("err: %s", err)
1378	}
1379
1380	// Snapshot the Tombstones.
1381	snap := s.Snapshot()
1382	defer snap.Close()
1383
1384	// Alter the real state store.
1385	if err := s.ReapTombstones(4); err != nil {
1386		t.Fatalf("err: %s", err)
1387	}
1388	idx, _, err := s.KVSList(nil, "foo/bar", nil)
1389	if err != nil {
1390		t.Fatalf("err: %s", err)
1391	}
1392	if idx != 3 {
1393		t.Fatalf("bad index: %d", idx)
1394	}
1395
1396	// Verify the snapshot.
1397	stones, err := snap.Tombstones()
1398	if err != nil {
1399		t.Fatalf("err: %s", err)
1400	}
1401	var dump []*Tombstone
1402	for stone := stones.Next(); stone != nil; stone = stones.Next() {
1403		dump = append(dump, stone.(*Tombstone))
1404	}
1405	if len(dump) != 1 {
1406		t.Fatalf("bad %#v", dump)
1407	}
1408	stone := dump[0]
1409	if stone.Key != "foo/bar" || stone.Index != 4 {
1410		t.Fatalf("bad: %#v", stone)
1411	}
1412
1413	// Restore the values into a new state store.
1414	func() {
1415		s := testStateStore(t)
1416		restore := s.Restore()
1417		for _, stone := range dump {
1418			if err := restore.Tombstone(stone); err != nil {
1419				t.Fatalf("err: %s", err)
1420			}
1421		}
1422		restore.Commit()
1423
1424		// See if the stone works properly in a list query.
1425		idx, _, err := s.KVSList(nil, "foo/bar", nil)
1426		if err != nil {
1427			t.Fatalf("err: %s", err)
1428		}
1429		if idx != 4 {
1430			t.Fatalf("bad index: %d", idx)
1431		}
1432
1433		// Make sure it reaps correctly. We should still get a 4 for
1434		// the index here because it will be using the last index from
1435		// the tombstone table.
1436		if err := s.ReapTombstones(4); err != nil {
1437			t.Fatalf("err: %s", err)
1438		}
1439		idx, _, err = s.KVSList(nil, "foo/bar", nil)
1440		if err != nil {
1441			t.Fatalf("err: %s", err)
1442		}
1443		if idx != 4 {
1444			t.Fatalf("bad index: %d", idx)
1445		}
1446
1447		// But make sure the tombstone is actually gone.
1448		snap := s.Snapshot()
1449		defer snap.Close()
1450		stones, err := snap.Tombstones()
1451		if err != nil {
1452			t.Fatalf("err: %s", err)
1453		}
1454		if stones.Next() != nil {
1455			t.Fatalf("unexpected extra tombstones")
1456		}
1457	}()
1458}
1459