1package expr
2
3import (
4	"context"
5	"encoding/json"
6	"fmt"
7	"strings"
8	"time"
9
10	"bosun.org/opentsdb"
11	elastic "github.com/olivere/elastic"
12)
13
14// InitClient sets up the elastic client. If the client has already been
15// initialized it is a noop
16func (e ElasticHosts) InitClient6(prefix string) error {
17	if _, ok := e.Hosts[prefix]; !ok {
18		prefixes := make([]string, len(e.Hosts))
19		i := 0
20		for k := range e.Hosts {
21			prefixes[i] = k
22			i++
23		}
24		return fmt.Errorf("prefix %v not defined, available prefixes are: %v", prefix, prefixes)
25	}
26	if c := esClients.m[prefix]; c != nil {
27		// client already initialized
28		return nil
29	}
30	// esClients.Lock()
31	var err error
32	if e.Hosts[prefix].SimpleClient {
33		// simple client enabled
34		esClients.m[prefix], err = elastic.NewSimpleClient(elastic.SetURL(e.Hosts[prefix].Hosts...), elastic.SetMaxRetries(10))
35	} else if len(e.Hosts[prefix].Hosts) == 0 {
36		// client option enabled
37		esClients.m[prefix], err = elastic.NewClient(e.Hosts[prefix].ClientOptionFuncs.([]elastic.ClientOptionFunc)...)
38	} else {
39		// default behavior
40		esClients.m[prefix], err = elastic.NewClient(elastic.SetURL(e.Hosts[prefix].Hosts...), elastic.SetMaxRetries(10))
41	}
42	// esClients.Unlock()
43	if err != nil {
44		return err
45	}
46	return nil
47}
48
49// getService returns an elasticsearch service based on the global client
50func (e *ElasticHosts) getService6(prefix string) (*elastic.SearchService, error) {
51	esClients.Lock()
52	defer esClients.Unlock()
53
54	err := e.InitClient(prefix)
55	if err != nil {
56		return nil, err
57	}
58	return esClients.m[prefix].(*elastic.Client).Search(), nil
59}
60
61// Query takes a Logstash request, applies it a search service, and then queries
62// elasticsearch.
63func (e ElasticHosts) Query6(r *ElasticRequest6) (*elastic.SearchResult, error) {
64	s, err := e.getService6(r.HostKey)
65	if err != nil {
66		return nil, err
67	}
68
69	s.Index(r.Indices...)
70
71	// With IgnoreUnavailable there can be gaps in the indices (i.e. missing days) and we will not error
72	// If no indices match than there will be no successful shards and and error is returned in that case
73	s.IgnoreUnavailable(true)
74	res, err := s.SearchSource(r.Source).Do(context.Background())
75	if err != nil {
76		return nil, err
77	}
78	if res.Shards == nil {
79		return nil, fmt.Errorf("no shard info in reply, should not be here please file issue")
80	}
81	if res.Shards.Successful == 0 {
82		return nil, fmt.Errorf("no successful shards in result, perhaps the index does exist, total shards: %v, failed shards: %v", res.Shards.Total, res.Shards.Failed)
83	}
84	return res, nil
85}
86
87// ElasticRequest6 is a container for the information needed to query elasticsearch or a date
88// histogram.
89type ElasticRequest6 struct {
90	Indices []string
91	HostKey string
92	Start   *time.Time
93	End     *time.Time
94	Source  *elastic.SearchSource // This the object that we build queries in
95}
96
97// CacheKey returns the text of the elastic query. That text is the indentifer for
98// the query in the cache. It is a combination of the host key, indices queries and the json query content
99func (r *ElasticRequest6) CacheKey() (string, error) {
100	s, err := r.Source.Source()
101	if err != nil {
102		return "", err
103	}
104	b, err := json.Marshal(s)
105	if err != nil {
106		return "", fmt.Errorf("failed to generate json representation of search source for cache key: %s", s)
107	}
108
109	return fmt.Sprintf("%s:%v\n%s", r.HostKey, r.Indices, b), nil
110}
111
112// timeESRequest execute the elasticsearch query (which may set or hit cache) and returns
113// the search results.
114func timeESRequest6(e *State, req *ElasticRequest6) (resp *elastic.SearchResult, err error) {
115	var source interface{}
116	source, err = req.Source.Source()
117	if err != nil {
118		return resp, fmt.Errorf("failed to get source of request while timing elastic request: %s", err)
119	}
120	b, err := json.MarshalIndent(source, "", "  ")
121	if err != nil {
122		return resp, err
123	}
124	key, err := req.CacheKey()
125	if err != nil {
126		return nil, err
127	}
128	e.Timer.StepCustomTiming("elastic", "query", fmt.Sprintf("%s:%v\n%s", req.HostKey, req.Indices, b), func() {
129		getFn := func() (interface{}, error) {
130			return e.ElasticHosts.Query6(req)
131		}
132		var val interface{}
133		var hit bool
134		val, err, hit = e.Cache.Get(key, getFn)
135		collectCacheHit(e.Cache, "elastic", hit)
136		resp = val.(*elastic.SearchResult)
137	})
138	return
139}
140
141func ESDateHistogram6(prefix string, e *State, indexer ESIndexer, keystring string, filter elastic.Query, interval, sduration, eduration, stat_field, rstat string, size int) (r *Results, err error) {
142	r = new(Results)
143	req, err := ESBaseQuery6(e.now, indexer, filter, sduration, eduration, size, prefix)
144	if err != nil {
145		return nil, err
146	}
147	// Extended bounds and min doc count are required to get values back when the bucket value is 0
148	ts := elastic.NewDateHistogramAggregation().Field(indexer.TimeField).Interval(strings.Replace(interval, "M", "n", -1)).MinDocCount(0).ExtendedBoundsMin(req.Start).ExtendedBoundsMax(req.End).Format(elasticRFC3339)
149	if stat_field != "" {
150		ts = ts.SubAggregation("stats", elastic.NewExtendedStatsAggregation().Field(stat_field))
151		switch rstat {
152		case "avg", "min", "max", "sum", "sum_of_squares", "variance", "std_deviation":
153		default:
154			return r, fmt.Errorf("stat function %v not a valid option", rstat)
155		}
156	}
157	if keystring == "" {
158		req.Source = req.Source.Aggregation("ts", ts)
159		result, err := timeESRequest6(e, req)
160		if err != nil {
161			return nil, err
162		}
163		ts, found := result.Aggregations.DateHistogram("ts")
164		if !found {
165			return nil, fmt.Errorf("expected time series not found in elastic reply")
166		}
167		series := make(Series)
168		for _, v := range ts.Buckets {
169			val := processESBucketItem6(v, rstat)
170			if val != nil {
171				series[time.Unix(int64(v.Key)/1000, 0).UTC()] = *val
172			}
173		}
174		if len(series) == 0 {
175			return r, nil
176		}
177		r.Results = append(r.Results, &Result{
178			Value: series,
179			Group: make(opentsdb.TagSet),
180		})
181		return r, nil
182	}
183	keys := strings.Split(keystring, ",")
184	aggregation := elastic.NewTermsAggregation().Field(keys[len(keys)-1])
185	aggregation = aggregation.SubAggregation("ts", ts)
186	for i := len(keys) - 2; i > -1; i-- {
187		aggregation = elastic.NewTermsAggregation().Field(keys[i]).SubAggregation("g_"+keys[i+1], aggregation)
188	}
189	req.Source = req.Source.Aggregation("g_"+keys[0], aggregation)
190	result, err := timeESRequest6(e, req)
191	if err != nil {
192		return nil, err
193	}
194	top, ok := result.Aggregations.Terms("g_" + keys[0])
195	if !ok {
196		return nil, fmt.Errorf("top key g_%v not found in result", keys[0])
197	}
198	var desc func(*elastic.AggregationBucketKeyItem, opentsdb.TagSet, []string) error
199	desc = func(b *elastic.AggregationBucketKeyItem, tags opentsdb.TagSet, keys []string) error {
200		if ts, found := b.DateHistogram("ts"); found {
201			if e.Squelched(tags) {
202				return nil
203			}
204			series := make(Series)
205			for _, v := range ts.Buckets {
206				val := processESBucketItem6(v, rstat)
207				if val != nil {
208					series[time.Unix(int64(v.Key)/1000, 0).UTC()] = *val
209				}
210			}
211			if len(series) == 0 {
212				return nil
213			}
214			r.Results = append(r.Results, &Result{
215				Value: series,
216				Group: tags.Copy(),
217			})
218			return nil
219		}
220		if len(keys) < 1 {
221			return nil
222		}
223		n, _ := b.Aggregations.Terms("g_" + keys[0])
224		for _, item := range n.Buckets {
225			key := fmt.Sprint(item.Key)
226			tags[keys[0]] = key
227			if err := desc(item, tags.Copy(), keys[1:]); err != nil {
228				return err
229			}
230		}
231		return nil
232	}
233	for _, b := range top.Buckets {
234		tags := make(opentsdb.TagSet)
235		key := fmt.Sprint(b.Key)
236		tags[keys[0]] = key
237		if err := desc(b, tags, keys[1:]); err != nil {
238			return nil, err
239		}
240	}
241	return r, nil
242}
243
244// ESBaseQuery builds the base query that both ESCount and ESStat share
245func ESBaseQuery6(now time.Time, indexer ESIndexer, filter elastic.Query, sduration, eduration string, size int, prefix string) (*ElasticRequest6, error) {
246	start, err := opentsdb.ParseDuration(sduration)
247	if err != nil {
248		return nil, err
249	}
250	var end opentsdb.Duration
251	if eduration != "" {
252		end, err = opentsdb.ParseDuration(eduration)
253		if err != nil {
254			return nil, err
255		}
256	}
257	st := now.Add(time.Duration(-start))
258	en := now.Add(time.Duration(-end))
259	indices := indexer.Generate(&st, &en)
260	r := ElasticRequest6{
261		Indices: indices,
262		HostKey: prefix,
263		Start:   &st,
264		End:     &en,
265		Source:  elastic.NewSearchSource().Size(size),
266	}
267	var q elastic.Query
268	q = elastic.NewRangeQuery(indexer.TimeField).Gte(st).Lte(en).Format(elasticRFC3339)
269	r.Source = r.Source.Query(elastic.NewBoolQuery().Must(q, filter))
270	return &r, nil
271}
272
273func ScopeES6(ts opentsdb.TagSet, q elastic.Query) elastic.Query {
274	var filters []elastic.Query
275	for tagKey, tagValue := range ts {
276		filters = append(filters, elastic.NewTermQuery(tagKey, tagValue))
277	}
278	filters = append(filters, q)
279	b := elastic.NewBoolQuery().Must(filters...)
280	return b
281}
282
283func processESBucketItem6(b *elastic.AggregationBucketHistogramItem, rstat string) *float64 {
284	if stats, found := b.ExtendedStats("stats"); found {
285		var val *float64
286		switch rstat {
287		case "avg":
288			val = stats.Avg
289		case "min":
290			val = stats.Min
291		case "max":
292			val = stats.Max
293		case "sum":
294			val = stats.Sum
295		case "sum_of_squares":
296			val = stats.SumOfSquares
297		case "variance":
298			val = stats.Variance
299		case "std_deviation":
300			val = stats.StdDeviation
301		}
302		return val
303	}
304	v := float64(b.DocCount)
305	return &v
306}
307