1package state 2 3import ( 4 "fmt" 5 6 memdb "github.com/hashicorp/go-memdb" 7 8 "github.com/hashicorp/consul/agent/structs" 9) 10 11const ( 12 serviceNamesUsageTable = "service-names" 13) 14 15// usageTableSchema returns a new table schema used for tracking various indexes 16// for the Raft log. 17func usageTableSchema() *memdb.TableSchema { 18 return &memdb.TableSchema{ 19 Name: "usage", 20 Indexes: map[string]*memdb.IndexSchema{ 21 "id": { 22 Name: "id", 23 AllowMissing: false, 24 Unique: true, 25 Indexer: &memdb.StringFieldIndex{ 26 Field: "ID", 27 Lowercase: true, 28 }, 29 }, 30 }, 31 } 32} 33 34// UsageEntry represents a count of some arbitrary identifier within the 35// state store, along with the last seen index. 36type UsageEntry struct { 37 ID string 38 Index uint64 39 Count int 40} 41 42// ServiceUsage contains all of the usage data related to services 43type ServiceUsage struct { 44 Services int 45 ServiceInstances int 46 EnterpriseServiceUsage 47} 48 49type uniqueServiceState int 50 51const ( 52 NoChange uniqueServiceState = 0 53 Deleted uniqueServiceState = 1 54 Created uniqueServiceState = 2 55) 56 57// updateUsage takes a set of memdb changes and computes a delta for specific 58// usage metrics that we track. 59func updateUsage(tx WriteTxn, changes Changes) error { 60 usageDeltas := make(map[string]int) 61 serviceNameChanges := make(map[structs.ServiceName]int) 62 for _, change := range changes.Changes { 63 var delta int 64 if change.Created() { 65 delta = 1 66 } else if change.Deleted() { 67 delta = -1 68 } 69 70 switch change.Table { 71 case "nodes": 72 usageDeltas[change.Table] += delta 73 case tableServices: 74 svc := changeObject(change).(*structs.ServiceNode) 75 usageDeltas[change.Table] += delta 76 addEnterpriseServiceInstanceUsage(usageDeltas, change) 77 78 // Construct a mapping of all of the various service names that were 79 // changed, in order to compare it with the finished memdb state. 80 // Make sure to account for the fact that services can change their names. 81 if serviceNameChanged(change) { 82 serviceNameChanges[svc.CompoundServiceName()] += 1 83 before := change.Before.(*structs.ServiceNode) 84 serviceNameChanges[before.CompoundServiceName()] -= 1 85 } else { 86 serviceNameChanges[svc.CompoundServiceName()] += delta 87 } 88 } 89 } 90 91 serviceStates, err := updateServiceNameUsage(tx, usageDeltas, serviceNameChanges) 92 if err != nil { 93 return err 94 } 95 addEnterpriseServiceUsage(usageDeltas, serviceStates) 96 97 idx := changes.Index 98 // This will happen when restoring from a snapshot, just take the max index 99 // of the tables we are tracking. 100 if idx == 0 { 101 idx = maxIndexTxn(tx, "nodes", tableServices) 102 } 103 104 return writeUsageDeltas(tx, idx, usageDeltas) 105} 106 107func updateServiceNameUsage(tx WriteTxn, usageDeltas map[string]int, serviceNameChanges map[structs.ServiceName]int) (map[structs.ServiceName]uniqueServiceState, error) { 108 serviceStates := make(map[structs.ServiceName]uniqueServiceState, len(serviceNameChanges)) 109 for svc, delta := range serviceNameChanges { 110 q := Query{Value: svc.Name, EnterpriseMeta: svc.EnterpriseMeta} 111 serviceIter, err := tx.Get(tableServices, indexService, q) 112 if err != nil { 113 return nil, err 114 } 115 116 // Count the number of service instances associated with the given service 117 // name at the end of this transaction, and compare that with how many were 118 // added/removed during the transaction. This allows us to handle a single 119 // transaction committing multiple changes related to a single service 120 // name. 121 var svcCount int 122 for service := serviceIter.Next(); service != nil; service = serviceIter.Next() { 123 svcCount += 1 124 } 125 126 var serviceState uniqueServiceState 127 switch { 128 case svcCount == 0: 129 // If no services exist, we know we deleted the last service 130 // instance. 131 serviceState = Deleted 132 usageDeltas[serviceNamesUsageTable] -= 1 133 case svcCount == delta: 134 // If the current number of service instances equals the number added, 135 // than we know we created a new service name. 136 serviceState = Created 137 usageDeltas[serviceNamesUsageTable] += 1 138 default: 139 serviceState = NoChange 140 } 141 142 serviceStates[svc] = serviceState 143 } 144 145 return serviceStates, nil 146} 147 148// serviceNameChanged returns a boolean that indicates whether the 149// provided change resulted in an update to the service's service name. 150func serviceNameChanged(change memdb.Change) bool { 151 if change.Updated() { 152 before := change.Before.(*structs.ServiceNode) 153 after := change.After.(*structs.ServiceNode) 154 return before.ServiceName != after.ServiceName 155 } 156 157 return false 158} 159 160// writeUsageDeltas will take in a map of IDs to deltas and update each 161// entry accordingly, checking for integer underflow. The index that is 162// passed in will be recorded on the entry as well. 163func writeUsageDeltas(tx WriteTxn, idx uint64, usageDeltas map[string]int) error { 164 for id, delta := range usageDeltas { 165 u, err := tx.First("usage", "id", id) 166 if err != nil { 167 return fmt.Errorf("failed to retrieve existing usage entry: %s", err) 168 } 169 170 if u == nil { 171 if delta < 0 { 172 // Don't return an error here, since we don't want to block updates 173 // from happening to the state store. But, set the delta to 0 so that 174 // we do not accidentally underflow the uint64 and begin reporting 175 // large numbers. 176 delta = 0 177 } 178 err := tx.Insert("usage", &UsageEntry{ 179 ID: id, 180 Count: delta, 181 Index: idx, 182 }) 183 if err != nil { 184 return fmt.Errorf("failed to update usage entry: %s", err) 185 } 186 } else if cur, ok := u.(*UsageEntry); ok { 187 updated := cur.Count + delta 188 if updated < 0 { 189 // Don't return an error here, since we don't want to block updates 190 // from happening to the state store. But, set the delta to 0 so that 191 // we do not accidentally underflow the uint64 and begin reporting 192 // large numbers. 193 updated = 0 194 } 195 err := tx.Insert("usage", &UsageEntry{ 196 ID: id, 197 Count: updated, 198 Index: idx, 199 }) 200 if err != nil { 201 return fmt.Errorf("failed to update usage entry: %s", err) 202 } 203 } 204 } 205 return nil 206} 207 208// NodeCount returns the latest seen Raft index, a count of the number of nodes 209// registered, and any errors. 210func (s *Store) NodeCount() (uint64, int, error) { 211 tx := s.db.ReadTxn() 212 defer tx.Abort() 213 214 nodeUsage, err := firstUsageEntry(tx, "nodes") 215 if err != nil { 216 return 0, 0, fmt.Errorf("failed nodes lookup: %s", err) 217 } 218 return nodeUsage.Index, nodeUsage.Count, nil 219} 220 221// ServiceUsage returns the latest seen Raft index, a compiled set of service 222// usage data, and any errors. 223func (s *Store) ServiceUsage() (uint64, ServiceUsage, error) { 224 tx := s.db.ReadTxn() 225 defer tx.Abort() 226 227 serviceInstances, err := firstUsageEntry(tx, tableServices) 228 if err != nil { 229 return 0, ServiceUsage{}, fmt.Errorf("failed services lookup: %s", err) 230 } 231 232 services, err := firstUsageEntry(tx, serviceNamesUsageTable) 233 if err != nil { 234 return 0, ServiceUsage{}, fmt.Errorf("failed services lookup: %s", err) 235 } 236 237 usage := ServiceUsage{ 238 ServiceInstances: serviceInstances.Count, 239 Services: services.Count, 240 } 241 results, err := compileEnterpriseUsage(tx, usage) 242 if err != nil { 243 return 0, ServiceUsage{}, fmt.Errorf("failed services lookup: %s", err) 244 } 245 246 return serviceInstances.Index, results, nil 247} 248 249func firstUsageEntry(tx ReadTxn, id string) (*UsageEntry, error) { 250 usage, err := tx.First("usage", "id", id) 251 if err != nil { 252 return nil, err 253 } 254 255 // If no elements have been inserted, the usage entry will not exist. We 256 // return a valid value so that can be certain the return value is not nil 257 // when no error has occurred. 258 if usage == nil { 259 return &UsageEntry{ID: id, Count: 0}, nil 260 } 261 262 realUsage, ok := usage.(*UsageEntry) 263 if !ok { 264 return nil, fmt.Errorf("failed usage lookup: type %T is not *UsageEntry", usage) 265 } 266 267 return realUsage, nil 268} 269