1package state
2
3import (
4	"fmt"
5
6	memdb "github.com/hashicorp/go-memdb"
7
8	"github.com/hashicorp/consul/agent/structs"
9)
10
11const (
12	serviceNamesUsageTable = "service-names"
13)
14
15// usageTableSchema returns a new table schema used for tracking various indexes
16// for the Raft log.
17func usageTableSchema() *memdb.TableSchema {
18	return &memdb.TableSchema{
19		Name: "usage",
20		Indexes: map[string]*memdb.IndexSchema{
21			"id": {
22				Name:         "id",
23				AllowMissing: false,
24				Unique:       true,
25				Indexer: &memdb.StringFieldIndex{
26					Field:     "ID",
27					Lowercase: true,
28				},
29			},
30		},
31	}
32}
33
34// UsageEntry represents a count of some arbitrary identifier within the
35// state store, along with the last seen index.
36type UsageEntry struct {
37	ID    string
38	Index uint64
39	Count int
40}
41
42// ServiceUsage contains all of the usage data related to services
43type ServiceUsage struct {
44	Services         int
45	ServiceInstances int
46	EnterpriseServiceUsage
47}
48
49type uniqueServiceState int
50
51const (
52	NoChange uniqueServiceState = 0
53	Deleted  uniqueServiceState = 1
54	Created  uniqueServiceState = 2
55)
56
57// updateUsage takes a set of memdb changes and computes a delta for specific
58// usage metrics that we track.
59func updateUsage(tx WriteTxn, changes Changes) error {
60	usageDeltas := make(map[string]int)
61	serviceNameChanges := make(map[structs.ServiceName]int)
62	for _, change := range changes.Changes {
63		var delta int
64		if change.Created() {
65			delta = 1
66		} else if change.Deleted() {
67			delta = -1
68		}
69
70		switch change.Table {
71		case "nodes":
72			usageDeltas[change.Table] += delta
73		case tableServices:
74			svc := changeObject(change).(*structs.ServiceNode)
75			usageDeltas[change.Table] += delta
76			addEnterpriseServiceInstanceUsage(usageDeltas, change)
77
78			// Construct a mapping of all of the various service names that were
79			// changed, in order to compare it with the finished memdb state.
80			// Make sure to account for the fact that services can change their names.
81			if serviceNameChanged(change) {
82				serviceNameChanges[svc.CompoundServiceName()] += 1
83				before := change.Before.(*structs.ServiceNode)
84				serviceNameChanges[before.CompoundServiceName()] -= 1
85			} else {
86				serviceNameChanges[svc.CompoundServiceName()] += delta
87			}
88		}
89	}
90
91	serviceStates, err := updateServiceNameUsage(tx, usageDeltas, serviceNameChanges)
92	if err != nil {
93		return err
94	}
95	addEnterpriseServiceUsage(usageDeltas, serviceStates)
96
97	idx := changes.Index
98	// This will happen when restoring from a snapshot, just take the max index
99	// of the tables we are tracking.
100	if idx == 0 {
101		idx = maxIndexTxn(tx, "nodes", tableServices)
102	}
103
104	return writeUsageDeltas(tx, idx, usageDeltas)
105}
106
107func updateServiceNameUsage(tx WriteTxn, usageDeltas map[string]int, serviceNameChanges map[structs.ServiceName]int) (map[structs.ServiceName]uniqueServiceState, error) {
108	serviceStates := make(map[structs.ServiceName]uniqueServiceState, len(serviceNameChanges))
109	for svc, delta := range serviceNameChanges {
110		q := Query{Value: svc.Name, EnterpriseMeta: svc.EnterpriseMeta}
111		serviceIter, err := tx.Get(tableServices, indexService, q)
112		if err != nil {
113			return nil, err
114		}
115
116		// Count the number of service instances associated with the given service
117		// name at the end of this transaction, and compare that with how many were
118		// added/removed during the transaction. This allows us to handle a single
119		// transaction committing multiple changes related to a single service
120		// name.
121		var svcCount int
122		for service := serviceIter.Next(); service != nil; service = serviceIter.Next() {
123			svcCount += 1
124		}
125
126		var serviceState uniqueServiceState
127		switch {
128		case svcCount == 0:
129			// If no services exist, we know we deleted the last service
130			// instance.
131			serviceState = Deleted
132			usageDeltas[serviceNamesUsageTable] -= 1
133		case svcCount == delta:
134			// If the current number of service instances equals the number added,
135			// than we know we created a new service name.
136			serviceState = Created
137			usageDeltas[serviceNamesUsageTable] += 1
138		default:
139			serviceState = NoChange
140		}
141
142		serviceStates[svc] = serviceState
143	}
144
145	return serviceStates, nil
146}
147
148// serviceNameChanged returns a boolean that indicates whether the
149// provided change resulted in an update to the service's service name.
150func serviceNameChanged(change memdb.Change) bool {
151	if change.Updated() {
152		before := change.Before.(*structs.ServiceNode)
153		after := change.After.(*structs.ServiceNode)
154		return before.ServiceName != after.ServiceName
155	}
156
157	return false
158}
159
160// writeUsageDeltas will take in a map of IDs to deltas and update each
161// entry accordingly, checking for integer underflow. The index that is
162// passed in will be recorded on the entry as well.
163func writeUsageDeltas(tx WriteTxn, idx uint64, usageDeltas map[string]int) error {
164	for id, delta := range usageDeltas {
165		u, err := tx.First("usage", "id", id)
166		if err != nil {
167			return fmt.Errorf("failed to retrieve existing usage entry: %s", err)
168		}
169
170		if u == nil {
171			if delta < 0 {
172				// Don't return an error here, since we don't want to block updates
173				// from happening to the state store. But, set the delta to 0 so that
174				// we do not accidentally underflow the uint64 and begin reporting
175				// large numbers.
176				delta = 0
177			}
178			err := tx.Insert("usage", &UsageEntry{
179				ID:    id,
180				Count: delta,
181				Index: idx,
182			})
183			if err != nil {
184				return fmt.Errorf("failed to update usage entry: %s", err)
185			}
186		} else if cur, ok := u.(*UsageEntry); ok {
187			updated := cur.Count + delta
188			if updated < 0 {
189				// Don't return an error here, since we don't want to block updates
190				// from happening to the state store. But, set the delta to 0 so that
191				// we do not accidentally underflow the uint64 and begin reporting
192				// large numbers.
193				updated = 0
194			}
195			err := tx.Insert("usage", &UsageEntry{
196				ID:    id,
197				Count: updated,
198				Index: idx,
199			})
200			if err != nil {
201				return fmt.Errorf("failed to update usage entry: %s", err)
202			}
203		}
204	}
205	return nil
206}
207
208// NodeCount returns the latest seen Raft index, a count of the number of nodes
209// registered, and any errors.
210func (s *Store) NodeCount() (uint64, int, error) {
211	tx := s.db.ReadTxn()
212	defer tx.Abort()
213
214	nodeUsage, err := firstUsageEntry(tx, "nodes")
215	if err != nil {
216		return 0, 0, fmt.Errorf("failed nodes lookup: %s", err)
217	}
218	return nodeUsage.Index, nodeUsage.Count, nil
219}
220
221// ServiceUsage returns the latest seen Raft index, a compiled set of service
222// usage data, and any errors.
223func (s *Store) ServiceUsage() (uint64, ServiceUsage, error) {
224	tx := s.db.ReadTxn()
225	defer tx.Abort()
226
227	serviceInstances, err := firstUsageEntry(tx, tableServices)
228	if err != nil {
229		return 0, ServiceUsage{}, fmt.Errorf("failed services lookup: %s", err)
230	}
231
232	services, err := firstUsageEntry(tx, serviceNamesUsageTable)
233	if err != nil {
234		return 0, ServiceUsage{}, fmt.Errorf("failed services lookup: %s", err)
235	}
236
237	usage := ServiceUsage{
238		ServiceInstances: serviceInstances.Count,
239		Services:         services.Count,
240	}
241	results, err := compileEnterpriseUsage(tx, usage)
242	if err != nil {
243		return 0, ServiceUsage{}, fmt.Errorf("failed services lookup: %s", err)
244	}
245
246	return serviceInstances.Index, results, nil
247}
248
249func firstUsageEntry(tx ReadTxn, id string) (*UsageEntry, error) {
250	usage, err := tx.First("usage", "id", id)
251	if err != nil {
252		return nil, err
253	}
254
255	// If no elements have been inserted, the usage entry will not exist. We
256	// return a valid value so that can be certain the return value is not nil
257	// when no error has occurred.
258	if usage == nil {
259		return &UsageEntry{ID: id, Count: 0}, nil
260	}
261
262	realUsage, ok := usage.(*UsageEntry)
263	if !ok {
264		return nil, fmt.Errorf("failed usage lookup: type %T is not *UsageEntry", usage)
265	}
266
267	return realUsage, nil
268}
269