1/* 2** Zabbix 3** Copyright (C) 2001-2021 Zabbix SIA 4** 5** This program is free software; you can redistribute it and/or modify 6** it under the terms of the GNU General Public License as published by 7** the Free Software Foundation; either version 2 of the License, or 8** (at your option) any later version. 9** 10** This program is distributed in the hope that it will be useful, 11** but WITHOUT ANY WARRANTY; without even the implied warranty of 12** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13** GNU General Public License for more details. 14** 15** You should have received a copy of the GNU General Public License 16** along with this program; if not, write to the Free Software 17** Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. 18**/ 19 20package scheduler 21 22import ( 23 "container/heap" 24 "fmt" 25 "reflect" 26 "strconv" 27 "testing" 28 "time" 29 30 "zabbix.com/internal/agent" 31 "zabbix.com/internal/agent/alias" 32 "zabbix.com/pkg/conf" 33 "zabbix.com/pkg/itemutil" 34 "zabbix.com/pkg/log" 35 "zabbix.com/pkg/plugin" 36) 37 38// getNextCheck calculates simplified nextcheck based on the specified delay string and current time 39func getNextcheck(delay string, from time.Time) (nextcheck time.Time) { 40 simple_delay, _ := strconv.ParseInt(delay, 10, 64) 41 from_seconds := from.Unix() 42 return time.Unix(from_seconds-from_seconds%simple_delay+simple_delay, 0) 43} 44 45type callTracker interface { 46 call(key string) 47 called() map[string][]time.Time 48} 49 50type mockPlugin struct { 51 calls map[string][]time.Time 52 now *time.Time 53} 54 55func (p *mockPlugin) call(key string) { 56 if p.calls == nil { 57 p.calls = make(map[string][]time.Time) 58 } 59 if p.calls[key] == nil { 60 p.calls[key] = make([]time.Time, 0, 20) 61 } 62 p.calls[key] = append(p.calls[key], *p.now) 63} 64 65func (p *mockPlugin) called() map[string][]time.Time { 66 return p.calls 67} 68 69type mockExporterPlugin struct { 70 plugin.Base 71 mockPlugin 72} 73 74func (p *mockExporterPlugin) Export(key string, params []string, ctx plugin.ContextProvider) (result interface{}, err error) { 75 p.call(key) 76 return 77} 78 79type mockCollectorPlugin struct { 80 plugin.Base 81 mockPlugin 82 period int 83} 84 85func (p *mockCollectorPlugin) Collect() (err error) { 86 p.call("$collect") 87 return 88} 89 90func (p *mockCollectorPlugin) Period() (period int) { 91 return p.period 92} 93 94type mockCollectorExporterPlugin struct { 95 plugin.Base 96 mockPlugin 97 period int 98} 99 100func (p *mockCollectorExporterPlugin) Export(key string, params []string, ctx plugin.ContextProvider) (result interface{}, err error) { 101 p.call(key) 102 return 103} 104 105func (p *mockCollectorExporterPlugin) Collect() (err error) { 106 p.call("$collect") 107 return 108} 109 110func (p *mockCollectorExporterPlugin) Period() (period int) { 111 return p.period 112} 113 114type mockRunnerPlugin struct { 115 plugin.Base 116 mockPlugin 117} 118 119func (p *mockRunnerPlugin) Start() { 120 p.call("$start") 121} 122 123func (p *mockRunnerPlugin) Stop() { 124 p.call("$stop") 125} 126 127type mockPassiveRunnerPlugin struct { 128 plugin.Base 129 mockPlugin 130} 131 132func (p *mockPassiveRunnerPlugin) Export(key string, params []string, ctx plugin.ContextProvider) (result interface{}, err error) { 133 return 134} 135func (p *mockPassiveRunnerPlugin) Start() { 136 p.call("$start") 137} 138 139func (p *mockPassiveRunnerPlugin) Stop() { 140 p.call("$stop") 141} 142 143type watchTracker interface { 144 watched() []*plugin.Request 145} 146 147type mockWatcherPlugin struct { 148 plugin.Base 149 mockPlugin 150 requests []*plugin.Request 151} 152 153func (p *mockWatcherPlugin) Watch(requests []*plugin.Request, ctx plugin.ContextProvider) { 154 p.call("$watch") 155 p.requests = requests 156} 157 158func (p *mockWatcherPlugin) watched() []*plugin.Request { 159 return p.requests 160} 161 162type mockRunnerWatcherPlugin struct { 163 plugin.Base 164 mockPlugin 165 requests []*plugin.Request 166} 167 168func (p *mockRunnerWatcherPlugin) Start() { 169 p.call("$start") 170} 171 172func (p *mockRunnerWatcherPlugin) Stop() { 173 p.call("$stop") 174} 175 176func (p *mockRunnerWatcherPlugin) Watch(requests []*plugin.Request, ctx plugin.ContextProvider) { 177 p.call("$watch") 178 p.requests = requests 179} 180 181func (p *mockRunnerWatcherPlugin) watched() []*plugin.Request { 182 return p.requests 183} 184 185type mockConfiguratorPlugin struct { 186 plugin.Base 187 mockPlugin 188 options interface{} 189} 190 191func (p *mockConfiguratorPlugin) Configure(global *plugin.GlobalOptions, options interface{}) { 192 p.call("$configure") 193} 194 195func (p *mockConfiguratorPlugin) Validate(options interface{}) (err error) { 196 return 197} 198 199type resultCacheMock struct { 200 results []*plugin.Result 201} 202 203func (c *resultCacheMock) Write(r *plugin.Result) { 204 c.results = append(c.results, r) 205} 206 207func (c *resultCacheMock) Flush() { 208} 209 210func (pc *resultCacheMock) SlotsAvailable() int { 211 return 1 212} 213 214func (pc *resultCacheMock) PersistSlotsAvailable() int { 215 return 1 216} 217 218type mockManager struct { 219 Manager 220 sink chan performer 221 now time.Time 222 startTime time.Time 223} 224 225func (m *mockManager) finishTasks() { 226 for { 227 select { 228 case p := <-m.sink: 229 m.processFinishRequest(p) 230 default: 231 return 232 } 233 } 234} 235 236func (m *mockManager) iterate(t *testing.T, iters int) { 237 for i := 0; i < iters; i++ { 238 m.now = m.now.Add(time.Second) 239 m.processQueue(m.now) 240 m.finishTasks() 241 } 242} 243 244func (m *mockManager) mockInit(t *testing.T) { 245 m.init() 246 m.aliases, _ = alias.NewManager(nil) 247 clock := time.Now().Unix() 248 m.startTime = time.Unix(clock-clock%10, 100) 249 t.Logf("starting time %s", m.startTime.Format(time.Stamp)) 250 m.now = m.startTime 251} 252 253func (m *mockManager) update(update *updateRequest) { 254 m.processUpdateRequest(update, m.now) 255} 256 257func (m *mockManager) mockTasks() { 258 index := make(map[exporterTaskAccessor]uint64) 259 for clientid, client := range m.clients { 260 for _, task := range client.exporters { 261 index[task] = clientid 262 } 263 client.exporters = make(map[uint64]exporterTaskAccessor) 264 } 265 for _, p := range m.plugins { 266 tasks := p.tasks 267 p.tasks = make(performerHeap, 0, len(tasks)) 268 for j, task := range tasks { 269 switch t := task.(type) { 270 case *collectorTask: 271 collector := p.impl.(plugin.Collector) 272 mockTask := &mockCollectorTask{ 273 taskBase: taskBase{ 274 plugin: task.getPlugin(), 275 scheduled: getNextcheck(fmt.Sprintf("%d", collector.Period()), m.now).Add(priorityCollectorTaskNs), 276 index: -1, 277 active: task.isActive(), 278 recurring: true, 279 }, 280 sink: m.sink, 281 } 282 p.enqueueTask(mockTask) 283 case *exporterTask: 284 mockTask := &mockExporterTask{ 285 exporterTask: exporterTask{ 286 taskBase: taskBase{ 287 plugin: task.getPlugin(), 288 scheduled: getNextcheck(t.item.delay, m.now).Add(priorityExporterTaskNs), 289 index: -1, 290 active: task.isActive(), 291 recurring: true, 292 }, 293 item: t.item, 294 client: t.client, 295 meta: t.meta, 296 }, 297 sink: m.sink, 298 } 299 p.enqueueTask(mockTask) 300 m.clients[index[t]].exporters[t.item.itemid] = mockTask 301 case *directExporterTask: 302 mockTask := &mockExporterTask{ 303 exporterTask: exporterTask{ 304 taskBase: taskBase{ 305 plugin: task.getPlugin(), 306 scheduled: getNextcheck(t.item.delay, m.now).Add(priorityExporterTaskNs), 307 index: -1, 308 active: task.isActive(), 309 recurring: true, 310 }, 311 item: t.item, 312 client: t.client, 313 meta: t.meta, 314 }, 315 sink: m.sink, 316 } 317 p.enqueueTask(mockTask) 318 case *starterTask: 319 mockTask := &mockStarterTask{ 320 taskBase: taskBase{ 321 plugin: task.getPlugin(), 322 scheduled: m.now, 323 index: -1, 324 active: task.isActive(), 325 }, 326 sink: m.sink, 327 } 328 p.enqueueTask(mockTask) 329 case *stopperTask: 330 mockTask := &mockStopperTask{ 331 taskBase: taskBase{ 332 plugin: task.getPlugin(), 333 scheduled: m.now.Add(priorityStopperTaskNs), 334 index: -1, 335 active: task.isActive(), 336 }, 337 sink: m.sink, 338 } 339 p.enqueueTask(mockTask) 340 case *watcherTask: 341 mockTask := &mockWatcherTask{ 342 taskBase: taskBase{ 343 plugin: task.getPlugin(), 344 scheduled: m.now.Add(priorityWatcherTaskNs), 345 index: -1, 346 active: task.isActive(), 347 }, 348 sink: m.sink, 349 requests: t.requests, 350 client: t.client, 351 } 352 p.enqueueTask(mockTask) 353 case *configuratorTask: 354 mockTask := &mockConfigerTask{ 355 taskBase: taskBase{ 356 plugin: task.getPlugin(), 357 scheduled: m.now.Add(priorityWatcherTaskNs), 358 index: -1, 359 active: task.isActive(), 360 }, 361 options: t.options, 362 sink: m.sink, 363 } 364 p.enqueueTask(mockTask) 365 default: 366 p.enqueueTask(task) 367 } 368 tasks[j].setIndex(-1) 369 } 370 m.pluginQueue.Update(p) 371 } 372} 373 374// checks if the times timestamps match the offsets within the specified range 375func (m *mockManager) checkTimeline(t *testing.T, name string, times []time.Time, offsets []int, iters int) { 376 start := m.now.Add(-time.Second * time.Duration(iters-1)) 377 to := int(m.now.Sub(m.startTime) / time.Second) 378 from := to - iters + 1 379 var left, right int 380 381 // find the range start in timestamps 382 if len(times) != 0 { 383 for times[left].Before(start) { 384 left++ 385 if left == len(times) { 386 break 387 } 388 } 389 } 390 391 // find the range start in offsets 392 if len(offsets) != 0 { 393 for offsets[right] < from { 394 right++ 395 if right == len(offsets) { 396 break 397 } 398 } 399 } 400 401 for left < len(times) && right < len(offsets) { 402 if times[left].After(m.now) { 403 if offsets[right] <= to { 404 t.Errorf("Plugin %s: no matching timestamp for offset %d", name, offsets[right]) 405 } 406 return 407 } 408 if offsets[right] > to { 409 t.Errorf("Plugin %s: no matching offset for timestamp %s", name, times[left].Format(time.Stamp)) 410 return 411 } 412 413 offsetTime := m.startTime.Add(time.Second * time.Duration(offsets[right])) 414 if !offsetTime.Equal(times[left]) { 415 t.Errorf("Plugin %s: offset %d time %s does not match timestamp %s", name, offsets[right], 416 offsetTime.Format(time.Stamp), times[left].Format(time.Stamp)) 417 return 418 } 419 left++ 420 right++ 421 } 422 if left != len(times) && !times[left].After(m.now) { 423 t.Errorf("Plugin %s: no matching offset for timestamp %s", name, times[left].Format(time.Stamp)) 424 return 425 } 426 427 if right != len(offsets) && offsets[right] <= to { 428 t.Errorf("Plugin %s: no matching timestamp for offset %d", name, offsets[right]) 429 return 430 } 431} 432 433// checks plugin call timeline within the specified range 434func (m *mockManager) checkPluginTimeline(t *testing.T, plugins []plugin.Accessor, calls []map[string][]int, iters int) { 435 for i, p := range plugins { 436 tracker := p.(callTracker).called() 437 for key, offsets := range calls[i] { 438 m.checkTimeline(t, p.Name()+":"+key, tracker[key], offsets, iters) 439 } 440 } 441} 442 443type mockExporterTask struct { 444 exporterTask 445 sink chan performer 446} 447 448func (t *mockExporterTask) perform(s Scheduler) { 449 key, params, _ := itemutil.ParseKey(t.item.key) 450 _, _ = t.plugin.impl.(plugin.Exporter).Export(key, params, t) 451 t.sink <- t 452} 453 454func (t *mockExporterTask) reschedule(now time.Time) (err error) { 455 t.scheduled = getNextcheck(t.item.delay, t.scheduled) 456 return 457} 458 459func (t *mockExporterTask) task() (task *exporterTask) { 460 return &t.exporterTask 461} 462 463// plugin.ContextProvider interface 464 465func (t *mockExporterTask) Output() (output plugin.ResultWriter) { 466 return nil 467} 468 469func (t *mockExporterTask) Meta() (meta *plugin.Meta) { 470 return &t.meta 471} 472 473func (t *mockExporterTask) GlobalRegexp() plugin.RegexpMatcher { 474 return t.client.GlobalRegexp() 475} 476 477type mockCollectorTask struct { 478 taskBase 479 sink chan performer 480} 481 482func (t *mockCollectorTask) perform(s Scheduler) { 483 _ = t.plugin.impl.(plugin.Collector).Collect() 484 t.sink <- t 485} 486 487func (t *mockCollectorTask) reschedule(now time.Time) (err error) { 488 t.scheduled = getNextcheck(fmt.Sprintf("%d", t.plugin.impl.(plugin.Collector).Period()), t.scheduled) 489 return 490} 491 492func (t *mockCollectorTask) getWeight() int { 493 return t.plugin.maxCapacity 494} 495 496type mockStarterTask struct { 497 taskBase 498 sink chan performer 499} 500 501func (t *mockStarterTask) perform(s Scheduler) { 502 t.plugin.impl.(plugin.Runner).Start() 503 t.sink <- t 504} 505 506func (t *mockStarterTask) reschedule(now time.Time) (err error) { 507 return 508} 509 510func (t *mockStarterTask) getWeight() int { 511 return t.plugin.maxCapacity 512} 513 514type mockStopperTask struct { 515 taskBase 516 sink chan performer 517} 518 519func (t *mockStopperTask) perform(s Scheduler) { 520 t.plugin.impl.(plugin.Runner).Stop() 521 t.sink <- t 522} 523 524func (t *mockStopperTask) reschedule(now time.Time) (err error) { 525 return 526} 527 528func (t *mockStopperTask) getWeight() int { 529 return t.plugin.maxCapacity 530} 531 532type mockWatcherTask struct { 533 taskBase 534 sink chan performer 535 resultSink plugin.ResultWriter 536 requests []*plugin.Request 537 client ClientAccessor 538} 539 540func (t *mockWatcherTask) perform(s Scheduler) { 541 log.Debugf("%s %v", t.plugin.impl.Name(), t.requests) 542 t.plugin.impl.(plugin.Watcher).Watch(t.requests, t) 543 t.sink <- t 544} 545 546func (t *mockWatcherTask) reschedule(now time.Time) (err error) { 547 return 548} 549 550func (t *mockWatcherTask) getWeight() int { 551 return t.plugin.maxCapacity 552} 553 554// plugin.ContextProvider interface 555 556func (t *mockWatcherTask) ClientID() (clientid uint64) { 557 return t.client.ID() 558} 559 560func (t *mockWatcherTask) ItemID() (itemid uint64) { 561 return 0 562} 563 564func (t *mockWatcherTask) Output() (output plugin.ResultWriter) { 565 return t.resultSink 566} 567 568func (t *mockWatcherTask) Meta() (meta *plugin.Meta) { 569 return nil 570} 571 572func (t *mockWatcherTask) GlobalRegexp() plugin.RegexpMatcher { 573 return t.client.GlobalRegexp() 574} 575 576type mockConfigerTask struct { 577 taskBase 578 sink chan performer 579 options *agent.AgentOptions 580} 581 582func (t *mockConfigerTask) perform(s Scheduler) { 583 t.plugin.impl.(plugin.Configurator).Configure(agent.GlobalOptions(t.options), t.options.Plugins[t.plugin.name()]) 584 t.sink <- t 585} 586 587func (t *mockConfigerTask) reschedule(now time.Time) (err error) { 588 return 589} 590 591func (t *mockConfigerTask) getWeight() int { 592 return t.plugin.maxCapacity 593} 594 595func checkExporterTasks(t *testing.T, m *Manager, clientID uint64, items []*clientItem) { 596 lastCheck := time.Time{} 597 n := 0 598 for p := m.pluginQueue.Peek(); p != nil; p = m.pluginQueue.Peek() { 599 if task := p.peekTask(); task != nil { 600 if task.getScheduled().Before(lastCheck) { 601 t.Errorf("Out of order tasks detected") 602 } 603 heap.Pop(&m.pluginQueue) 604 p.popTask() 605 n++ 606 if p.peekTask() != nil { 607 heap.Push(&m.pluginQueue, p) 608 } 609 } else { 610 heap.Pop(&m.pluginQueue) 611 } 612 } 613 if len(items) != n { 614 t.Errorf("Expected %d tasks while got %d", len(items), n) 615 } 616 617 var requestClient *client 618 var ok bool 619 if requestClient, ok = m.clients[clientID]; !ok { 620 t.Errorf("Cannot find owner of the default client") 621 return 622 } 623 624 for _, item := range items { 625 if tacc, ok := requestClient.exporters[item.itemid]; ok { 626 ti := tacc.task().item 627 if ti.delay != item.delay { 628 t.Errorf("Expected item %d delay %s while got %s", item.itemid, item.delay, ti.delay) 629 } 630 if ti.key != item.key { 631 t.Errorf("Expected item %d key %s while got %s", item.itemid, item.key, ti.key) 632 } 633 } else { 634 t.Errorf("Item %d was not queued", item.itemid) 635 } 636 } 637 638 if len(items) != len(requestClient.exporters) { 639 t.Errorf("Expected %d queued items while got %d", len(items), len(requestClient.exporters)) 640 } 641} 642 643func TestTaskCreate(t *testing.T) { 644 _ = log.Open(log.Console, log.Debug, "", 0) 645 646 plugin.ClearRegistry() 647 plugins := make([]mockExporterPlugin, 3) 648 for i := range plugins { 649 p := &plugins[i] 650 name := fmt.Sprintf("debug%d", i+1) 651 plugin.RegisterMetrics(p, name, name, "Debug.") 652 } 653 654 manager, _ := NewManager(&agent.Options) 655 656 items := []*clientItem{ 657 &clientItem{itemid: 1, delay: "151", key: "debug1"}, 658 &clientItem{itemid: 2, delay: "103", key: "debug2"}, 659 &clientItem{itemid: 3, delay: "79", key: "debug3"}, 660 &clientItem{itemid: 4, delay: "17", key: "debug1"}, 661 &clientItem{itemid: 5, delay: "7", key: "debug2"}, 662 &clientItem{itemid: 6, delay: "1", key: "debug3"}, 663 &clientItem{itemid: 7, delay: "63", key: "debug1"}, 664 &clientItem{itemid: 8, delay: "47", key: "debug2"}, 665 &clientItem{itemid: 9, delay: "31", key: "debug3"}, 666 } 667 668 var cache resultCacheMock 669 update := updateRequest{ 670 clientID: agent.MaxBuiltinClientID + 1, 671 sink: &cache, 672 requests: make([]*plugin.Request, 0), 673 } 674 675 var lastLogsize uint64 676 var mtime int 677 for _, item := range items { 678 update.requests = append(update.requests, &plugin.Request{ 679 Itemid: item.itemid, 680 Key: item.key, 681 Delay: item.delay, 682 LastLogsize: &lastLogsize, 683 Mtime: &mtime, 684 }) 685 } 686 manager.processUpdateRequest(&update, time.Now()) 687 688 if len(manager.pluginQueue) != 3 { 689 t.Errorf("Expected %d plugins queued while got %d", 3, len(manager.pluginQueue)) 690 } 691 692 checkExporterTasks(t, manager, agent.MaxBuiltinClientID+1, items) 693} 694 695func TestTaskUpdate(t *testing.T) { 696 _ = log.Open(log.Console, log.Debug, "", 0) 697 698 plugin.ClearRegistry() 699 plugins := make([]mockExporterPlugin, 3) 700 for i := range plugins { 701 p := &plugins[i] 702 name := fmt.Sprintf("debug%d", i+1) 703 plugin.RegisterMetrics(p, name, name, "Debug.") 704 } 705 706 manager, _ := NewManager(&agent.Options) 707 708 items := []*clientItem{ 709 &clientItem{itemid: 1, delay: "151", key: "debug1"}, 710 &clientItem{itemid: 2, delay: "103", key: "debug2"}, 711 &clientItem{itemid: 3, delay: "79", key: "debug3"}, 712 &clientItem{itemid: 4, delay: "17", key: "debug1"}, 713 &clientItem{itemid: 5, delay: "7", key: "debug2"}, 714 &clientItem{itemid: 6, delay: "1", key: "debug3"}, 715 &clientItem{itemid: 7, delay: "63", key: "debug1"}, 716 &clientItem{itemid: 8, delay: "47", key: "debug2"}, 717 &clientItem{itemid: 9, delay: "31", key: "debug3"}, 718 } 719 720 var cache resultCacheMock 721 update := updateRequest{ 722 clientID: agent.MaxBuiltinClientID + 1, 723 sink: &cache, 724 requests: make([]*plugin.Request, 0), 725 } 726 727 var lastLogsize uint64 728 var mtime int 729 for _, item := range items { 730 update.requests = append(update.requests, &plugin.Request{ 731 Itemid: item.itemid, 732 Key: item.key, 733 Delay: item.delay, 734 LastLogsize: &lastLogsize, 735 Mtime: &mtime, 736 }) 737 } 738 manager.processUpdateRequest(&update, time.Now()) 739 740 for _, item := range items { 741 item.delay = "10" + item.delay 742 item.key = item.key + "[1]" 743 } 744 update.requests = update.requests[:0] 745 for _, item := range items { 746 update.requests = append(update.requests, &plugin.Request{ 747 Itemid: item.itemid, 748 Key: item.key, 749 Delay: item.delay, 750 LastLogsize: &lastLogsize, 751 Mtime: &mtime, 752 }) 753 } 754 manager.processUpdateRequest(&update, time.Now()) 755 756 if len(manager.pluginQueue) != 3 { 757 t.Errorf("Expected %d plugins queued while got %d", 3, len(manager.pluginQueue)) 758 } 759 760 checkExporterTasks(t, manager, agent.MaxBuiltinClientID+1, items) 761} 762 763func TestTaskUpdateInvalidInterval(t *testing.T) { 764 _ = log.Open(log.Console, log.Debug, "", 0) 765 766 plugin.ClearRegistry() 767 plugins := make([]mockExporterPlugin, 3) 768 for i := range plugins { 769 p := &plugins[i] 770 name := fmt.Sprintf("debug%d", i+1) 771 plugin.RegisterMetrics(p, name, name, "Debug.") 772 } 773 774 manager, _ := NewManager(&agent.Options) 775 776 items := []*clientItem{ 777 &clientItem{itemid: 1, delay: "151", key: "debug1"}, 778 &clientItem{itemid: 2, delay: "103", key: "debug2"}, 779 } 780 781 var cache resultCacheMock 782 update := updateRequest{ 783 clientID: agent.MaxBuiltinClientID + 1, 784 sink: &cache, 785 requests: make([]*plugin.Request, 0), 786 } 787 788 var lastLogsize uint64 789 var mtime int 790 for _, item := range items { 791 update.requests = append(update.requests, &plugin.Request{ 792 Itemid: item.itemid, 793 Key: item.key, 794 Delay: item.delay, 795 LastLogsize: &lastLogsize, 796 Mtime: &mtime, 797 }) 798 } 799 manager.processUpdateRequest(&update, time.Now()) 800 801 items[0].delay = "xyz" 802 update.requests = update.requests[:0] 803 for _, item := range items { 804 update.requests = append(update.requests, &plugin.Request{ 805 Itemid: item.itemid, 806 Key: item.key, 807 Delay: item.delay, 808 LastLogsize: &lastLogsize, 809 Mtime: &mtime, 810 }) 811 } 812 manager.processUpdateRequest(&update, time.Now()) 813 814 if len(manager.plugins["debug1"].tasks) != 0 { 815 t.Errorf("Expected %d tasks queued while got %d", 0, len(manager.plugins["debug1"].tasks)) 816 } 817} 818 819func TestTaskDelete(t *testing.T) { 820 _ = log.Open(log.Console, log.Debug, "", 0) 821 822 plugin.ClearRegistry() 823 plugins := make([]mockExporterPlugin, 3) 824 for i := range plugins { 825 p := &plugins[i] 826 name := fmt.Sprintf("debug%d", i+1) 827 plugin.RegisterMetrics(p, name, name, "Debug.") 828 } 829 830 manager, _ := NewManager(&agent.Options) 831 832 items := []*clientItem{ 833 &clientItem{itemid: 1, delay: "151", key: "debug1"}, 834 &clientItem{itemid: 2, delay: "103", key: "debug2"}, 835 &clientItem{itemid: 3, delay: "79", key: "debug3"}, // remove 836 &clientItem{itemid: 4, delay: "17", key: "debug1"}, 837 &clientItem{itemid: 5, delay: "7", key: "debug2"}, 838 &clientItem{itemid: 6, delay: "1", key: "debug3"}, // remove 839 &clientItem{itemid: 7, delay: "63", key: "debug1"}, 840 &clientItem{itemid: 8, delay: "47", key: "debug2"}, // remove 841 &clientItem{itemid: 9, delay: "31", key: "debug3"}, // remove 842 } 843 844 var cache resultCacheMock 845 update := updateRequest{ 846 clientID: agent.MaxBuiltinClientID + 1, 847 sink: &cache, 848 requests: make([]*plugin.Request, 0), 849 } 850 851 var lastLogsize uint64 852 var mtime int 853 for _, item := range items { 854 update.requests = append(update.requests, &plugin.Request{ 855 Itemid: item.itemid, 856 Key: item.key, 857 Delay: item.delay, 858 LastLogsize: &lastLogsize, 859 Mtime: &mtime, 860 }) 861 } 862 manager.processUpdateRequest(&update, time.Now()) 863 864 items[2] = items[6] 865 items = items[:cap(items)-4] 866 update.requests = update.requests[:0] 867 for _, item := range items { 868 update.requests = append(update.requests, &plugin.Request{ 869 Itemid: item.itemid, 870 Key: item.key, 871 Delay: item.delay, 872 LastLogsize: &lastLogsize, 873 Mtime: &mtime, 874 }) 875 } 876 manager.processUpdateRequest(&update, time.Now()) 877 878 if len(manager.plugins["debug3"].tasks) != 0 { 879 t.Errorf("Expected %d tasks queued while got %d", 0, len(manager.plugins["debug3"].tasks)) 880 } 881 882 checkExporterTasks(t, manager, agent.MaxBuiltinClientID+1, items) 883} 884 885func TestSchedule(t *testing.T) { 886 _ = log.Open(log.Console, log.Debug, "", 0) 887 888 manager := mockManager{sink: make(chan performer, 10)} 889 plugin.ClearRegistry() 890 plugins := make([]plugin.Accessor, 3) 891 for i := range plugins { 892 plugins[i] = &mockExporterPlugin{Base: plugin.Base{}, mockPlugin: mockPlugin{now: &manager.now}} 893 name := fmt.Sprintf("debug%d", i+1) 894 plugin.RegisterMetrics(plugins[i], name, name, "Debug.") 895 } 896 manager.mockInit(t) 897 898 items := []*clientItem{ 899 &clientItem{itemid: 1, delay: "1", key: "debug1"}, 900 &clientItem{itemid: 2, delay: "2", key: "debug2"}, 901 &clientItem{itemid: 3, delay: "5", key: "debug3"}, 902 } 903 904 calls := []map[string][]int{ 905 map[string][]int{"debug1": []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20}}, 906 map[string][]int{"debug2": []int{2, 4, 6, 8, 10, 12, 14, 16, 18, 20}}, 907 map[string][]int{"debug3": []int{5, 10, 15, 20}}, 908 } 909 910 var cache resultCacheMock 911 update := updateRequest{ 912 clientID: agent.MaxBuiltinClientID + 1, 913 sink: &cache, 914 requests: make([]*plugin.Request, 0), 915 } 916 917 var lastLogsize uint64 918 var mtime int 919 for _, item := range items { 920 update.requests = append(update.requests, &plugin.Request{ 921 Itemid: item.itemid, 922 Key: item.key, 923 Delay: item.delay, 924 LastLogsize: &lastLogsize, 925 Mtime: &mtime, 926 }) 927 } 928 manager.update(&update) 929 manager.mockTasks() 930 931 manager.iterate(t, 20) 932 manager.checkPluginTimeline(t, plugins, calls, 20) 933} 934 935func TestScheduleCapacity(t *testing.T) { 936 _ = log.Open(log.Console, log.Debug, "", 0) 937 938 manager := mockManager{sink: make(chan performer, 10)} 939 plugin.ClearRegistry() 940 plugins := make([]plugin.Accessor, 2) 941 for i := range plugins { 942 plugins[i] = &mockExporterPlugin{Base: plugin.Base{}, mockPlugin: mockPlugin{now: &manager.now}} 943 name := fmt.Sprintf("debug%d", i+1) 944 plugin.RegisterMetrics(plugins[i], name, name, "Debug.") 945 } 946 manager.mockInit(t) 947 948 p := manager.plugins["debug2"] 949 p.maxCapacity = 2 950 951 items := []*clientItem{ 952 &clientItem{itemid: 1, delay: "1", key: "debug1"}, 953 &clientItem{itemid: 2, delay: "2", key: "debug2"}, 954 &clientItem{itemid: 3, delay: "2", key: "debug2"}, 955 &clientItem{itemid: 4, delay: "2", key: "debug2"}, 956 } 957 958 calls := []map[string][]int{ 959 map[string][]int{"debug1": []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}}, 960 map[string][]int{"debug2": []int{2, 2, 3, 4, 4, 5, 6, 6, 7, 8, 8, 9, 10, 10}}, 961 } 962 963 var cache resultCacheMock 964 update := updateRequest{ 965 clientID: agent.MaxBuiltinClientID + 1, 966 sink: &cache, 967 requests: make([]*plugin.Request, 0), 968 } 969 970 var lastLogsize uint64 971 var mtime int 972 for _, item := range items { 973 update.requests = append(update.requests, &plugin.Request{ 974 Itemid: item.itemid, 975 Key: item.key, 976 Delay: item.delay, 977 LastLogsize: &lastLogsize, 978 Mtime: &mtime, 979 }) 980 } 981 manager.update(&update) 982 manager.mockTasks() 983 984 manager.iterate(t, 10) 985 manager.checkPluginTimeline(t, plugins, calls, 10) 986} 987 988func TestScheduleUpdate(t *testing.T) { 989 _ = log.Open(log.Console, log.Debug, "", 0) 990 991 manager := mockManager{sink: make(chan performer, 10)} 992 plugin.ClearRegistry() 993 plugins := make([]plugin.Accessor, 3) 994 for i := range plugins { 995 plugins[i] = &mockExporterPlugin{Base: plugin.Base{}, mockPlugin: mockPlugin{now: &manager.now}} 996 name := fmt.Sprintf("debug%d", i+1) 997 plugin.RegisterMetrics(plugins[i], name, name, "Debug.") 998 } 999 manager.mockInit(t) 1000 1001 items := []*clientItem{ 1002 &clientItem{itemid: 1, delay: "1", key: "debug1"}, 1003 &clientItem{itemid: 2, delay: "1", key: "debug2"}, 1004 &clientItem{itemid: 3, delay: "1", key: "debug3"}, 1005 } 1006 1007 calls := []map[string][]int{ 1008 map[string][]int{"debug1": []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 16, 17, 18, 19, 20}}, 1009 map[string][]int{"debug2": []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 16, 17, 18, 19, 20}}, 1010 map[string][]int{"debug3": []int{1, 2, 3, 4, 5, 16, 17, 18, 19, 20}}, 1011 } 1012 1013 var cache resultCacheMock 1014 update := updateRequest{ 1015 clientID: agent.MaxBuiltinClientID + 1, 1016 sink: &cache, 1017 requests: make([]*plugin.Request, 0), 1018 } 1019 1020 var lastLogsize uint64 1021 var mtime int 1022 for _, item := range items { 1023 update.requests = append(update.requests, &plugin.Request{ 1024 Itemid: item.itemid, 1025 Key: item.key, 1026 Delay: item.delay, 1027 LastLogsize: &lastLogsize, 1028 Mtime: &mtime, 1029 }) 1030 } 1031 manager.update(&update) 1032 manager.mockTasks() 1033 manager.iterate(t, 5) 1034 manager.checkPluginTimeline(t, plugins, calls, 5) 1035 1036 update.requests = update.requests[:2] 1037 manager.update(&update) 1038 manager.mockTasks() 1039 manager.iterate(t, 5) 1040 manager.checkPluginTimeline(t, plugins, calls, 5) 1041 1042 update.requests = update.requests[:0] 1043 manager.update(&update) 1044 manager.mockTasks() 1045 manager.iterate(t, 5) 1046 manager.checkPluginTimeline(t, plugins, calls, 5) 1047 1048 update.requests = update.requests[:3] 1049 manager.update(&update) 1050 manager.mockTasks() 1051 manager.iterate(t, 5) 1052 manager.checkPluginTimeline(t, plugins, calls, 5) 1053} 1054 1055func TestCollectorSchedule(t *testing.T) { 1056 _ = log.Open(log.Console, log.Debug, "", 0) 1057 1058 manager := mockManager{sink: make(chan performer, 10)} 1059 plugin.ClearRegistry() 1060 plugins := make([]plugin.Accessor, 1) 1061 for i := range plugins { 1062 plugins[i] = &mockCollectorPlugin{Base: plugin.Base{}, mockPlugin: mockPlugin{now: &manager.now}, period: 2} 1063 name := fmt.Sprintf("debug%d", i+1) 1064 plugin.RegisterMetrics(plugins[i], name, name, "Debug.") 1065 } 1066 manager.mockInit(t) 1067 1068 items := []*clientItem{ 1069 &clientItem{itemid: 1, delay: "1", key: "debug1"}, 1070 } 1071 1072 calls := []map[string][]int{ 1073 map[string][]int{"$collect": []int{2, 4, 6, 8, 10, 12, 14, 16, 18, 20}}, 1074 } 1075 1076 var cache resultCacheMock 1077 update := updateRequest{ 1078 clientID: agent.MaxBuiltinClientID + 1, 1079 sink: &cache, 1080 requests: make([]*plugin.Request, 0), 1081 } 1082 1083 var lastLogsize uint64 1084 var mtime int 1085 for _, item := range items { 1086 update.requests = append(update.requests, &plugin.Request{ 1087 Itemid: item.itemid, 1088 Key: item.key, 1089 Delay: item.delay, 1090 LastLogsize: &lastLogsize, 1091 Mtime: &mtime, 1092 }) 1093 } 1094 manager.update(&update) 1095 manager.mockTasks() 1096 manager.iterate(t, 20) 1097 manager.checkPluginTimeline(t, plugins, calls, 20) 1098} 1099 1100func TestCollectorScheduleUpdate(t *testing.T) { 1101 _ = log.Open(log.Console, log.Debug, "", 0) 1102 1103 manager := mockManager{sink: make(chan performer, 10)} 1104 plugin.ClearRegistry() 1105 plugins := make([]plugin.Accessor, 3) 1106 for i := range plugins { 1107 plugins[i] = &mockCollectorPlugin{Base: plugin.Base{}, mockPlugin: mockPlugin{now: &manager.now}, period: 2} 1108 name := fmt.Sprintf("debug%d", i+1) 1109 plugin.RegisterMetrics(plugins[i], name, name, "Debug.") 1110 } 1111 manager.mockInit(t) 1112 1113 items := []*clientItem{ 1114 &clientItem{itemid: 1, delay: "5", key: "debug1"}, 1115 &clientItem{itemid: 2, delay: "5", key: "debug2"}, 1116 &clientItem{itemid: 3, delay: "5", key: "debug3"}, 1117 } 1118 1119 calls := []map[string][]int{ 1120 map[string][]int{"$collect": []int{2, 4, 6, 8, 10, 12, 14}}, 1121 map[string][]int{"$collect": []int{2, 4, 6, 8, 10, 22, 24}}, 1122 map[string][]int{"$collect": []int{2, 4, 22, 24}}, 1123 } 1124 1125 var cache resultCacheMock 1126 update := updateRequest{ 1127 clientID: agent.MaxBuiltinClientID + 1, 1128 sink: &cache, 1129 requests: make([]*plugin.Request, 0), 1130 } 1131 1132 var lastLogsize uint64 1133 var mtime int 1134 for _, item := range items { 1135 update.requests = append(update.requests, &plugin.Request{ 1136 Itemid: item.itemid, 1137 Key: item.key, 1138 Delay: item.delay, 1139 LastLogsize: &lastLogsize, 1140 Mtime: &mtime, 1141 }) 1142 } 1143 manager.update(&update) 1144 manager.mockTasks() 1145 manager.iterate(t, 5) 1146 manager.checkPluginTimeline(t, plugins, calls, 5) 1147 1148 update.requests = update.requests[:2] 1149 manager.update(&update) 1150 manager.mockTasks() 1151 manager.iterate(t, 5) 1152 manager.checkPluginTimeline(t, plugins, calls, 5) 1153 1154 update.requests = update.requests[:1] 1155 manager.update(&update) 1156 manager.mockTasks() 1157 manager.iterate(t, 5) 1158 manager.checkPluginTimeline(t, plugins, calls, 5) 1159 1160 update.requests = update.requests[:0] 1161 manager.update(&update) 1162 manager.mockTasks() 1163 manager.iterate(t, 5) 1164 manager.checkPluginTimeline(t, plugins, calls, 5) 1165 1166 update.requests = update.requests[1:3] 1167 manager.update(&update) 1168 manager.mockTasks() 1169 manager.iterate(t, 5) 1170 manager.checkPluginTimeline(t, plugins, calls, 5) 1171} 1172 1173func TestRunner(t *testing.T) { 1174 _ = log.Open(log.Console, log.Debug, "", 0) 1175 1176 manager := mockManager{sink: make(chan performer, 10)} 1177 plugin.ClearRegistry() 1178 plugins := make([]plugin.Accessor, 3) 1179 for i := range plugins { 1180 plugins[i] = &mockRunnerPlugin{Base: plugin.Base{}, mockPlugin: mockPlugin{now: &manager.now}} 1181 name := fmt.Sprintf("debug%d", i+1) 1182 plugin.RegisterMetrics(plugins[i], name, name, "Debug.") 1183 } 1184 manager.mockInit(t) 1185 1186 items := []*clientItem{ 1187 &clientItem{itemid: 1, delay: "5", key: "debug1"}, 1188 &clientItem{itemid: 2, delay: "5", key: "debug2"}, 1189 &clientItem{itemid: 3, delay: "5", key: "debug3"}, 1190 } 1191 1192 calls := []map[string][]int{ 1193 map[string][]int{"$start": []int{1, 5}, "$stop": []int{4, 6}}, 1194 map[string][]int{"$start": []int{1, 5, 7}, "$stop": []int{3, 6, 8}}, 1195 map[string][]int{"$start": []int{1, 5, 8}, "$stop": []int{2, 6}}, 1196 } 1197 1198 var cache resultCacheMock 1199 update := updateRequest{ 1200 clientID: agent.MaxBuiltinClientID + 1, 1201 sink: &cache, 1202 requests: make([]*plugin.Request, 0), 1203 } 1204 1205 var lastLogsize uint64 1206 var mtime int 1207 for _, item := range items { 1208 update.requests = append(update.requests, &plugin.Request{ 1209 Itemid: item.itemid, 1210 Key: item.key, 1211 Delay: item.delay, 1212 LastLogsize: &lastLogsize, 1213 Mtime: &mtime, 1214 }) 1215 } 1216 manager.update(&update) 1217 manager.mockTasks() 1218 manager.iterate(t, 1) 1219 manager.checkPluginTimeline(t, plugins, calls, 1) 1220 1221 update.requests = update.requests[:2] 1222 manager.update(&update) 1223 manager.mockTasks() 1224 manager.iterate(t, 1) 1225 manager.checkPluginTimeline(t, plugins, calls, 1) 1226 1227 update.requests = update.requests[:1] 1228 manager.update(&update) 1229 manager.mockTasks() 1230 manager.iterate(t, 1) 1231 manager.checkPluginTimeline(t, plugins, calls, 1) 1232 1233 update.requests = update.requests[:0] 1234 manager.update(&update) 1235 manager.mockTasks() 1236 manager.iterate(t, 1) 1237 manager.checkPluginTimeline(t, plugins, calls, 1) 1238 1239 update.requests = update.requests[:3] 1240 manager.update(&update) 1241 manager.mockTasks() 1242 manager.iterate(t, 1) 1243 manager.checkPluginTimeline(t, plugins, calls, 1) 1244 1245 update.requests = update.requests[:0] 1246 manager.update(&update) 1247 manager.mockTasks() 1248 manager.iterate(t, 1) 1249 manager.checkPluginTimeline(t, plugins, calls, 1) 1250 1251 update.requests = update.requests[1:2] 1252 manager.update(&update) 1253 manager.mockTasks() 1254 manager.iterate(t, 1) 1255 manager.checkPluginTimeline(t, plugins, calls, 1) 1256 1257 update.requests = update.requests[1:2] 1258 manager.update(&update) 1259 manager.mockTasks() 1260 manager.iterate(t, 1) 1261 manager.checkPluginTimeline(t, plugins, calls, 1) 1262} 1263 1264func checkWatchRequests(t *testing.T, p plugin.Accessor, requests []*plugin.Request) { 1265 tracker := p.(watchTracker) 1266 if !reflect.DeepEqual(tracker.watched(), requests) { 1267 expected := "" 1268 for _, r := range requests { 1269 expected += fmt.Sprintf("%+v,", *r) 1270 } 1271 returned := "" 1272 for _, r := range tracker.watched() { 1273 returned += fmt.Sprintf("%+v,", *r) 1274 } 1275 t.Errorf("Expected watch requests %s while got %s", expected, returned) 1276 } 1277} 1278 1279func TestWatcher(t *testing.T) { 1280 _ = log.Open(log.Console, log.Debug, "", 0) 1281 1282 manager := mockManager{sink: make(chan performer, 10)} 1283 plugin.ClearRegistry() 1284 plugins := make([]plugin.Accessor, 3) 1285 for i := range plugins { 1286 plugins[i] = &mockWatcherPlugin{Base: plugin.Base{}, mockPlugin: mockPlugin{now: &manager.now}} 1287 name := fmt.Sprintf("debug%d", i+1) 1288 plugin.RegisterMetrics(plugins[i], name, name, "Debug.") 1289 } 1290 manager.mockInit(t) 1291 1292 items := []*clientItem{ 1293 &clientItem{itemid: 1, delay: "5", key: "debug1"}, 1294 &clientItem{itemid: 2, delay: "5", key: "debug2[1]"}, 1295 &clientItem{itemid: 3, delay: "5", key: "debug2[2]"}, 1296 &clientItem{itemid: 4, delay: "5", key: "debug3[1]"}, 1297 &clientItem{itemid: 5, delay: "5", key: "debug3[2]"}, 1298 &clientItem{itemid: 6, delay: "5", key: "debug3[3]"}, 1299 } 1300 1301 calls := []map[string][]int{ 1302 map[string][]int{"$watch": []int{1, 2, 3, 4, 5}}, 1303 map[string][]int{"$watch": []int{1, 2, 3, 4, 5}}, 1304 map[string][]int{"$watch": []int{1, 2, 3, 5}}, 1305 } 1306 1307 var cache resultCacheMock 1308 update := updateRequest{ 1309 clientID: agent.MaxBuiltinClientID + 1, 1310 sink: &cache, 1311 requests: make([]*plugin.Request, 0), 1312 } 1313 1314 var lastLogsize uint64 1315 var mtime int 1316 for _, item := range items { 1317 update.requests = append(update.requests, &plugin.Request{ 1318 Itemid: item.itemid, 1319 Key: item.key, 1320 Delay: item.delay, 1321 LastLogsize: &lastLogsize, 1322 Mtime: &mtime, 1323 }) 1324 } 1325 manager.update(&update) 1326 manager.mockTasks() 1327 manager.iterate(t, 1) 1328 manager.checkPluginTimeline(t, plugins, calls, 1) 1329 1330 checkWatchRequests(t, plugins[0], update.requests[0:1]) 1331 checkWatchRequests(t, plugins[1], update.requests[1:3]) 1332 checkWatchRequests(t, plugins[2], update.requests[3:6]) 1333 1334 update.requests = update.requests[:5] 1335 manager.update(&update) 1336 manager.mockTasks() 1337 manager.iterate(t, 1) 1338 manager.checkPluginTimeline(t, plugins, calls, 1) 1339 1340 checkWatchRequests(t, plugins[0], update.requests[0:1]) 1341 checkWatchRequests(t, plugins[1], update.requests[1:3]) 1342 checkWatchRequests(t, plugins[2], update.requests[3:5]) 1343 1344 update.requests = update.requests[:3] 1345 manager.update(&update) 1346 manager.mockTasks() 1347 manager.iterate(t, 1) 1348 manager.checkPluginTimeline(t, plugins, calls, 1) 1349 1350 checkWatchRequests(t, plugins[0], update.requests[0:1]) 1351 checkWatchRequests(t, plugins[1], update.requests[1:3]) 1352 1353 update.requests = update.requests[:2] 1354 manager.update(&update) 1355 manager.mockTasks() 1356 manager.iterate(t, 1) 1357 manager.checkPluginTimeline(t, plugins, calls, 1) 1358 1359 checkWatchRequests(t, plugins[0], update.requests[0:1]) 1360 checkWatchRequests(t, plugins[1], update.requests[1:2]) 1361 1362 update.requests = update.requests[:6] 1363 manager.update(&update) 1364 manager.mockTasks() 1365 manager.iterate(t, 1) 1366 manager.checkPluginTimeline(t, plugins, calls, 1) 1367 1368 checkWatchRequests(t, plugins[0], update.requests[0:1]) 1369 checkWatchRequests(t, plugins[1], update.requests[1:3]) 1370 checkWatchRequests(t, plugins[2], update.requests[3:6]) 1371} 1372 1373func TestCollectorExporterSchedule(t *testing.T) { 1374 _ = log.Open(log.Console, log.Debug, "", 0) 1375 1376 manager := mockManager{sink: make(chan performer, 10)} 1377 plugin.ClearRegistry() 1378 plugins := make([]plugin.Accessor, 1) 1379 for i := range plugins { 1380 plugins[i] = &mockCollectorExporterPlugin{Base: plugin.Base{}, mockPlugin: mockPlugin{now: &manager.now}, period: 2} 1381 plugin.RegisterMetrics(plugins[i], "debug", "debug", "Debug.") 1382 } 1383 manager.mockInit(t) 1384 1385 items := []*clientItem{ 1386 &clientItem{itemid: 1, delay: "2", key: "debug[1]"}, 1387 &clientItem{itemid: 2, delay: "2", key: "debug[2]"}, 1388 &clientItem{itemid: 3, delay: "2", key: "debug[3]"}, 1389 } 1390 1391 calls := []map[string][]int{ 1392 map[string][]int{"debug": []int{3, 3, 3, 5, 5, 5, 7, 7, 7, 9, 9, 9}, "$collect": []int{2, 4, 6, 8, 10}}, 1393 } 1394 1395 var cache resultCacheMock 1396 update := updateRequest{ 1397 clientID: agent.MaxBuiltinClientID + 1, 1398 sink: &cache, 1399 requests: make([]*plugin.Request, 0), 1400 } 1401 1402 var lastLogsize uint64 1403 var mtime int 1404 for _, item := range items { 1405 update.requests = append(update.requests, &plugin.Request{ 1406 Itemid: item.itemid, 1407 Key: item.key, 1408 Delay: item.delay, 1409 LastLogsize: &lastLogsize, 1410 Mtime: &mtime, 1411 }) 1412 } 1413 manager.update(&update) 1414 manager.mockTasks() 1415 manager.iterate(t, 10) 1416 1417 manager.checkPluginTimeline(t, plugins, calls, 10) 1418} 1419 1420func TestRunnerWatcher(t *testing.T) { 1421 _ = log.Open(log.Console, log.Debug, "", 0) 1422 1423 manager := mockManager{sink: make(chan performer, 10)} 1424 plugin.ClearRegistry() 1425 plugins := make([]plugin.Accessor, 3) 1426 for i := range plugins { 1427 plugins[i] = &mockRunnerWatcherPlugin{Base: plugin.Base{}, mockPlugin: mockPlugin{now: &manager.now}} 1428 name := fmt.Sprintf("debug%d", i+1) 1429 plugin.RegisterMetrics(plugins[i], name, name, "Debug.") 1430 } 1431 manager.mockInit(t) 1432 1433 items := []*clientItem{ 1434 &clientItem{itemid: 1, delay: "5", key: "debug1"}, 1435 &clientItem{itemid: 2, delay: "5", key: "debug2[1]"}, 1436 &clientItem{itemid: 3, delay: "5", key: "debug2[2]"}, 1437 &clientItem{itemid: 4, delay: "5", key: "debug3[1]"}, 1438 &clientItem{itemid: 5, delay: "5", key: "debug3[2]"}, 1439 &clientItem{itemid: 6, delay: "5", key: "debug3[3]"}, 1440 } 1441 1442 calls := []map[string][]int{ 1443 map[string][]int{"$watch": []int{2, 6, 11, 16}, "$start": []int{1}, "$stop": []int{17}}, 1444 map[string][]int{"$watch": []int{2, 6, 11, 22, 26}, "$start": []int{1, 21}, "$stop": []int{12, 27}}, 1445 map[string][]int{"$watch": []int{2, 6, 27}, "$start": []int{1, 26}, "$stop": []int{7}}, 1446 } 1447 1448 var cache resultCacheMock 1449 update := updateRequest{ 1450 clientID: agent.MaxBuiltinClientID + 1, 1451 sink: &cache, 1452 requests: make([]*plugin.Request, 0), 1453 } 1454 1455 var lastLogsize uint64 1456 var mtime int 1457 for _, item := range items { 1458 update.requests = append(update.requests, &plugin.Request{ 1459 Itemid: item.itemid, 1460 Key: item.key, 1461 Delay: item.delay, 1462 LastLogsize: &lastLogsize, 1463 Mtime: &mtime, 1464 }) 1465 } 1466 manager.update(&update) 1467 manager.mockTasks() 1468 manager.iterate(t, 5) 1469 manager.checkPluginTimeline(t, plugins, calls, 5) 1470 1471 checkWatchRequests(t, plugins[0], update.requests[0:1]) 1472 checkWatchRequests(t, plugins[1], update.requests[1:3]) 1473 checkWatchRequests(t, plugins[2], update.requests[3:6]) 1474 1475 update.requests = update.requests[:3] 1476 manager.update(&update) 1477 manager.mockTasks() 1478 manager.iterate(t, 5) 1479 manager.checkPluginTimeline(t, plugins, calls, 5) 1480 1481 checkWatchRequests(t, plugins[0], update.requests[0:1]) 1482 checkWatchRequests(t, plugins[1], update.requests[1:3]) 1483 1484 update.requests = update.requests[:1] 1485 manager.update(&update) 1486 manager.mockTasks() 1487 manager.iterate(t, 5) 1488 manager.checkPluginTimeline(t, plugins, calls, 5) 1489 1490 checkWatchRequests(t, plugins[0], update.requests[0:1]) 1491 1492 update.requests = update.requests[:0] 1493 manager.update(&update) 1494 manager.mockTasks() 1495 manager.iterate(t, 5) 1496 manager.checkPluginTimeline(t, plugins, calls, 5) 1497 1498 update.requests = update.requests[1:3] 1499 manager.update(&update) 1500 manager.mockTasks() 1501 manager.iterate(t, 5) 1502 manager.checkPluginTimeline(t, plugins, calls, 5) 1503 1504 checkWatchRequests(t, plugins[1], update.requests[:2]) 1505 1506 update.requests = update.requests[2:5] 1507 manager.update(&update) 1508 manager.mockTasks() 1509 manager.iterate(t, 5) 1510 manager.checkPluginTimeline(t, plugins, calls, 5) 1511 1512 checkWatchRequests(t, plugins[2], update.requests[0:3]) 1513} 1514 1515func TestMultiCollectorExporterSchedule(t *testing.T) { 1516 _ = log.Open(log.Console, log.Debug, "", 0) 1517 1518 manager := mockManager{sink: make(chan performer, 10)} 1519 plugin.ClearRegistry() 1520 plugins := make([]plugin.Accessor, 1) 1521 for i := range plugins { 1522 plugins[i] = &mockCollectorExporterPlugin{Base: plugin.Base{}, mockPlugin: mockPlugin{now: &manager.now}, period: 2} 1523 plugin.RegisterMetrics(plugins[i], "debug", "debug", "Debug.") 1524 } 1525 manager.mockInit(t) 1526 1527 items := []*clientItem{ 1528 &clientItem{itemid: 1, delay: "2", key: "debug[1]"}, 1529 } 1530 1531 calls := []map[string][]int{ 1532 map[string][]int{"debug": []int{3, 3, 5, 5, 7, 9}, "$collect": []int{2, 4, 6, 8, 10}}, 1533 } 1534 1535 var cache resultCacheMock 1536 update := updateRequest{ 1537 clientID: agent.MaxBuiltinClientID + 1, 1538 sink: &cache, 1539 requests: make([]*plugin.Request, 0), 1540 } 1541 1542 var lastLogsize uint64 1543 var mtime int 1544 for _, item := range items { 1545 update.requests = append(update.requests, &plugin.Request{ 1546 Itemid: item.itemid, 1547 Key: item.key, 1548 Delay: item.delay, 1549 LastLogsize: &lastLogsize, 1550 Mtime: &mtime, 1551 }) 1552 } 1553 manager.update(&update) 1554 update.clientID = agent.MaxBuiltinClientID + 2 1555 manager.update(&update) 1556 manager.mockTasks() 1557 manager.iterate(t, 5) 1558 manager.checkPluginTimeline(t, plugins, calls, 5) 1559 1560 update.requests = update.requests[:0] 1561 manager.update(&update) 1562 manager.mockTasks() 1563 manager.iterate(t, 5) 1564 manager.checkPluginTimeline(t, plugins, calls, 5) 1565 1566 update.clientID = agent.MaxBuiltinClientID + 1 1567 manager.update(&update) 1568 manager.mockTasks() 1569 manager.iterate(t, 5) 1570 manager.checkPluginTimeline(t, plugins, calls, 5) 1571} 1572 1573func TestMultiRunnerWatcher(t *testing.T) { 1574 _ = log.Open(log.Console, log.Debug, "", 0) 1575 1576 manager := mockManager{sink: make(chan performer, 10)} 1577 plugin.ClearRegistry() 1578 plugins := make([]plugin.Accessor, 1) 1579 for i := range plugins { 1580 plugins[i] = &mockRunnerWatcherPlugin{Base: plugin.Base{}, mockPlugin: mockPlugin{now: &manager.now}} 1581 plugin.RegisterMetrics(plugins[i], "debug", "debug", "Debug.") 1582 } 1583 manager.mockInit(t) 1584 1585 items := []*clientItem{ 1586 &clientItem{itemid: 1, delay: "5", key: "debug[1]"}, 1587 &clientItem{itemid: 2, delay: "5", key: "debug[2]"}, 1588 &clientItem{itemid: 3, delay: "5", key: "debug[3]"}, 1589 } 1590 1591 calls := []map[string][]int{ 1592 map[string][]int{"$watch": []int{2, 3, 6, 7, 11, 17, 21}, "$start": []int{1, 16}, "$stop": []int{12}}, 1593 } 1594 1595 var cache resultCacheMock 1596 update := updateRequest{ 1597 clientID: agent.MaxBuiltinClientID + 1, 1598 sink: &cache, 1599 requests: make([]*plugin.Request, 0), 1600 } 1601 1602 var lastLogsize uint64 1603 var mtime int 1604 for _, item := range items { 1605 update.requests = append(update.requests, &plugin.Request{ 1606 Itemid: item.itemid, 1607 Key: item.key, 1608 Delay: item.delay, 1609 LastLogsize: &lastLogsize, 1610 Mtime: &mtime, 1611 }) 1612 } 1613 manager.update(&update) 1614 update.clientID = agent.MaxBuiltinClientID + 2 1615 manager.update(&update) 1616 manager.mockTasks() 1617 manager.iterate(t, 5) 1618 manager.checkPluginTimeline(t, plugins, calls, 5) 1619 1620 update.clientID = agent.MaxBuiltinClientID + 1 1621 manager.update(&update) 1622 update.clientID = agent.MaxBuiltinClientID + 2 1623 update.requests = update.requests[:0] 1624 manager.update(&update) 1625 manager.mockTasks() 1626 manager.iterate(t, 5) 1627 manager.checkPluginTimeline(t, plugins, calls, 5) 1628 1629 update.clientID = agent.MaxBuiltinClientID + 1 1630 manager.update(&update) 1631 manager.mockTasks() 1632 manager.iterate(t, 5) 1633 manager.checkPluginTimeline(t, plugins, calls, 5) 1634 1635 update.requests = update.requests[:1] 1636 update.clientID = agent.MaxBuiltinClientID + 2 1637 manager.update(&update) 1638 manager.mockTasks() 1639 manager.iterate(t, 5) 1640 manager.checkPluginTimeline(t, plugins, calls, 5) 1641 1642 update.clientID = agent.MaxBuiltinClientID + 1 1643 manager.update(&update) 1644 manager.mockTasks() 1645 manager.iterate(t, 5) 1646 manager.checkPluginTimeline(t, plugins, calls, 5) 1647} 1648 1649func TestPassiveRunner(t *testing.T) { 1650 _ = log.Open(log.Console, log.Debug, "", 0) 1651 1652 manager := mockManager{sink: make(chan performer, 10)} 1653 plugin.ClearRegistry() 1654 plugins := make([]plugin.Accessor, 3) 1655 for i := range plugins { 1656 plugins[i] = &mockPassiveRunnerPlugin{Base: plugin.Base{}, mockPlugin: mockPlugin{now: &manager.now}} 1657 name := fmt.Sprintf("debug%d", i+1) 1658 plugin.RegisterMetrics(plugins[i], name, name, "Debug.") 1659 } 1660 manager.mockInit(t) 1661 1662 items := []*clientItem{ 1663 &clientItem{itemid: 1, delay: "5", key: "debug1"}, 1664 &clientItem{itemid: 2, delay: "5", key: "debug2"}, 1665 &clientItem{itemid: 3, delay: "5", key: "debug3"}, 1666 } 1667 1668 calls := []map[string][]int{ 1669 map[string][]int{"$start": []int{1}, "$stop": []int{}}, 1670 map[string][]int{"$start": []int{1}, "$stop": []int{3600*51 + 1}}, 1671 map[string][]int{"$start": []int{1}, "$stop": []int{3600*26 + 1}}, 1672 } 1673 1674 var cache resultCacheMock 1675 update := updateRequest{ 1676 clientID: agent.PassiveChecksClientID, 1677 sink: &cache, 1678 requests: make([]*plugin.Request, 0), 1679 } 1680 1681 var lastLogsize uint64 1682 var mtime int 1683 for _, item := range items { 1684 update.requests = append(update.requests, &plugin.Request{ 1685 Itemid: item.itemid, 1686 Key: item.key, 1687 Delay: item.delay, 1688 LastLogsize: &lastLogsize, 1689 Mtime: &mtime, 1690 }) 1691 } 1692 manager.update(&update) 1693 manager.mockTasks() 1694 manager.iterate(t, 3600) 1695 manager.checkPluginTimeline(t, plugins, calls, 3600) 1696 1697 update.requests = update.requests[:0] 1698 manager.update(&update) 1699 manager.mockTasks() 1700 manager.iterate(t, 3600) 1701 manager.checkPluginTimeline(t, plugins, calls, 3600) 1702 1703 update.requests = update.requests[:2] 1704 manager.update(&update) 1705 manager.mockTasks() 1706 manager.iterate(t, 3600*24) 1707 manager.checkPluginTimeline(t, plugins, calls, 3600*24) 1708 1709 update.requests = update.requests[:1] 1710 manager.update(&update) 1711 manager.mockTasks() 1712 manager.iterate(t, 3600*25) 1713 manager.checkPluginTimeline(t, plugins, calls, 3600*25) 1714 1715 update.requests = update.requests[:1] 1716 manager.update(&update) 1717 manager.mockTasks() 1718 manager.iterate(t, 1) 1719 manager.checkPluginTimeline(t, plugins, calls, 1) 1720} 1721 1722type configuratorOption struct { 1723 Params interface{} `conf:"optional"` 1724} 1725 1726func TestConfigurator(t *testing.T) { 1727 _ = log.Open(log.Console, log.Debug, "", 0) 1728 1729 var opt1, opt2, opt3 configuratorOption 1730 _ = conf.Unmarshal([]byte("Delay=5"), &opt1) 1731 _ = conf.Unmarshal([]byte("Delay=30"), &opt2) 1732 _ = conf.Unmarshal([]byte("Delay=60"), &opt3) 1733 1734 agent.Options.Plugins = map[string]interface{}{ 1735 "Debug1": opt1.Params, 1736 "Debug2": opt2.Params, 1737 "Debug3": opt3.Params, 1738 } 1739 1740 manager := mockManager{sink: make(chan performer, 10)} 1741 plugin.ClearRegistry() 1742 plugins := make([]plugin.Accessor, 3) 1743 for i := range plugins { 1744 name := fmt.Sprintf("debug%d", i+1) 1745 plugins[i] = &mockConfiguratorPlugin{ 1746 Base: plugin.Base{}, 1747 mockPlugin: mockPlugin{now: &manager.now}, 1748 options: agent.Options.Plugins[name]} 1749 plugin.RegisterMetrics(plugins[i], name, name, "Debug.") 1750 } 1751 manager.mockInit(t) 1752 1753 items := []*clientItem{ 1754 &clientItem{itemid: 1, delay: "5", key: "debug1"}, 1755 &clientItem{itemid: 2, delay: "5", key: "debug2"}, 1756 &clientItem{itemid: 3, delay: "5", key: "debug3"}, 1757 } 1758 1759 calls := []map[string][]int{ 1760 map[string][]int{"$configure": []int{1}}, 1761 map[string][]int{"$configure": []int{6}}, 1762 map[string][]int{"$configure": []int{11}}, 1763 } 1764 1765 var cache resultCacheMock 1766 update := updateRequest{ 1767 clientID: agent.MaxBuiltinClientID + 1, 1768 sink: &cache, 1769 requests: make([]*plugin.Request, 0), 1770 } 1771 1772 var lastLogsize uint64 1773 var mtime int 1774 for _, item := range items { 1775 update.requests = append(update.requests, &plugin.Request{ 1776 Itemid: item.itemid, 1777 Key: item.key, 1778 Delay: item.delay, 1779 LastLogsize: &lastLogsize, 1780 Mtime: &mtime, 1781 }) 1782 } 1783 update.requests = update.requests[:1] 1784 manager.update(&update) 1785 manager.mockTasks() 1786 manager.iterate(t, 5) 1787 manager.checkPluginTimeline(t, plugins, calls, 5) 1788 1789 update.requests = update.requests[:2] 1790 manager.update(&update) 1791 manager.mockTasks() 1792 manager.iterate(t, 5) 1793 manager.checkPluginTimeline(t, plugins, calls, 5) 1794 1795 update.requests = update.requests[:3] 1796 manager.update(&update) 1797 manager.mockTasks() 1798 manager.iterate(t, 5) 1799 manager.checkPluginTimeline(t, plugins, calls, 5) 1800} 1801