1package csm
2
3import (
4	"encoding/json"
5	"net"
6	"time"
7
8	"github.com/aws/aws-sdk-go/aws"
9	"github.com/aws/aws-sdk-go/aws/awserr"
10	"github.com/aws/aws-sdk-go/aws/request"
11)
12
13// Reporter will gather metrics of API requests made and
14// send those metrics to the CSM endpoint.
15type Reporter struct {
16	clientID  string
17	url       string
18	conn      net.Conn
19	metricsCh metricChan
20	done      chan struct{}
21}
22
23var (
24	sender *Reporter
25)
26
27func connect(url string) error {
28	const network = "udp"
29	if err := sender.connect(network, url); err != nil {
30		return err
31	}
32
33	if sender.done == nil {
34		sender.done = make(chan struct{})
35		go sender.start()
36	}
37
38	return nil
39}
40
41func newReporter(clientID, url string) *Reporter {
42	return &Reporter{
43		clientID:  clientID,
44		url:       url,
45		metricsCh: newMetricChan(MetricsChannelSize),
46	}
47}
48
49func (rep *Reporter) sendAPICallAttemptMetric(r *request.Request) {
50	if rep == nil {
51		return
52	}
53
54	now := time.Now()
55	creds, _ := r.Config.Credentials.Get()
56
57	m := metric{
58		ClientID:  aws.String(rep.clientID),
59		API:       aws.String(r.Operation.Name),
60		Service:   aws.String(r.ClientInfo.ServiceID),
61		Timestamp: (*metricTime)(&now),
62		UserAgent: aws.String(r.HTTPRequest.Header.Get("User-Agent")),
63		Region:    r.Config.Region,
64		Type:      aws.String("ApiCallAttempt"),
65		Version:   aws.Int(1),
66
67		XAmzRequestID: aws.String(r.RequestID),
68
69		AttemptLatency: aws.Int(int(now.Sub(r.AttemptTime).Nanoseconds() / int64(time.Millisecond))),
70		AccessKey:      aws.String(creds.AccessKeyID),
71	}
72
73	if r.HTTPResponse != nil {
74		m.HTTPStatusCode = aws.Int(r.HTTPResponse.StatusCode)
75	}
76
77	if r.Error != nil {
78		if awserr, ok := r.Error.(awserr.Error); ok {
79			m.SetException(getMetricException(awserr))
80		}
81	}
82
83	m.TruncateFields()
84	rep.metricsCh.Push(m)
85}
86
87func getMetricException(err awserr.Error) metricException {
88	msg := err.Error()
89	code := err.Code()
90
91	switch code {
92	case "RequestError",
93		request.ErrCodeSerialization,
94		request.CanceledErrorCode:
95		return sdkException{
96			requestException{exception: code, message: msg},
97		}
98	default:
99		return awsException{
100			requestException{exception: code, message: msg},
101		}
102	}
103}
104
105func (rep *Reporter) sendAPICallMetric(r *request.Request) {
106	if rep == nil {
107		return
108	}
109
110	now := time.Now()
111	m := metric{
112		ClientID:           aws.String(rep.clientID),
113		API:                aws.String(r.Operation.Name),
114		Service:            aws.String(r.ClientInfo.ServiceID),
115		Timestamp:          (*metricTime)(&now),
116		UserAgent:          aws.String(r.HTTPRequest.Header.Get("User-Agent")),
117		Type:               aws.String("ApiCall"),
118		AttemptCount:       aws.Int(r.RetryCount + 1),
119		Region:             r.Config.Region,
120		Latency:            aws.Int(int(time.Since(r.Time) / time.Millisecond)),
121		XAmzRequestID:      aws.String(r.RequestID),
122		MaxRetriesExceeded: aws.Int(boolIntValue(r.RetryCount >= r.MaxRetries())),
123	}
124
125	if r.HTTPResponse != nil {
126		m.FinalHTTPStatusCode = aws.Int(r.HTTPResponse.StatusCode)
127	}
128
129	if r.Error != nil {
130		if awserr, ok := r.Error.(awserr.Error); ok {
131			m.SetFinalException(getMetricException(awserr))
132		}
133	}
134
135	m.TruncateFields()
136
137	// TODO: Probably want to figure something out for logging dropped
138	// metrics
139	rep.metricsCh.Push(m)
140}
141
142func (rep *Reporter) connect(network, url string) error {
143	if rep.conn != nil {
144		rep.conn.Close()
145	}
146
147	conn, err := net.Dial(network, url)
148	if err != nil {
149		return awserr.New("UDPError", "Could not connect", err)
150	}
151
152	rep.conn = conn
153
154	return nil
155}
156
157func (rep *Reporter) close() {
158	if rep.done != nil {
159		close(rep.done)
160	}
161
162	rep.metricsCh.Pause()
163}
164
165func (rep *Reporter) start() {
166	defer func() {
167		rep.metricsCh.Pause()
168	}()
169
170	for {
171		select {
172		case <-rep.done:
173			rep.done = nil
174			return
175		case m := <-rep.metricsCh.ch:
176			// TODO: What to do with this error? Probably should just log
177			b, err := json.Marshal(m)
178			if err != nil {
179				continue
180			}
181
182			rep.conn.Write(b)
183		}
184	}
185}
186
187// Pause will pause the metric channel preventing any new metrics from being
188// added. It is safe to call concurrently with other calls to Pause, but if
189// called concurently with Continue can lead to unexpected state.
190func (rep *Reporter) Pause() {
191	lock.Lock()
192	defer lock.Unlock()
193
194	if rep == nil {
195		return
196	}
197
198	rep.close()
199}
200
201// Continue will reopen the metric channel and allow for monitoring to be
202// resumed. It is safe to call concurrently with other calls to Continue, but
203// if called concurently with Pause can lead to unexpected state.
204func (rep *Reporter) Continue() {
205	lock.Lock()
206	defer lock.Unlock()
207	if rep == nil {
208		return
209	}
210
211	if !rep.metricsCh.IsPaused() {
212		return
213	}
214
215	rep.metricsCh.Continue()
216}
217
218// Client side metric handler names
219const (
220	APICallMetricHandlerName        = "awscsm.SendAPICallMetric"
221	APICallAttemptMetricHandlerName = "awscsm.SendAPICallAttemptMetric"
222)
223
224// InjectHandlers will will enable client side metrics and inject the proper
225// handlers to handle how metrics are sent.
226//
227// InjectHandlers is NOT safe to call concurrently. Calling InjectHandlers
228// multiple times may lead to unexpected behavior, (e.g. duplicate metrics).
229//
230//		// Start must be called in order to inject the correct handlers
231//		r, err := csm.Start("clientID", "127.0.0.1:8094")
232//		if err != nil {
233//			panic(fmt.Errorf("expected no error, but received %v", err))
234//		}
235//
236//		sess := session.NewSession()
237//		r.InjectHandlers(&sess.Handlers)
238//
239//		// create a new service client with our client side metric session
240//		svc := s3.New(sess)
241func (rep *Reporter) InjectHandlers(handlers *request.Handlers) {
242	if rep == nil {
243		return
244	}
245
246	handlers.Complete.PushFrontNamed(request.NamedHandler{
247		Name: APICallMetricHandlerName,
248		Fn:   rep.sendAPICallMetric,
249	})
250
251	handlers.CompleteAttempt.PushFrontNamed(request.NamedHandler{
252		Name: APICallAttemptMetricHandlerName,
253		Fn:   rep.sendAPICallAttemptMetric,
254	})
255}
256
257// boolIntValue return 1 for true and 0 for false.
258func boolIntValue(b bool) int {
259	if b {
260		return 1
261	}
262
263	return 0
264}
265