1package profiler 2 3import ( 4 "context" 5 "sync" 6 "time" 7 8 "github.com/percona/percona-toolkit/src/go/mongolib/proto" 9 "github.com/percona/percona-toolkit/src/go/mongolib/stats" 10 "github.com/percona/percona-toolkit/src/go/pt-mongodb-query-digest/filter" 11 "go.mongodb.org/mongo-driver/mongo" 12) 13 14// DocsBufferSize is the buffer size to store documents from the MongoDB profiler 15var DocsBufferSize = 100 16 17// Profiler interface 18type Profiler interface { 19 GetLastError() error 20 QueriesChan() chan stats.Queries 21 TimeoutsChan() <-chan time.Time 22 FlushQueries() 23 Start(context.Context) 24 Stop() 25} 26 27// Profile has unexported variables for the profiler 28type Profile struct { 29 // dependencies 30 cursor *mongo.Cursor 31 filters []filter.Filter 32 ticker <-chan time.Time 33 stats Stats 34 35 // internal 36 queriesChan chan stats.Queries 37 stopChan chan bool 38 docsChan chan proto.SystemProfile 39 timeoutsChan chan time.Time 40 // For the moment ProcessDoc is exportable to it could be called from the "outside" 41 // For that reason, we need a mutex to make it thread safe. In the future this func 42 // will be unexported 43 countersMapLock sync.Mutex 44 keyFilters []string 45 lock sync.Mutex 46 running bool 47 lastError error 48 stopWaitGroup sync.WaitGroup 49} 50 51// NewProfiler returns a new instance of the profiler interface 52func NewProfiler(cursor *mongo.Cursor, filters []filter.Filter, ticker <-chan time.Time, stats Stats) Profiler { 53 return &Profile{ 54 cursor: cursor, 55 filters: filters, 56 ticker: ticker, 57 stats: stats, 58 59 // internal 60 docsChan: make(chan proto.SystemProfile, DocsBufferSize), 61 timeoutsChan: make(chan time.Time), 62 keyFilters: []string{"^shardVersion$", "^\\$"}, 63 } 64} 65 66// GetLastError return the latest error 67func (p *Profile) GetLastError() error { 68 return p.lastError 69} 70 71// QueriesChan returns the channels used to read the queries from the profiler 72func (p *Profile) QueriesChan() chan stats.Queries { 73 return p.queriesChan 74} 75 76// Start the profiler 77func (p *Profile) Start(ctx context.Context) { 78 p.lock.Lock() 79 defer p.lock.Unlock() 80 if !p.running { 81 p.running = true 82 p.queriesChan = make(chan stats.Queries) 83 p.stopChan = make(chan bool) 84 go p.getData(ctx) 85 } 86} 87 88// Stop the profiler 89func (p *Profile) Stop() { 90 p.lock.Lock() 91 defer p.lock.Unlock() 92 if p.running { 93 select { 94 case p.stopChan <- true: 95 default: 96 } 97 // Wait for getData to receive the stop signal 98 p.stopWaitGroup.Wait() 99 } 100} 101 102// TimeoutsChan returns the channels to receive timeout signals 103func (p *Profile) TimeoutsChan() <-chan time.Time { 104 p.lock.Lock() 105 defer p.lock.Unlock() 106 return p.timeoutsChan 107} 108 109func (p *Profile) getData(ctx context.Context) { 110 go p.getDocs(ctx) 111 p.stopWaitGroup.Add(1) 112 defer p.stopWaitGroup.Done() 113 114 for { 115 select { 116 case <-p.ticker: 117 p.FlushQueries() 118 case <-p.stopChan: 119 // Close the iterator to break the loop on getDocs 120 p.lastError = p.cursor.Close(ctx) 121 return 122 } 123 } 124} 125 126func (p *Profile) getDocs(ctx context.Context) { 127 defer p.Stop() 128 defer p.FlushQueries() 129 130 var doc proto.SystemProfile 131 132 for p.cursor.Next(ctx) { 133 if err := p.cursor.Decode(&doc); err != nil { 134 p.lastError = err 135 return 136 } 137 valid := true 138 for _, filter := range p.filters { 139 if !filter(doc) { 140 valid = false 141 return 142 } 143 } 144 if !valid { 145 continue 146 } 147 p.lastError = p.stats.Add(doc) 148 } 149} 150 151// FlushQueries clean all the queries from the queries chan 152func (p *Profile) FlushQueries() { 153 p.queriesChan <- p.stats.Queries() 154 p.stats.Reset() 155} 156