1package inmemory 2 3import ( 4 "fmt" 5 "sync" 6 "sync/atomic" 7 8 "github.com/splitio/go-split-commons/v3/dtos" 9 "github.com/splitio/go-split-commons/v3/storage" 10 constants "github.com/splitio/go-split-commons/v3/telemetry" 11) 12 13type latencies struct { 14 // MethodLatencies 15 treatment AtomicInt64Slice 16 treatments AtomicInt64Slice 17 treatmentWithConfig AtomicInt64Slice 18 treatmentsWithConfig AtomicInt64Slice 19 track AtomicInt64Slice 20 21 // HTTPLatencies 22 splits AtomicInt64Slice 23 segments AtomicInt64Slice 24 impressions AtomicInt64Slice 25 impressionsCount AtomicInt64Slice 26 events AtomicInt64Slice 27 telemetry AtomicInt64Slice 28 token AtomicInt64Slice 29} 30 31type counters struct { 32 // Evaluation Counters 33 treatment int64 34 treatments int64 35 treatmentWithConfig int64 36 treatmentsWithConfig int64 37 track int64 38 39 // Push Counters 40 authRejections int64 41 tokenRefreshes int64 42 43 // Factory Counters 44 burTimeouts int64 45 nonReadyUsages int64 46} 47 48type records struct { 49 // Impressions Data 50 impressionsQueued int64 51 impressionsDropped int64 52 impressionsDeduped int64 53 54 // Events Data 55 eventsQueued int64 56 eventsDropped int64 57 58 // LastSynchronization 59 splits int64 60 segments int64 61 impressions int64 62 impressionsCount int64 63 events int64 64 token int64 65 telemetry int64 66 67 // SDK 68 session int64 69} 70 71// TelemetryStorage In Memory Telemetry Storage struct 72type TelemetryStorage struct { 73 counters counters 74 httpErrors dtos.HTTPErrors 75 mutexHTTPErrors sync.RWMutex 76 latencies latencies 77 records records 78 streamingEvents []dtos.StreamingEvent // Max Length 20 79 mutexStreamingEvents sync.RWMutex 80 tags []string 81 mutexTags sync.RWMutex 82} 83 84// NewTelemetryStorage builds in memory telemetry storage 85func NewTelemetryStorage() (storage.TelemetryStorage, error) { 86 treatmentLatencies, err := NewAtomicInt64Slice(constants.LatencyBucketCount) 87 if err != nil { 88 return nil, fmt.Errorf("could not create InMemory Storage, %w", err) 89 } 90 treatmentWithConfigLatencies, err := NewAtomicInt64Slice(constants.LatencyBucketCount) 91 if err != nil { 92 return nil, fmt.Errorf("could not create InMemory Storage, %w", err) 93 } 94 treatmentsLatencies, err := NewAtomicInt64Slice(constants.LatencyBucketCount) 95 if err != nil { 96 return nil, fmt.Errorf("could not create InMemory Storage, %w", err) 97 } 98 treatmentsWithConfigLatencies, err := NewAtomicInt64Slice(constants.LatencyBucketCount) 99 if err != nil { 100 return nil, fmt.Errorf("could not create InMemory Storage, %w", err) 101 } 102 track, err := NewAtomicInt64Slice(constants.LatencyBucketCount) 103 if err != nil { 104 return nil, fmt.Errorf("could not create InMemory Storage, %w", err) 105 } 106 107 splits, err := NewAtomicInt64Slice(constants.LatencyBucketCount) 108 if err != nil { 109 return nil, fmt.Errorf("could not create InMemory Storage, %w", err) 110 } 111 segments, err := NewAtomicInt64Slice(constants.LatencyBucketCount) 112 if err != nil { 113 return nil, fmt.Errorf("could not create InMemory Storage, %w", err) 114 } 115 impressions, err := NewAtomicInt64Slice(constants.LatencyBucketCount) 116 if err != nil { 117 return nil, fmt.Errorf("could not create InMemory Storage, %w", err) 118 } 119 impressionsCount, err := NewAtomicInt64Slice(constants.LatencyBucketCount) 120 if err != nil { 121 return nil, fmt.Errorf("could not create InMemory Storage, %w", err) 122 } 123 events, err := NewAtomicInt64Slice(constants.LatencyBucketCount) 124 if err != nil { 125 return nil, fmt.Errorf("could not create InMemory Storage, %w", err) 126 } 127 telemetry, err := NewAtomicInt64Slice(constants.LatencyBucketCount) 128 if err != nil { 129 return nil, fmt.Errorf("could not create InMemory Storage, %w", err) 130 } 131 token, err := NewAtomicInt64Slice(constants.LatencyBucketCount) 132 if err != nil { 133 return nil, fmt.Errorf("could not create InMemory Storage, %w", err) 134 } 135 136 return &TelemetryStorage{ 137 counters: counters{}, 138 httpErrors: dtos.HTTPErrors{ 139 Splits: make(map[int]int64), 140 Segments: make(map[int]int64), 141 Impressions: make(map[int]int64), 142 ImpressionsCount: make(map[int]int64), 143 Events: make(map[int]int64), 144 Token: make(map[int]int64), 145 Telemetry: make(map[int]int64), 146 }, 147 mutexHTTPErrors: sync.RWMutex{}, 148 latencies: latencies{ 149 treatment: treatmentLatencies, 150 treatmentWithConfig: treatmentWithConfigLatencies, 151 treatments: treatmentsLatencies, 152 treatmentsWithConfig: treatmentsWithConfigLatencies, 153 track: track, 154 155 splits: splits, 156 segments: segments, 157 impressions: impressions, 158 impressionsCount: impressionsCount, 159 events: events, 160 token: token, 161 telemetry: telemetry, 162 }, 163 records: records{}, 164 streamingEvents: make([]dtos.StreamingEvent, 0, constants.MaxStreamingEvents), 165 mutexStreamingEvents: sync.RWMutex{}, 166 tags: make([]string, 0, constants.MaxTags), 167 mutexTags: sync.RWMutex{}, 168 }, nil 169} 170 171// TELEMETRY STORAGE PRODUCER 172 173func (i *TelemetryStorage) RecordConfigData(configData dtos.Config) error { 174 // No-Op. Config Data will be sent directly to Split Servers. No need to store. 175 return nil 176} 177 178// RecordLatency stores latency for method 179func (i *TelemetryStorage) RecordLatency(method string, latency int64) { 180 bucket := constants.Bucket(latency) 181 switch method { 182 case constants.Treatment: 183 i.latencies.treatment.Incr(bucket) 184 case constants.Treatments: 185 i.latencies.treatments.Incr(bucket) 186 case constants.TreatmentWithConfig: 187 i.latencies.treatmentWithConfig.Incr(bucket) 188 case constants.TreatmentsWithConfig: 189 i.latencies.treatmentsWithConfig.Incr(bucket) 190 case constants.Track: 191 i.latencies.track.Incr(bucket) 192 } 193} 194 195// RecordException stores exceptions for method 196func (i *TelemetryStorage) RecordException(method string) { 197 switch method { 198 case constants.Treatment: 199 atomic.AddInt64(&i.counters.treatment, 1) 200 case constants.Treatments: 201 atomic.AddInt64(&i.counters.treatments, 1) 202 case constants.TreatmentWithConfig: 203 atomic.AddInt64(&i.counters.treatmentWithConfig, 1) 204 case constants.TreatmentsWithConfig: 205 atomic.AddInt64(&i.counters.treatmentsWithConfig, 1) 206 case constants.Track: 207 atomic.AddInt64(&i.counters.track, 1) 208 } 209} 210 211// RecordImpressionsStats records impressions by type 212func (i *TelemetryStorage) RecordImpressionsStats(dataType int, count int64) { 213 switch dataType { 214 case constants.ImpressionsDropped: 215 atomic.AddInt64(&i.records.impressionsDropped, count) 216 case constants.ImpressionsDeduped: 217 atomic.AddInt64(&i.records.impressionsDeduped, count) 218 case constants.ImpressionsQueued: 219 atomic.AddInt64(&i.records.impressionsQueued, count) 220 } 221} 222 223// RecordEventsStats recirds events by type 224func (i *TelemetryStorage) RecordEventsStats(dataType int, count int64) { 225 switch dataType { 226 case constants.EventsDropped: 227 atomic.AddInt64(&i.records.eventsDropped, count) 228 case constants.EventsQueued: 229 atomic.AddInt64(&i.records.eventsQueued, count) 230 } 231} 232 233// RecordSuccessfulSync records sync for resource 234func (i *TelemetryStorage) RecordSuccessfulSync(resource int, timestamp int64) { 235 switch resource { 236 case constants.SplitSync: 237 atomic.StoreInt64(&i.records.splits, timestamp) 238 case constants.SegmentSync: 239 atomic.StoreInt64(&i.records.segments, timestamp) 240 case constants.ImpressionSync: 241 atomic.StoreInt64(&i.records.impressions, timestamp) 242 case constants.ImpressionCountSync: 243 atomic.StoreInt64(&i.records.impressionsCount, timestamp) 244 case constants.EventSync: 245 atomic.StoreInt64(&i.records.events, timestamp) 246 case constants.TelemetrySync: 247 atomic.StoreInt64(&i.records.telemetry, timestamp) 248 case constants.TokenSync: 249 atomic.StoreInt64(&i.records.token, timestamp) 250 } 251} 252 253func (i *TelemetryStorage) createOrUpdate(status int, item map[int]int64) { 254 if item == nil { 255 item[status] = 1 256 return 257 } 258 item[status]++ 259} 260 261// RecordSyncError records http error 262func (i *TelemetryStorage) RecordSyncError(resource int, status int) { 263 i.mutexHTTPErrors.Lock() 264 defer i.mutexHTTPErrors.Unlock() 265 switch resource { 266 case constants.SplitSync: 267 i.createOrUpdate(status, i.httpErrors.Splits) 268 case constants.SegmentSync: 269 i.createOrUpdate(status, i.httpErrors.Segments) 270 case constants.ImpressionSync: 271 i.createOrUpdate(status, i.httpErrors.Impressions) 272 case constants.ImpressionCountSync: 273 i.createOrUpdate(status, i.httpErrors.ImpressionsCount) 274 case constants.EventSync: 275 i.createOrUpdate(status, i.httpErrors.Events) 276 case constants.TelemetrySync: 277 i.createOrUpdate(status, i.httpErrors.Telemetry) 278 case constants.TokenSync: 279 i.createOrUpdate(status, i.httpErrors.Token) 280 } 281} 282 283// RecordSyncLatency records http error 284func (i *TelemetryStorage) RecordSyncLatency(resource int, latency int64) { 285 bucket := constants.Bucket(latency) 286 switch resource { 287 case constants.SplitSync: 288 i.latencies.splits.Incr(bucket) 289 case constants.SegmentSync: 290 i.latencies.segments.Incr(bucket) 291 case constants.ImpressionSync: 292 i.latencies.impressions.Incr(bucket) 293 case constants.ImpressionCountSync: 294 i.latencies.impressionsCount.Incr(bucket) 295 case constants.EventSync: 296 i.latencies.events.Incr(bucket) 297 case constants.TelemetrySync: 298 i.latencies.telemetry.Incr(bucket) 299 case constants.TokenSync: 300 i.latencies.token.Incr(bucket) 301 } 302} 303 304// RecordAuthRejections records auth rejections 305func (i *TelemetryStorage) RecordAuthRejections() { 306 atomic.AddInt64(&i.counters.authRejections, 1) 307} 308 309// RecordTokenRefreshes records token 310func (i *TelemetryStorage) RecordTokenRefreshes() { 311 atomic.AddInt64(&i.counters.tokenRefreshes, 1) 312} 313 314// RecordStreamingEvent appends new streaming event 315func (i *TelemetryStorage) RecordStreamingEvent(event *dtos.StreamingEvent) { 316 if event == nil { 317 return 318 } 319 i.mutexStreamingEvents.Lock() 320 defer i.mutexStreamingEvents.Unlock() 321 if len(i.streamingEvents) < constants.MaxStreamingEvents { 322 i.streamingEvents = append(i.streamingEvents, *event) 323 } 324} 325 326// AddTag adds particular tag 327func (i *TelemetryStorage) AddTag(tag string) { 328 i.mutexTags.Lock() 329 defer i.mutexTags.Unlock() 330 if len(i.tags) < constants.MaxTags { 331 i.tags = append(i.tags, tag) 332 } 333} 334 335// RecordSessionLength records session length 336func (i *TelemetryStorage) RecordSessionLength(session int64) { 337 atomic.StoreInt64(&i.records.session, session) 338} 339 340// RecordNonReadyUsage records non ready usage 341func (i *TelemetryStorage) RecordNonReadyUsage() { 342 atomic.AddInt64(&i.counters.nonReadyUsages, 1) 343} 344 345// RecordBURTimeout records bur timeodout 346func (i *TelemetryStorage) RecordBURTimeout() { 347 atomic.AddInt64(&i.counters.burTimeouts, 1) 348} 349 350// TELEMETRY STORAGE CONSUMER 351 352// PopLatencies gets and clears method latencies 353func (i *TelemetryStorage) PopLatencies() dtos.MethodLatencies { 354 return dtos.MethodLatencies{ 355 Treatment: i.latencies.treatment.FetchAndClearAll(), 356 Treatments: i.latencies.treatments.FetchAndClearAll(), 357 TreatmentWithConfig: i.latencies.treatmentWithConfig.FetchAndClearAll(), 358 TreatmentsWithConfig: i.latencies.treatmentsWithConfig.FetchAndClearAll(), 359 Track: i.latencies.track.FetchAndClearAll(), 360 } 361} 362 363// PopExceptions gets and clears method exceptions 364func (i *TelemetryStorage) PopExceptions() dtos.MethodExceptions { 365 return dtos.MethodExceptions{ 366 Treatment: atomic.SwapInt64(&i.counters.treatment, 0), 367 Treatments: atomic.SwapInt64(&i.counters.treatments, 0), 368 TreatmentWithConfig: atomic.SwapInt64(&i.counters.treatmentWithConfig, 0), 369 TreatmentsWithConfig: atomic.SwapInt64(&i.counters.treatmentsWithConfig, 0), 370 Track: atomic.SwapInt64(&i.counters.track, 0), 371 } 372} 373 374// GetImpressionsStats gets impressions by type 375func (i *TelemetryStorage) GetImpressionsStats(dataType int) int64 { 376 switch dataType { 377 case constants.ImpressionsDropped: 378 return atomic.LoadInt64(&i.records.impressionsDropped) 379 case constants.ImpressionsDeduped: 380 return atomic.LoadInt64(&i.records.impressionsDeduped) 381 case constants.ImpressionsQueued: 382 return atomic.LoadInt64(&i.records.impressionsQueued) 383 } 384 return 0 385} 386 387// GetEventsStats gets events by type 388func (i *TelemetryStorage) GetEventsStats(dataType int) int64 { 389 switch dataType { 390 case constants.EventsDropped: 391 return atomic.LoadInt64(&i.records.eventsDropped) 392 case constants.EventsQueued: 393 return atomic.LoadInt64(&i.records.eventsQueued) 394 } 395 return 0 396} 397 398// GetLastSynchronization gets last synchronization stats for fetchers and recorders 399func (i *TelemetryStorage) GetLastSynchronization() dtos.LastSynchronization { 400 return dtos.LastSynchronization{ 401 Splits: atomic.LoadInt64(&i.records.splits), 402 Segments: atomic.LoadInt64(&i.records.segments), 403 Impressions: atomic.LoadInt64(&i.records.impressions), 404 ImpressionsCount: atomic.LoadInt64(&i.records.impressionsCount), 405 Events: atomic.LoadInt64(&i.records.events), 406 Telemetry: atomic.LoadInt64(&i.records.telemetry), 407 Token: atomic.LoadInt64(&i.records.token), 408 } 409} 410 411// PopHTTPErrors gets http errors 412func (i *TelemetryStorage) PopHTTPErrors() dtos.HTTPErrors { 413 i.mutexHTTPErrors.Lock() 414 defer i.mutexHTTPErrors.Unlock() 415 toReturn := i.httpErrors 416 i.httpErrors.Splits = make(map[int]int64) 417 i.httpErrors.Segments = make(map[int]int64) 418 i.httpErrors.Impressions = make(map[int]int64) 419 i.httpErrors.ImpressionsCount = make(map[int]int64) 420 i.httpErrors.Events = make(map[int]int64) 421 i.httpErrors.Telemetry = make(map[int]int64) 422 i.httpErrors.Token = make(map[int]int64) 423 return toReturn 424} 425 426// PopHTTPLatencies gets http latencies 427func (i *TelemetryStorage) PopHTTPLatencies() dtos.HTTPLatencies { 428 return dtos.HTTPLatencies{ 429 Splits: i.latencies.splits.FetchAndClearAll(), 430 Segments: i.latencies.segments.FetchAndClearAll(), 431 Impressions: i.latencies.impressions.FetchAndClearAll(), 432 ImpressionsCount: i.latencies.impressionsCount.FetchAndClearAll(), 433 Events: i.latencies.events.FetchAndClearAll(), 434 Telemetry: i.latencies.telemetry.FetchAndClearAll(), 435 Token: i.latencies.token.FetchAndClearAll(), 436 } 437} 438 439// PopAuthRejections gets total amount of auth rejections 440func (i *TelemetryStorage) PopAuthRejections() int64 { 441 return atomic.SwapInt64(&i.counters.authRejections, 0) 442} 443 444// PopTokenRefreshes gets total amount of token refreshes 445func (i *TelemetryStorage) PopTokenRefreshes() int64 { 446 return atomic.SwapInt64(&i.counters.tokenRefreshes, 0) 447} 448 449// PopStreamingEvents gets streamingEvents data 450func (i *TelemetryStorage) PopStreamingEvents() []dtos.StreamingEvent { 451 i.mutexStreamingEvents.Lock() 452 defer i.mutexStreamingEvents.Unlock() 453 toReturn := i.streamingEvents 454 i.streamingEvents = make([]dtos.StreamingEvent, 0, constants.MaxStreamingEvents) 455 return toReturn 456} 457 458// PopTags gets total amount of tags 459func (i *TelemetryStorage) PopTags() []string { 460 i.mutexTags.Lock() 461 defer i.mutexTags.Unlock() 462 toReturn := i.tags 463 i.tags = make([]string, 0, constants.MaxTags) 464 return toReturn 465} 466 467// GetSessionLength gets session duration 468func (i *TelemetryStorage) GetSessionLength() int64 { 469 return atomic.LoadInt64(&i.records.session) 470} 471 472// GetNonReadyUsages gets non usages on ready 473func (i *TelemetryStorage) GetNonReadyUsages() int64 { 474 return atomic.LoadInt64(&i.counters.nonReadyUsages) 475} 476 477// GetBURTimeouts gets timedouts data 478func (i *TelemetryStorage) GetBURTimeouts() int64 { 479 return atomic.LoadInt64(&i.counters.burTimeouts) 480} 481