1package notifications
2
3import (
4	"expvar"
5	"fmt"
6	"net/http"
7	"sync"
8)
9
10// EndpointMetrics track various actions taken by the endpoint, typically by
11// number of events. The goal of this to export it via expvar but we may find
12// some other future solution to be better.
13type EndpointMetrics struct {
14	Pending   int            // events pending in queue
15	Events    int            // total events incoming
16	Successes int            // total events written successfully
17	Failures  int            // total events failed
18	Errors    int            // total events errored
19	Statuses  map[string]int // status code histogram, per call event
20}
21
22// safeMetrics guards the metrics implementation with a lock and provides a
23// safe update function.
24type safeMetrics struct {
25	EndpointMetrics
26	sync.Mutex // protects statuses map
27}
28
29// newSafeMetrics returns safeMetrics with map allocated.
30func newSafeMetrics() *safeMetrics {
31	var sm safeMetrics
32	sm.Statuses = make(map[string]int)
33	return &sm
34}
35
36// httpStatusListener returns the listener for the http sink that updates the
37// relevant counters.
38func (sm *safeMetrics) httpStatusListener() httpStatusListener {
39	return &endpointMetricsHTTPStatusListener{
40		safeMetrics: sm,
41	}
42}
43
44// eventQueueListener returns a listener that maintains queue related counters.
45func (sm *safeMetrics) eventQueueListener() eventQueueListener {
46	return &endpointMetricsEventQueueListener{
47		safeMetrics: sm,
48	}
49}
50
51// endpointMetricsHTTPStatusListener increments counters related to http sinks
52// for the relevant events.
53type endpointMetricsHTTPStatusListener struct {
54	*safeMetrics
55}
56
57var _ httpStatusListener = &endpointMetricsHTTPStatusListener{}
58
59func (emsl *endpointMetricsHTTPStatusListener) success(status int, events ...Event) {
60	emsl.safeMetrics.Lock()
61	defer emsl.safeMetrics.Unlock()
62	emsl.Statuses[fmt.Sprintf("%d %s", status, http.StatusText(status))] += len(events)
63	emsl.Successes += len(events)
64}
65
66func (emsl *endpointMetricsHTTPStatusListener) failure(status int, events ...Event) {
67	emsl.safeMetrics.Lock()
68	defer emsl.safeMetrics.Unlock()
69	emsl.Statuses[fmt.Sprintf("%d %s", status, http.StatusText(status))] += len(events)
70	emsl.Failures += len(events)
71}
72
73func (emsl *endpointMetricsHTTPStatusListener) err(err error, events ...Event) {
74	emsl.safeMetrics.Lock()
75	defer emsl.safeMetrics.Unlock()
76	emsl.Errors += len(events)
77}
78
79// endpointMetricsEventQueueListener maintains the incoming events counter and
80// the queues pending count.
81type endpointMetricsEventQueueListener struct {
82	*safeMetrics
83}
84
85func (eqc *endpointMetricsEventQueueListener) ingress(events ...Event) {
86	eqc.Lock()
87	defer eqc.Unlock()
88	eqc.Events += len(events)
89	eqc.Pending += len(events)
90}
91
92func (eqc *endpointMetricsEventQueueListener) egress(events ...Event) {
93	eqc.Lock()
94	defer eqc.Unlock()
95	eqc.Pending -= len(events)
96}
97
98// endpoints is global registry of endpoints used to report metrics to expvar
99var endpoints struct {
100	registered []*Endpoint
101	mu         sync.Mutex
102}
103
104// register places the endpoint into expvar so that stats are tracked.
105func register(e *Endpoint) {
106	endpoints.mu.Lock()
107	defer endpoints.mu.Unlock()
108
109	endpoints.registered = append(endpoints.registered, e)
110}
111
112func init() {
113	// NOTE(stevvooe): Setup registry metrics structure to report to expvar.
114	// Ideally, we do more metrics through logging but we need some nice
115	// realtime metrics for queue state for now.
116
117	registry := expvar.Get("registry")
118
119	if registry == nil {
120		registry = expvar.NewMap("registry")
121	}
122
123	var notifications expvar.Map
124	notifications.Init()
125	notifications.Set("endpoints", expvar.Func(func() interface{} {
126		endpoints.mu.Lock()
127		defer endpoints.mu.Unlock()
128
129		var names []interface{}
130		for _, v := range endpoints.registered {
131			var epjson struct {
132				Name string `json:"name"`
133				URL  string `json:"url"`
134				EndpointConfig
135
136				Metrics EndpointMetrics
137			}
138
139			epjson.Name = v.Name()
140			epjson.URL = v.URL()
141			epjson.EndpointConfig = v.EndpointConfig
142
143			v.ReadMetrics(&epjson.Metrics)
144
145			names = append(names, epjson)
146		}
147
148		return names
149	}))
150
151	registry.(*expvar.Map).Set("notifications", &notifications)
152}
153