1package expr
2
3import (
4	"errors"
5	"fmt"
6	"sync"
7	"time"
8
9	"github.com/jinzhu/now"
10
11	elastic6 "github.com/olivere/elastic"
12	elastic7 "github.com/olivere/elastic/v7"
13	elastic2 "gopkg.in/olivere/elastic.v3"
14	elastic5 "gopkg.in/olivere/elastic.v5"
15)
16
17const (
18	ESV2 ESVersion = "v2"
19	ESV5 ESVersion = "v5"
20	ESV6 ESVersion = "v6"
21	ESV7 ESVersion = "v7"
22
23	// 2016-09-22T22:26:14.679270711Z
24	elasticRFC3339 = "date_optional_time"
25)
26
27type ESVersion string
28
29type ESQuery struct {
30	Query func(ver ESVersion) interface{}
31}
32
33// Map of prefixes to corresponding clients
34// TODO: switch to sync.Map
35var esClients struct {
36	sync.Mutex
37	m map[string]interface{}
38}
39
40func init() {
41	esClients.m = make(map[string]interface{})
42}
43
44func ESAll(e *State) (*Results, error) {
45	var r Results
46	q := ESQuery{
47		Query: func(ver ESVersion) interface{} {
48			switch ver {
49			case ESV2:
50				return elastic2.NewMatchAllQuery()
51			case ESV5:
52				return elastic5.NewMatchAllQuery()
53			case ESV6:
54				return elastic6.NewMatchAllQuery()
55			case ESV7:
56				return elastic7.NewMatchAllQuery()
57			}
58			return nil
59		},
60	}
61	r.Results = append(r.Results, &Result{Value: q})
62	return &r, nil
63}
64
65func ESAnd(e *State, esqueries ...ESQuery) (*Results, error) {
66	var r Results
67	q := ESQuery{
68		Query: func(ver ESVersion) interface{} {
69			switch ver {
70			case ESV2:
71				queries := make([]elastic2.Query, len(esqueries))
72				for i, q := range esqueries {
73					queries[i] = q.Query(ver).(elastic2.Query)
74				}
75				return elastic2.NewBoolQuery().Must(queries...)
76			case ESV5:
77				queries := make([]elastic5.Query, len(esqueries))
78				for i, q := range esqueries {
79					queries[i] = q.Query(ver).(elastic5.Query)
80				}
81				return elastic5.NewBoolQuery().Must(queries...)
82			case ESV6:
83				queries := make([]elastic6.Query, len(esqueries))
84				for i, q := range esqueries {
85					queries[i] = q.Query(ver).(elastic6.Query)
86				}
87				return elastic6.NewBoolQuery().Must(queries...)
88			case ESV7:
89				queries := make([]elastic7.Query, len(esqueries))
90				for i, q := range esqueries {
91					queries[i] = q.Query(ver).(elastic7.Query)
92				}
93				return elastic7.NewBoolQuery().Must(queries...)
94			}
95			return nil
96		},
97	}
98	r.Results = append(r.Results, &Result{Value: q})
99	return &r, nil
100}
101
102func ESNot(e *State, query ESQuery) (*Results, error) {
103	var r Results
104	q := ESQuery{
105		Query: func(ver ESVersion) interface{} {
106			switch ver {
107			case ESV2:
108				return elastic2.NewBoolQuery().MustNot(query.Query(ver).(elastic2.Query))
109			case ESV5:
110				return elastic5.NewBoolQuery().MustNot(query.Query(ver).(elastic5.Query))
111			case ESV6:
112				return elastic6.NewBoolQuery().MustNot(query.Query(ver).(elastic6.Query))
113			case ESV7:
114				return elastic7.NewBoolQuery().MustNot(query.Query(ver).(elastic7.Query))
115			}
116			return nil
117		},
118	}
119	r.Results = append(r.Results, &Result{Value: q})
120	return &r, nil
121}
122
123func ESOr(e *State, esqueries ...ESQuery) (*Results, error) {
124	var r Results
125	q := ESQuery{
126		Query: func(ver ESVersion) interface{} {
127			switch ver {
128			case ESV2:
129				queries := make([]elastic2.Query, len(esqueries))
130				for i, q := range esqueries {
131					queries[i] = q.Query(ver).(elastic2.Query)
132				}
133				return elastic2.NewBoolQuery().Should(queries...).MinimumNumberShouldMatch(1)
134			case ESV5:
135				queries := make([]elastic5.Query, len(esqueries))
136				for i, q := range esqueries {
137					queries[i] = q.Query(ver).(elastic5.Query)
138				}
139				return elastic5.NewBoolQuery().Should(queries...).MinimumNumberShouldMatch(1)
140			case ESV6:
141				queries := make([]elastic6.Query, len(esqueries))
142				for i, q := range esqueries {
143					queries[i] = q.Query(ver).(elastic6.Query)
144				}
145				return elastic6.NewBoolQuery().Should(queries...).MinimumNumberShouldMatch(1)
146			case ESV7:
147				queries := make([]elastic7.Query, len(esqueries))
148				for i, q := range esqueries {
149					queries[i] = q.Query(ver).(elastic7.Query)
150				}
151				return elastic7.NewBoolQuery().Should(queries...).MinimumNumberShouldMatch(1)
152			}
153			return nil
154		},
155	}
156	r.Results = append(r.Results, &Result{Value: q})
157	return &r, nil
158}
159
160func ESRegexp(e *State, key string, regex string) (*Results, error) {
161	var r Results
162	q := ESQuery{
163		Query: func(ver ESVersion) interface{} {
164			switch ver {
165			case ESV2:
166				return elastic2.NewRegexpQuery(key, regex)
167			case ESV5:
168				return elastic5.NewRegexpQuery(key, regex)
169			case ESV6:
170				return elastic6.NewRegexpQuery(key, regex)
171			case ESV7:
172				return elastic7.NewRegexpQuery(key, regex)
173			}
174			return nil
175		},
176	}
177	r.Results = append(r.Results, &Result{Value: q})
178	return &r, nil
179}
180
181func ESQueryString(e *State, key string, query string) (*Results, error) {
182	var r Results
183	q := ESQuery{
184		// Query: qs
185		Query: func(ver ESVersion) interface{} {
186			switch ver {
187			case ESV2:
188				qs := elastic2.NewQueryStringQuery(query)
189				if key != "" {
190					qs.Field(key)
191				}
192				return qs
193			case ESV5:
194				qs := elastic5.NewQueryStringQuery(query)
195				if key != "" {
196					qs.Field(key)
197				}
198				return qs
199			case ESV6:
200				qs := elastic6.NewQueryStringQuery(query)
201				if key != "" {
202					qs.Field(key)
203				}
204				return qs
205			case ESV7:
206				qs := elastic7.NewQueryStringQuery(query)
207				if key != "" {
208					qs.Field(key)
209				}
210				return qs
211			}
212			return nil
213		},
214	}
215	r.Results = append(r.Results, &Result{Value: q})
216	return &r, nil
217}
218
219func ESExists(e *State, field string) (*Results, error) {
220	var r Results
221	q := ESQuery{
222		Query: func(ver ESVersion) interface{} {
223			switch ver {
224			case ESV2:
225				return elastic2.NewExistsQuery(field)
226			case ESV5:
227				return elastic5.NewExistsQuery(field)
228			case ESV6:
229				return elastic6.NewExistsQuery(field)
230			case ESV7:
231				return elastic7.NewExistsQuery(field)
232			}
233			return nil
234		},
235	}
236	r.Results = append(r.Results, &Result{Value: q})
237	return &r, nil
238}
239
240func ESGT(e *State, key string, gt float64) (*Results, error) {
241	var r Results
242	q := ESQuery{
243		Query: func(ver ESVersion) interface{} {
244			switch ver {
245			case ESV2:
246				return elastic2.NewRangeQuery(key).Gt(gt)
247			case ESV5:
248				return elastic5.NewRangeQuery(key).Gt(gt)
249			case ESV6:
250				return elastic6.NewRangeQuery(key).Gt(gt)
251			case ESV7:
252				return elastic7.NewRangeQuery(key).Gt(gt)
253			}
254			return nil
255		},
256	}
257	r.Results = append(r.Results, &Result{Value: q})
258	return &r, nil
259}
260
261func ESGTE(e *State, key string, gte float64) (*Results, error) {
262	var r Results
263	q := ESQuery{
264		Query: func(ver ESVersion) interface{} {
265			switch ver {
266			case ESV2:
267				return elastic2.NewRangeQuery(key).Gte(gte)
268			case ESV5:
269				return elastic5.NewRangeQuery(key).Gte(gte)
270			case ESV6:
271				return elastic6.NewRangeQuery(key).Gte(gte)
272			case ESV7:
273				return elastic7.NewRangeQuery(key).Gte(gte)
274			}
275			return nil
276		},
277	}
278	r.Results = append(r.Results, &Result{Value: q})
279	return &r, nil
280}
281
282func ESLT(e *State, key string, lt float64) (*Results, error) {
283	var r Results
284	q := ESQuery{
285		Query: func(ver ESVersion) interface{} {
286			switch ver {
287			case ESV2:
288				return elastic2.NewRangeQuery(key).Lt(lt)
289			case ESV5:
290				return elastic5.NewRangeQuery(key).Lt(lt)
291			case ESV6:
292				return elastic6.NewRangeQuery(key).Lt(lt)
293			case ESV7:
294				return elastic7.NewRangeQuery(key).Lt(lt)
295			}
296			return nil
297		},
298	}
299	r.Results = append(r.Results, &Result{Value: q})
300	return &r, nil
301}
302
303func ESLTE(e *State, key string, lte float64) (*Results, error) {
304	var r Results
305	q := ESQuery{
306		Query: func(ver ESVersion) interface{} {
307			switch ver {
308			case ESV2:
309				return elastic2.NewRangeQuery(key).Lte(lte)
310			case ESV5:
311				return elastic5.NewRangeQuery(key).Lte(lte)
312			case ESV6:
313				return elastic6.NewRangeQuery(key).Lte(lte)
314			case ESV7:
315				return elastic7.NewRangeQuery(key).Lte(lte)
316			}
317			return nil
318		},
319	}
320	r.Results = append(r.Results, &Result{Value: q})
321	return &r, nil
322}
323
324// ElasticHosts is an array of Logstash hosts and exists as a type for something to attach
325// methods to.  The elasticsearch library will use the listed to hosts to discover all
326// of the hosts in the config
327// type ElasticHosts []string
328type ElasticHosts struct {
329	Hosts map[string]ElasticConfig
330}
331
332type ElasticConfig struct {
333	Hosts             []string
334	Version           ESVersion
335	SimpleClient      bool
336	ClientOptionFuncs interface{}
337}
338
339// InitClient sets up the elastic client. If the client has already been
340// initialized it is a noop
341func (e ElasticHosts) InitClient(prefix string) error {
342	if _, ok := e.Hosts[prefix]; !ok {
343		prefixes := make([]string, len(e.Hosts))
344		i := 0
345		for k := range e.Hosts {
346			prefixes[i] = k
347			i++
348		}
349		return fmt.Errorf("prefix %v not defined, available prefixes are: %v", prefix, prefixes)
350	}
351	if c := esClients.m[prefix]; c != nil {
352		// client already initialized
353		return nil
354	}
355	var err error
356	if e.Hosts[prefix].SimpleClient {
357		// simple client enabled
358		err = createVersionedSimpleESClient(prefix, e.Hosts[prefix])
359	} else {
360		// default behavior
361		err = createVersionedESClient(prefix, e.Hosts[prefix])
362	}
363	if err != nil {
364		return err
365	}
366	return nil
367}
368
369func createVersionedSimpleESClient(prefix string, cfg ElasticConfig) error {
370	var err error
371	switch cfg.Version {
372	case ESV2:
373		esClients.m[prefix], err = elastic2.NewSimpleClient(elastic2.SetURL(cfg.Hosts...), elastic2.SetMaxRetries(10))
374	case ESV5:
375		esClients.m[prefix], err = elastic5.NewSimpleClient(elastic5.SetURL(cfg.Hosts...), elastic5.SetMaxRetries(10))
376	case ESV6:
377		esClients.m[prefix], err = elastic6.NewSimpleClient(elastic6.SetURL(cfg.Hosts...), elastic6.SetMaxRetries(10))
378	case ESV7:
379		esClients.m[prefix], err = elastic7.NewSimpleClient(elastic7.SetURL(cfg.Hosts...), elastic7.SetMaxRetries(10))
380	}
381	return err
382}
383
384func createVersionedESClient(prefix string, cfg ElasticConfig) error {
385	var err error
386	switch cfg.Version {
387	case ESV2:
388		if len(cfg.Hosts) == 0 {
389			// client option enabled
390			esClients.m[prefix], err = elastic2.NewClient(cfg.ClientOptionFuncs.([]elastic2.ClientOptionFunc)...)
391		} else {
392			// default behavior
393			esClients.m[prefix], err = elastic2.NewClient(elastic2.SetURL(cfg.Hosts...), elastic2.SetMaxRetries(10))
394		}
395	case ESV5:
396		if len(cfg.Hosts) == 0 {
397			// client option enabled
398			esClients.m[prefix], err = elastic5.NewClient(cfg.ClientOptionFuncs.([]elastic5.ClientOptionFunc)...)
399		} else {
400			// default behavior
401			esClients.m[prefix], err = elastic5.NewClient(elastic5.SetURL(cfg.Hosts...), elastic5.SetMaxRetries(10))
402		}
403	case ESV6:
404		if len(cfg.Hosts) == 0 {
405			// client option enabled
406			esClients.m[prefix], err = elastic6.NewClient(cfg.ClientOptionFuncs.([]elastic6.ClientOptionFunc)...)
407		} else {
408			// default behavior
409			esClients.m[prefix], err = elastic6.NewClient(elastic6.SetURL(cfg.Hosts...), elastic6.SetMaxRetries(10))
410		}
411	case ESV7:
412		if len(cfg.Hosts) == 0 {
413			// client option enabled
414			esClients.m[prefix], err = elastic7.NewClient(cfg.ClientOptionFuncs.([]elastic7.ClientOptionFunc)...)
415		} else {
416			// default behavior
417			esClients.m[prefix], err = elastic7.NewClient(elastic7.SetURL(cfg.Hosts...), elastic7.SetMaxRetries(10))
418		}
419	}
420	return err
421}
422
423func ESIndicies(e *State, timeField string, literalIndices ...string) *Results {
424	var r Results
425	indexer := ESIndexer{}
426	// Don't check for existing indexes in this case, just pass through and let elastic return
427	// an error at query time if the index does not exist
428	indexer.Generate = func(start, end *time.Time) []string {
429		return literalIndices
430	}
431	indexer.TimeField = timeField
432	r.Results = append(r.Results, &Result{Value: indexer})
433	return &r
434}
435
436func ESLS(e *State, indexRoot string) (*Results, error) {
437	return ESDaily(e, "@timestamp", indexRoot+"-", "2006.01.02")
438}
439
440func ESDaily(e *State, timeField, indexRoot, layout string) (*Results, error) {
441	var r Results
442	indexer := ESIndexer{}
443	indexer.TimeField = timeField
444	indexer.Generate = func(start, end *time.Time) []string {
445		var indices []string
446		truncStart := now.New(*start).BeginningOfDay()
447		truncEnd := now.New(*end).BeginningOfDay()
448		for d := truncStart; !d.After(truncEnd); d = d.AddDate(0, 0, 1) {
449			indices = append(indices, fmt.Sprintf("%v%v", indexRoot, d.Format(layout)))
450		}
451		return indices
452	}
453	r.Results = append(r.Results, &Result{Value: indexer})
454	return &r, nil
455}
456
457func ESMonthly(e *State, timeField, indexRoot, layout string) (*Results, error) {
458	var r Results
459	indexer := ESIndexer{}
460	indexer.TimeField = timeField
461	indexer.Generate = func(start, end *time.Time) []string {
462		var indices []string
463		truncStart := now.New(*start).BeginningOfMonth()
464		truncEnd := now.New(*end).BeginningOfMonth()
465		for d := truncStart; !d.After(truncEnd); d = d.AddDate(0, 1, 0) {
466			indices = append(indices, fmt.Sprintf("%v%v", indexRoot, d.Format(layout)))
467		}
468		return indices
469	}
470	r.Results = append(r.Results, &Result{Value: indexer})
471	return &r, nil
472}
473
474func ESCount(prefix string, e *State, indexer ESIndexer, keystring string, filter ESQuery, interval, sduration, eduration string) (r *Results, err error) {
475	switch ver := e.ElasticHosts.Hosts[prefix].Version; ver {
476	case ESV2:
477		return ESDateHistogram2(prefix, e, indexer, keystring, filter.Query(ver).(elastic2.Query), interval, sduration, eduration, "", "", 0)
478	case ESV5:
479		return ESDateHistogram5(prefix, e, indexer, keystring, filter.Query(ver).(elastic5.Query), interval, sduration, eduration, "", "", 0)
480	case ESV6:
481		return ESDateHistogram6(prefix, e, indexer, keystring, filter.Query(ver).(elastic6.Query), interval, sduration, eduration, "", "", 0)
482	case ESV7:
483		return ESDateHistogram7(prefix, e, indexer, keystring, filter.Query(ver).(elastic7.Query), interval, sduration, eduration, "", "", 0)
484	}
485	return nil, errors.New("unknown version")
486}
487
488// ESStat returns a bucketed statistical reduction for the specified field.
489func ESStat(prefix string, e *State, indexer ESIndexer, keystring string, filter ESQuery, field, rstat, interval, sduration, eduration string) (r *Results, err error) {
490	switch ver := e.ElasticHosts.Hosts[prefix].Version; ver {
491	case ESV2:
492		return ESDateHistogram2(prefix, e, indexer, keystring, filter.Query(ver).(elastic2.Query), interval, sduration, eduration, field, rstat, 0)
493	case ESV5:
494		return ESDateHistogram5(prefix, e, indexer, keystring, filter.Query(ver).(elastic5.Query), interval, sduration, eduration, field, rstat, 0)
495	case ESV6:
496		return ESDateHistogram6(prefix, e, indexer, keystring, filter.Query(ver).(elastic6.Query), interval, sduration, eduration, field, rstat, 0)
497	case ESV7:
498		return ESDateHistogram7(prefix, e, indexer, keystring, filter.Query(ver).(elastic7.Query), interval, sduration, eduration, field, rstat, 0)
499	}
500	return nil, errors.New("unknown version")
501}
502