1package cloudwatch 2 3import ( 4 "context" 5 "encoding/json" 6 "testing" 7 "time" 8 9 "github.com/aws/aws-sdk-go/aws" 10 "github.com/aws/aws-sdk-go/aws/session" 11 "github.com/aws/aws-sdk-go/service/cloudwatchlogs" 12 "github.com/aws/aws-sdk-go/service/cloudwatchlogs/cloudwatchlogsiface" 13 "github.com/grafana/grafana-plugin-sdk-go/backend" 14 "github.com/grafana/grafana-plugin-sdk-go/backend/datasource" 15 "github.com/grafana/grafana-plugin-sdk-go/backend/instancemgmt" 16 "github.com/grafana/grafana-plugin-sdk-go/data" 17 18 "github.com/stretchr/testify/assert" 19 "github.com/stretchr/testify/require" 20) 21 22func TestQuery_DescribeLogGroups(t *testing.T) { 23 origNewCWLogsClient := NewCWLogsClient 24 t.Cleanup(func() { 25 NewCWLogsClient = origNewCWLogsClient 26 }) 27 28 var cli FakeCWLogsClient 29 30 NewCWLogsClient = func(sess *session.Session) cloudwatchlogsiface.CloudWatchLogsAPI { 31 return cli 32 } 33 34 t.Run("Empty log group name prefix", func(t *testing.T) { 35 cli = FakeCWLogsClient{ 36 logGroups: cloudwatchlogs.DescribeLogGroupsOutput{ 37 LogGroups: []*cloudwatchlogs.LogGroup{ 38 { 39 LogGroupName: aws.String("group_a"), 40 }, 41 { 42 LogGroupName: aws.String("group_b"), 43 }, 44 { 45 LogGroupName: aws.String("group_c"), 46 }, 47 }, 48 }, 49 } 50 51 im := datasource.NewInstanceManager(func(s backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) { 52 return datasourceInfo{}, nil 53 }) 54 55 executor := newExecutor(nil, im, newTestConfig(), fakeSessionCache{}) 56 resp, err := executor.QueryData(context.Background(), &backend.QueryDataRequest{ 57 PluginContext: backend.PluginContext{ 58 DataSourceInstanceSettings: &backend.DataSourceInstanceSettings{}, 59 }, 60 Queries: []backend.DataQuery{ 61 { 62 JSON: json.RawMessage(`{ 63 "type": "logAction", 64 "subtype": "DescribeLogGroups", 65 "limit": 50 66 }`), 67 }, 68 }, 69 }) 70 require.NoError(t, err) 71 require.NotNil(t, resp) 72 73 assert.Equal(t, &backend.QueryDataResponse{Responses: backend.Responses{ 74 "": backend.DataResponse{ 75 Frames: data.Frames{ 76 &data.Frame{ 77 Name: "logGroups", 78 Fields: []*data.Field{ 79 data.NewField("logGroupName", nil, []*string{ 80 aws.String("group_a"), aws.String("group_b"), aws.String("group_c"), 81 }), 82 }, 83 }, 84 }, 85 }, 86 }, 87 }, resp) 88 }) 89 90 t.Run("Non-empty log group name prefix", func(t *testing.T) { 91 cli = FakeCWLogsClient{ 92 logGroups: cloudwatchlogs.DescribeLogGroupsOutput{ 93 LogGroups: []*cloudwatchlogs.LogGroup{ 94 { 95 LogGroupName: aws.String("group_a"), 96 }, 97 { 98 LogGroupName: aws.String("group_b"), 99 }, 100 { 101 LogGroupName: aws.String("group_c"), 102 }, 103 }, 104 }, 105 } 106 107 im := datasource.NewInstanceManager(func(s backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) { 108 return datasourceInfo{}, nil 109 }) 110 111 executor := newExecutor(nil, im, newTestConfig(), fakeSessionCache{}) 112 resp, err := executor.QueryData(context.Background(), &backend.QueryDataRequest{ 113 PluginContext: backend.PluginContext{ 114 DataSourceInstanceSettings: &backend.DataSourceInstanceSettings{}, 115 }, 116 Queries: []backend.DataQuery{ 117 { 118 JSON: json.RawMessage(`{ 119 "type": "logAction", 120 "subtype": "DescribeLogGroups", 121 "limit": 50, 122 "region": "default", 123 "logGroupNamePrefix": "g" 124 }`), 125 }, 126 }, 127 }) 128 require.NoError(t, err) 129 require.NotNil(t, resp) 130 131 assert.Equal(t, &backend.QueryDataResponse{Responses: backend.Responses{ 132 "": backend.DataResponse{ 133 Frames: data.Frames{ 134 &data.Frame{ 135 Name: "logGroups", 136 Fields: []*data.Field{ 137 data.NewField("logGroupName", nil, []*string{ 138 aws.String("group_a"), aws.String("group_b"), aws.String("group_c"), 139 }), 140 }, 141 }, 142 }, 143 }, 144 }, 145 }, resp) 146 }) 147} 148 149func TestQuery_GetLogGroupFields(t *testing.T) { 150 origNewCWLogsClient := NewCWLogsClient 151 t.Cleanup(func() { 152 NewCWLogsClient = origNewCWLogsClient 153 }) 154 155 var cli FakeCWLogsClient 156 157 NewCWLogsClient = func(sess *session.Session) cloudwatchlogsiface.CloudWatchLogsAPI { 158 return cli 159 } 160 161 cli = FakeCWLogsClient{ 162 logGroupFields: cloudwatchlogs.GetLogGroupFieldsOutput{ 163 LogGroupFields: []*cloudwatchlogs.LogGroupField{ 164 { 165 Name: aws.String("field_a"), 166 Percent: aws.Int64(100), 167 }, 168 { 169 Name: aws.String("field_b"), 170 Percent: aws.Int64(30), 171 }, 172 { 173 Name: aws.String("field_c"), 174 Percent: aws.Int64(55), 175 }, 176 }, 177 }, 178 } 179 180 const refID = "A" 181 182 im := datasource.NewInstanceManager(func(s backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) { 183 return datasourceInfo{}, nil 184 }) 185 186 executor := newExecutor(nil, im, newTestConfig(), fakeSessionCache{}) 187 resp, err := executor.QueryData(context.Background(), &backend.QueryDataRequest{ 188 PluginContext: backend.PluginContext{ 189 DataSourceInstanceSettings: &backend.DataSourceInstanceSettings{}, 190 }, 191 Queries: []backend.DataQuery{ 192 { 193 RefID: refID, 194 JSON: json.RawMessage(`{ 195 "type": "logAction", 196 "subtype": "GetLogGroupFields", 197 "logGroupName": "group_a", 198 "limit": 50 199 }`), 200 }, 201 }, 202 }) 203 require.NoError(t, err) 204 require.NotNil(t, resp) 205 206 expFrame := &data.Frame{ 207 Name: refID, 208 Fields: []*data.Field{ 209 data.NewField("name", nil, []*string{ 210 aws.String("field_a"), aws.String("field_b"), aws.String("field_c"), 211 }), 212 data.NewField("percent", nil, []*int64{ 213 aws.Int64(100), aws.Int64(30), aws.Int64(55), 214 }), 215 }, 216 } 217 expFrame.RefID = refID 218 assert.Equal(t, &backend.QueryDataResponse{Responses: backend.Responses{ 219 refID: backend.DataResponse{ 220 Frames: data.Frames{expFrame}, 221 }, 222 }, 223 }, resp) 224} 225 226func TestQuery_StartQuery(t *testing.T) { 227 origNewCWLogsClient := NewCWLogsClient 228 t.Cleanup(func() { 229 NewCWLogsClient = origNewCWLogsClient 230 }) 231 232 var cli FakeCWLogsClient 233 234 NewCWLogsClient = func(sess *session.Session) cloudwatchlogsiface.CloudWatchLogsAPI { 235 return cli 236 } 237 238 t.Run("invalid time range", func(t *testing.T) { 239 cli = FakeCWLogsClient{ 240 logGroupFields: cloudwatchlogs.GetLogGroupFieldsOutput{ 241 LogGroupFields: []*cloudwatchlogs.LogGroupField{ 242 { 243 Name: aws.String("field_a"), 244 Percent: aws.Int64(100), 245 }, 246 { 247 Name: aws.String("field_b"), 248 Percent: aws.Int64(30), 249 }, 250 { 251 Name: aws.String("field_c"), 252 Percent: aws.Int64(55), 253 }, 254 }, 255 }, 256 } 257 258 timeRange := backend.TimeRange{ 259 From: time.Unix(1584873443, 0), 260 To: time.Unix(1584700643, 0), 261 } 262 263 im := datasource.NewInstanceManager(func(s backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) { 264 return datasourceInfo{}, nil 265 }) 266 267 executor := newExecutor(nil, im, newTestConfig(), fakeSessionCache{}) 268 _, err := executor.QueryData(context.Background(), &backend.QueryDataRequest{ 269 PluginContext: backend.PluginContext{ 270 DataSourceInstanceSettings: &backend.DataSourceInstanceSettings{}, 271 }, 272 Queries: []backend.DataQuery{ 273 { 274 TimeRange: timeRange, 275 JSON: json.RawMessage(`{ 276 "type": "logAction", 277 "subtype": "StartQuery", 278 "limit": 50, 279 "region": "default", 280 "queryString": "fields @message" 281 }`), 282 }, 283 }, 284 }) 285 require.Error(t, err) 286 287 assert.Contains(t, err.Error(), "invalid time range: start time must be before end time") 288 }) 289 290 t.Run("valid time range", func(t *testing.T) { 291 const refID = "A" 292 cli = FakeCWLogsClient{ 293 logGroupFields: cloudwatchlogs.GetLogGroupFieldsOutput{ 294 LogGroupFields: []*cloudwatchlogs.LogGroupField{ 295 { 296 Name: aws.String("field_a"), 297 Percent: aws.Int64(100), 298 }, 299 { 300 Name: aws.String("field_b"), 301 Percent: aws.Int64(30), 302 }, 303 { 304 Name: aws.String("field_c"), 305 Percent: aws.Int64(55), 306 }, 307 }, 308 }, 309 } 310 311 timeRange := backend.TimeRange{ 312 From: time.Unix(1584700643000, 0), 313 To: time.Unix(1584873443000, 0), 314 } 315 316 im := datasource.NewInstanceManager(func(s backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) { 317 return datasourceInfo{}, nil 318 }) 319 320 executor := newExecutor(nil, im, newTestConfig(), fakeSessionCache{}) 321 resp, err := executor.QueryData(context.Background(), &backend.QueryDataRequest{ 322 PluginContext: backend.PluginContext{ 323 DataSourceInstanceSettings: &backend.DataSourceInstanceSettings{}, 324 }, 325 Queries: []backend.DataQuery{ 326 { 327 RefID: refID, 328 TimeRange: timeRange, 329 JSON: json.RawMessage(`{ 330 "type": "logAction", 331 "subtype": "StartQuery", 332 "limit": 50, 333 "region": "default", 334 "queryString": "fields @message" 335 }`), 336 }, 337 }, 338 }) 339 require.NoError(t, err) 340 341 expFrame := data.NewFrame( 342 refID, 343 data.NewField("queryId", nil, []string{"abcd-efgh-ijkl-mnop"}), 344 ) 345 expFrame.RefID = refID 346 expFrame.Meta = &data.FrameMeta{ 347 Custom: map[string]interface{}{ 348 "Region": "default", 349 }, 350 } 351 assert.Equal(t, &backend.QueryDataResponse{Responses: backend.Responses{ 352 refID: { 353 Frames: data.Frames{expFrame}, 354 }, 355 }, 356 }, resp) 357 }) 358} 359 360func TestQuery_StopQuery(t *testing.T) { 361 origNewCWLogsClient := NewCWLogsClient 362 t.Cleanup(func() { 363 NewCWLogsClient = origNewCWLogsClient 364 }) 365 366 var cli FakeCWLogsClient 367 368 NewCWLogsClient = func(sess *session.Session) cloudwatchlogsiface.CloudWatchLogsAPI { 369 return cli 370 } 371 372 cli = FakeCWLogsClient{ 373 logGroupFields: cloudwatchlogs.GetLogGroupFieldsOutput{ 374 LogGroupFields: []*cloudwatchlogs.LogGroupField{ 375 { 376 Name: aws.String("field_a"), 377 Percent: aws.Int64(100), 378 }, 379 { 380 Name: aws.String("field_b"), 381 Percent: aws.Int64(30), 382 }, 383 { 384 Name: aws.String("field_c"), 385 Percent: aws.Int64(55), 386 }, 387 }, 388 }, 389 } 390 391 im := datasource.NewInstanceManager(func(s backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) { 392 return datasourceInfo{}, nil 393 }) 394 395 timeRange := backend.TimeRange{ 396 From: time.Unix(1584873443, 0), 397 To: time.Unix(1584700643, 0), 398 } 399 400 executor := newExecutor(nil, im, newTestConfig(), fakeSessionCache{}) 401 resp, err := executor.QueryData(context.Background(), &backend.QueryDataRequest{ 402 PluginContext: backend.PluginContext{ 403 DataSourceInstanceSettings: &backend.DataSourceInstanceSettings{}, 404 }, 405 Queries: []backend.DataQuery{ 406 { 407 TimeRange: timeRange, 408 JSON: json.RawMessage(`{ 409 "type": "logAction", 410 "subtype": "StopQuery", 411 "queryId": "abcd-efgh-ijkl-mnop" 412 }`), 413 }, 414 }, 415 }) 416 require.NoError(t, err) 417 418 expFrame := &data.Frame{ 419 Name: "StopQueryResponse", 420 Fields: []*data.Field{ 421 data.NewField("success", nil, []bool{true}), 422 }, 423 } 424 assert.Equal(t, &backend.QueryDataResponse{Responses: backend.Responses{ 425 "": { 426 Frames: data.Frames{expFrame}, 427 }, 428 }, 429 }, resp) 430} 431 432func TestQuery_GetQueryResults(t *testing.T) { 433 origNewCWLogsClient := NewCWLogsClient 434 t.Cleanup(func() { 435 NewCWLogsClient = origNewCWLogsClient 436 }) 437 438 var cli FakeCWLogsClient 439 440 NewCWLogsClient = func(sess *session.Session) cloudwatchlogsiface.CloudWatchLogsAPI { 441 return cli 442 } 443 444 const refID = "A" 445 cli = FakeCWLogsClient{ 446 queryResults: cloudwatchlogs.GetQueryResultsOutput{ 447 Results: [][]*cloudwatchlogs.ResultField{ 448 { 449 { 450 Field: aws.String("@timestamp"), 451 Value: aws.String("2020-03-20 10:37:23.000"), 452 }, 453 { 454 Field: aws.String("field_b"), 455 Value: aws.String("b_1"), 456 }, 457 { 458 Field: aws.String("@ptr"), 459 Value: aws.String("abcdefg"), 460 }, 461 }, 462 { 463 { 464 Field: aws.String("@timestamp"), 465 Value: aws.String("2020-03-20 10:40:43.000"), 466 }, 467 { 468 Field: aws.String("field_b"), 469 Value: aws.String("b_2"), 470 }, 471 { 472 Field: aws.String("@ptr"), 473 Value: aws.String("hijklmnop"), 474 }, 475 }, 476 }, 477 Statistics: &cloudwatchlogs.QueryStatistics{ 478 BytesScanned: aws.Float64(512), 479 RecordsMatched: aws.Float64(256), 480 RecordsScanned: aws.Float64(1024), 481 }, 482 Status: aws.String("Complete"), 483 }, 484 } 485 486 im := datasource.NewInstanceManager(func(s backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) { 487 return datasourceInfo{}, nil 488 }) 489 490 executor := newExecutor(nil, im, newTestConfig(), fakeSessionCache{}) 491 resp, err := executor.QueryData(context.Background(), &backend.QueryDataRequest{ 492 PluginContext: backend.PluginContext{ 493 DataSourceInstanceSettings: &backend.DataSourceInstanceSettings{}, 494 }, 495 Queries: []backend.DataQuery{ 496 { 497 RefID: refID, 498 JSON: json.RawMessage(`{ 499 "type": "logAction", 500 "subtype": "GetQueryResults", 501 "queryId": "abcd-efgh-ijkl-mnop" 502 }`), 503 }, 504 }, 505 }) 506 require.NoError(t, err) 507 508 time1, err := time.Parse("2006-01-02 15:04:05.000", "2020-03-20 10:37:23.000") 509 require.NoError(t, err) 510 time2, err := time.Parse("2006-01-02 15:04:05.000", "2020-03-20 10:40:43.000") 511 require.NoError(t, err) 512 expField1 := data.NewField("@timestamp", nil, []*time.Time{ 513 aws.Time(time1), aws.Time(time2), 514 }) 515 expField1.SetConfig(&data.FieldConfig{DisplayName: "Time"}) 516 expField2 := data.NewField("field_b", nil, []*string{ 517 aws.String("b_1"), aws.String("b_2"), 518 }) 519 expFrame := data.NewFrame(refID, expField1, expField2) 520 expFrame.RefID = refID 521 expFrame.Meta = &data.FrameMeta{ 522 Custom: map[string]interface{}{ 523 "Status": "Complete", 524 }, 525 Stats: []data.QueryStat{ 526 { 527 FieldConfig: data.FieldConfig{DisplayName: "Bytes scanned"}, 528 Value: 512, 529 }, 530 { 531 FieldConfig: data.FieldConfig{DisplayName: "Records scanned"}, 532 Value: 1024, 533 }, 534 { 535 FieldConfig: data.FieldConfig{DisplayName: "Records matched"}, 536 Value: 256, 537 }, 538 }, 539 PreferredVisualization: "logs", 540 } 541 542 assert.Equal(t, &backend.QueryDataResponse{Responses: backend.Responses{ 543 refID: { 544 Frames: data.Frames{expFrame}, 545 }, 546 }, 547 }, resp) 548} 549