1package search // import "bosun.org/cmd/bosun/search"
2
3import (
4	"fmt"
5	"math/rand"
6	"reflect"
7	"regexp"
8	"sort"
9	"strings"
10	"sync"
11	"time"
12
13	"bosun.org/cmd/bosun/database"
14	"bosun.org/collect"
15	"bosun.org/metadata"
16	"bosun.org/opentsdb"
17	"bosun.org/slog"
18)
19
20// Search is a struct to hold indexed data about OpenTSDB metric and tag data.
21// It is suited to answering questions about: available metrics for a tag set,
22// available tag keys for a metric, and available tag values for a metric and
23// tag key.
24type Search struct {
25	DataAccess database.DataAccess
26
27	// metric -> tags -> struct
28	last map[string]map[string]*database.LastInfo
29
30	indexQueue chan *opentsdb.DataPoint
31	sync.RWMutex
32}
33
34func init() {
35	metadata.AddMetricMeta("bosun.search.index_queue", metadata.Gauge, metadata.Count, "Number of datapoints queued for indexing to redis")
36	metadata.AddMetricMeta("bosun.search.dropped", metadata.Counter, metadata.Count, "Number of datapoints discarded without being saved to redis")
37}
38
39func NewSearch(data database.DataAccess, skipLast bool) *Search {
40	s := Search{
41		DataAccess: data,
42		last:       make(map[string]map[string]*database.LastInfo),
43		indexQueue: make(chan *opentsdb.DataPoint, 300000),
44	}
45	collect.Set("search.index_queue", opentsdb.TagSet{}, func() interface{} { return len(s.indexQueue) })
46	collect.Set("search.last.total", opentsdb.TagSet{}, func() interface{} {
47		s.RLock()
48		defer s.RUnlock()
49		total := 0
50		for _, v := range s.last {
51			total += len(v)
52		}
53		return total
54	})
55	collect.Set("search.last.metrics", opentsdb.TagSet{}, func() interface{} {
56		s.RLock()
57		defer s.RUnlock()
58		return len(s.last)
59	})
60	if !skipLast {
61		s.loadLast()
62		go s.redisIndex(s.indexQueue)
63		go s.backupLoop()
64	}
65	return &s
66}
67
68func (s *Search) Index(mdp opentsdb.MultiDataPoint) {
69	for _, dp := range mdp {
70		s.Lock()
71		mmap := s.last[dp.Metric]
72		if mmap == nil {
73			mmap = make(map[string]*database.LastInfo)
74			s.last[dp.Metric] = mmap
75		}
76		p := mmap[dp.Tags.String()]
77		if p == nil {
78			p = &database.LastInfo{}
79			mmap[dp.Tags.String()] = p
80		}
81		if p.Timestamp < dp.Timestamp {
82			if fv, err := getFloat(dp.Value); err == nil {
83				p.DiffFromPrev = (fv - p.LastVal) / float64(dp.Timestamp-p.Timestamp)
84				p.LastVal = fv
85			} else {
86				slog.Error(err)
87			}
88			p.Timestamp = dp.Timestamp
89		}
90		s.Unlock()
91		select {
92		case s.indexQueue <- dp:
93		default:
94			collect.Add("search.dropped", opentsdb.TagSet{}, 1)
95		}
96	}
97}
98
99func (s *Search) redisIndex(c <-chan *opentsdb.DataPoint) {
100	now := time.Now().Unix()
101	nextUpdateTimes := make(map[string]int64)
102	updateIfTime := func(key string, f func()) {
103		nextUpdate, ok := nextUpdateTimes[key]
104		if !ok || now > nextUpdate {
105			f()
106			nextUpdateTimes[key] = now + int64(30*60+rand.Intn(15*60)) //pick a random time between 30 and 45 minutes from now
107		}
108	}
109	for dp := range c {
110		now = time.Now().Unix()
111		metric := dp.Metric
112		for k, v := range dp.Tags {
113			updateIfTime(fmt.Sprintf("kvm:%s:%s:%s", k, v, metric), func() {
114				if err := s.DataAccess.Search().AddMetricForTag(k, v, metric, now); err != nil {
115					slog.Error(err)
116				}
117				if err := s.DataAccess.Search().AddTagValue(metric, k, v, now); err != nil {
118					slog.Error(err)
119				}
120			})
121			updateIfTime(fmt.Sprintf("mk:%s:%s", metric, k), func() {
122				if err := s.DataAccess.Search().AddTagKeyForMetric(metric, k, now); err != nil {
123					slog.Error(err)
124				}
125			})
126			updateIfTime(fmt.Sprintf("kv:%s:%s", k, v), func() {
127				if err := s.DataAccess.Search().AddTagValue(database.Search_All, k, v, now); err != nil {
128					slog.Error(err)
129				}
130			})
131			updateIfTime(fmt.Sprintf("m:%s", metric), func() {
132				if err := s.DataAccess.Search().AddMetric(metric, now); err != nil {
133					slog.Error(err)
134				}
135			})
136		}
137		updateIfTime(fmt.Sprintf("mts:%s:%s", metric, dp.Tags.Tags()), func() {
138			if err := s.DataAccess.Search().AddMetricTagSet(metric, dp.Tags.Tags(), now); err != nil {
139				slog.Error(err)
140			}
141		})
142	}
143}
144
145var floatType = reflect.TypeOf(float64(0))
146
147func getFloat(unk interface{}) (float64, error) {
148	v := reflect.ValueOf(unk)
149	v = reflect.Indirect(v)
150	if !v.Type().ConvertibleTo(floatType) {
151		return 0, fmt.Errorf("cannot convert %v to float64", v.Type())
152	}
153	fv := v.Convert(floatType)
154	return fv.Float(), nil
155}
156
157// Match returns all matching values against search. search is a regex, except
158// that `.` is literal, `*` can be used for `.*`, and the entire string is
159// searched (`^` and `&` added to ends of search).
160func Match(search string, values []string) ([]string, error) {
161	v := strings.Replace(search, ".", `\.`, -1)
162	v = strings.Replace(v, "*", ".*", -1)
163	v = "^" + v + "$"
164	re, err := regexp.Compile(v)
165	if err != nil {
166		return nil, err
167	}
168	var nvs []string
169	for _, nv := range values {
170		if re.MatchString(nv) {
171			nvs = append(nvs, nv)
172		}
173	}
174	return nvs, nil
175}
176
177var errNotFloat = fmt.Errorf("last: expected float64")
178
179// GetLast returns the value of the most recent data point for the given metric
180// and tag. tags should be of the form "{key=val,key2=val2}". If diff is true,
181// the value is treated as a counter. err is non nil if there is no match.
182func (s *Search) GetLast(metric, tags string, diff bool) (v float64, t int64, err error) {
183	s.RLock()
184	defer s.RUnlock()
185	m, mOk := s.last[metric]
186	if mOk {
187		p := m[tags]
188		if p != nil {
189			if diff {
190				return p.DiffFromPrev, p.Timestamp, nil
191			}
192			return p.LastVal, p.Timestamp, nil
193		}
194	}
195	return 0, 0, fmt.Errorf("no match for %s:%s", metric, tags)
196}
197
198// GetLastInt64 is like GetLast but converts the value to an int64
199func (s *Search) GetLastInt64(metric, tags string, diff bool) (int64, int64, error) {
200	v, t, err := s.GetLast(metric, tags, diff)
201	return int64(v), t, err
202}
203
204// load stored last data from redis
205func (s *Search) loadLast() {
206	s.Lock()
207	defer s.Unlock()
208	slog.Info("Loading last datapoints from redis")
209	m, err := s.DataAccess.Search().LoadLastInfos()
210	if err != nil {
211		slog.Error(err)
212	} else {
213		s.last = m
214	}
215	slog.Info("Done")
216}
217
218func (s *Search) backupLoop() {
219	for {
220		time.Sleep(2 * time.Minute)
221		slog.Info("Backing up last data to redis")
222		err := s.BackupLast()
223		if err != nil {
224			slog.Error(err)
225		}
226	}
227}
228
229func (s *Search) BackupLast() error {
230	s.RLock()
231	copyL := make(map[string]map[string]*database.LastInfo, len(s.last))
232	for m, mmap := range s.last {
233		innerCopy := make(map[string]*database.LastInfo, len(mmap))
234		copyL[m] = innerCopy
235		for ts, info := range mmap {
236			innerCopy[ts] = &database.LastInfo{
237				LastVal:      info.LastVal,
238				DiffFromPrev: info.DiffFromPrev,
239				Timestamp:    info.Timestamp,
240			}
241		}
242	}
243	s.RUnlock()
244	return s.DataAccess.Search().BackupLastInfos(copyL)
245}
246
247func (s *Search) Expand(q *opentsdb.Query) error {
248	for k, ov := range q.Tags {
249		var nvs []string
250		for _, v := range strings.Split(ov, "|") {
251			v = strings.TrimSpace(v)
252			if v == "*" || !strings.Contains(v, "*") {
253				nvs = append(nvs, v)
254			} else {
255				vs, err := s.TagValuesByMetricTagKey(q.Metric, k, 0)
256				if err != nil {
257					return err
258				}
259				ns, err := Match(v, vs)
260				if err != nil {
261					return err
262				}
263				nvs = append(nvs, ns...)
264			}
265		}
266		if len(nvs) == 0 {
267			return fmt.Errorf("expr: no tags matching %s=%s", k, ov)
268		}
269		q.Tags[k] = strings.Join(nvs, "|")
270	}
271	return nil
272}
273
274// UniqueMetrics returns a sorted slice of metrics where the
275// metric has been updated more recently than epoch
276func (s *Search) UniqueMetrics(epochFilter int64) ([]string, error) {
277	m, err := s.DataAccess.Search().GetAllMetrics()
278	if err != nil {
279		return nil, err
280	}
281	metrics := []string{}
282	for k, epoch := range m {
283		if epoch < epochFilter {
284			continue
285		}
286		metrics = append(metrics, k)
287	}
288	sort.Strings(metrics)
289	return metrics, nil
290}
291
292func (s *Search) TagValuesByTagKey(Tagk string, since time.Duration) ([]string, error) {
293	return s.TagValuesByMetricTagKey(database.Search_All, Tagk, since)
294}
295
296func (s *Search) MetricsByTagPair(tagk, tagv string, since time.Duration) ([]string, error) {
297	var t int64
298	if since > 0 {
299		t = time.Now().Add(-since).Unix()
300	}
301	metrics, err := s.DataAccess.Search().GetMetricsForTag(tagk, tagv)
302	if err != nil {
303		return nil, err
304	}
305	r := []string{}
306	for k, ts := range metrics {
307		if t <= ts {
308			r = append(r, k)
309		}
310	}
311	sort.Strings(r)
312	return r, nil
313}
314
315func (s *Search) TagKeysByMetric(metric string) ([]string, error) {
316	keys, err := s.DataAccess.Search().GetTagKeysForMetric(metric)
317	if err != nil {
318		return nil, err
319	}
320	r := []string{}
321	for k := range keys {
322		r = append(r, k)
323	}
324	sort.Strings(r)
325	return r, nil
326}
327
328func (s *Search) TagValuesByMetricTagKey(metric, tagK string, since time.Duration) ([]string, error) {
329	var t int64
330	if since > 0 {
331		t = time.Now().Add(-since).Unix()
332	}
333	vals, err := s.DataAccess.Search().GetTagValues(metric, tagK)
334	if err != nil {
335		return nil, err
336	}
337	r := []string{}
338	for k, ts := range vals {
339		if t <= ts {
340			r = append(r, k)
341		}
342	}
343	sort.Strings(r)
344	return r, nil
345}
346
347func (s *Search) FilteredTagSets(metric string, tags opentsdb.TagSet, since int64) ([]opentsdb.TagSet, error) {
348	sets, err := s.DataAccess.Search().GetMetricTagSets(metric, tags)
349	if err != nil {
350		return nil, err
351	}
352	r := []opentsdb.TagSet{}
353	for k, lastSeen := range sets {
354		ts, err := opentsdb.ParseTags(k)
355		if err != nil {
356			return nil, err
357		}
358		if lastSeen >= since {
359			r = append(r, ts)
360		}
361
362	}
363	return r, nil
364}
365