1package state
2
3import (
4	crand "crypto/rand"
5	"fmt"
6	"testing"
7	"time"
8
9	"github.com/hashicorp/consul/agent/structs"
10	"github.com/hashicorp/consul/types"
11	"github.com/hashicorp/go-memdb"
12	"github.com/stretchr/testify/require"
13)
14
15func testUUID() string {
16	buf := make([]byte, 16)
17	if _, err := crand.Read(buf); err != nil {
18		panic(fmt.Errorf("failed to read random bytes: %v", err))
19	}
20
21	return fmt.Sprintf("%08x-%04x-%04x-%04x-%12x",
22		buf[0:4],
23		buf[4:6],
24		buf[6:8],
25		buf[8:10],
26		buf[10:16])
27}
28
29func snapshotIndexes(snap *Snapshot) ([]*IndexEntry, error) {
30	iter, err := snap.Indexes()
31	if err != nil {
32		return nil, err
33	}
34	var indexes []*IndexEntry
35	for index := iter.Next(); index != nil; index = iter.Next() {
36		indexes = append(indexes, index.(*IndexEntry))
37	}
38	return indexes, nil
39}
40
41func restoreIndexes(indexes []*IndexEntry, r *Restore) error {
42	for _, index := range indexes {
43		if err := r.IndexRestore(index); err != nil {
44			return err
45		}
46	}
47	return nil
48}
49
50func testStateStore(t *testing.T) *Store {
51	s, err := NewStateStore(nil)
52	if err != nil {
53		t.Fatalf("err: %s", err)
54	}
55	if s == nil {
56		t.Fatalf("missing state store")
57	}
58	return s
59}
60
61func testRegisterNode(t *testing.T, s *Store, idx uint64, nodeID string) {
62	testRegisterNodeWithMeta(t, s, idx, nodeID, nil)
63}
64
65// testRegisterNodeWithChange registers a node and ensures it gets different from previous registration
66func testRegisterNodeWithChange(t *testing.T, s *Store, idx uint64, nodeID string) {
67	testRegisterNodeWithMeta(t, s, idx, nodeID, map[string]string{
68		"version": string(idx),
69	})
70}
71
72func testRegisterNodeWithMeta(t *testing.T, s *Store, idx uint64, nodeID string, meta map[string]string) {
73	node := &structs.Node{Node: nodeID, Meta: meta}
74	if err := s.EnsureNode(idx, node); err != nil {
75		t.Fatalf("err: %s", err)
76	}
77
78	tx := s.db.Txn(false)
79	defer tx.Abort()
80	n, err := tx.First("nodes", "id", nodeID)
81	if err != nil {
82		t.Fatalf("err: %s", err)
83	}
84	if result, ok := n.(*structs.Node); !ok || result.Node != nodeID {
85		t.Fatalf("bad node: %#v", result)
86	}
87}
88
89// testRegisterServiceWithChange registers a service and allow ensuring the consul index is updated
90// even if service already exists if using `modifyAccordingIndex`.
91// This is done by setting the transaction ID in "version" meta so service will be updated if it already exists
92func testRegisterServiceWithChange(t *testing.T, s *Store, idx uint64, nodeID, serviceID string, modifyAccordingIndex bool) {
93	meta := make(map[string]string)
94	if modifyAccordingIndex {
95		meta["version"] = string(idx)
96	}
97	svc := &structs.NodeService{
98		ID:      serviceID,
99		Service: serviceID,
100		Address: "1.1.1.1",
101		Port:    1111,
102		Meta:    meta,
103	}
104	if err := s.EnsureService(idx, nodeID, svc); err != nil {
105		t.Fatalf("err: %s", err)
106	}
107
108	tx := s.db.Txn(false)
109	defer tx.Abort()
110	_, service, err := firstWatchCompoundWithTxn(tx, "services", "id", nil, nodeID, serviceID)
111	if err != nil {
112		t.Fatalf("err: %s", err)
113	}
114	if result, ok := service.(*structs.ServiceNode); !ok ||
115		result.Node != nodeID ||
116		result.ServiceID != serviceID {
117		t.Fatalf("bad service: %#v", result)
118	}
119}
120
121// testRegisterService register a service with given transaction idx
122// If the service already exists, transaction number might not be increased
123// Use `testRegisterServiceWithChange()` if you want perform a registration that
124// ensures the transaction is updated by setting idx in Meta of Service
125func testRegisterService(t *testing.T, s *Store, idx uint64, nodeID, serviceID string) {
126	testRegisterServiceWithChange(t, s, idx, nodeID, serviceID, false)
127}
128
129func testRegisterIngressService(t *testing.T, s *Store, idx uint64, nodeID, serviceID string) {
130	svc := &structs.NodeService{
131		ID:      serviceID,
132		Service: serviceID,
133		Kind:    structs.ServiceKindIngressGateway,
134		Address: "1.1.1.1",
135		Port:    1111,
136	}
137	if err := s.EnsureService(idx, nodeID, svc); err != nil {
138		t.Fatalf("err: %s", err)
139	}
140
141	tx := s.db.Txn(false)
142	defer tx.Abort()
143	_, service, err := firstWatchCompoundWithTxn(tx, "services", "id", nil, nodeID, serviceID)
144	if err != nil {
145		t.Fatalf("err: %s", err)
146	}
147	if result, ok := service.(*structs.ServiceNode); !ok ||
148		result.Node != nodeID ||
149		result.ServiceID != serviceID {
150		t.Fatalf("bad service: %#v", result)
151	}
152}
153
154func testRegisterCheck(t *testing.T, s *Store, idx uint64,
155	nodeID string, serviceID string, checkID types.CheckID, state string) {
156	chk := &structs.HealthCheck{
157		Node:      nodeID,
158		CheckID:   checkID,
159		ServiceID: serviceID,
160		Status:    state,
161	}
162	if err := s.EnsureCheck(idx, chk); err != nil {
163		t.Fatalf("err: %s", err)
164	}
165
166	tx := s.db.Txn(false)
167	defer tx.Abort()
168	_, c, err := firstWatchCompoundWithTxn(tx, "checks", "id", nil, nodeID, string(checkID))
169	if err != nil {
170		t.Fatalf("err: %s", err)
171	}
172	if result, ok := c.(*structs.HealthCheck); !ok ||
173		result.Node != nodeID ||
174		result.ServiceID != serviceID ||
175		result.CheckID != checkID {
176		t.Fatalf("bad check: %#v", result)
177	}
178}
179
180func testRegisterSidecarProxy(t *testing.T, s *Store, idx uint64, nodeID string, targetServiceID string) {
181	svc := &structs.NodeService{
182		ID:      targetServiceID + "-sidecar-proxy",
183		Service: targetServiceID + "-sidecar-proxy",
184		Port:    20000,
185		Kind:    structs.ServiceKindConnectProxy,
186		Proxy: structs.ConnectProxyConfig{
187			DestinationServiceName: targetServiceID,
188			DestinationServiceID:   targetServiceID,
189		},
190	}
191	require.NoError(t, s.EnsureService(idx, nodeID, svc))
192}
193
194func testRegisterConnectNativeService(t *testing.T, s *Store, idx uint64, nodeID string, serviceID string) {
195	svc := &structs.NodeService{
196		ID:      serviceID,
197		Service: serviceID,
198		Port:    1111,
199		Connect: structs.ServiceConnect{
200			Native: true,
201		},
202	}
203	require.NoError(t, s.EnsureService(idx, nodeID, svc))
204}
205
206func testSetKey(t *testing.T, s *Store, idx uint64, key, value string, entMeta *structs.EnterpriseMeta) {
207	entry := &structs.DirEntry{
208		Key:   key,
209		Value: []byte(value),
210	}
211	if entMeta != nil {
212		entry.EnterpriseMeta = *entMeta
213	}
214
215	if err := s.KVSSet(idx, entry); err != nil {
216		t.Fatalf("err: %s", err)
217	}
218
219	tx := s.db.Txn(false)
220	defer tx.Abort()
221	e, err := firstWithTxn(tx, "kvs", "id", key, entMeta)
222	if err != nil {
223		t.Fatalf("err: %s", err)
224	}
225	if result, ok := e.(*structs.DirEntry); !ok || result.Key != key {
226		t.Fatalf("bad kvs entry: %#v", result)
227	}
228}
229
230// watchFired is a helper for unit tests that returns if the given watch set
231// fired (it doesn't care which watch actually fired). This uses a fixed
232// timeout since we already expect the event happened before calling this and
233// just need to distinguish a fire from a timeout. We do need a little time to
234// allow the watch to set up any goroutines, though.
235func watchFired(ws memdb.WatchSet) bool {
236	timedOut := ws.Watch(time.After(50 * time.Millisecond))
237	return !timedOut
238}
239
240func TestStateStore_Restore_Abort(t *testing.T) {
241	s := testStateStore(t)
242
243	// The detailed restore functions are tested below, this just checks
244	// that abort works.
245	restore := s.Restore()
246	entry := &structs.DirEntry{
247		Key:   "foo",
248		Value: []byte("bar"),
249		RaftIndex: structs.RaftIndex{
250			ModifyIndex: 5,
251		},
252	}
253	if err := restore.KVS(entry); err != nil {
254		t.Fatalf("err: %s", err)
255	}
256	restore.Abort()
257
258	idx, entries, err := s.KVSList(nil, "", nil)
259	if err != nil {
260		t.Fatalf("err: %s", err)
261	}
262	if idx != 0 {
263		t.Fatalf("bad index: %d", idx)
264	}
265	if len(entries) != 0 {
266		t.Fatalf("bad: %#v", entries)
267	}
268}
269
270func TestStateStore_Abandon(t *testing.T) {
271	s := testStateStore(t)
272	abandonCh := s.AbandonCh()
273	s.Abandon()
274	select {
275	case <-abandonCh:
276	default:
277		t.Fatalf("bad")
278	}
279}
280
281func TestStateStore_maxIndex(t *testing.T) {
282	s := testStateStore(t)
283
284	testRegisterNode(t, s, 0, "foo")
285	testRegisterNode(t, s, 1, "bar")
286	testRegisterService(t, s, 2, "foo", "consul")
287
288	if max := s.maxIndex("nodes", "services"); max != 2 {
289		t.Fatalf("bad max: %d", max)
290	}
291}
292
293func TestStateStore_indexUpdateMaxTxn(t *testing.T) {
294	s := testStateStore(t)
295
296	testRegisterNode(t, s, 0, "foo")
297	testRegisterNode(t, s, 1, "bar")
298
299	tx := s.db.Txn(true)
300	if err := indexUpdateMaxTxn(tx, 3, "nodes"); err != nil {
301		t.Fatalf("err: %s", err)
302	}
303	tx.Commit()
304
305	if max := s.maxIndex("nodes"); max != 3 {
306		t.Fatalf("bad max: %d", max)
307	}
308}
309