1package csm 2 3import ( 4 "encoding/json" 5 "net" 6 "time" 7 8 "github.com/aws/aws-sdk-go/aws" 9 "github.com/aws/aws-sdk-go/aws/awserr" 10 "github.com/aws/aws-sdk-go/aws/request" 11) 12 13// Reporter will gather metrics of API requests made and 14// send those metrics to the CSM endpoint. 15type Reporter struct { 16 clientID string 17 url string 18 conn net.Conn 19 metricsCh metricChan 20 done chan struct{} 21} 22 23var ( 24 sender *Reporter 25) 26 27func connect(url string) error { 28 const network = "udp" 29 if err := sender.connect(network, url); err != nil { 30 return err 31 } 32 33 if sender.done == nil { 34 sender.done = make(chan struct{}) 35 go sender.start() 36 } 37 38 return nil 39} 40 41func newReporter(clientID, url string) *Reporter { 42 return &Reporter{ 43 clientID: clientID, 44 url: url, 45 metricsCh: newMetricChan(MetricsChannelSize), 46 } 47} 48 49func (rep *Reporter) sendAPICallAttemptMetric(r *request.Request) { 50 if rep == nil { 51 return 52 } 53 54 now := time.Now() 55 creds, _ := r.Config.Credentials.Get() 56 57 m := metric{ 58 ClientID: aws.String(rep.clientID), 59 API: aws.String(r.Operation.Name), 60 Service: aws.String(r.ClientInfo.ServiceID), 61 Timestamp: (*metricTime)(&now), 62 UserAgent: aws.String(r.HTTPRequest.Header.Get("User-Agent")), 63 Region: r.Config.Region, 64 Type: aws.String("ApiCallAttempt"), 65 Version: aws.Int(1), 66 67 XAmzRequestID: aws.String(r.RequestID), 68 69 AttemptLatency: aws.Int(int(now.Sub(r.AttemptTime).Nanoseconds() / int64(time.Millisecond))), 70 AccessKey: aws.String(creds.AccessKeyID), 71 } 72 73 if r.HTTPResponse != nil { 74 m.HTTPStatusCode = aws.Int(r.HTTPResponse.StatusCode) 75 } 76 77 if r.Error != nil { 78 if awserr, ok := r.Error.(awserr.Error); ok { 79 m.SetException(getMetricException(awserr)) 80 } 81 } 82 83 m.TruncateFields() 84 rep.metricsCh.Push(m) 85} 86 87func getMetricException(err awserr.Error) metricException { 88 msg := err.Error() 89 code := err.Code() 90 91 switch code { 92 case "RequestError", 93 request.ErrCodeSerialization, 94 request.CanceledErrorCode: 95 return sdkException{ 96 requestException{exception: code, message: msg}, 97 } 98 default: 99 return awsException{ 100 requestException{exception: code, message: msg}, 101 } 102 } 103} 104 105func (rep *Reporter) sendAPICallMetric(r *request.Request) { 106 if rep == nil { 107 return 108 } 109 110 now := time.Now() 111 m := metric{ 112 ClientID: aws.String(rep.clientID), 113 API: aws.String(r.Operation.Name), 114 Service: aws.String(r.ClientInfo.ServiceID), 115 Timestamp: (*metricTime)(&now), 116 UserAgent: aws.String(r.HTTPRequest.Header.Get("User-Agent")), 117 Type: aws.String("ApiCall"), 118 AttemptCount: aws.Int(r.RetryCount + 1), 119 Region: r.Config.Region, 120 Latency: aws.Int(int(time.Since(r.Time) / time.Millisecond)), 121 XAmzRequestID: aws.String(r.RequestID), 122 MaxRetriesExceeded: aws.Int(boolIntValue(r.RetryCount >= r.MaxRetries())), 123 } 124 125 if r.HTTPResponse != nil { 126 m.FinalHTTPStatusCode = aws.Int(r.HTTPResponse.StatusCode) 127 } 128 129 if r.Error != nil { 130 if awserr, ok := r.Error.(awserr.Error); ok { 131 m.SetFinalException(getMetricException(awserr)) 132 } 133 } 134 135 m.TruncateFields() 136 137 // TODO: Probably want to figure something out for logging dropped 138 // metrics 139 rep.metricsCh.Push(m) 140} 141 142func (rep *Reporter) connect(network, url string) error { 143 if rep.conn != nil { 144 rep.conn.Close() 145 } 146 147 conn, err := net.Dial(network, url) 148 if err != nil { 149 return awserr.New("UDPError", "Could not connect", err) 150 } 151 152 rep.conn = conn 153 154 return nil 155} 156 157func (rep *Reporter) close() { 158 if rep.done != nil { 159 close(rep.done) 160 } 161 162 rep.metricsCh.Pause() 163} 164 165func (rep *Reporter) start() { 166 defer func() { 167 rep.metricsCh.Pause() 168 }() 169 170 for { 171 select { 172 case <-rep.done: 173 rep.done = nil 174 return 175 case m := <-rep.metricsCh.ch: 176 // TODO: What to do with this error? Probably should just log 177 b, err := json.Marshal(m) 178 if err != nil { 179 continue 180 } 181 182 rep.conn.Write(b) 183 } 184 } 185} 186 187// Pause will pause the metric channel preventing any new metrics from being 188// added. It is safe to call concurrently with other calls to Pause, but if 189// called concurently with Continue can lead to unexpected state. 190func (rep *Reporter) Pause() { 191 lock.Lock() 192 defer lock.Unlock() 193 194 if rep == nil { 195 return 196 } 197 198 rep.close() 199} 200 201// Continue will reopen the metric channel and allow for monitoring to be 202// resumed. It is safe to call concurrently with other calls to Continue, but 203// if called concurently with Pause can lead to unexpected state. 204func (rep *Reporter) Continue() { 205 lock.Lock() 206 defer lock.Unlock() 207 if rep == nil { 208 return 209 } 210 211 if !rep.metricsCh.IsPaused() { 212 return 213 } 214 215 rep.metricsCh.Continue() 216} 217 218// Client side metric handler names 219const ( 220 APICallMetricHandlerName = "awscsm.SendAPICallMetric" 221 APICallAttemptMetricHandlerName = "awscsm.SendAPICallAttemptMetric" 222) 223 224// InjectHandlers will will enable client side metrics and inject the proper 225// handlers to handle how metrics are sent. 226// 227// InjectHandlers is NOT safe to call concurrently. Calling InjectHandlers 228// multiple times may lead to unexpected behavior, (e.g. duplicate metrics). 229// 230// // Start must be called in order to inject the correct handlers 231// r, err := csm.Start("clientID", "127.0.0.1:8094") 232// if err != nil { 233// panic(fmt.Errorf("expected no error, but received %v", err)) 234// } 235// 236// sess := session.NewSession() 237// r.InjectHandlers(&sess.Handlers) 238// 239// // create a new service client with our client side metric session 240// svc := s3.New(sess) 241func (rep *Reporter) InjectHandlers(handlers *request.Handlers) { 242 if rep == nil { 243 return 244 } 245 246 handlers.Complete.PushFrontNamed(request.NamedHandler{ 247 Name: APICallMetricHandlerName, 248 Fn: rep.sendAPICallMetric, 249 }) 250 251 handlers.CompleteAttempt.PushFrontNamed(request.NamedHandler{ 252 Name: APICallAttemptMetricHandlerName, 253 Fn: rep.sendAPICallAttemptMetric, 254 }) 255} 256 257// boolIntValue return 1 for true and 0 for false. 258func boolIntValue(b bool) int { 259 if b { 260 return 1 261 } 262 263 return 0 264} 265