1package profiler
2
3import (
4	"context"
5	"sync"
6	"time"
7
8	"github.com/percona/percona-toolkit/src/go/mongolib/proto"
9	"github.com/percona/percona-toolkit/src/go/mongolib/stats"
10	"github.com/percona/percona-toolkit/src/go/pt-mongodb-query-digest/filter"
11	"go.mongodb.org/mongo-driver/mongo"
12)
13
14// DocsBufferSize is the buffer size to store documents from the MongoDB profiler
15var DocsBufferSize = 100
16
17// Profiler interface
18type Profiler interface {
19	GetLastError() error
20	QueriesChan() chan stats.Queries
21	TimeoutsChan() <-chan time.Time
22	FlushQueries()
23	Start(context.Context)
24	Stop()
25}
26
27// Profile has unexported variables for the profiler
28type Profile struct {
29	// dependencies
30	cursor  *mongo.Cursor
31	filters []filter.Filter
32	ticker  <-chan time.Time
33	stats   Stats
34
35	// internal
36	queriesChan  chan stats.Queries
37	stopChan     chan bool
38	docsChan     chan proto.SystemProfile
39	timeoutsChan chan time.Time
40	// For the moment ProcessDoc is exportable to it could be called from the "outside"
41	// For that reason, we need a mutex to make it thread safe. In the future this func
42	// will be unexported
43	countersMapLock sync.Mutex
44	keyFilters      []string
45	lock            sync.Mutex
46	running         bool
47	lastError       error
48	stopWaitGroup   sync.WaitGroup
49}
50
51// NewProfiler returns a new instance of the profiler interface
52func NewProfiler(cursor *mongo.Cursor, filters []filter.Filter, ticker <-chan time.Time, stats Stats) Profiler {
53	return &Profile{
54		cursor:  cursor,
55		filters: filters,
56		ticker:  ticker,
57		stats:   stats,
58
59		// internal
60		docsChan:     make(chan proto.SystemProfile, DocsBufferSize),
61		timeoutsChan: make(chan time.Time),
62		keyFilters:   []string{"^shardVersion$", "^\\$"},
63	}
64}
65
66// GetLastError return the latest error
67func (p *Profile) GetLastError() error {
68	return p.lastError
69}
70
71// QueriesChan returns the channels used to read the queries from the profiler
72func (p *Profile) QueriesChan() chan stats.Queries {
73	return p.queriesChan
74}
75
76// Start the profiler
77func (p *Profile) Start(ctx context.Context) {
78	p.lock.Lock()
79	defer p.lock.Unlock()
80	if !p.running {
81		p.running = true
82		p.queriesChan = make(chan stats.Queries)
83		p.stopChan = make(chan bool)
84		go p.getData(ctx)
85	}
86}
87
88// Stop the profiler
89func (p *Profile) Stop() {
90	p.lock.Lock()
91	defer p.lock.Unlock()
92	if p.running {
93		select {
94		case p.stopChan <- true:
95		default:
96		}
97		// Wait for getData to receive the stop signal
98		p.stopWaitGroup.Wait()
99	}
100}
101
102// TimeoutsChan returns the channels to receive timeout signals
103func (p *Profile) TimeoutsChan() <-chan time.Time {
104	p.lock.Lock()
105	defer p.lock.Unlock()
106	return p.timeoutsChan
107}
108
109func (p *Profile) getData(ctx context.Context) {
110	go p.getDocs(ctx)
111	p.stopWaitGroup.Add(1)
112	defer p.stopWaitGroup.Done()
113
114	for {
115		select {
116		case <-p.ticker:
117			p.FlushQueries()
118		case <-p.stopChan:
119			// Close the iterator to break the loop on getDocs
120			p.lastError = p.cursor.Close(ctx)
121			return
122		}
123	}
124}
125
126func (p *Profile) getDocs(ctx context.Context) {
127	defer p.Stop()
128	defer p.FlushQueries()
129
130	var doc proto.SystemProfile
131
132	for p.cursor.Next(ctx) {
133		if err := p.cursor.Decode(&doc); err != nil {
134			p.lastError = err
135			return
136		}
137		valid := true
138		for _, filter := range p.filters {
139			if !filter(doc) {
140				valid = false
141				return
142			}
143		}
144		if !valid {
145			continue
146		}
147		p.lastError = p.stats.Add(doc)
148	}
149}
150
151// FlushQueries clean all the queries from the queries chan
152func (p *Profile) FlushQueries() {
153	p.queriesChan <- p.stats.Queries()
154	p.stats.Reset()
155}
156