1package cloudwatch
2
3import (
4	"context"
5	"encoding/json"
6	"fmt"
7	"regexp"
8	"time"
9
10	"github.com/aws/aws-sdk-go/aws/client"
11	"github.com/aws/aws-sdk-go/aws/request"
12	"github.com/aws/aws-sdk-go/aws/session"
13	"github.com/aws/aws-sdk-go/service/cloudwatch"
14	"github.com/aws/aws-sdk-go/service/cloudwatch/cloudwatchiface"
15	"github.com/aws/aws-sdk-go/service/cloudwatchlogs"
16	"github.com/aws/aws-sdk-go/service/cloudwatchlogs/cloudwatchlogsiface"
17	"github.com/aws/aws-sdk-go/service/ec2"
18	"github.com/aws/aws-sdk-go/service/ec2/ec2iface"
19	"github.com/aws/aws-sdk-go/service/resourcegroupstaggingapi"
20	"github.com/aws/aws-sdk-go/service/resourcegroupstaggingapi/resourcegroupstaggingapiiface"
21	"github.com/grafana/grafana-aws-sdk/pkg/awsds"
22	"github.com/grafana/grafana-plugin-sdk-go/backend"
23	"github.com/grafana/grafana-plugin-sdk-go/backend/datasource"
24	"github.com/grafana/grafana-plugin-sdk-go/backend/instancemgmt"
25	"github.com/grafana/grafana-plugin-sdk-go/data"
26	"github.com/grafana/grafana/pkg/components/simplejson"
27	"github.com/grafana/grafana/pkg/infra/log"
28	"github.com/grafana/grafana/pkg/plugins"
29	"github.com/grafana/grafana/pkg/plugins/backendplugin/coreplugin"
30	"github.com/grafana/grafana/pkg/setting"
31)
32
33type datasourceInfo struct {
34	profile       string
35	region        string
36	authType      awsds.AuthType
37	assumeRoleARN string
38	externalID    string
39	namespace     string
40	endpoint      string
41
42	accessKey string
43	secretKey string
44
45	datasourceID int64
46}
47
48const cloudWatchTSFormat = "2006-01-02 15:04:05.000"
49const defaultRegion = "default"
50
51// Constants also defined in datasource/cloudwatch/datasource.ts
52const logIdentifierInternal = "__log__grafana_internal__"
53const logStreamIdentifierInternal = "__logstream__grafana_internal__"
54
55var plog = log.New("tsdb.cloudwatch")
56var aliasFormat = regexp.MustCompile(`\{\{\s*(.+?)\s*\}\}`)
57
58func ProvideService(cfg *setting.Cfg, logsService *LogsService, registrar plugins.CoreBackendRegistrar) (*CloudWatchService, error) {
59	plog.Debug("initing")
60
61	executor := newExecutor(logsService, datasource.NewInstanceManager(NewInstanceSettings()), cfg, awsds.NewSessionCache())
62	factory := coreplugin.New(backend.ServeOpts{
63		QueryDataHandler: executor,
64	})
65
66	if err := registrar.LoadAndRegister("cloudwatch", factory); err != nil {
67		plog.Error("Failed to register plugin", "error", err)
68		return nil, err
69	}
70
71	return &CloudWatchService{
72		LogsService: logsService,
73		Cfg:         cfg,
74		Executor:    executor,
75	}, nil
76}
77
78type CloudWatchService struct {
79	LogsService *LogsService
80	Cfg         *setting.Cfg
81	Executor    *cloudWatchExecutor
82}
83
84type SessionCache interface {
85	GetSession(region string, s awsds.AWSDatasourceSettings) (*session.Session, error)
86}
87
88func newExecutor(logsService *LogsService, im instancemgmt.InstanceManager, cfg *setting.Cfg, sessions SessionCache) *cloudWatchExecutor {
89	return &cloudWatchExecutor{
90		logsService: logsService,
91		im:          im,
92		cfg:         cfg,
93		sessions:    sessions,
94	}
95}
96
97func NewInstanceSettings() datasource.InstanceFactoryFunc {
98	return func(settings backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) {
99		jsonData := struct {
100			Profile       string `json:"profile"`
101			Region        string `json:"defaultRegion"`
102			AssumeRoleARN string `json:"assumeRoleArn"`
103			ExternalID    string `json:"externalId"`
104			Endpoint      string `json:"endpoint"`
105			Namespace     string `json:"customMetricsNamespaces"`
106			AuthType      string `json:"authType"`
107		}{}
108
109		err := json.Unmarshal(settings.JSONData, &jsonData)
110		if err != nil {
111			return nil, fmt.Errorf("error reading settings: %w", err)
112		}
113
114		model := datasourceInfo{
115			profile:       jsonData.Profile,
116			region:        jsonData.Region,
117			assumeRoleARN: jsonData.AssumeRoleARN,
118			externalID:    jsonData.ExternalID,
119			endpoint:      jsonData.Endpoint,
120			namespace:     jsonData.Namespace,
121			datasourceID:  settings.ID,
122		}
123
124		at := awsds.AuthTypeDefault
125		switch jsonData.AuthType {
126		case "credentials":
127			at = awsds.AuthTypeSharedCreds
128		case "keys":
129			at = awsds.AuthTypeKeys
130		case "default":
131			at = awsds.AuthTypeDefault
132		case "ec2_iam_role":
133			at = awsds.AuthTypeEC2IAMRole
134		case "arn":
135			at = awsds.AuthTypeDefault
136			plog.Warn("Authentication type \"arn\" is deprecated, falling back to default")
137		default:
138			plog.Warn("Unrecognized AWS authentication type", "type", jsonData.AuthType)
139		}
140
141		model.authType = at
142
143		if model.profile == "" {
144			model.profile = settings.Database // legacy support
145		}
146
147		model.accessKey = settings.DecryptedSecureJSONData["accessKey"]
148		model.secretKey = settings.DecryptedSecureJSONData["secretKey"]
149
150		return model, nil
151	}
152}
153
154// cloudWatchExecutor executes CloudWatch requests.
155type cloudWatchExecutor struct {
156	logsService *LogsService
157	im          instancemgmt.InstanceManager
158	cfg         *setting.Cfg
159	sessions    SessionCache
160}
161
162func (e *cloudWatchExecutor) newSession(region string, pluginCtx backend.PluginContext) (*session.Session, error) {
163	dsInfo, err := e.getDSInfo(pluginCtx)
164	if err != nil {
165		return nil, err
166	}
167
168	if region == defaultRegion {
169		region = dsInfo.region
170	}
171
172	return e.sessions.GetSession(region, awsds.AWSDatasourceSettings{
173		Profile:       dsInfo.profile,
174		Region:        region,
175		AuthType:      dsInfo.authType,
176		AssumeRoleARN: dsInfo.assumeRoleARN,
177		ExternalID:    dsInfo.externalID,
178		Endpoint:      dsInfo.endpoint,
179		DefaultRegion: dsInfo.region,
180		AccessKey:     dsInfo.accessKey,
181		SecretKey:     dsInfo.secretKey,
182	})
183}
184
185func (e *cloudWatchExecutor) getCWClient(region string, pluginCtx backend.PluginContext) (cloudwatchiface.CloudWatchAPI, error) {
186	sess, err := e.newSession(region, pluginCtx)
187	if err != nil {
188		return nil, err
189	}
190	return NewCWClient(sess), nil
191}
192
193func (e *cloudWatchExecutor) getCWLogsClient(region string, pluginCtx backend.PluginContext) (cloudwatchlogsiface.CloudWatchLogsAPI, error) {
194	sess, err := e.newSession(region, pluginCtx)
195	if err != nil {
196		return nil, err
197	}
198
199	logsClient := NewCWLogsClient(sess)
200
201	return logsClient, nil
202}
203
204func (e *cloudWatchExecutor) getEC2Client(region string, pluginCtx backend.PluginContext) (ec2iface.EC2API, error) {
205	sess, err := e.newSession(region, pluginCtx)
206	if err != nil {
207		return nil, err
208	}
209
210	return newEC2Client(sess), nil
211}
212
213func (e *cloudWatchExecutor) getRGTAClient(region string, pluginCtx backend.PluginContext) (resourcegroupstaggingapiiface.ResourceGroupsTaggingAPIAPI,
214	error) {
215	sess, err := e.newSession(region, pluginCtx)
216	if err != nil {
217		return nil, err
218	}
219
220	return newRGTAClient(sess), nil
221}
222
223func (e *cloudWatchExecutor) alertQuery(ctx context.Context, logsClient cloudwatchlogsiface.CloudWatchLogsAPI,
224	queryContext backend.DataQuery, model *simplejson.Json) (*cloudwatchlogs.GetQueryResultsOutput, error) {
225	const maxAttempts = 8
226	const pollPeriod = 1000 * time.Millisecond
227
228	startQueryOutput, err := e.executeStartQuery(ctx, logsClient, model, queryContext.TimeRange)
229	if err != nil {
230		return nil, err
231	}
232
233	requestParams := simplejson.NewFromAny(map[string]interface{}{
234		"region":  model.Get("region").MustString(""),
235		"queryId": *startQueryOutput.QueryId,
236	})
237
238	ticker := time.NewTicker(pollPeriod)
239	defer ticker.Stop()
240
241	attemptCount := 1
242	for range ticker.C {
243		res, err := e.executeGetQueryResults(ctx, logsClient, requestParams)
244		if err != nil {
245			return nil, err
246		}
247		if isTerminated(*res.Status) {
248			return res, err
249		}
250		if attemptCount >= maxAttempts {
251			return res, fmt.Errorf("fetching of query results exceeded max number of attempts")
252		}
253
254		attemptCount++
255	}
256
257	return nil, nil
258}
259
260func (e *cloudWatchExecutor) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
261	/*
262		Unlike many other data sources, with Cloudwatch Logs query requests don't receive the results as the response
263		to the query, but rather an ID is first returned. Following this, a client is expected to send requests along
264		with the ID until the status of the query is complete, receiving (possibly partial) results each time. For
265		queries made via dashboards and Explore, the logic of making these repeated queries is handled on the
266		frontend, but because alerts are executed on the backend the logic needs to be reimplemented here.
267	*/
268	q := req.Queries[0]
269	model, err := simplejson.NewJson(q.JSON)
270	if err != nil {
271		return nil, err
272	}
273	_, fromAlert := req.Headers["FromAlert"]
274	isLogAlertQuery := fromAlert && model.Get("queryMode").MustString("") == "Logs"
275
276	if isLogAlertQuery {
277		return e.executeLogAlertQuery(ctx, req)
278	}
279
280	queryType := model.Get("type").MustString("")
281
282	var result *backend.QueryDataResponse
283	switch queryType {
284	case "metricFindQuery":
285		result, err = e.executeMetricFindQuery(ctx, model, q, req.PluginContext)
286	case "annotationQuery":
287		result, err = e.executeAnnotationQuery(ctx, model, q, req.PluginContext)
288	case "logAction":
289		result, err = e.executeLogActions(ctx, req)
290	case "liveLogAction":
291		result, err = e.executeLiveLogQuery(ctx, req)
292	case "timeSeriesQuery":
293		fallthrough
294	default:
295		result, err = e.executeTimeSeriesQuery(ctx, req)
296	}
297
298	return result, err
299}
300
301func (e *cloudWatchExecutor) executeLogAlertQuery(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
302	resp := backend.NewQueryDataResponse()
303
304	for _, q := range req.Queries {
305		model, err := simplejson.NewJson(q.JSON)
306		if err != nil {
307			continue
308		}
309
310		model.Set("subtype", "StartQuery")
311		model.Set("queryString", model.Get("expression").MustString(""))
312
313		region := model.Get("region").MustString(defaultRegion)
314		if region == defaultRegion {
315			dsInfo, err := e.getDSInfo(req.PluginContext)
316			if err != nil {
317				return nil, err
318			}
319			model.Set("region", dsInfo.region)
320		}
321
322		logsClient, err := e.getCWLogsClient(region, req.PluginContext)
323		if err != nil {
324			return nil, err
325		}
326
327		result, err := e.executeStartQuery(ctx, logsClient, model, q.TimeRange)
328		if err != nil {
329			return nil, err
330		}
331
332		model.Set("queryId", *result.QueryId)
333
334		getQueryResultsOutput, err := e.alertQuery(ctx, logsClient, q, model)
335		if err != nil {
336			return nil, err
337		}
338
339		dataframe, err := logsResultsToDataframes(getQueryResultsOutput)
340		if err != nil {
341			return nil, err
342		}
343
344		var frames []*data.Frame
345
346		statsGroups := model.Get("statsGroups").MustStringArray()
347		if len(statsGroups) > 0 && len(dataframe.Fields) > 0 {
348			frames, err = groupResults(dataframe, statsGroups)
349			if err != nil {
350				return nil, err
351			}
352		} else {
353			frames = data.Frames{dataframe}
354		}
355
356		respD := resp.Responses["A"]
357		respD.Frames = frames
358		resp.Responses["A"] = respD
359	}
360
361	return resp, nil
362}
363
364func (e *cloudWatchExecutor) getDSInfo(pluginCtx backend.PluginContext) (*datasourceInfo, error) {
365	i, err := e.im.Get(pluginCtx)
366	if err != nil {
367		return nil, err
368	}
369
370	instance := i.(datasourceInfo)
371
372	return &instance, nil
373}
374
375func isTerminated(queryStatus string) bool {
376	return queryStatus == "Complete" || queryStatus == "Cancelled" || queryStatus == "Failed" || queryStatus == "Timeout"
377}
378
379// NewCWClient is a CloudWatch client factory.
380//
381// Stubbable by tests.
382var NewCWClient = func(sess *session.Session) cloudwatchiface.CloudWatchAPI {
383	client := cloudwatch.New(sess)
384	client.Handlers.Send.PushFront(func(r *request.Request) {
385		r.HTTPRequest.Header.Set("User-Agent", fmt.Sprintf("Grafana/%s", setting.BuildVersion))
386	})
387
388	return client
389}
390
391// NewCWLogsClient is a CloudWatch logs client factory.
392//
393// Stubbable by tests.
394var NewCWLogsClient = func(sess *session.Session) cloudwatchlogsiface.CloudWatchLogsAPI {
395	client := cloudwatchlogs.New(sess)
396	client.Handlers.Send.PushFront(func(r *request.Request) {
397		r.HTTPRequest.Header.Set("User-Agent", fmt.Sprintf("Grafana/%s", setting.BuildVersion))
398	})
399
400	return client
401}
402
403// EC2 client factory.
404//
405// Stubbable by tests.
406var newEC2Client = func(provider client.ConfigProvider) ec2iface.EC2API {
407	return ec2.New(provider)
408}
409
410// RGTA client factory.
411//
412// Stubbable by tests.
413var newRGTAClient = func(provider client.ConfigProvider) resourcegroupstaggingapiiface.ResourceGroupsTaggingAPIAPI {
414	return resourcegroupstaggingapi.New(provider)
415}
416