1package notifications 2 3import ( 4 "expvar" 5 "fmt" 6 "net/http" 7 "sync" 8) 9 10// EndpointMetrics track various actions taken by the endpoint, typically by 11// number of events. The goal of this to export it via expvar but we may find 12// some other future solution to be better. 13type EndpointMetrics struct { 14 Pending int // events pending in queue 15 Events int // total events incoming 16 Successes int // total events written successfully 17 Failures int // total events failed 18 Errors int // total events errored 19 Statuses map[string]int // status code histogram, per call event 20} 21 22// safeMetrics guards the metrics implementation with a lock and provides a 23// safe update function. 24type safeMetrics struct { 25 EndpointMetrics 26 sync.Mutex // protects statuses map 27} 28 29// newSafeMetrics returns safeMetrics with map allocated. 30func newSafeMetrics() *safeMetrics { 31 var sm safeMetrics 32 sm.Statuses = make(map[string]int) 33 return &sm 34} 35 36// httpStatusListener returns the listener for the http sink that updates the 37// relevant counters. 38func (sm *safeMetrics) httpStatusListener() httpStatusListener { 39 return &endpointMetricsHTTPStatusListener{ 40 safeMetrics: sm, 41 } 42} 43 44// eventQueueListener returns a listener that maintains queue related counters. 45func (sm *safeMetrics) eventQueueListener() eventQueueListener { 46 return &endpointMetricsEventQueueListener{ 47 safeMetrics: sm, 48 } 49} 50 51// endpointMetricsHTTPStatusListener increments counters related to http sinks 52// for the relevant events. 53type endpointMetricsHTTPStatusListener struct { 54 *safeMetrics 55} 56 57var _ httpStatusListener = &endpointMetricsHTTPStatusListener{} 58 59func (emsl *endpointMetricsHTTPStatusListener) success(status int, events ...Event) { 60 emsl.safeMetrics.Lock() 61 defer emsl.safeMetrics.Unlock() 62 emsl.Statuses[fmt.Sprintf("%d %s", status, http.StatusText(status))] += len(events) 63 emsl.Successes += len(events) 64} 65 66func (emsl *endpointMetricsHTTPStatusListener) failure(status int, events ...Event) { 67 emsl.safeMetrics.Lock() 68 defer emsl.safeMetrics.Unlock() 69 emsl.Statuses[fmt.Sprintf("%d %s", status, http.StatusText(status))] += len(events) 70 emsl.Failures += len(events) 71} 72 73func (emsl *endpointMetricsHTTPStatusListener) err(err error, events ...Event) { 74 emsl.safeMetrics.Lock() 75 defer emsl.safeMetrics.Unlock() 76 emsl.Errors += len(events) 77} 78 79// endpointMetricsEventQueueListener maintains the incoming events counter and 80// the queues pending count. 81type endpointMetricsEventQueueListener struct { 82 *safeMetrics 83} 84 85func (eqc *endpointMetricsEventQueueListener) ingress(events ...Event) { 86 eqc.Lock() 87 defer eqc.Unlock() 88 eqc.Events += len(events) 89 eqc.Pending += len(events) 90} 91 92func (eqc *endpointMetricsEventQueueListener) egress(events ...Event) { 93 eqc.Lock() 94 defer eqc.Unlock() 95 eqc.Pending -= len(events) 96} 97 98// endpoints is global registry of endpoints used to report metrics to expvar 99var endpoints struct { 100 registered []*Endpoint 101 mu sync.Mutex 102} 103 104// register places the endpoint into expvar so that stats are tracked. 105func register(e *Endpoint) { 106 endpoints.mu.Lock() 107 defer endpoints.mu.Unlock() 108 109 endpoints.registered = append(endpoints.registered, e) 110} 111 112func init() { 113 // NOTE(stevvooe): Setup registry metrics structure to report to expvar. 114 // Ideally, we do more metrics through logging but we need some nice 115 // realtime metrics for queue state for now. 116 117 registry := expvar.Get("registry") 118 119 if registry == nil { 120 registry = expvar.NewMap("registry") 121 } 122 123 var notifications expvar.Map 124 notifications.Init() 125 notifications.Set("endpoints", expvar.Func(func() interface{} { 126 endpoints.mu.Lock() 127 defer endpoints.mu.Unlock() 128 129 var names []interface{} 130 for _, v := range endpoints.registered { 131 var epjson struct { 132 Name string `json:"name"` 133 URL string `json:"url"` 134 EndpointConfig 135 136 Metrics EndpointMetrics 137 } 138 139 epjson.Name = v.Name() 140 epjson.URL = v.URL() 141 epjson.EndpointConfig = v.EndpointConfig 142 143 v.ReadMetrics(&epjson.Metrics) 144 145 names = append(names, epjson) 146 } 147 148 return names 149 })) 150 151 registry.(*expvar.Map).Set("notifications", ¬ifications) 152} 153