1package cloudwatch 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "regexp" 8 "time" 9 10 "github.com/aws/aws-sdk-go/aws/client" 11 "github.com/aws/aws-sdk-go/aws/request" 12 "github.com/aws/aws-sdk-go/aws/session" 13 "github.com/aws/aws-sdk-go/service/cloudwatch" 14 "github.com/aws/aws-sdk-go/service/cloudwatch/cloudwatchiface" 15 "github.com/aws/aws-sdk-go/service/cloudwatchlogs" 16 "github.com/aws/aws-sdk-go/service/cloudwatchlogs/cloudwatchlogsiface" 17 "github.com/aws/aws-sdk-go/service/ec2" 18 "github.com/aws/aws-sdk-go/service/ec2/ec2iface" 19 "github.com/aws/aws-sdk-go/service/resourcegroupstaggingapi" 20 "github.com/aws/aws-sdk-go/service/resourcegroupstaggingapi/resourcegroupstaggingapiiface" 21 "github.com/grafana/grafana-aws-sdk/pkg/awsds" 22 "github.com/grafana/grafana-plugin-sdk-go/backend" 23 "github.com/grafana/grafana-plugin-sdk-go/backend/datasource" 24 "github.com/grafana/grafana-plugin-sdk-go/backend/instancemgmt" 25 "github.com/grafana/grafana-plugin-sdk-go/data" 26 "github.com/grafana/grafana/pkg/components/simplejson" 27 "github.com/grafana/grafana/pkg/infra/log" 28 "github.com/grafana/grafana/pkg/plugins" 29 "github.com/grafana/grafana/pkg/plugins/backendplugin/coreplugin" 30 "github.com/grafana/grafana/pkg/setting" 31) 32 33type datasourceInfo struct { 34 profile string 35 region string 36 authType awsds.AuthType 37 assumeRoleARN string 38 externalID string 39 namespace string 40 endpoint string 41 42 accessKey string 43 secretKey string 44 45 datasourceID int64 46} 47 48const cloudWatchTSFormat = "2006-01-02 15:04:05.000" 49const defaultRegion = "default" 50 51// Constants also defined in datasource/cloudwatch/datasource.ts 52const logIdentifierInternal = "__log__grafana_internal__" 53const logStreamIdentifierInternal = "__logstream__grafana_internal__" 54 55var plog = log.New("tsdb.cloudwatch") 56var aliasFormat = regexp.MustCompile(`\{\{\s*(.+?)\s*\}\}`) 57 58func ProvideService(cfg *setting.Cfg, logsService *LogsService, registrar plugins.CoreBackendRegistrar) (*CloudWatchService, error) { 59 plog.Debug("initing") 60 61 executor := newExecutor(logsService, datasource.NewInstanceManager(NewInstanceSettings()), cfg, awsds.NewSessionCache()) 62 factory := coreplugin.New(backend.ServeOpts{ 63 QueryDataHandler: executor, 64 }) 65 66 if err := registrar.LoadAndRegister("cloudwatch", factory); err != nil { 67 plog.Error("Failed to register plugin", "error", err) 68 return nil, err 69 } 70 71 return &CloudWatchService{ 72 LogsService: logsService, 73 Cfg: cfg, 74 Executor: executor, 75 }, nil 76} 77 78type CloudWatchService struct { 79 LogsService *LogsService 80 Cfg *setting.Cfg 81 Executor *cloudWatchExecutor 82} 83 84type SessionCache interface { 85 GetSession(region string, s awsds.AWSDatasourceSettings) (*session.Session, error) 86} 87 88func newExecutor(logsService *LogsService, im instancemgmt.InstanceManager, cfg *setting.Cfg, sessions SessionCache) *cloudWatchExecutor { 89 return &cloudWatchExecutor{ 90 logsService: logsService, 91 im: im, 92 cfg: cfg, 93 sessions: sessions, 94 } 95} 96 97func NewInstanceSettings() datasource.InstanceFactoryFunc { 98 return func(settings backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) { 99 jsonData := struct { 100 Profile string `json:"profile"` 101 Region string `json:"defaultRegion"` 102 AssumeRoleARN string `json:"assumeRoleArn"` 103 ExternalID string `json:"externalId"` 104 Endpoint string `json:"endpoint"` 105 Namespace string `json:"customMetricsNamespaces"` 106 AuthType string `json:"authType"` 107 }{} 108 109 err := json.Unmarshal(settings.JSONData, &jsonData) 110 if err != nil { 111 return nil, fmt.Errorf("error reading settings: %w", err) 112 } 113 114 model := datasourceInfo{ 115 profile: jsonData.Profile, 116 region: jsonData.Region, 117 assumeRoleARN: jsonData.AssumeRoleARN, 118 externalID: jsonData.ExternalID, 119 endpoint: jsonData.Endpoint, 120 namespace: jsonData.Namespace, 121 datasourceID: settings.ID, 122 } 123 124 at := awsds.AuthTypeDefault 125 switch jsonData.AuthType { 126 case "credentials": 127 at = awsds.AuthTypeSharedCreds 128 case "keys": 129 at = awsds.AuthTypeKeys 130 case "default": 131 at = awsds.AuthTypeDefault 132 case "ec2_iam_role": 133 at = awsds.AuthTypeEC2IAMRole 134 case "arn": 135 at = awsds.AuthTypeDefault 136 plog.Warn("Authentication type \"arn\" is deprecated, falling back to default") 137 default: 138 plog.Warn("Unrecognized AWS authentication type", "type", jsonData.AuthType) 139 } 140 141 model.authType = at 142 143 if model.profile == "" { 144 model.profile = settings.Database // legacy support 145 } 146 147 model.accessKey = settings.DecryptedSecureJSONData["accessKey"] 148 model.secretKey = settings.DecryptedSecureJSONData["secretKey"] 149 150 return model, nil 151 } 152} 153 154// cloudWatchExecutor executes CloudWatch requests. 155type cloudWatchExecutor struct { 156 logsService *LogsService 157 im instancemgmt.InstanceManager 158 cfg *setting.Cfg 159 sessions SessionCache 160} 161 162func (e *cloudWatchExecutor) newSession(region string, pluginCtx backend.PluginContext) (*session.Session, error) { 163 dsInfo, err := e.getDSInfo(pluginCtx) 164 if err != nil { 165 return nil, err 166 } 167 168 if region == defaultRegion { 169 region = dsInfo.region 170 } 171 172 return e.sessions.GetSession(region, awsds.AWSDatasourceSettings{ 173 Profile: dsInfo.profile, 174 Region: region, 175 AuthType: dsInfo.authType, 176 AssumeRoleARN: dsInfo.assumeRoleARN, 177 ExternalID: dsInfo.externalID, 178 Endpoint: dsInfo.endpoint, 179 DefaultRegion: dsInfo.region, 180 AccessKey: dsInfo.accessKey, 181 SecretKey: dsInfo.secretKey, 182 }) 183} 184 185func (e *cloudWatchExecutor) getCWClient(region string, pluginCtx backend.PluginContext) (cloudwatchiface.CloudWatchAPI, error) { 186 sess, err := e.newSession(region, pluginCtx) 187 if err != nil { 188 return nil, err 189 } 190 return NewCWClient(sess), nil 191} 192 193func (e *cloudWatchExecutor) getCWLogsClient(region string, pluginCtx backend.PluginContext) (cloudwatchlogsiface.CloudWatchLogsAPI, error) { 194 sess, err := e.newSession(region, pluginCtx) 195 if err != nil { 196 return nil, err 197 } 198 199 logsClient := NewCWLogsClient(sess) 200 201 return logsClient, nil 202} 203 204func (e *cloudWatchExecutor) getEC2Client(region string, pluginCtx backend.PluginContext) (ec2iface.EC2API, error) { 205 sess, err := e.newSession(region, pluginCtx) 206 if err != nil { 207 return nil, err 208 } 209 210 return newEC2Client(sess), nil 211} 212 213func (e *cloudWatchExecutor) getRGTAClient(region string, pluginCtx backend.PluginContext) (resourcegroupstaggingapiiface.ResourceGroupsTaggingAPIAPI, 214 error) { 215 sess, err := e.newSession(region, pluginCtx) 216 if err != nil { 217 return nil, err 218 } 219 220 return newRGTAClient(sess), nil 221} 222 223func (e *cloudWatchExecutor) alertQuery(ctx context.Context, logsClient cloudwatchlogsiface.CloudWatchLogsAPI, 224 queryContext backend.DataQuery, model *simplejson.Json) (*cloudwatchlogs.GetQueryResultsOutput, error) { 225 const maxAttempts = 8 226 const pollPeriod = 1000 * time.Millisecond 227 228 startQueryOutput, err := e.executeStartQuery(ctx, logsClient, model, queryContext.TimeRange) 229 if err != nil { 230 return nil, err 231 } 232 233 requestParams := simplejson.NewFromAny(map[string]interface{}{ 234 "region": model.Get("region").MustString(""), 235 "queryId": *startQueryOutput.QueryId, 236 }) 237 238 ticker := time.NewTicker(pollPeriod) 239 defer ticker.Stop() 240 241 attemptCount := 1 242 for range ticker.C { 243 res, err := e.executeGetQueryResults(ctx, logsClient, requestParams) 244 if err != nil { 245 return nil, err 246 } 247 if isTerminated(*res.Status) { 248 return res, err 249 } 250 if attemptCount >= maxAttempts { 251 return res, fmt.Errorf("fetching of query results exceeded max number of attempts") 252 } 253 254 attemptCount++ 255 } 256 257 return nil, nil 258} 259 260func (e *cloudWatchExecutor) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) { 261 /* 262 Unlike many other data sources, with Cloudwatch Logs query requests don't receive the results as the response 263 to the query, but rather an ID is first returned. Following this, a client is expected to send requests along 264 with the ID until the status of the query is complete, receiving (possibly partial) results each time. For 265 queries made via dashboards and Explore, the logic of making these repeated queries is handled on the 266 frontend, but because alerts are executed on the backend the logic needs to be reimplemented here. 267 */ 268 q := req.Queries[0] 269 model, err := simplejson.NewJson(q.JSON) 270 if err != nil { 271 return nil, err 272 } 273 _, fromAlert := req.Headers["FromAlert"] 274 isLogAlertQuery := fromAlert && model.Get("queryMode").MustString("") == "Logs" 275 276 if isLogAlertQuery { 277 return e.executeLogAlertQuery(ctx, req) 278 } 279 280 queryType := model.Get("type").MustString("") 281 282 var result *backend.QueryDataResponse 283 switch queryType { 284 case "metricFindQuery": 285 result, err = e.executeMetricFindQuery(ctx, model, q, req.PluginContext) 286 case "annotationQuery": 287 result, err = e.executeAnnotationQuery(ctx, model, q, req.PluginContext) 288 case "logAction": 289 result, err = e.executeLogActions(ctx, req) 290 case "liveLogAction": 291 result, err = e.executeLiveLogQuery(ctx, req) 292 case "timeSeriesQuery": 293 fallthrough 294 default: 295 result, err = e.executeTimeSeriesQuery(ctx, req) 296 } 297 298 return result, err 299} 300 301func (e *cloudWatchExecutor) executeLogAlertQuery(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) { 302 resp := backend.NewQueryDataResponse() 303 304 for _, q := range req.Queries { 305 model, err := simplejson.NewJson(q.JSON) 306 if err != nil { 307 continue 308 } 309 310 model.Set("subtype", "StartQuery") 311 model.Set("queryString", model.Get("expression").MustString("")) 312 313 region := model.Get("region").MustString(defaultRegion) 314 if region == defaultRegion { 315 dsInfo, err := e.getDSInfo(req.PluginContext) 316 if err != nil { 317 return nil, err 318 } 319 model.Set("region", dsInfo.region) 320 } 321 322 logsClient, err := e.getCWLogsClient(region, req.PluginContext) 323 if err != nil { 324 return nil, err 325 } 326 327 result, err := e.executeStartQuery(ctx, logsClient, model, q.TimeRange) 328 if err != nil { 329 return nil, err 330 } 331 332 model.Set("queryId", *result.QueryId) 333 334 getQueryResultsOutput, err := e.alertQuery(ctx, logsClient, q, model) 335 if err != nil { 336 return nil, err 337 } 338 339 dataframe, err := logsResultsToDataframes(getQueryResultsOutput) 340 if err != nil { 341 return nil, err 342 } 343 344 var frames []*data.Frame 345 346 statsGroups := model.Get("statsGroups").MustStringArray() 347 if len(statsGroups) > 0 && len(dataframe.Fields) > 0 { 348 frames, err = groupResults(dataframe, statsGroups) 349 if err != nil { 350 return nil, err 351 } 352 } else { 353 frames = data.Frames{dataframe} 354 } 355 356 respD := resp.Responses["A"] 357 respD.Frames = frames 358 resp.Responses["A"] = respD 359 } 360 361 return resp, nil 362} 363 364func (e *cloudWatchExecutor) getDSInfo(pluginCtx backend.PluginContext) (*datasourceInfo, error) { 365 i, err := e.im.Get(pluginCtx) 366 if err != nil { 367 return nil, err 368 } 369 370 instance := i.(datasourceInfo) 371 372 return &instance, nil 373} 374 375func isTerminated(queryStatus string) bool { 376 return queryStatus == "Complete" || queryStatus == "Cancelled" || queryStatus == "Failed" || queryStatus == "Timeout" 377} 378 379// NewCWClient is a CloudWatch client factory. 380// 381// Stubbable by tests. 382var NewCWClient = func(sess *session.Session) cloudwatchiface.CloudWatchAPI { 383 client := cloudwatch.New(sess) 384 client.Handlers.Send.PushFront(func(r *request.Request) { 385 r.HTTPRequest.Header.Set("User-Agent", fmt.Sprintf("Grafana/%s", setting.BuildVersion)) 386 }) 387 388 return client 389} 390 391// NewCWLogsClient is a CloudWatch logs client factory. 392// 393// Stubbable by tests. 394var NewCWLogsClient = func(sess *session.Session) cloudwatchlogsiface.CloudWatchLogsAPI { 395 client := cloudwatchlogs.New(sess) 396 client.Handlers.Send.PushFront(func(r *request.Request) { 397 r.HTTPRequest.Header.Set("User-Agent", fmt.Sprintf("Grafana/%s", setting.BuildVersion)) 398 }) 399 400 return client 401} 402 403// EC2 client factory. 404// 405// Stubbable by tests. 406var newEC2Client = func(provider client.ConfigProvider) ec2iface.EC2API { 407 return ec2.New(provider) 408} 409 410// RGTA client factory. 411// 412// Stubbable by tests. 413var newRGTAClient = func(provider client.ConfigProvider) resourcegroupstaggingapiiface.ResourceGroupsTaggingAPIAPI { 414 return resourcegroupstaggingapi.New(provider) 415} 416