1package tsdb
2
3import (
4	"bytes"
5	"sync"
6
7	"github.com/influxdata/influxdb/models"
8	"github.com/influxdata/influxql"
9)
10
11// guard lets one match a set of points and block until they are done.
12type guard struct {
13	cond  *sync.Cond
14	done  bool
15	min   int64
16	max   int64
17	names map[string]struct{}
18	expr  *exprGuard
19}
20
21// newGuard constructs a guard that will match any points in the given min and max
22// time range, with the given set of measurement names, or the given expression.
23// The expression is optional.
24func newGuard(min, max int64, names []string, expr influxql.Expr) *guard {
25	set := make(map[string]struct{}, len(names))
26	for _, name := range names {
27		set[name] = struct{}{}
28	}
29	return &guard{
30		cond:  sync.NewCond(new(sync.Mutex)),
31		min:   min,
32		max:   max,
33		names: set,
34		expr:  newExprGuard(expr),
35	}
36}
37
38// Matches returns true if any of the points match the guard.
39func (g *guard) Matches(points []models.Point) bool {
40	if g == nil {
41		return true
42	}
43
44	for _, pt := range points {
45		if t := pt.Time().UnixNano(); t < g.min || t > g.max {
46			continue
47		}
48		if len(g.names) == 0 && g.expr.matches(pt) {
49			return true
50		} else if _, ok := g.names[string(pt.Name())]; ok && g.expr.matches(pt) {
51			return true
52		}
53	}
54	return false
55}
56
57// Wait blocks until the guard has been marked Done.
58func (g *guard) Wait() {
59	g.cond.L.Lock()
60	for !g.done {
61		g.cond.Wait()
62	}
63	g.cond.L.Unlock()
64}
65
66// Done signals to anyone waiting on the guard that they can proceed.
67func (g *guard) Done() {
68	g.cond.L.Lock()
69	g.done = true
70	g.cond.Broadcast()
71	g.cond.L.Unlock()
72}
73
74// exprGuard is a union of influxql.Expr based guards. a nil exprGuard matches
75// everything, while the zero value matches nothing.
76type exprGuard struct {
77	and        *[2]*exprGuard
78	or         *[2]*exprGuard
79	tagMatches *tagGuard
80	tagExists  map[string]struct{}
81}
82
83type tagGuard struct {
84	meas bool
85	key  []byte
86	op   func([]byte) bool
87}
88
89// empty returns true if the exprGuard is empty, meaning that it matches no points.
90func (e *exprGuard) empty() bool {
91	return e != nil && e.and == nil && e.or == nil && e.tagMatches == nil && e.tagExists == nil
92}
93
94// newExprGuard scrutinizes the expression and returns an efficient guard.
95func newExprGuard(expr influxql.Expr) *exprGuard {
96	if expr == nil {
97		return nil
98	}
99
100	switch expr := expr.(type) {
101	case *influxql.ParenExpr:
102		return newExprGuard(expr.Expr)
103
104	case *influxql.BooleanLiteral:
105		if expr.Val {
106			return nil // matches everything
107		}
108		return new(exprGuard) // matches nothing
109
110	case *influxql.BinaryExpr:
111		switch expr.Op {
112		case influxql.AND:
113			lhs, rhs := newExprGuard(expr.LHS), newExprGuard(expr.RHS)
114			if lhs == nil { // reduce
115				return rhs
116			} else if rhs == nil { // reduce
117				return lhs
118			} else if lhs.empty() || rhs.empty() { // short circuit
119				return new(exprGuard)
120			} else {
121				return &exprGuard{and: &[2]*exprGuard{lhs, rhs}}
122			}
123
124		case influxql.OR:
125			lhs, rhs := newExprGuard(expr.LHS), newExprGuard(expr.RHS)
126			if lhs.empty() { // reduce
127				return rhs
128			} else if rhs.empty() { // reduce
129				return lhs
130			} else if lhs == nil || rhs == nil { // short circuit
131				return nil
132			} else {
133				return &exprGuard{or: &[2]*exprGuard{lhs, rhs}}
134			}
135
136		default:
137			return newBinaryExprGuard(expr)
138		}
139	default:
140		// if we couldn't analyze, match everything
141		return nil
142	}
143}
144
145// newBinaryExprGuard scrutinizes the binary expression and returns an efficient guard.
146func newBinaryExprGuard(expr *influxql.BinaryExpr) *exprGuard {
147	// if it's a nested binary expression, always match.
148	if _, ok := expr.LHS.(*influxql.BinaryExpr); ok {
149		return nil
150	} else if _, ok := expr.RHS.(*influxql.BinaryExpr); ok {
151		return nil
152	}
153
154	// ensure one of the expressions is a VarRef, and make that the key.
155	key, ok := expr.LHS.(*influxql.VarRef)
156	value := expr.RHS
157	if !ok {
158		key, ok = expr.RHS.(*influxql.VarRef)
159		if !ok {
160			return nil
161		}
162		value = expr.LHS
163	}
164
165	// check the key for situations we know we can't filter.
166	if key.Val != "_name" && key.Type != influxql.Unknown && key.Type != influxql.Tag {
167		return nil
168	}
169
170	// scrutinize the value to return an efficient guard.
171	switch value := value.(type) {
172	case *influxql.StringLiteral:
173		val := []byte(value.Val)
174		g := &exprGuard{tagMatches: &tagGuard{
175			meas: key.Val == "_name",
176			key:  []byte(key.Val),
177		}}
178
179		switch expr.Op {
180		case influxql.EQ:
181			g.tagMatches.op = func(x []byte) bool { return bytes.Equal(val, x) }
182
183		case influxql.NEQ:
184			g.tagMatches.op = func(x []byte) bool { return !bytes.Equal(val, x) }
185
186		default: // any other operator isn't valid. conservatively match everything.
187			return nil
188		}
189
190		return g
191
192	case *influxql.RegexLiteral:
193		// There's a tradeoff between being precise and being fast. For example, if the
194		// delete includes a very expensive regex, we don't want to run that against every
195		// incoming point. The decision here is to match any point that has a possibly
196		// expensive match if there is any overlap on the tags. In other words, expensive
197		// matches get transformed into trivially matching everything.
198		return &exprGuard{tagExists: map[string]struct{}{key.Val: {}}}
199
200	case *influxql.VarRef:
201		// We could do a better job here by encoding the two names and checking the points
202		// against them, but I'm not quite sure how to do that. Be conservative and match
203		// any points that contain either the key or value.
204
205		// since every point has a measurement, always match if either are on the measurement.
206		if key.Val == "_name" || value.Val == "_name" {
207			return nil
208		}
209		return &exprGuard{tagExists: map[string]struct{}{
210			key.Val:   {},
211			value.Val: {},
212		}}
213
214	default: // any other value type matches everything
215		return nil
216	}
217}
218
219// matches checks if the exprGuard matches the point.
220func (g *exprGuard) matches(pt models.Point) bool {
221	switch {
222	case g == nil:
223		return true
224
225	case g.and != nil:
226		return g.and[0].matches(pt) && g.and[1].matches(pt)
227
228	case g.or != nil:
229		return g.or[0].matches(pt) || g.or[1].matches(pt)
230
231	case g.tagMatches != nil:
232		if g.tagMatches.meas {
233			return g.tagMatches.op(pt.Name())
234		}
235		for _, tag := range pt.Tags() {
236			if bytes.Equal(tag.Key, g.tagMatches.key) && g.tagMatches.op(tag.Value) {
237				return true
238			}
239		}
240		return false
241
242	case g.tagExists != nil:
243		for _, tag := range pt.Tags() {
244			if _, ok := g.tagExists[string(tag.Key)]; ok {
245				return true
246			}
247		}
248		return false
249
250	default:
251		return false
252	}
253}
254