1package state 2 3import ( 4 crand "crypto/rand" 5 "fmt" 6 "testing" 7 "time" 8 9 "github.com/hashicorp/consul/agent/structs" 10 "github.com/hashicorp/consul/types" 11 "github.com/hashicorp/go-memdb" 12 "github.com/stretchr/testify/require" 13) 14 15func testUUID() string { 16 buf := make([]byte, 16) 17 if _, err := crand.Read(buf); err != nil { 18 panic(fmt.Errorf("failed to read random bytes: %v", err)) 19 } 20 21 return fmt.Sprintf("%08x-%04x-%04x-%04x-%12x", 22 buf[0:4], 23 buf[4:6], 24 buf[6:8], 25 buf[8:10], 26 buf[10:16]) 27} 28 29func snapshotIndexes(snap *Snapshot) ([]*IndexEntry, error) { 30 iter, err := snap.Indexes() 31 if err != nil { 32 return nil, err 33 } 34 var indexes []*IndexEntry 35 for index := iter.Next(); index != nil; index = iter.Next() { 36 indexes = append(indexes, index.(*IndexEntry)) 37 } 38 return indexes, nil 39} 40 41func restoreIndexes(indexes []*IndexEntry, r *Restore) error { 42 for _, index := range indexes { 43 if err := r.IndexRestore(index); err != nil { 44 return err 45 } 46 } 47 return nil 48} 49 50func testStateStore(t *testing.T) *Store { 51 s, err := NewStateStore(nil) 52 if err != nil { 53 t.Fatalf("err: %s", err) 54 } 55 if s == nil { 56 t.Fatalf("missing state store") 57 } 58 return s 59} 60 61func testRegisterNode(t *testing.T, s *Store, idx uint64, nodeID string) { 62 testRegisterNodeWithMeta(t, s, idx, nodeID, nil) 63} 64 65// testRegisterNodeWithChange registers a node and ensures it gets different from previous registration 66func testRegisterNodeWithChange(t *testing.T, s *Store, idx uint64, nodeID string) { 67 testRegisterNodeWithMeta(t, s, idx, nodeID, map[string]string{ 68 "version": string(idx), 69 }) 70} 71 72func testRegisterNodeWithMeta(t *testing.T, s *Store, idx uint64, nodeID string, meta map[string]string) { 73 node := &structs.Node{Node: nodeID, Meta: meta} 74 if err := s.EnsureNode(idx, node); err != nil { 75 t.Fatalf("err: %s", err) 76 } 77 78 tx := s.db.Txn(false) 79 defer tx.Abort() 80 n, err := tx.First("nodes", "id", nodeID) 81 if err != nil { 82 t.Fatalf("err: %s", err) 83 } 84 if result, ok := n.(*structs.Node); !ok || result.Node != nodeID { 85 t.Fatalf("bad node: %#v", result) 86 } 87} 88 89// testRegisterServiceWithChange registers a service and allow ensuring the consul index is updated 90// even if service already exists if using `modifyAccordingIndex`. 91// This is done by setting the transaction ID in "version" meta so service will be updated if it already exists 92func testRegisterServiceWithChange(t *testing.T, s *Store, idx uint64, nodeID, serviceID string, modifyAccordingIndex bool) { 93 meta := make(map[string]string) 94 if modifyAccordingIndex { 95 meta["version"] = string(idx) 96 } 97 svc := &structs.NodeService{ 98 ID: serviceID, 99 Service: serviceID, 100 Address: "1.1.1.1", 101 Port: 1111, 102 Meta: meta, 103 } 104 if err := s.EnsureService(idx, nodeID, svc); err != nil { 105 t.Fatalf("err: %s", err) 106 } 107 108 tx := s.db.Txn(false) 109 defer tx.Abort() 110 _, service, err := firstWatchCompoundWithTxn(tx, "services", "id", nil, nodeID, serviceID) 111 if err != nil { 112 t.Fatalf("err: %s", err) 113 } 114 if result, ok := service.(*structs.ServiceNode); !ok || 115 result.Node != nodeID || 116 result.ServiceID != serviceID { 117 t.Fatalf("bad service: %#v", result) 118 } 119} 120 121// testRegisterService register a service with given transaction idx 122// If the service already exists, transaction number might not be increased 123// Use `testRegisterServiceWithChange()` if you want perform a registration that 124// ensures the transaction is updated by setting idx in Meta of Service 125func testRegisterService(t *testing.T, s *Store, idx uint64, nodeID, serviceID string) { 126 testRegisterServiceWithChange(t, s, idx, nodeID, serviceID, false) 127} 128 129func testRegisterIngressService(t *testing.T, s *Store, idx uint64, nodeID, serviceID string) { 130 svc := &structs.NodeService{ 131 ID: serviceID, 132 Service: serviceID, 133 Kind: structs.ServiceKindIngressGateway, 134 Address: "1.1.1.1", 135 Port: 1111, 136 } 137 if err := s.EnsureService(idx, nodeID, svc); err != nil { 138 t.Fatalf("err: %s", err) 139 } 140 141 tx := s.db.Txn(false) 142 defer tx.Abort() 143 _, service, err := firstWatchCompoundWithTxn(tx, "services", "id", nil, nodeID, serviceID) 144 if err != nil { 145 t.Fatalf("err: %s", err) 146 } 147 if result, ok := service.(*structs.ServiceNode); !ok || 148 result.Node != nodeID || 149 result.ServiceID != serviceID { 150 t.Fatalf("bad service: %#v", result) 151 } 152} 153 154func testRegisterCheck(t *testing.T, s *Store, idx uint64, 155 nodeID string, serviceID string, checkID types.CheckID, state string) { 156 chk := &structs.HealthCheck{ 157 Node: nodeID, 158 CheckID: checkID, 159 ServiceID: serviceID, 160 Status: state, 161 } 162 if err := s.EnsureCheck(idx, chk); err != nil { 163 t.Fatalf("err: %s", err) 164 } 165 166 tx := s.db.Txn(false) 167 defer tx.Abort() 168 _, c, err := firstWatchCompoundWithTxn(tx, "checks", "id", nil, nodeID, string(checkID)) 169 if err != nil { 170 t.Fatalf("err: %s", err) 171 } 172 if result, ok := c.(*structs.HealthCheck); !ok || 173 result.Node != nodeID || 174 result.ServiceID != serviceID || 175 result.CheckID != checkID { 176 t.Fatalf("bad check: %#v", result) 177 } 178} 179 180func testRegisterSidecarProxy(t *testing.T, s *Store, idx uint64, nodeID string, targetServiceID string) { 181 svc := &structs.NodeService{ 182 ID: targetServiceID + "-sidecar-proxy", 183 Service: targetServiceID + "-sidecar-proxy", 184 Port: 20000, 185 Kind: structs.ServiceKindConnectProxy, 186 Proxy: structs.ConnectProxyConfig{ 187 DestinationServiceName: targetServiceID, 188 DestinationServiceID: targetServiceID, 189 }, 190 } 191 require.NoError(t, s.EnsureService(idx, nodeID, svc)) 192} 193 194func testRegisterConnectNativeService(t *testing.T, s *Store, idx uint64, nodeID string, serviceID string) { 195 svc := &structs.NodeService{ 196 ID: serviceID, 197 Service: serviceID, 198 Port: 1111, 199 Connect: structs.ServiceConnect{ 200 Native: true, 201 }, 202 } 203 require.NoError(t, s.EnsureService(idx, nodeID, svc)) 204} 205 206func testSetKey(t *testing.T, s *Store, idx uint64, key, value string, entMeta *structs.EnterpriseMeta) { 207 entry := &structs.DirEntry{ 208 Key: key, 209 Value: []byte(value), 210 } 211 if entMeta != nil { 212 entry.EnterpriseMeta = *entMeta 213 } 214 215 if err := s.KVSSet(idx, entry); err != nil { 216 t.Fatalf("err: %s", err) 217 } 218 219 tx := s.db.Txn(false) 220 defer tx.Abort() 221 e, err := firstWithTxn(tx, "kvs", "id", key, entMeta) 222 if err != nil { 223 t.Fatalf("err: %s", err) 224 } 225 if result, ok := e.(*structs.DirEntry); !ok || result.Key != key { 226 t.Fatalf("bad kvs entry: %#v", result) 227 } 228} 229 230// watchFired is a helper for unit tests that returns if the given watch set 231// fired (it doesn't care which watch actually fired). This uses a fixed 232// timeout since we already expect the event happened before calling this and 233// just need to distinguish a fire from a timeout. We do need a little time to 234// allow the watch to set up any goroutines, though. 235func watchFired(ws memdb.WatchSet) bool { 236 timedOut := ws.Watch(time.After(50 * time.Millisecond)) 237 return !timedOut 238} 239 240func TestStateStore_Restore_Abort(t *testing.T) { 241 s := testStateStore(t) 242 243 // The detailed restore functions are tested below, this just checks 244 // that abort works. 245 restore := s.Restore() 246 entry := &structs.DirEntry{ 247 Key: "foo", 248 Value: []byte("bar"), 249 RaftIndex: structs.RaftIndex{ 250 ModifyIndex: 5, 251 }, 252 } 253 if err := restore.KVS(entry); err != nil { 254 t.Fatalf("err: %s", err) 255 } 256 restore.Abort() 257 258 idx, entries, err := s.KVSList(nil, "", nil) 259 if err != nil { 260 t.Fatalf("err: %s", err) 261 } 262 if idx != 0 { 263 t.Fatalf("bad index: %d", idx) 264 } 265 if len(entries) != 0 { 266 t.Fatalf("bad: %#v", entries) 267 } 268} 269 270func TestStateStore_Abandon(t *testing.T) { 271 s := testStateStore(t) 272 abandonCh := s.AbandonCh() 273 s.Abandon() 274 select { 275 case <-abandonCh: 276 default: 277 t.Fatalf("bad") 278 } 279} 280 281func TestStateStore_maxIndex(t *testing.T) { 282 s := testStateStore(t) 283 284 testRegisterNode(t, s, 0, "foo") 285 testRegisterNode(t, s, 1, "bar") 286 testRegisterService(t, s, 2, "foo", "consul") 287 288 if max := s.maxIndex("nodes", "services"); max != 2 { 289 t.Fatalf("bad max: %d", max) 290 } 291} 292 293func TestStateStore_indexUpdateMaxTxn(t *testing.T) { 294 s := testStateStore(t) 295 296 testRegisterNode(t, s, 0, "foo") 297 testRegisterNode(t, s, 1, "bar") 298 299 tx := s.db.Txn(true) 300 if err := indexUpdateMaxTxn(tx, 3, "nodes"); err != nil { 301 t.Fatalf("err: %s", err) 302 } 303 tx.Commit() 304 305 if max := s.maxIndex("nodes"); max != 3 { 306 t.Fatalf("bad max: %d", max) 307 } 308} 309