1/*
2** Zabbix
3** Copyright (C) 2001-2021 Zabbix SIA
4**
5** This program is free software; you can redistribute it and/or modify
6** it under the terms of the GNU General Public License as published by
7** the Free Software Foundation; either version 2 of the License, or
8** (at your option) any later version.
9**
10** This program is distributed in the hope that it will be useful,
11** but WITHOUT ANY WARRANTY; without even the implied warranty of
12** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13** GNU General Public License for more details.
14**
15** You should have received a copy of the GNU General Public License
16** along with this program; if not, write to the Free Software
17** Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
18**/
19
20package scheduler
21
22import (
23	"container/heap"
24	"fmt"
25	"reflect"
26	"strconv"
27	"testing"
28	"time"
29
30	"zabbix.com/internal/agent"
31	"zabbix.com/internal/agent/alias"
32	"zabbix.com/pkg/conf"
33	"zabbix.com/pkg/itemutil"
34	"zabbix.com/pkg/log"
35	"zabbix.com/pkg/plugin"
36)
37
38// getNextCheck calculates simplified nextcheck based on the specified delay string and current time
39func getNextcheck(delay string, from time.Time) (nextcheck time.Time) {
40	simple_delay, _ := strconv.ParseInt(delay, 10, 64)
41	from_seconds := from.Unix()
42	return time.Unix(from_seconds-from_seconds%simple_delay+simple_delay, 0)
43}
44
45type callTracker interface {
46	call(key string)
47	called() map[string][]time.Time
48}
49
50type mockPlugin struct {
51	calls map[string][]time.Time
52	now   *time.Time
53}
54
55func (p *mockPlugin) call(key string) {
56	if p.calls == nil {
57		p.calls = make(map[string][]time.Time)
58	}
59	if p.calls[key] == nil {
60		p.calls[key] = make([]time.Time, 0, 20)
61	}
62	p.calls[key] = append(p.calls[key], *p.now)
63}
64
65func (p *mockPlugin) called() map[string][]time.Time {
66	return p.calls
67}
68
69type mockExporterPlugin struct {
70	plugin.Base
71	mockPlugin
72}
73
74func (p *mockExporterPlugin) Export(key string, params []string, ctx plugin.ContextProvider) (result interface{}, err error) {
75	p.call(key)
76	return
77}
78
79type mockCollectorPlugin struct {
80	plugin.Base
81	mockPlugin
82	period int
83}
84
85func (p *mockCollectorPlugin) Collect() (err error) {
86	p.call("$collect")
87	return
88}
89
90func (p *mockCollectorPlugin) Period() (period int) {
91	return p.period
92}
93
94type mockCollectorExporterPlugin struct {
95	plugin.Base
96	mockPlugin
97	period int
98}
99
100func (p *mockCollectorExporterPlugin) Export(key string, params []string, ctx plugin.ContextProvider) (result interface{}, err error) {
101	p.call(key)
102	return
103}
104
105func (p *mockCollectorExporterPlugin) Collect() (err error) {
106	p.call("$collect")
107	return
108}
109
110func (p *mockCollectorExporterPlugin) Period() (period int) {
111	return p.period
112}
113
114type mockRunnerPlugin struct {
115	plugin.Base
116	mockPlugin
117}
118
119func (p *mockRunnerPlugin) Start() {
120	p.call("$start")
121}
122
123func (p *mockRunnerPlugin) Stop() {
124	p.call("$stop")
125}
126
127type mockPassiveRunnerPlugin struct {
128	plugin.Base
129	mockPlugin
130}
131
132func (p *mockPassiveRunnerPlugin) Export(key string, params []string, ctx plugin.ContextProvider) (result interface{}, err error) {
133	return
134}
135func (p *mockPassiveRunnerPlugin) Start() {
136	p.call("$start")
137}
138
139func (p *mockPassiveRunnerPlugin) Stop() {
140	p.call("$stop")
141}
142
143type watchTracker interface {
144	watched() []*plugin.Request
145}
146
147type mockWatcherPlugin struct {
148	plugin.Base
149	mockPlugin
150	requests []*plugin.Request
151}
152
153func (p *mockWatcherPlugin) Watch(requests []*plugin.Request, ctx plugin.ContextProvider) {
154	p.call("$watch")
155	p.requests = requests
156}
157
158func (p *mockWatcherPlugin) watched() []*plugin.Request {
159	return p.requests
160}
161
162type mockRunnerWatcherPlugin struct {
163	plugin.Base
164	mockPlugin
165	requests []*plugin.Request
166}
167
168func (p *mockRunnerWatcherPlugin) Start() {
169	p.call("$start")
170}
171
172func (p *mockRunnerWatcherPlugin) Stop() {
173	p.call("$stop")
174}
175
176func (p *mockRunnerWatcherPlugin) Watch(requests []*plugin.Request, ctx plugin.ContextProvider) {
177	p.call("$watch")
178	p.requests = requests
179}
180
181func (p *mockRunnerWatcherPlugin) watched() []*plugin.Request {
182	return p.requests
183}
184
185type mockConfiguratorPlugin struct {
186	plugin.Base
187	mockPlugin
188	options interface{}
189}
190
191func (p *mockConfiguratorPlugin) Configure(global *plugin.GlobalOptions, options interface{}) {
192	p.call("$configure")
193}
194
195func (p *mockConfiguratorPlugin) Validate(options interface{}) (err error) {
196	return
197}
198
199type resultCacheMock struct {
200	results []*plugin.Result
201}
202
203func (c *resultCacheMock) Write(r *plugin.Result) {
204	c.results = append(c.results, r)
205}
206
207func (c *resultCacheMock) Flush() {
208}
209
210func (pc *resultCacheMock) SlotsAvailable() int {
211	return 1
212}
213
214func (pc *resultCacheMock) PersistSlotsAvailable() int {
215	return 1
216}
217
218type mockManager struct {
219	Manager
220	sink      chan performer
221	now       time.Time
222	startTime time.Time
223}
224
225func (m *mockManager) finishTasks() {
226	for {
227		select {
228		case p := <-m.sink:
229			m.processFinishRequest(p)
230		default:
231			return
232		}
233	}
234}
235
236func (m *mockManager) iterate(t *testing.T, iters int) {
237	for i := 0; i < iters; i++ {
238		m.now = m.now.Add(time.Second)
239		m.processQueue(m.now)
240		m.finishTasks()
241	}
242}
243
244func (m *mockManager) mockInit(t *testing.T) {
245	m.init()
246	m.aliases, _ = alias.NewManager(nil)
247	clock := time.Now().Unix()
248	m.startTime = time.Unix(clock-clock%10, 100)
249	t.Logf("starting time %s", m.startTime.Format(time.Stamp))
250	m.now = m.startTime
251}
252
253func (m *mockManager) update(update *updateRequest) {
254	m.processUpdateRequest(update, m.now)
255}
256
257func (m *mockManager) mockTasks() {
258	index := make(map[exporterTaskAccessor]uint64)
259	for clientid, client := range m.clients {
260		for _, task := range client.exporters {
261			index[task] = clientid
262		}
263		client.exporters = make(map[uint64]exporterTaskAccessor)
264	}
265	for _, p := range m.plugins {
266		tasks := p.tasks
267		p.tasks = make(performerHeap, 0, len(tasks))
268		for j, task := range tasks {
269			switch t := task.(type) {
270			case *collectorTask:
271				collector := p.impl.(plugin.Collector)
272				mockTask := &mockCollectorTask{
273					taskBase: taskBase{
274						plugin:    task.getPlugin(),
275						scheduled: getNextcheck(fmt.Sprintf("%d", collector.Period()), m.now).Add(priorityCollectorTaskNs),
276						index:     -1,
277						active:    task.isActive(),
278						recurring: true,
279					},
280					sink: m.sink,
281				}
282				p.enqueueTask(mockTask)
283			case *exporterTask:
284				mockTask := &mockExporterTask{
285					exporterTask: exporterTask{
286						taskBase: taskBase{
287							plugin:    task.getPlugin(),
288							scheduled: getNextcheck(t.item.delay, m.now).Add(priorityExporterTaskNs),
289							index:     -1,
290							active:    task.isActive(),
291							recurring: true,
292						},
293						item:   t.item,
294						client: t.client,
295						meta:   t.meta,
296					},
297					sink: m.sink,
298				}
299				p.enqueueTask(mockTask)
300				m.clients[index[t]].exporters[t.item.itemid] = mockTask
301			case *directExporterTask:
302				mockTask := &mockExporterTask{
303					exporterTask: exporterTask{
304						taskBase: taskBase{
305							plugin:    task.getPlugin(),
306							scheduled: getNextcheck(t.item.delay, m.now).Add(priorityExporterTaskNs),
307							index:     -1,
308							active:    task.isActive(),
309							recurring: true,
310						},
311						item:   t.item,
312						client: t.client,
313						meta:   t.meta,
314					},
315					sink: m.sink,
316				}
317				p.enqueueTask(mockTask)
318			case *starterTask:
319				mockTask := &mockStarterTask{
320					taskBase: taskBase{
321						plugin:    task.getPlugin(),
322						scheduled: m.now,
323						index:     -1,
324						active:    task.isActive(),
325					},
326					sink: m.sink,
327				}
328				p.enqueueTask(mockTask)
329			case *stopperTask:
330				mockTask := &mockStopperTask{
331					taskBase: taskBase{
332						plugin:    task.getPlugin(),
333						scheduled: m.now.Add(priorityStopperTaskNs),
334						index:     -1,
335						active:    task.isActive(),
336					},
337					sink: m.sink,
338				}
339				p.enqueueTask(mockTask)
340			case *watcherTask:
341				mockTask := &mockWatcherTask{
342					taskBase: taskBase{
343						plugin:    task.getPlugin(),
344						scheduled: m.now.Add(priorityWatcherTaskNs),
345						index:     -1,
346						active:    task.isActive(),
347					},
348					sink:     m.sink,
349					requests: t.requests,
350					client:   t.client,
351				}
352				p.enqueueTask(mockTask)
353			case *configuratorTask:
354				mockTask := &mockConfigerTask{
355					taskBase: taskBase{
356						plugin:    task.getPlugin(),
357						scheduled: m.now.Add(priorityWatcherTaskNs),
358						index:     -1,
359						active:    task.isActive(),
360					},
361					options: t.options,
362					sink:    m.sink,
363				}
364				p.enqueueTask(mockTask)
365			default:
366				p.enqueueTask(task)
367			}
368			tasks[j].setIndex(-1)
369		}
370		m.pluginQueue.Update(p)
371	}
372}
373
374// checks if the times timestamps match the offsets within the specified range
375func (m *mockManager) checkTimeline(t *testing.T, name string, times []time.Time, offsets []int, iters int) {
376	start := m.now.Add(-time.Second * time.Duration(iters-1))
377	to := int(m.now.Sub(m.startTime) / time.Second)
378	from := to - iters + 1
379	var left, right int
380
381	// find the range start in timestamps
382	if len(times) != 0 {
383		for times[left].Before(start) {
384			left++
385			if left == len(times) {
386				break
387			}
388		}
389	}
390
391	// find the range start in offsets
392	if len(offsets) != 0 {
393		for offsets[right] < from {
394			right++
395			if right == len(offsets) {
396				break
397			}
398		}
399	}
400
401	for left < len(times) && right < len(offsets) {
402		if times[left].After(m.now) {
403			if offsets[right] <= to {
404				t.Errorf("Plugin %s: no matching timestamp for offset %d", name, offsets[right])
405			}
406			return
407		}
408		if offsets[right] > to {
409			t.Errorf("Plugin %s: no matching offset for timestamp %s", name, times[left].Format(time.Stamp))
410			return
411		}
412
413		offsetTime := m.startTime.Add(time.Second * time.Duration(offsets[right]))
414		if !offsetTime.Equal(times[left]) {
415			t.Errorf("Plugin %s: offset %d time %s does not match timestamp %s", name, offsets[right],
416				offsetTime.Format(time.Stamp), times[left].Format(time.Stamp))
417			return
418		}
419		left++
420		right++
421	}
422	if left != len(times) && !times[left].After(m.now) {
423		t.Errorf("Plugin %s: no matching offset for timestamp %s", name, times[left].Format(time.Stamp))
424		return
425	}
426
427	if right != len(offsets) && offsets[right] <= to {
428		t.Errorf("Plugin %s: no matching timestamp for offset %d", name, offsets[right])
429		return
430	}
431}
432
433// checks plugin call timeline within the specified range
434func (m *mockManager) checkPluginTimeline(t *testing.T, plugins []plugin.Accessor, calls []map[string][]int, iters int) {
435	for i, p := range plugins {
436		tracker := p.(callTracker).called()
437		for key, offsets := range calls[i] {
438			m.checkTimeline(t, p.Name()+":"+key, tracker[key], offsets, iters)
439		}
440	}
441}
442
443type mockExporterTask struct {
444	exporterTask
445	sink chan performer
446}
447
448func (t *mockExporterTask) perform(s Scheduler) {
449	key, params, _ := itemutil.ParseKey(t.item.key)
450	_, _ = t.plugin.impl.(plugin.Exporter).Export(key, params, t)
451	t.sink <- t
452}
453
454func (t *mockExporterTask) reschedule(now time.Time) (err error) {
455	t.scheduled = getNextcheck(t.item.delay, t.scheduled)
456	return
457}
458
459func (t *mockExporterTask) task() (task *exporterTask) {
460	return &t.exporterTask
461}
462
463// plugin.ContextProvider interface
464
465func (t *mockExporterTask) Output() (output plugin.ResultWriter) {
466	return nil
467}
468
469func (t *mockExporterTask) Meta() (meta *plugin.Meta) {
470	return &t.meta
471}
472
473func (t *mockExporterTask) GlobalRegexp() plugin.RegexpMatcher {
474	return t.client.GlobalRegexp()
475}
476
477type mockCollectorTask struct {
478	taskBase
479	sink chan performer
480}
481
482func (t *mockCollectorTask) perform(s Scheduler) {
483	_ = t.plugin.impl.(plugin.Collector).Collect()
484	t.sink <- t
485}
486
487func (t *mockCollectorTask) reschedule(now time.Time) (err error) {
488	t.scheduled = getNextcheck(fmt.Sprintf("%d", t.plugin.impl.(plugin.Collector).Period()), t.scheduled)
489	return
490}
491
492func (t *mockCollectorTask) getWeight() int {
493	return t.plugin.maxCapacity
494}
495
496type mockStarterTask struct {
497	taskBase
498	sink chan performer
499}
500
501func (t *mockStarterTask) perform(s Scheduler) {
502	t.plugin.impl.(plugin.Runner).Start()
503	t.sink <- t
504}
505
506func (t *mockStarterTask) reschedule(now time.Time) (err error) {
507	return
508}
509
510func (t *mockStarterTask) getWeight() int {
511	return t.plugin.maxCapacity
512}
513
514type mockStopperTask struct {
515	taskBase
516	sink chan performer
517}
518
519func (t *mockStopperTask) perform(s Scheduler) {
520	t.plugin.impl.(plugin.Runner).Stop()
521	t.sink <- t
522}
523
524func (t *mockStopperTask) reschedule(now time.Time) (err error) {
525	return
526}
527
528func (t *mockStopperTask) getWeight() int {
529	return t.plugin.maxCapacity
530}
531
532type mockWatcherTask struct {
533	taskBase
534	sink       chan performer
535	resultSink plugin.ResultWriter
536	requests   []*plugin.Request
537	client     ClientAccessor
538}
539
540func (t *mockWatcherTask) perform(s Scheduler) {
541	log.Debugf("%s %v", t.plugin.impl.Name(), t.requests)
542	t.plugin.impl.(plugin.Watcher).Watch(t.requests, t)
543	t.sink <- t
544}
545
546func (t *mockWatcherTask) reschedule(now time.Time) (err error) {
547	return
548}
549
550func (t *mockWatcherTask) getWeight() int {
551	return t.plugin.maxCapacity
552}
553
554// plugin.ContextProvider interface
555
556func (t *mockWatcherTask) ClientID() (clientid uint64) {
557	return t.client.ID()
558}
559
560func (t *mockWatcherTask) ItemID() (itemid uint64) {
561	return 0
562}
563
564func (t *mockWatcherTask) Output() (output plugin.ResultWriter) {
565	return t.resultSink
566}
567
568func (t *mockWatcherTask) Meta() (meta *plugin.Meta) {
569	return nil
570}
571
572func (t *mockWatcherTask) GlobalRegexp() plugin.RegexpMatcher {
573	return t.client.GlobalRegexp()
574}
575
576type mockConfigerTask struct {
577	taskBase
578	sink    chan performer
579	options *agent.AgentOptions
580}
581
582func (t *mockConfigerTask) perform(s Scheduler) {
583	t.plugin.impl.(plugin.Configurator).Configure(agent.GlobalOptions(t.options), t.options.Plugins[t.plugin.name()])
584	t.sink <- t
585}
586
587func (t *mockConfigerTask) reschedule(now time.Time) (err error) {
588	return
589}
590
591func (t *mockConfigerTask) getWeight() int {
592	return t.plugin.maxCapacity
593}
594
595func checkExporterTasks(t *testing.T, m *Manager, clientID uint64, items []*clientItem) {
596	lastCheck := time.Time{}
597	n := 0
598	for p := m.pluginQueue.Peek(); p != nil; p = m.pluginQueue.Peek() {
599		if task := p.peekTask(); task != nil {
600			if task.getScheduled().Before(lastCheck) {
601				t.Errorf("Out of order tasks detected")
602			}
603			heap.Pop(&m.pluginQueue)
604			p.popTask()
605			n++
606			if p.peekTask() != nil {
607				heap.Push(&m.pluginQueue, p)
608			}
609		} else {
610			heap.Pop(&m.pluginQueue)
611		}
612	}
613	if len(items) != n {
614		t.Errorf("Expected %d tasks while got %d", len(items), n)
615	}
616
617	var requestClient *client
618	var ok bool
619	if requestClient, ok = m.clients[clientID]; !ok {
620		t.Errorf("Cannot find owner of the default client")
621		return
622	}
623
624	for _, item := range items {
625		if tacc, ok := requestClient.exporters[item.itemid]; ok {
626			ti := tacc.task().item
627			if ti.delay != item.delay {
628				t.Errorf("Expected item %d delay %s while got %s", item.itemid, item.delay, ti.delay)
629			}
630			if ti.key != item.key {
631				t.Errorf("Expected item %d key %s while got %s", item.itemid, item.key, ti.key)
632			}
633		} else {
634			t.Errorf("Item %d was not queued", item.itemid)
635		}
636	}
637
638	if len(items) != len(requestClient.exporters) {
639		t.Errorf("Expected %d queued items while got %d", len(items), len(requestClient.exporters))
640	}
641}
642
643func TestTaskCreate(t *testing.T) {
644	_ = log.Open(log.Console, log.Debug, "", 0)
645
646	plugin.ClearRegistry()
647	plugins := make([]mockExporterPlugin, 3)
648	for i := range plugins {
649		p := &plugins[i]
650		name := fmt.Sprintf("debug%d", i+1)
651		plugin.RegisterMetrics(p, name, name, "Debug.")
652	}
653
654	manager, _ := NewManager(&agent.Options)
655
656	items := []*clientItem{
657		&clientItem{itemid: 1, delay: "151", key: "debug1"},
658		&clientItem{itemid: 2, delay: "103", key: "debug2"},
659		&clientItem{itemid: 3, delay: "79", key: "debug3"},
660		&clientItem{itemid: 4, delay: "17", key: "debug1"},
661		&clientItem{itemid: 5, delay: "7", key: "debug2"},
662		&clientItem{itemid: 6, delay: "1", key: "debug3"},
663		&clientItem{itemid: 7, delay: "63", key: "debug1"},
664		&clientItem{itemid: 8, delay: "47", key: "debug2"},
665		&clientItem{itemid: 9, delay: "31", key: "debug3"},
666	}
667
668	var cache resultCacheMock
669	update := updateRequest{
670		clientID: agent.MaxBuiltinClientID + 1,
671		sink:     &cache,
672		requests: make([]*plugin.Request, 0),
673	}
674
675	var lastLogsize uint64
676	var mtime int
677	for _, item := range items {
678		update.requests = append(update.requests, &plugin.Request{
679			Itemid:      item.itemid,
680			Key:         item.key,
681			Delay:       item.delay,
682			LastLogsize: &lastLogsize,
683			Mtime:       &mtime,
684		})
685	}
686	manager.processUpdateRequest(&update, time.Now())
687
688	if len(manager.pluginQueue) != 3 {
689		t.Errorf("Expected %d plugins queued while got %d", 3, len(manager.pluginQueue))
690	}
691
692	checkExporterTasks(t, manager, agent.MaxBuiltinClientID+1, items)
693}
694
695func TestTaskUpdate(t *testing.T) {
696	_ = log.Open(log.Console, log.Debug, "", 0)
697
698	plugin.ClearRegistry()
699	plugins := make([]mockExporterPlugin, 3)
700	for i := range plugins {
701		p := &plugins[i]
702		name := fmt.Sprintf("debug%d", i+1)
703		plugin.RegisterMetrics(p, name, name, "Debug.")
704	}
705
706	manager, _ := NewManager(&agent.Options)
707
708	items := []*clientItem{
709		&clientItem{itemid: 1, delay: "151", key: "debug1"},
710		&clientItem{itemid: 2, delay: "103", key: "debug2"},
711		&clientItem{itemid: 3, delay: "79", key: "debug3"},
712		&clientItem{itemid: 4, delay: "17", key: "debug1"},
713		&clientItem{itemid: 5, delay: "7", key: "debug2"},
714		&clientItem{itemid: 6, delay: "1", key: "debug3"},
715		&clientItem{itemid: 7, delay: "63", key: "debug1"},
716		&clientItem{itemid: 8, delay: "47", key: "debug2"},
717		&clientItem{itemid: 9, delay: "31", key: "debug3"},
718	}
719
720	var cache resultCacheMock
721	update := updateRequest{
722		clientID: agent.MaxBuiltinClientID + 1,
723		sink:     &cache,
724		requests: make([]*plugin.Request, 0),
725	}
726
727	var lastLogsize uint64
728	var mtime int
729	for _, item := range items {
730		update.requests = append(update.requests, &plugin.Request{
731			Itemid:      item.itemid,
732			Key:         item.key,
733			Delay:       item.delay,
734			LastLogsize: &lastLogsize,
735			Mtime:       &mtime,
736		})
737	}
738	manager.processUpdateRequest(&update, time.Now())
739
740	for _, item := range items {
741		item.delay = "10" + item.delay
742		item.key = item.key + "[1]"
743	}
744	update.requests = update.requests[:0]
745	for _, item := range items {
746		update.requests = append(update.requests, &plugin.Request{
747			Itemid:      item.itemid,
748			Key:         item.key,
749			Delay:       item.delay,
750			LastLogsize: &lastLogsize,
751			Mtime:       &mtime,
752		})
753	}
754	manager.processUpdateRequest(&update, time.Now())
755
756	if len(manager.pluginQueue) != 3 {
757		t.Errorf("Expected %d plugins queued while got %d", 3, len(manager.pluginQueue))
758	}
759
760	checkExporterTasks(t, manager, agent.MaxBuiltinClientID+1, items)
761}
762
763func TestTaskUpdateInvalidInterval(t *testing.T) {
764	_ = log.Open(log.Console, log.Debug, "", 0)
765
766	plugin.ClearRegistry()
767	plugins := make([]mockExporterPlugin, 3)
768	for i := range plugins {
769		p := &plugins[i]
770		name := fmt.Sprintf("debug%d", i+1)
771		plugin.RegisterMetrics(p, name, name, "Debug.")
772	}
773
774	manager, _ := NewManager(&agent.Options)
775
776	items := []*clientItem{
777		&clientItem{itemid: 1, delay: "151", key: "debug1"},
778		&clientItem{itemid: 2, delay: "103", key: "debug2"},
779	}
780
781	var cache resultCacheMock
782	update := updateRequest{
783		clientID: agent.MaxBuiltinClientID + 1,
784		sink:     &cache,
785		requests: make([]*plugin.Request, 0),
786	}
787
788	var lastLogsize uint64
789	var mtime int
790	for _, item := range items {
791		update.requests = append(update.requests, &plugin.Request{
792			Itemid:      item.itemid,
793			Key:         item.key,
794			Delay:       item.delay,
795			LastLogsize: &lastLogsize,
796			Mtime:       &mtime,
797		})
798	}
799	manager.processUpdateRequest(&update, time.Now())
800
801	items[0].delay = "xyz"
802	update.requests = update.requests[:0]
803	for _, item := range items {
804		update.requests = append(update.requests, &plugin.Request{
805			Itemid:      item.itemid,
806			Key:         item.key,
807			Delay:       item.delay,
808			LastLogsize: &lastLogsize,
809			Mtime:       &mtime,
810		})
811	}
812	manager.processUpdateRequest(&update, time.Now())
813
814	if len(manager.plugins["debug1"].tasks) != 0 {
815		t.Errorf("Expected %d tasks queued while got %d", 0, len(manager.plugins["debug1"].tasks))
816	}
817}
818
819func TestTaskDelete(t *testing.T) {
820	_ = log.Open(log.Console, log.Debug, "", 0)
821
822	plugin.ClearRegistry()
823	plugins := make([]mockExporterPlugin, 3)
824	for i := range plugins {
825		p := &plugins[i]
826		name := fmt.Sprintf("debug%d", i+1)
827		plugin.RegisterMetrics(p, name, name, "Debug.")
828	}
829
830	manager, _ := NewManager(&agent.Options)
831
832	items := []*clientItem{
833		&clientItem{itemid: 1, delay: "151", key: "debug1"},
834		&clientItem{itemid: 2, delay: "103", key: "debug2"},
835		&clientItem{itemid: 3, delay: "79", key: "debug3"}, // remove
836		&clientItem{itemid: 4, delay: "17", key: "debug1"},
837		&clientItem{itemid: 5, delay: "7", key: "debug2"},
838		&clientItem{itemid: 6, delay: "1", key: "debug3"}, // remove
839		&clientItem{itemid: 7, delay: "63", key: "debug1"},
840		&clientItem{itemid: 8, delay: "47", key: "debug2"}, // remove
841		&clientItem{itemid: 9, delay: "31", key: "debug3"}, // remove
842	}
843
844	var cache resultCacheMock
845	update := updateRequest{
846		clientID: agent.MaxBuiltinClientID + 1,
847		sink:     &cache,
848		requests: make([]*plugin.Request, 0),
849	}
850
851	var lastLogsize uint64
852	var mtime int
853	for _, item := range items {
854		update.requests = append(update.requests, &plugin.Request{
855			Itemid:      item.itemid,
856			Key:         item.key,
857			Delay:       item.delay,
858			LastLogsize: &lastLogsize,
859			Mtime:       &mtime,
860		})
861	}
862	manager.processUpdateRequest(&update, time.Now())
863
864	items[2] = items[6]
865	items = items[:cap(items)-4]
866	update.requests = update.requests[:0]
867	for _, item := range items {
868		update.requests = append(update.requests, &plugin.Request{
869			Itemid:      item.itemid,
870			Key:         item.key,
871			Delay:       item.delay,
872			LastLogsize: &lastLogsize,
873			Mtime:       &mtime,
874		})
875	}
876	manager.processUpdateRequest(&update, time.Now())
877
878	if len(manager.plugins["debug3"].tasks) != 0 {
879		t.Errorf("Expected %d tasks queued while got %d", 0, len(manager.plugins["debug3"].tasks))
880	}
881
882	checkExporterTasks(t, manager, agent.MaxBuiltinClientID+1, items)
883}
884
885func TestSchedule(t *testing.T) {
886	_ = log.Open(log.Console, log.Debug, "", 0)
887
888	manager := mockManager{sink: make(chan performer, 10)}
889	plugin.ClearRegistry()
890	plugins := make([]plugin.Accessor, 3)
891	for i := range plugins {
892		plugins[i] = &mockExporterPlugin{Base: plugin.Base{}, mockPlugin: mockPlugin{now: &manager.now}}
893		name := fmt.Sprintf("debug%d", i+1)
894		plugin.RegisterMetrics(plugins[i], name, name, "Debug.")
895	}
896	manager.mockInit(t)
897
898	items := []*clientItem{
899		&clientItem{itemid: 1, delay: "1", key: "debug1"},
900		&clientItem{itemid: 2, delay: "2", key: "debug2"},
901		&clientItem{itemid: 3, delay: "5", key: "debug3"},
902	}
903
904	calls := []map[string][]int{
905		map[string][]int{"debug1": []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20}},
906		map[string][]int{"debug2": []int{2, 4, 6, 8, 10, 12, 14, 16, 18, 20}},
907		map[string][]int{"debug3": []int{5, 10, 15, 20}},
908	}
909
910	var cache resultCacheMock
911	update := updateRequest{
912		clientID: agent.MaxBuiltinClientID + 1,
913		sink:     &cache,
914		requests: make([]*plugin.Request, 0),
915	}
916
917	var lastLogsize uint64
918	var mtime int
919	for _, item := range items {
920		update.requests = append(update.requests, &plugin.Request{
921			Itemid:      item.itemid,
922			Key:         item.key,
923			Delay:       item.delay,
924			LastLogsize: &lastLogsize,
925			Mtime:       &mtime,
926		})
927	}
928	manager.update(&update)
929	manager.mockTasks()
930
931	manager.iterate(t, 20)
932	manager.checkPluginTimeline(t, plugins, calls, 20)
933}
934
935func TestScheduleCapacity(t *testing.T) {
936	_ = log.Open(log.Console, log.Debug, "", 0)
937
938	manager := mockManager{sink: make(chan performer, 10)}
939	plugin.ClearRegistry()
940	plugins := make([]plugin.Accessor, 2)
941	for i := range plugins {
942		plugins[i] = &mockExporterPlugin{Base: plugin.Base{}, mockPlugin: mockPlugin{now: &manager.now}}
943		name := fmt.Sprintf("debug%d", i+1)
944		plugin.RegisterMetrics(plugins[i], name, name, "Debug.")
945	}
946	manager.mockInit(t)
947
948	p := manager.plugins["debug2"]
949	p.maxCapacity = 2
950
951	items := []*clientItem{
952		&clientItem{itemid: 1, delay: "1", key: "debug1"},
953		&clientItem{itemid: 2, delay: "2", key: "debug2"},
954		&clientItem{itemid: 3, delay: "2", key: "debug2"},
955		&clientItem{itemid: 4, delay: "2", key: "debug2"},
956	}
957
958	calls := []map[string][]int{
959		map[string][]int{"debug1": []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}},
960		map[string][]int{"debug2": []int{2, 2, 3, 4, 4, 5, 6, 6, 7, 8, 8, 9, 10, 10}},
961	}
962
963	var cache resultCacheMock
964	update := updateRequest{
965		clientID: agent.MaxBuiltinClientID + 1,
966		sink:     &cache,
967		requests: make([]*plugin.Request, 0),
968	}
969
970	var lastLogsize uint64
971	var mtime int
972	for _, item := range items {
973		update.requests = append(update.requests, &plugin.Request{
974			Itemid:      item.itemid,
975			Key:         item.key,
976			Delay:       item.delay,
977			LastLogsize: &lastLogsize,
978			Mtime:       &mtime,
979		})
980	}
981	manager.update(&update)
982	manager.mockTasks()
983
984	manager.iterate(t, 10)
985	manager.checkPluginTimeline(t, plugins, calls, 10)
986}
987
988func TestScheduleUpdate(t *testing.T) {
989	_ = log.Open(log.Console, log.Debug, "", 0)
990
991	manager := mockManager{sink: make(chan performer, 10)}
992	plugin.ClearRegistry()
993	plugins := make([]plugin.Accessor, 3)
994	for i := range plugins {
995		plugins[i] = &mockExporterPlugin{Base: plugin.Base{}, mockPlugin: mockPlugin{now: &manager.now}}
996		name := fmt.Sprintf("debug%d", i+1)
997		plugin.RegisterMetrics(plugins[i], name, name, "Debug.")
998	}
999	manager.mockInit(t)
1000
1001	items := []*clientItem{
1002		&clientItem{itemid: 1, delay: "1", key: "debug1"},
1003		&clientItem{itemid: 2, delay: "1", key: "debug2"},
1004		&clientItem{itemid: 3, delay: "1", key: "debug3"},
1005	}
1006
1007	calls := []map[string][]int{
1008		map[string][]int{"debug1": []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 16, 17, 18, 19, 20}},
1009		map[string][]int{"debug2": []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 16, 17, 18, 19, 20}},
1010		map[string][]int{"debug3": []int{1, 2, 3, 4, 5, 16, 17, 18, 19, 20}},
1011	}
1012
1013	var cache resultCacheMock
1014	update := updateRequest{
1015		clientID: agent.MaxBuiltinClientID + 1,
1016		sink:     &cache,
1017		requests: make([]*plugin.Request, 0),
1018	}
1019
1020	var lastLogsize uint64
1021	var mtime int
1022	for _, item := range items {
1023		update.requests = append(update.requests, &plugin.Request{
1024			Itemid:      item.itemid,
1025			Key:         item.key,
1026			Delay:       item.delay,
1027			LastLogsize: &lastLogsize,
1028			Mtime:       &mtime,
1029		})
1030	}
1031	manager.update(&update)
1032	manager.mockTasks()
1033	manager.iterate(t, 5)
1034	manager.checkPluginTimeline(t, plugins, calls, 5)
1035
1036	update.requests = update.requests[:2]
1037	manager.update(&update)
1038	manager.mockTasks()
1039	manager.iterate(t, 5)
1040	manager.checkPluginTimeline(t, plugins, calls, 5)
1041
1042	update.requests = update.requests[:0]
1043	manager.update(&update)
1044	manager.mockTasks()
1045	manager.iterate(t, 5)
1046	manager.checkPluginTimeline(t, plugins, calls, 5)
1047
1048	update.requests = update.requests[:3]
1049	manager.update(&update)
1050	manager.mockTasks()
1051	manager.iterate(t, 5)
1052	manager.checkPluginTimeline(t, plugins, calls, 5)
1053}
1054
1055func TestCollectorSchedule(t *testing.T) {
1056	_ = log.Open(log.Console, log.Debug, "", 0)
1057
1058	manager := mockManager{sink: make(chan performer, 10)}
1059	plugin.ClearRegistry()
1060	plugins := make([]plugin.Accessor, 1)
1061	for i := range plugins {
1062		plugins[i] = &mockCollectorPlugin{Base: plugin.Base{}, mockPlugin: mockPlugin{now: &manager.now}, period: 2}
1063		name := fmt.Sprintf("debug%d", i+1)
1064		plugin.RegisterMetrics(plugins[i], name, name, "Debug.")
1065	}
1066	manager.mockInit(t)
1067
1068	items := []*clientItem{
1069		&clientItem{itemid: 1, delay: "1", key: "debug1"},
1070	}
1071
1072	calls := []map[string][]int{
1073		map[string][]int{"$collect": []int{2, 4, 6, 8, 10, 12, 14, 16, 18, 20}},
1074	}
1075
1076	var cache resultCacheMock
1077	update := updateRequest{
1078		clientID: agent.MaxBuiltinClientID + 1,
1079		sink:     &cache,
1080		requests: make([]*plugin.Request, 0),
1081	}
1082
1083	var lastLogsize uint64
1084	var mtime int
1085	for _, item := range items {
1086		update.requests = append(update.requests, &plugin.Request{
1087			Itemid:      item.itemid,
1088			Key:         item.key,
1089			Delay:       item.delay,
1090			LastLogsize: &lastLogsize,
1091			Mtime:       &mtime,
1092		})
1093	}
1094	manager.update(&update)
1095	manager.mockTasks()
1096	manager.iterate(t, 20)
1097	manager.checkPluginTimeline(t, plugins, calls, 20)
1098}
1099
1100func TestCollectorScheduleUpdate(t *testing.T) {
1101	_ = log.Open(log.Console, log.Debug, "", 0)
1102
1103	manager := mockManager{sink: make(chan performer, 10)}
1104	plugin.ClearRegistry()
1105	plugins := make([]plugin.Accessor, 3)
1106	for i := range plugins {
1107		plugins[i] = &mockCollectorPlugin{Base: plugin.Base{}, mockPlugin: mockPlugin{now: &manager.now}, period: 2}
1108		name := fmt.Sprintf("debug%d", i+1)
1109		plugin.RegisterMetrics(plugins[i], name, name, "Debug.")
1110	}
1111	manager.mockInit(t)
1112
1113	items := []*clientItem{
1114		&clientItem{itemid: 1, delay: "5", key: "debug1"},
1115		&clientItem{itemid: 2, delay: "5", key: "debug2"},
1116		&clientItem{itemid: 3, delay: "5", key: "debug3"},
1117	}
1118
1119	calls := []map[string][]int{
1120		map[string][]int{"$collect": []int{2, 4, 6, 8, 10, 12, 14}},
1121		map[string][]int{"$collect": []int{2, 4, 6, 8, 10, 22, 24}},
1122		map[string][]int{"$collect": []int{2, 4, 22, 24}},
1123	}
1124
1125	var cache resultCacheMock
1126	update := updateRequest{
1127		clientID: agent.MaxBuiltinClientID + 1,
1128		sink:     &cache,
1129		requests: make([]*plugin.Request, 0),
1130	}
1131
1132	var lastLogsize uint64
1133	var mtime int
1134	for _, item := range items {
1135		update.requests = append(update.requests, &plugin.Request{
1136			Itemid:      item.itemid,
1137			Key:         item.key,
1138			Delay:       item.delay,
1139			LastLogsize: &lastLogsize,
1140			Mtime:       &mtime,
1141		})
1142	}
1143	manager.update(&update)
1144	manager.mockTasks()
1145	manager.iterate(t, 5)
1146	manager.checkPluginTimeline(t, plugins, calls, 5)
1147
1148	update.requests = update.requests[:2]
1149	manager.update(&update)
1150	manager.mockTasks()
1151	manager.iterate(t, 5)
1152	manager.checkPluginTimeline(t, plugins, calls, 5)
1153
1154	update.requests = update.requests[:1]
1155	manager.update(&update)
1156	manager.mockTasks()
1157	manager.iterate(t, 5)
1158	manager.checkPluginTimeline(t, plugins, calls, 5)
1159
1160	update.requests = update.requests[:0]
1161	manager.update(&update)
1162	manager.mockTasks()
1163	manager.iterate(t, 5)
1164	manager.checkPluginTimeline(t, plugins, calls, 5)
1165
1166	update.requests = update.requests[1:3]
1167	manager.update(&update)
1168	manager.mockTasks()
1169	manager.iterate(t, 5)
1170	manager.checkPluginTimeline(t, plugins, calls, 5)
1171}
1172
1173func TestRunner(t *testing.T) {
1174	_ = log.Open(log.Console, log.Debug, "", 0)
1175
1176	manager := mockManager{sink: make(chan performer, 10)}
1177	plugin.ClearRegistry()
1178	plugins := make([]plugin.Accessor, 3)
1179	for i := range plugins {
1180		plugins[i] = &mockRunnerPlugin{Base: plugin.Base{}, mockPlugin: mockPlugin{now: &manager.now}}
1181		name := fmt.Sprintf("debug%d", i+1)
1182		plugin.RegisterMetrics(plugins[i], name, name, "Debug.")
1183	}
1184	manager.mockInit(t)
1185
1186	items := []*clientItem{
1187		&clientItem{itemid: 1, delay: "5", key: "debug1"},
1188		&clientItem{itemid: 2, delay: "5", key: "debug2"},
1189		&clientItem{itemid: 3, delay: "5", key: "debug3"},
1190	}
1191
1192	calls := []map[string][]int{
1193		map[string][]int{"$start": []int{1, 5}, "$stop": []int{4, 6}},
1194		map[string][]int{"$start": []int{1, 5, 7}, "$stop": []int{3, 6, 8}},
1195		map[string][]int{"$start": []int{1, 5, 8}, "$stop": []int{2, 6}},
1196	}
1197
1198	var cache resultCacheMock
1199	update := updateRequest{
1200		clientID: agent.MaxBuiltinClientID + 1,
1201		sink:     &cache,
1202		requests: make([]*plugin.Request, 0),
1203	}
1204
1205	var lastLogsize uint64
1206	var mtime int
1207	for _, item := range items {
1208		update.requests = append(update.requests, &plugin.Request{
1209			Itemid:      item.itemid,
1210			Key:         item.key,
1211			Delay:       item.delay,
1212			LastLogsize: &lastLogsize,
1213			Mtime:       &mtime,
1214		})
1215	}
1216	manager.update(&update)
1217	manager.mockTasks()
1218	manager.iterate(t, 1)
1219	manager.checkPluginTimeline(t, plugins, calls, 1)
1220
1221	update.requests = update.requests[:2]
1222	manager.update(&update)
1223	manager.mockTasks()
1224	manager.iterate(t, 1)
1225	manager.checkPluginTimeline(t, plugins, calls, 1)
1226
1227	update.requests = update.requests[:1]
1228	manager.update(&update)
1229	manager.mockTasks()
1230	manager.iterate(t, 1)
1231	manager.checkPluginTimeline(t, plugins, calls, 1)
1232
1233	update.requests = update.requests[:0]
1234	manager.update(&update)
1235	manager.mockTasks()
1236	manager.iterate(t, 1)
1237	manager.checkPluginTimeline(t, plugins, calls, 1)
1238
1239	update.requests = update.requests[:3]
1240	manager.update(&update)
1241	manager.mockTasks()
1242	manager.iterate(t, 1)
1243	manager.checkPluginTimeline(t, plugins, calls, 1)
1244
1245	update.requests = update.requests[:0]
1246	manager.update(&update)
1247	manager.mockTasks()
1248	manager.iterate(t, 1)
1249	manager.checkPluginTimeline(t, plugins, calls, 1)
1250
1251	update.requests = update.requests[1:2]
1252	manager.update(&update)
1253	manager.mockTasks()
1254	manager.iterate(t, 1)
1255	manager.checkPluginTimeline(t, plugins, calls, 1)
1256
1257	update.requests = update.requests[1:2]
1258	manager.update(&update)
1259	manager.mockTasks()
1260	manager.iterate(t, 1)
1261	manager.checkPluginTimeline(t, plugins, calls, 1)
1262}
1263
1264func checkWatchRequests(t *testing.T, p plugin.Accessor, requests []*plugin.Request) {
1265	tracker := p.(watchTracker)
1266	if !reflect.DeepEqual(tracker.watched(), requests) {
1267		expected := ""
1268		for _, r := range requests {
1269			expected += fmt.Sprintf("%+v,", *r)
1270		}
1271		returned := ""
1272		for _, r := range tracker.watched() {
1273			returned += fmt.Sprintf("%+v,", *r)
1274		}
1275		t.Errorf("Expected watch requests %s while got %s", expected, returned)
1276	}
1277}
1278
1279func TestWatcher(t *testing.T) {
1280	_ = log.Open(log.Console, log.Debug, "", 0)
1281
1282	manager := mockManager{sink: make(chan performer, 10)}
1283	plugin.ClearRegistry()
1284	plugins := make([]plugin.Accessor, 3)
1285	for i := range plugins {
1286		plugins[i] = &mockWatcherPlugin{Base: plugin.Base{}, mockPlugin: mockPlugin{now: &manager.now}}
1287		name := fmt.Sprintf("debug%d", i+1)
1288		plugin.RegisterMetrics(plugins[i], name, name, "Debug.")
1289	}
1290	manager.mockInit(t)
1291
1292	items := []*clientItem{
1293		&clientItem{itemid: 1, delay: "5", key: "debug1"},
1294		&clientItem{itemid: 2, delay: "5", key: "debug2[1]"},
1295		&clientItem{itemid: 3, delay: "5", key: "debug2[2]"},
1296		&clientItem{itemid: 4, delay: "5", key: "debug3[1]"},
1297		&clientItem{itemid: 5, delay: "5", key: "debug3[2]"},
1298		&clientItem{itemid: 6, delay: "5", key: "debug3[3]"},
1299	}
1300
1301	calls := []map[string][]int{
1302		map[string][]int{"$watch": []int{1, 2, 3, 4, 5}},
1303		map[string][]int{"$watch": []int{1, 2, 3, 4, 5}},
1304		map[string][]int{"$watch": []int{1, 2, 3, 5}},
1305	}
1306
1307	var cache resultCacheMock
1308	update := updateRequest{
1309		clientID: agent.MaxBuiltinClientID + 1,
1310		sink:     &cache,
1311		requests: make([]*plugin.Request, 0),
1312	}
1313
1314	var lastLogsize uint64
1315	var mtime int
1316	for _, item := range items {
1317		update.requests = append(update.requests, &plugin.Request{
1318			Itemid:      item.itemid,
1319			Key:         item.key,
1320			Delay:       item.delay,
1321			LastLogsize: &lastLogsize,
1322			Mtime:       &mtime,
1323		})
1324	}
1325	manager.update(&update)
1326	manager.mockTasks()
1327	manager.iterate(t, 1)
1328	manager.checkPluginTimeline(t, plugins, calls, 1)
1329
1330	checkWatchRequests(t, plugins[0], update.requests[0:1])
1331	checkWatchRequests(t, plugins[1], update.requests[1:3])
1332	checkWatchRequests(t, plugins[2], update.requests[3:6])
1333
1334	update.requests = update.requests[:5]
1335	manager.update(&update)
1336	manager.mockTasks()
1337	manager.iterate(t, 1)
1338	manager.checkPluginTimeline(t, plugins, calls, 1)
1339
1340	checkWatchRequests(t, plugins[0], update.requests[0:1])
1341	checkWatchRequests(t, plugins[1], update.requests[1:3])
1342	checkWatchRequests(t, plugins[2], update.requests[3:5])
1343
1344	update.requests = update.requests[:3]
1345	manager.update(&update)
1346	manager.mockTasks()
1347	manager.iterate(t, 1)
1348	manager.checkPluginTimeline(t, plugins, calls, 1)
1349
1350	checkWatchRequests(t, plugins[0], update.requests[0:1])
1351	checkWatchRequests(t, plugins[1], update.requests[1:3])
1352
1353	update.requests = update.requests[:2]
1354	manager.update(&update)
1355	manager.mockTasks()
1356	manager.iterate(t, 1)
1357	manager.checkPluginTimeline(t, plugins, calls, 1)
1358
1359	checkWatchRequests(t, plugins[0], update.requests[0:1])
1360	checkWatchRequests(t, plugins[1], update.requests[1:2])
1361
1362	update.requests = update.requests[:6]
1363	manager.update(&update)
1364	manager.mockTasks()
1365	manager.iterate(t, 1)
1366	manager.checkPluginTimeline(t, plugins, calls, 1)
1367
1368	checkWatchRequests(t, plugins[0], update.requests[0:1])
1369	checkWatchRequests(t, plugins[1], update.requests[1:3])
1370	checkWatchRequests(t, plugins[2], update.requests[3:6])
1371}
1372
1373func TestCollectorExporterSchedule(t *testing.T) {
1374	_ = log.Open(log.Console, log.Debug, "", 0)
1375
1376	manager := mockManager{sink: make(chan performer, 10)}
1377	plugin.ClearRegistry()
1378	plugins := make([]plugin.Accessor, 1)
1379	for i := range plugins {
1380		plugins[i] = &mockCollectorExporterPlugin{Base: plugin.Base{}, mockPlugin: mockPlugin{now: &manager.now}, period: 2}
1381		plugin.RegisterMetrics(plugins[i], "debug", "debug", "Debug.")
1382	}
1383	manager.mockInit(t)
1384
1385	items := []*clientItem{
1386		&clientItem{itemid: 1, delay: "2", key: "debug[1]"},
1387		&clientItem{itemid: 2, delay: "2", key: "debug[2]"},
1388		&clientItem{itemid: 3, delay: "2", key: "debug[3]"},
1389	}
1390
1391	calls := []map[string][]int{
1392		map[string][]int{"debug": []int{3, 3, 3, 5, 5, 5, 7, 7, 7, 9, 9, 9}, "$collect": []int{2, 4, 6, 8, 10}},
1393	}
1394
1395	var cache resultCacheMock
1396	update := updateRequest{
1397		clientID: agent.MaxBuiltinClientID + 1,
1398		sink:     &cache,
1399		requests: make([]*plugin.Request, 0),
1400	}
1401
1402	var lastLogsize uint64
1403	var mtime int
1404	for _, item := range items {
1405		update.requests = append(update.requests, &plugin.Request{
1406			Itemid:      item.itemid,
1407			Key:         item.key,
1408			Delay:       item.delay,
1409			LastLogsize: &lastLogsize,
1410			Mtime:       &mtime,
1411		})
1412	}
1413	manager.update(&update)
1414	manager.mockTasks()
1415	manager.iterate(t, 10)
1416
1417	manager.checkPluginTimeline(t, plugins, calls, 10)
1418}
1419
1420func TestRunnerWatcher(t *testing.T) {
1421	_ = log.Open(log.Console, log.Debug, "", 0)
1422
1423	manager := mockManager{sink: make(chan performer, 10)}
1424	plugin.ClearRegistry()
1425	plugins := make([]plugin.Accessor, 3)
1426	for i := range plugins {
1427		plugins[i] = &mockRunnerWatcherPlugin{Base: plugin.Base{}, mockPlugin: mockPlugin{now: &manager.now}}
1428		name := fmt.Sprintf("debug%d", i+1)
1429		plugin.RegisterMetrics(plugins[i], name, name, "Debug.")
1430	}
1431	manager.mockInit(t)
1432
1433	items := []*clientItem{
1434		&clientItem{itemid: 1, delay: "5", key: "debug1"},
1435		&clientItem{itemid: 2, delay: "5", key: "debug2[1]"},
1436		&clientItem{itemid: 3, delay: "5", key: "debug2[2]"},
1437		&clientItem{itemid: 4, delay: "5", key: "debug3[1]"},
1438		&clientItem{itemid: 5, delay: "5", key: "debug3[2]"},
1439		&clientItem{itemid: 6, delay: "5", key: "debug3[3]"},
1440	}
1441
1442	calls := []map[string][]int{
1443		map[string][]int{"$watch": []int{2, 6, 11, 16}, "$start": []int{1}, "$stop": []int{17}},
1444		map[string][]int{"$watch": []int{2, 6, 11, 22, 26}, "$start": []int{1, 21}, "$stop": []int{12, 27}},
1445		map[string][]int{"$watch": []int{2, 6, 27}, "$start": []int{1, 26}, "$stop": []int{7}},
1446	}
1447
1448	var cache resultCacheMock
1449	update := updateRequest{
1450		clientID: agent.MaxBuiltinClientID + 1,
1451		sink:     &cache,
1452		requests: make([]*plugin.Request, 0),
1453	}
1454
1455	var lastLogsize uint64
1456	var mtime int
1457	for _, item := range items {
1458		update.requests = append(update.requests, &plugin.Request{
1459			Itemid:      item.itemid,
1460			Key:         item.key,
1461			Delay:       item.delay,
1462			LastLogsize: &lastLogsize,
1463			Mtime:       &mtime,
1464		})
1465	}
1466	manager.update(&update)
1467	manager.mockTasks()
1468	manager.iterate(t, 5)
1469	manager.checkPluginTimeline(t, plugins, calls, 5)
1470
1471	checkWatchRequests(t, plugins[0], update.requests[0:1])
1472	checkWatchRequests(t, plugins[1], update.requests[1:3])
1473	checkWatchRequests(t, plugins[2], update.requests[3:6])
1474
1475	update.requests = update.requests[:3]
1476	manager.update(&update)
1477	manager.mockTasks()
1478	manager.iterate(t, 5)
1479	manager.checkPluginTimeline(t, plugins, calls, 5)
1480
1481	checkWatchRequests(t, plugins[0], update.requests[0:1])
1482	checkWatchRequests(t, plugins[1], update.requests[1:3])
1483
1484	update.requests = update.requests[:1]
1485	manager.update(&update)
1486	manager.mockTasks()
1487	manager.iterate(t, 5)
1488	manager.checkPluginTimeline(t, plugins, calls, 5)
1489
1490	checkWatchRequests(t, plugins[0], update.requests[0:1])
1491
1492	update.requests = update.requests[:0]
1493	manager.update(&update)
1494	manager.mockTasks()
1495	manager.iterate(t, 5)
1496	manager.checkPluginTimeline(t, plugins, calls, 5)
1497
1498	update.requests = update.requests[1:3]
1499	manager.update(&update)
1500	manager.mockTasks()
1501	manager.iterate(t, 5)
1502	manager.checkPluginTimeline(t, plugins, calls, 5)
1503
1504	checkWatchRequests(t, plugins[1], update.requests[:2])
1505
1506	update.requests = update.requests[2:5]
1507	manager.update(&update)
1508	manager.mockTasks()
1509	manager.iterate(t, 5)
1510	manager.checkPluginTimeline(t, plugins, calls, 5)
1511
1512	checkWatchRequests(t, plugins[2], update.requests[0:3])
1513}
1514
1515func TestMultiCollectorExporterSchedule(t *testing.T) {
1516	_ = log.Open(log.Console, log.Debug, "", 0)
1517
1518	manager := mockManager{sink: make(chan performer, 10)}
1519	plugin.ClearRegistry()
1520	plugins := make([]plugin.Accessor, 1)
1521	for i := range plugins {
1522		plugins[i] = &mockCollectorExporterPlugin{Base: plugin.Base{}, mockPlugin: mockPlugin{now: &manager.now}, period: 2}
1523		plugin.RegisterMetrics(plugins[i], "debug", "debug", "Debug.")
1524	}
1525	manager.mockInit(t)
1526
1527	items := []*clientItem{
1528		&clientItem{itemid: 1, delay: "2", key: "debug[1]"},
1529	}
1530
1531	calls := []map[string][]int{
1532		map[string][]int{"debug": []int{3, 3, 5, 5, 7, 9}, "$collect": []int{2, 4, 6, 8, 10}},
1533	}
1534
1535	var cache resultCacheMock
1536	update := updateRequest{
1537		clientID: agent.MaxBuiltinClientID + 1,
1538		sink:     &cache,
1539		requests: make([]*plugin.Request, 0),
1540	}
1541
1542	var lastLogsize uint64
1543	var mtime int
1544	for _, item := range items {
1545		update.requests = append(update.requests, &plugin.Request{
1546			Itemid:      item.itemid,
1547			Key:         item.key,
1548			Delay:       item.delay,
1549			LastLogsize: &lastLogsize,
1550			Mtime:       &mtime,
1551		})
1552	}
1553	manager.update(&update)
1554	update.clientID = agent.MaxBuiltinClientID + 2
1555	manager.update(&update)
1556	manager.mockTasks()
1557	manager.iterate(t, 5)
1558	manager.checkPluginTimeline(t, plugins, calls, 5)
1559
1560	update.requests = update.requests[:0]
1561	manager.update(&update)
1562	manager.mockTasks()
1563	manager.iterate(t, 5)
1564	manager.checkPluginTimeline(t, plugins, calls, 5)
1565
1566	update.clientID = agent.MaxBuiltinClientID + 1
1567	manager.update(&update)
1568	manager.mockTasks()
1569	manager.iterate(t, 5)
1570	manager.checkPluginTimeline(t, plugins, calls, 5)
1571}
1572
1573func TestMultiRunnerWatcher(t *testing.T) {
1574	_ = log.Open(log.Console, log.Debug, "", 0)
1575
1576	manager := mockManager{sink: make(chan performer, 10)}
1577	plugin.ClearRegistry()
1578	plugins := make([]plugin.Accessor, 1)
1579	for i := range plugins {
1580		plugins[i] = &mockRunnerWatcherPlugin{Base: plugin.Base{}, mockPlugin: mockPlugin{now: &manager.now}}
1581		plugin.RegisterMetrics(plugins[i], "debug", "debug", "Debug.")
1582	}
1583	manager.mockInit(t)
1584
1585	items := []*clientItem{
1586		&clientItem{itemid: 1, delay: "5", key: "debug[1]"},
1587		&clientItem{itemid: 2, delay: "5", key: "debug[2]"},
1588		&clientItem{itemid: 3, delay: "5", key: "debug[3]"},
1589	}
1590
1591	calls := []map[string][]int{
1592		map[string][]int{"$watch": []int{2, 3, 6, 7, 11, 17, 21}, "$start": []int{1, 16}, "$stop": []int{12}},
1593	}
1594
1595	var cache resultCacheMock
1596	update := updateRequest{
1597		clientID: agent.MaxBuiltinClientID + 1,
1598		sink:     &cache,
1599		requests: make([]*plugin.Request, 0),
1600	}
1601
1602	var lastLogsize uint64
1603	var mtime int
1604	for _, item := range items {
1605		update.requests = append(update.requests, &plugin.Request{
1606			Itemid:      item.itemid,
1607			Key:         item.key,
1608			Delay:       item.delay,
1609			LastLogsize: &lastLogsize,
1610			Mtime:       &mtime,
1611		})
1612	}
1613	manager.update(&update)
1614	update.clientID = agent.MaxBuiltinClientID + 2
1615	manager.update(&update)
1616	manager.mockTasks()
1617	manager.iterate(t, 5)
1618	manager.checkPluginTimeline(t, plugins, calls, 5)
1619
1620	update.clientID = agent.MaxBuiltinClientID + 1
1621	manager.update(&update)
1622	update.clientID = agent.MaxBuiltinClientID + 2
1623	update.requests = update.requests[:0]
1624	manager.update(&update)
1625	manager.mockTasks()
1626	manager.iterate(t, 5)
1627	manager.checkPluginTimeline(t, plugins, calls, 5)
1628
1629	update.clientID = agent.MaxBuiltinClientID + 1
1630	manager.update(&update)
1631	manager.mockTasks()
1632	manager.iterate(t, 5)
1633	manager.checkPluginTimeline(t, plugins, calls, 5)
1634
1635	update.requests = update.requests[:1]
1636	update.clientID = agent.MaxBuiltinClientID + 2
1637	manager.update(&update)
1638	manager.mockTasks()
1639	manager.iterate(t, 5)
1640	manager.checkPluginTimeline(t, plugins, calls, 5)
1641
1642	update.clientID = agent.MaxBuiltinClientID + 1
1643	manager.update(&update)
1644	manager.mockTasks()
1645	manager.iterate(t, 5)
1646	manager.checkPluginTimeline(t, plugins, calls, 5)
1647}
1648
1649func TestPassiveRunner(t *testing.T) {
1650	_ = log.Open(log.Console, log.Debug, "", 0)
1651
1652	manager := mockManager{sink: make(chan performer, 10)}
1653	plugin.ClearRegistry()
1654	plugins := make([]plugin.Accessor, 3)
1655	for i := range plugins {
1656		plugins[i] = &mockPassiveRunnerPlugin{Base: plugin.Base{}, mockPlugin: mockPlugin{now: &manager.now}}
1657		name := fmt.Sprintf("debug%d", i+1)
1658		plugin.RegisterMetrics(plugins[i], name, name, "Debug.")
1659	}
1660	manager.mockInit(t)
1661
1662	items := []*clientItem{
1663		&clientItem{itemid: 1, delay: "5", key: "debug1"},
1664		&clientItem{itemid: 2, delay: "5", key: "debug2"},
1665		&clientItem{itemid: 3, delay: "5", key: "debug3"},
1666	}
1667
1668	calls := []map[string][]int{
1669		map[string][]int{"$start": []int{1}, "$stop": []int{}},
1670		map[string][]int{"$start": []int{1}, "$stop": []int{3600*51 + 1}},
1671		map[string][]int{"$start": []int{1}, "$stop": []int{3600*26 + 1}},
1672	}
1673
1674	var cache resultCacheMock
1675	update := updateRequest{
1676		clientID: agent.PassiveChecksClientID,
1677		sink:     &cache,
1678		requests: make([]*plugin.Request, 0),
1679	}
1680
1681	var lastLogsize uint64
1682	var mtime int
1683	for _, item := range items {
1684		update.requests = append(update.requests, &plugin.Request{
1685			Itemid:      item.itemid,
1686			Key:         item.key,
1687			Delay:       item.delay,
1688			LastLogsize: &lastLogsize,
1689			Mtime:       &mtime,
1690		})
1691	}
1692	manager.update(&update)
1693	manager.mockTasks()
1694	manager.iterate(t, 3600)
1695	manager.checkPluginTimeline(t, plugins, calls, 3600)
1696
1697	update.requests = update.requests[:0]
1698	manager.update(&update)
1699	manager.mockTasks()
1700	manager.iterate(t, 3600)
1701	manager.checkPluginTimeline(t, plugins, calls, 3600)
1702
1703	update.requests = update.requests[:2]
1704	manager.update(&update)
1705	manager.mockTasks()
1706	manager.iterate(t, 3600*24)
1707	manager.checkPluginTimeline(t, plugins, calls, 3600*24)
1708
1709	update.requests = update.requests[:1]
1710	manager.update(&update)
1711	manager.mockTasks()
1712	manager.iterate(t, 3600*25)
1713	manager.checkPluginTimeline(t, plugins, calls, 3600*25)
1714
1715	update.requests = update.requests[:1]
1716	manager.update(&update)
1717	manager.mockTasks()
1718	manager.iterate(t, 1)
1719	manager.checkPluginTimeline(t, plugins, calls, 1)
1720}
1721
1722type configuratorOption struct {
1723	Params interface{} `conf:"optional"`
1724}
1725
1726func TestConfigurator(t *testing.T) {
1727	_ = log.Open(log.Console, log.Debug, "", 0)
1728
1729	var opt1, opt2, opt3 configuratorOption
1730	_ = conf.Unmarshal([]byte("Delay=5"), &opt1)
1731	_ = conf.Unmarshal([]byte("Delay=30"), &opt2)
1732	_ = conf.Unmarshal([]byte("Delay=60"), &opt3)
1733
1734	agent.Options.Plugins = map[string]interface{}{
1735		"Debug1": opt1.Params,
1736		"Debug2": opt2.Params,
1737		"Debug3": opt3.Params,
1738	}
1739
1740	manager := mockManager{sink: make(chan performer, 10)}
1741	plugin.ClearRegistry()
1742	plugins := make([]plugin.Accessor, 3)
1743	for i := range plugins {
1744		name := fmt.Sprintf("debug%d", i+1)
1745		plugins[i] = &mockConfiguratorPlugin{
1746			Base:       plugin.Base{},
1747			mockPlugin: mockPlugin{now: &manager.now},
1748			options:    agent.Options.Plugins[name]}
1749		plugin.RegisterMetrics(plugins[i], name, name, "Debug.")
1750	}
1751	manager.mockInit(t)
1752
1753	items := []*clientItem{
1754		&clientItem{itemid: 1, delay: "5", key: "debug1"},
1755		&clientItem{itemid: 2, delay: "5", key: "debug2"},
1756		&clientItem{itemid: 3, delay: "5", key: "debug3"},
1757	}
1758
1759	calls := []map[string][]int{
1760		map[string][]int{"$configure": []int{1}},
1761		map[string][]int{"$configure": []int{6}},
1762		map[string][]int{"$configure": []int{11}},
1763	}
1764
1765	var cache resultCacheMock
1766	update := updateRequest{
1767		clientID: agent.MaxBuiltinClientID + 1,
1768		sink:     &cache,
1769		requests: make([]*plugin.Request, 0),
1770	}
1771
1772	var lastLogsize uint64
1773	var mtime int
1774	for _, item := range items {
1775		update.requests = append(update.requests, &plugin.Request{
1776			Itemid:      item.itemid,
1777			Key:         item.key,
1778			Delay:       item.delay,
1779			LastLogsize: &lastLogsize,
1780			Mtime:       &mtime,
1781		})
1782	}
1783	update.requests = update.requests[:1]
1784	manager.update(&update)
1785	manager.mockTasks()
1786	manager.iterate(t, 5)
1787	manager.checkPluginTimeline(t, plugins, calls, 5)
1788
1789	update.requests = update.requests[:2]
1790	manager.update(&update)
1791	manager.mockTasks()
1792	manager.iterate(t, 5)
1793	manager.checkPluginTimeline(t, plugins, calls, 5)
1794
1795	update.requests = update.requests[:3]
1796	manager.update(&update)
1797	manager.mockTasks()
1798	manager.iterate(t, 5)
1799	manager.checkPluginTimeline(t, plugins, calls, 5)
1800}
1801