1package inmemory
2
3import (
4	"fmt"
5	"sync"
6	"sync/atomic"
7
8	"github.com/splitio/go-split-commons/v3/dtos"
9	"github.com/splitio/go-split-commons/v3/storage"
10	constants "github.com/splitio/go-split-commons/v3/telemetry"
11)
12
13type latencies struct {
14	// MethodLatencies
15	treatment            AtomicInt64Slice
16	treatments           AtomicInt64Slice
17	treatmentWithConfig  AtomicInt64Slice
18	treatmentsWithConfig AtomicInt64Slice
19	track                AtomicInt64Slice
20
21	// HTTPLatencies
22	splits           AtomicInt64Slice
23	segments         AtomicInt64Slice
24	impressions      AtomicInt64Slice
25	impressionsCount AtomicInt64Slice
26	events           AtomicInt64Slice
27	telemetry        AtomicInt64Slice
28	token            AtomicInt64Slice
29}
30
31type counters struct {
32	// Evaluation Counters
33	treatment            int64
34	treatments           int64
35	treatmentWithConfig  int64
36	treatmentsWithConfig int64
37	track                int64
38
39	// Push Counters
40	authRejections int64
41	tokenRefreshes int64
42
43	// Factory Counters
44	burTimeouts    int64
45	nonReadyUsages int64
46}
47
48type records struct {
49	// Impressions Data
50	impressionsQueued  int64
51	impressionsDropped int64
52	impressionsDeduped int64
53
54	// Events Data
55	eventsQueued  int64
56	eventsDropped int64
57
58	// LastSynchronization
59	splits           int64
60	segments         int64
61	impressions      int64
62	impressionsCount int64
63	events           int64
64	token            int64
65	telemetry        int64
66
67	// SDK
68	session int64
69}
70
71// TelemetryStorage In Memory Telemetry Storage struct
72type TelemetryStorage struct {
73	counters             counters
74	httpErrors           dtos.HTTPErrors
75	mutexHTTPErrors      sync.RWMutex
76	latencies            latencies
77	records              records
78	streamingEvents      []dtos.StreamingEvent // Max Length 20
79	mutexStreamingEvents sync.RWMutex
80	tags                 []string
81	mutexTags            sync.RWMutex
82}
83
84// NewTelemetryStorage builds in memory telemetry storage
85func NewTelemetryStorage() (storage.TelemetryStorage, error) {
86	treatmentLatencies, err := NewAtomicInt64Slice(constants.LatencyBucketCount)
87	if err != nil {
88		return nil, fmt.Errorf("could not create InMemory Storage, %w", err)
89	}
90	treatmentWithConfigLatencies, err := NewAtomicInt64Slice(constants.LatencyBucketCount)
91	if err != nil {
92		return nil, fmt.Errorf("could not create InMemory Storage, %w", err)
93	}
94	treatmentsLatencies, err := NewAtomicInt64Slice(constants.LatencyBucketCount)
95	if err != nil {
96		return nil, fmt.Errorf("could not create InMemory Storage, %w", err)
97	}
98	treatmentsWithConfigLatencies, err := NewAtomicInt64Slice(constants.LatencyBucketCount)
99	if err != nil {
100		return nil, fmt.Errorf("could not create InMemory Storage, %w", err)
101	}
102	track, err := NewAtomicInt64Slice(constants.LatencyBucketCount)
103	if err != nil {
104		return nil, fmt.Errorf("could not create InMemory Storage, %w", err)
105	}
106
107	splits, err := NewAtomicInt64Slice(constants.LatencyBucketCount)
108	if err != nil {
109		return nil, fmt.Errorf("could not create InMemory Storage, %w", err)
110	}
111	segments, err := NewAtomicInt64Slice(constants.LatencyBucketCount)
112	if err != nil {
113		return nil, fmt.Errorf("could not create InMemory Storage, %w", err)
114	}
115	impressions, err := NewAtomicInt64Slice(constants.LatencyBucketCount)
116	if err != nil {
117		return nil, fmt.Errorf("could not create InMemory Storage, %w", err)
118	}
119	impressionsCount, err := NewAtomicInt64Slice(constants.LatencyBucketCount)
120	if err != nil {
121		return nil, fmt.Errorf("could not create InMemory Storage, %w", err)
122	}
123	events, err := NewAtomicInt64Slice(constants.LatencyBucketCount)
124	if err != nil {
125		return nil, fmt.Errorf("could not create InMemory Storage, %w", err)
126	}
127	telemetry, err := NewAtomicInt64Slice(constants.LatencyBucketCount)
128	if err != nil {
129		return nil, fmt.Errorf("could not create InMemory Storage, %w", err)
130	}
131	token, err := NewAtomicInt64Slice(constants.LatencyBucketCount)
132	if err != nil {
133		return nil, fmt.Errorf("could not create InMemory Storage, %w", err)
134	}
135
136	return &TelemetryStorage{
137		counters: counters{},
138		httpErrors: dtos.HTTPErrors{
139			Splits:           make(map[int]int64),
140			Segments:         make(map[int]int64),
141			Impressions:      make(map[int]int64),
142			ImpressionsCount: make(map[int]int64),
143			Events:           make(map[int]int64),
144			Token:            make(map[int]int64),
145			Telemetry:        make(map[int]int64),
146		},
147		mutexHTTPErrors: sync.RWMutex{},
148		latencies: latencies{
149			treatment:            treatmentLatencies,
150			treatmentWithConfig:  treatmentWithConfigLatencies,
151			treatments:           treatmentsLatencies,
152			treatmentsWithConfig: treatmentsWithConfigLatencies,
153			track:                track,
154
155			splits:           splits,
156			segments:         segments,
157			impressions:      impressions,
158			impressionsCount: impressionsCount,
159			events:           events,
160			token:            token,
161			telemetry:        telemetry,
162		},
163		records:              records{},
164		streamingEvents:      make([]dtos.StreamingEvent, 0, constants.MaxStreamingEvents),
165		mutexStreamingEvents: sync.RWMutex{},
166		tags:                 make([]string, 0, constants.MaxTags),
167		mutexTags:            sync.RWMutex{},
168	}, nil
169}
170
171// TELEMETRY STORAGE PRODUCER
172
173func (i *TelemetryStorage) RecordConfigData(configData dtos.Config) error {
174	// No-Op. Config Data will be sent directly to Split Servers. No need to store.
175	return nil
176}
177
178// RecordLatency stores latency for method
179func (i *TelemetryStorage) RecordLatency(method string, latency int64) {
180	bucket := constants.Bucket(latency)
181	switch method {
182	case constants.Treatment:
183		i.latencies.treatment.Incr(bucket)
184	case constants.Treatments:
185		i.latencies.treatments.Incr(bucket)
186	case constants.TreatmentWithConfig:
187		i.latencies.treatmentWithConfig.Incr(bucket)
188	case constants.TreatmentsWithConfig:
189		i.latencies.treatmentsWithConfig.Incr(bucket)
190	case constants.Track:
191		i.latencies.track.Incr(bucket)
192	}
193}
194
195// RecordException stores exceptions for method
196func (i *TelemetryStorage) RecordException(method string) {
197	switch method {
198	case constants.Treatment:
199		atomic.AddInt64(&i.counters.treatment, 1)
200	case constants.Treatments:
201		atomic.AddInt64(&i.counters.treatments, 1)
202	case constants.TreatmentWithConfig:
203		atomic.AddInt64(&i.counters.treatmentWithConfig, 1)
204	case constants.TreatmentsWithConfig:
205		atomic.AddInt64(&i.counters.treatmentsWithConfig, 1)
206	case constants.Track:
207		atomic.AddInt64(&i.counters.track, 1)
208	}
209}
210
211// RecordImpressionsStats records impressions by type
212func (i *TelemetryStorage) RecordImpressionsStats(dataType int, count int64) {
213	switch dataType {
214	case constants.ImpressionsDropped:
215		atomic.AddInt64(&i.records.impressionsDropped, count)
216	case constants.ImpressionsDeduped:
217		atomic.AddInt64(&i.records.impressionsDeduped, count)
218	case constants.ImpressionsQueued:
219		atomic.AddInt64(&i.records.impressionsQueued, count)
220	}
221}
222
223// RecordEventsStats recirds events by type
224func (i *TelemetryStorage) RecordEventsStats(dataType int, count int64) {
225	switch dataType {
226	case constants.EventsDropped:
227		atomic.AddInt64(&i.records.eventsDropped, count)
228	case constants.EventsQueued:
229		atomic.AddInt64(&i.records.eventsQueued, count)
230	}
231}
232
233// RecordSuccessfulSync records sync for resource
234func (i *TelemetryStorage) RecordSuccessfulSync(resource int, timestamp int64) {
235	switch resource {
236	case constants.SplitSync:
237		atomic.StoreInt64(&i.records.splits, timestamp)
238	case constants.SegmentSync:
239		atomic.StoreInt64(&i.records.segments, timestamp)
240	case constants.ImpressionSync:
241		atomic.StoreInt64(&i.records.impressions, timestamp)
242	case constants.ImpressionCountSync:
243		atomic.StoreInt64(&i.records.impressionsCount, timestamp)
244	case constants.EventSync:
245		atomic.StoreInt64(&i.records.events, timestamp)
246	case constants.TelemetrySync:
247		atomic.StoreInt64(&i.records.telemetry, timestamp)
248	case constants.TokenSync:
249		atomic.StoreInt64(&i.records.token, timestamp)
250	}
251}
252
253func (i *TelemetryStorage) createOrUpdate(status int, item map[int]int64) {
254	if item == nil {
255		item[status] = 1
256		return
257	}
258	item[status]++
259}
260
261// RecordSyncError records http error
262func (i *TelemetryStorage) RecordSyncError(resource int, status int) {
263	i.mutexHTTPErrors.Lock()
264	defer i.mutexHTTPErrors.Unlock()
265	switch resource {
266	case constants.SplitSync:
267		i.createOrUpdate(status, i.httpErrors.Splits)
268	case constants.SegmentSync:
269		i.createOrUpdate(status, i.httpErrors.Segments)
270	case constants.ImpressionSync:
271		i.createOrUpdate(status, i.httpErrors.Impressions)
272	case constants.ImpressionCountSync:
273		i.createOrUpdate(status, i.httpErrors.ImpressionsCount)
274	case constants.EventSync:
275		i.createOrUpdate(status, i.httpErrors.Events)
276	case constants.TelemetrySync:
277		i.createOrUpdate(status, i.httpErrors.Telemetry)
278	case constants.TokenSync:
279		i.createOrUpdate(status, i.httpErrors.Token)
280	}
281}
282
283// RecordSyncLatency records http error
284func (i *TelemetryStorage) RecordSyncLatency(resource int, latency int64) {
285	bucket := constants.Bucket(latency)
286	switch resource {
287	case constants.SplitSync:
288		i.latencies.splits.Incr(bucket)
289	case constants.SegmentSync:
290		i.latencies.segments.Incr(bucket)
291	case constants.ImpressionSync:
292		i.latencies.impressions.Incr(bucket)
293	case constants.ImpressionCountSync:
294		i.latencies.impressionsCount.Incr(bucket)
295	case constants.EventSync:
296		i.latencies.events.Incr(bucket)
297	case constants.TelemetrySync:
298		i.latencies.telemetry.Incr(bucket)
299	case constants.TokenSync:
300		i.latencies.token.Incr(bucket)
301	}
302}
303
304// RecordAuthRejections records auth rejections
305func (i *TelemetryStorage) RecordAuthRejections() {
306	atomic.AddInt64(&i.counters.authRejections, 1)
307}
308
309// RecordTokenRefreshes records token
310func (i *TelemetryStorage) RecordTokenRefreshes() {
311	atomic.AddInt64(&i.counters.tokenRefreshes, 1)
312}
313
314// RecordStreamingEvent appends new streaming event
315func (i *TelemetryStorage) RecordStreamingEvent(event *dtos.StreamingEvent) {
316	if event == nil {
317		return
318	}
319	i.mutexStreamingEvents.Lock()
320	defer i.mutexStreamingEvents.Unlock()
321	if len(i.streamingEvents) < constants.MaxStreamingEvents {
322		i.streamingEvents = append(i.streamingEvents, *event)
323	}
324}
325
326// AddTag adds particular tag
327func (i *TelemetryStorage) AddTag(tag string) {
328	i.mutexTags.Lock()
329	defer i.mutexTags.Unlock()
330	if len(i.tags) < constants.MaxTags {
331		i.tags = append(i.tags, tag)
332	}
333}
334
335// RecordSessionLength records session length
336func (i *TelemetryStorage) RecordSessionLength(session int64) {
337	atomic.StoreInt64(&i.records.session, session)
338}
339
340// RecordNonReadyUsage records non ready usage
341func (i *TelemetryStorage) RecordNonReadyUsage() {
342	atomic.AddInt64(&i.counters.nonReadyUsages, 1)
343}
344
345// RecordBURTimeout records bur timeodout
346func (i *TelemetryStorage) RecordBURTimeout() {
347	atomic.AddInt64(&i.counters.burTimeouts, 1)
348}
349
350// TELEMETRY STORAGE CONSUMER
351
352// PopLatencies gets and clears method latencies
353func (i *TelemetryStorage) PopLatencies() dtos.MethodLatencies {
354	return dtos.MethodLatencies{
355		Treatment:            i.latencies.treatment.FetchAndClearAll(),
356		Treatments:           i.latencies.treatments.FetchAndClearAll(),
357		TreatmentWithConfig:  i.latencies.treatmentWithConfig.FetchAndClearAll(),
358		TreatmentsWithConfig: i.latencies.treatmentsWithConfig.FetchAndClearAll(),
359		Track:                i.latencies.track.FetchAndClearAll(),
360	}
361}
362
363// PopExceptions gets and clears method exceptions
364func (i *TelemetryStorage) PopExceptions() dtos.MethodExceptions {
365	return dtos.MethodExceptions{
366		Treatment:            atomic.SwapInt64(&i.counters.treatment, 0),
367		Treatments:           atomic.SwapInt64(&i.counters.treatments, 0),
368		TreatmentWithConfig:  atomic.SwapInt64(&i.counters.treatmentWithConfig, 0),
369		TreatmentsWithConfig: atomic.SwapInt64(&i.counters.treatmentsWithConfig, 0),
370		Track:                atomic.SwapInt64(&i.counters.track, 0),
371	}
372}
373
374// GetImpressionsStats gets impressions by type
375func (i *TelemetryStorage) GetImpressionsStats(dataType int) int64 {
376	switch dataType {
377	case constants.ImpressionsDropped:
378		return atomic.LoadInt64(&i.records.impressionsDropped)
379	case constants.ImpressionsDeduped:
380		return atomic.LoadInt64(&i.records.impressionsDeduped)
381	case constants.ImpressionsQueued:
382		return atomic.LoadInt64(&i.records.impressionsQueued)
383	}
384	return 0
385}
386
387// GetEventsStats gets events by type
388func (i *TelemetryStorage) GetEventsStats(dataType int) int64 {
389	switch dataType {
390	case constants.EventsDropped:
391		return atomic.LoadInt64(&i.records.eventsDropped)
392	case constants.EventsQueued:
393		return atomic.LoadInt64(&i.records.eventsQueued)
394	}
395	return 0
396}
397
398// GetLastSynchronization gets last synchronization stats for fetchers and recorders
399func (i *TelemetryStorage) GetLastSynchronization() dtos.LastSynchronization {
400	return dtos.LastSynchronization{
401		Splits:           atomic.LoadInt64(&i.records.splits),
402		Segments:         atomic.LoadInt64(&i.records.segments),
403		Impressions:      atomic.LoadInt64(&i.records.impressions),
404		ImpressionsCount: atomic.LoadInt64(&i.records.impressionsCount),
405		Events:           atomic.LoadInt64(&i.records.events),
406		Telemetry:        atomic.LoadInt64(&i.records.telemetry),
407		Token:            atomic.LoadInt64(&i.records.token),
408	}
409}
410
411// PopHTTPErrors gets http errors
412func (i *TelemetryStorage) PopHTTPErrors() dtos.HTTPErrors {
413	i.mutexHTTPErrors.Lock()
414	defer i.mutexHTTPErrors.Unlock()
415	toReturn := i.httpErrors
416	i.httpErrors.Splits = make(map[int]int64)
417	i.httpErrors.Segments = make(map[int]int64)
418	i.httpErrors.Impressions = make(map[int]int64)
419	i.httpErrors.ImpressionsCount = make(map[int]int64)
420	i.httpErrors.Events = make(map[int]int64)
421	i.httpErrors.Telemetry = make(map[int]int64)
422	i.httpErrors.Token = make(map[int]int64)
423	return toReturn
424}
425
426// PopHTTPLatencies gets http latencies
427func (i *TelemetryStorage) PopHTTPLatencies() dtos.HTTPLatencies {
428	return dtos.HTTPLatencies{
429		Splits:           i.latencies.splits.FetchAndClearAll(),
430		Segments:         i.latencies.segments.FetchAndClearAll(),
431		Impressions:      i.latencies.impressions.FetchAndClearAll(),
432		ImpressionsCount: i.latencies.impressionsCount.FetchAndClearAll(),
433		Events:           i.latencies.events.FetchAndClearAll(),
434		Telemetry:        i.latencies.telemetry.FetchAndClearAll(),
435		Token:            i.latencies.token.FetchAndClearAll(),
436	}
437}
438
439// PopAuthRejections gets total amount of auth rejections
440func (i *TelemetryStorage) PopAuthRejections() int64 {
441	return atomic.SwapInt64(&i.counters.authRejections, 0)
442}
443
444// PopTokenRefreshes gets total amount of token refreshes
445func (i *TelemetryStorage) PopTokenRefreshes() int64 {
446	return atomic.SwapInt64(&i.counters.tokenRefreshes, 0)
447}
448
449// PopStreamingEvents gets streamingEvents data
450func (i *TelemetryStorage) PopStreamingEvents() []dtos.StreamingEvent {
451	i.mutexStreamingEvents.Lock()
452	defer i.mutexStreamingEvents.Unlock()
453	toReturn := i.streamingEvents
454	i.streamingEvents = make([]dtos.StreamingEvent, 0, constants.MaxStreamingEvents)
455	return toReturn
456}
457
458// PopTags gets total amount of tags
459func (i *TelemetryStorage) PopTags() []string {
460	i.mutexTags.Lock()
461	defer i.mutexTags.Unlock()
462	toReturn := i.tags
463	i.tags = make([]string, 0, constants.MaxTags)
464	return toReturn
465}
466
467// GetSessionLength gets session duration
468func (i *TelemetryStorage) GetSessionLength() int64 {
469	return atomic.LoadInt64(&i.records.session)
470}
471
472// GetNonReadyUsages gets non usages on ready
473func (i *TelemetryStorage) GetNonReadyUsages() int64 {
474	return atomic.LoadInt64(&i.counters.nonReadyUsages)
475}
476
477// GetBURTimeouts gets timedouts data
478func (i *TelemetryStorage) GetBURTimeouts() int64 {
479	return atomic.LoadInt64(&i.counters.burTimeouts)
480}
481