1package tsdb 2 3import ( 4 "bytes" 5 "sync" 6 7 "github.com/influxdata/influxdb/models" 8 "github.com/influxdata/influxql" 9) 10 11// guard lets one match a set of points and block until they are done. 12type guard struct { 13 cond *sync.Cond 14 done bool 15 min int64 16 max int64 17 names map[string]struct{} 18 expr *exprGuard 19} 20 21// newGuard constructs a guard that will match any points in the given min and max 22// time range, with the given set of measurement names, or the given expression. 23// The expression is optional. 24func newGuard(min, max int64, names []string, expr influxql.Expr) *guard { 25 set := make(map[string]struct{}, len(names)) 26 for _, name := range names { 27 set[name] = struct{}{} 28 } 29 return &guard{ 30 cond: sync.NewCond(new(sync.Mutex)), 31 min: min, 32 max: max, 33 names: set, 34 expr: newExprGuard(expr), 35 } 36} 37 38// Matches returns true if any of the points match the guard. 39func (g *guard) Matches(points []models.Point) bool { 40 if g == nil { 41 return true 42 } 43 44 for _, pt := range points { 45 if t := pt.Time().UnixNano(); t < g.min || t > g.max { 46 continue 47 } 48 if len(g.names) == 0 && g.expr.matches(pt) { 49 return true 50 } else if _, ok := g.names[string(pt.Name())]; ok && g.expr.matches(pt) { 51 return true 52 } 53 } 54 return false 55} 56 57// Wait blocks until the guard has been marked Done. 58func (g *guard) Wait() { 59 g.cond.L.Lock() 60 for !g.done { 61 g.cond.Wait() 62 } 63 g.cond.L.Unlock() 64} 65 66// Done signals to anyone waiting on the guard that they can proceed. 67func (g *guard) Done() { 68 g.cond.L.Lock() 69 g.done = true 70 g.cond.Broadcast() 71 g.cond.L.Unlock() 72} 73 74// exprGuard is a union of influxql.Expr based guards. a nil exprGuard matches 75// everything, while the zero value matches nothing. 76type exprGuard struct { 77 and *[2]*exprGuard 78 or *[2]*exprGuard 79 tagMatches *tagGuard 80 tagExists map[string]struct{} 81} 82 83type tagGuard struct { 84 meas bool 85 key []byte 86 op func([]byte) bool 87} 88 89// empty returns true if the exprGuard is empty, meaning that it matches no points. 90func (e *exprGuard) empty() bool { 91 return e != nil && e.and == nil && e.or == nil && e.tagMatches == nil && e.tagExists == nil 92} 93 94// newExprGuard scrutinizes the expression and returns an efficient guard. 95func newExprGuard(expr influxql.Expr) *exprGuard { 96 if expr == nil { 97 return nil 98 } 99 100 switch expr := expr.(type) { 101 case *influxql.ParenExpr: 102 return newExprGuard(expr.Expr) 103 104 case *influxql.BooleanLiteral: 105 if expr.Val { 106 return nil // matches everything 107 } 108 return new(exprGuard) // matches nothing 109 110 case *influxql.BinaryExpr: 111 switch expr.Op { 112 case influxql.AND: 113 lhs, rhs := newExprGuard(expr.LHS), newExprGuard(expr.RHS) 114 if lhs == nil { // reduce 115 return rhs 116 } else if rhs == nil { // reduce 117 return lhs 118 } else if lhs.empty() || rhs.empty() { // short circuit 119 return new(exprGuard) 120 } else { 121 return &exprGuard{and: &[2]*exprGuard{lhs, rhs}} 122 } 123 124 case influxql.OR: 125 lhs, rhs := newExprGuard(expr.LHS), newExprGuard(expr.RHS) 126 if lhs.empty() { // reduce 127 return rhs 128 } else if rhs.empty() { // reduce 129 return lhs 130 } else if lhs == nil || rhs == nil { // short circuit 131 return nil 132 } else { 133 return &exprGuard{or: &[2]*exprGuard{lhs, rhs}} 134 } 135 136 default: 137 return newBinaryExprGuard(expr) 138 } 139 default: 140 // if we couldn't analyze, match everything 141 return nil 142 } 143} 144 145// newBinaryExprGuard scrutinizes the binary expression and returns an efficient guard. 146func newBinaryExprGuard(expr *influxql.BinaryExpr) *exprGuard { 147 // if it's a nested binary expression, always match. 148 if _, ok := expr.LHS.(*influxql.BinaryExpr); ok { 149 return nil 150 } else if _, ok := expr.RHS.(*influxql.BinaryExpr); ok { 151 return nil 152 } 153 154 // ensure one of the expressions is a VarRef, and make that the key. 155 key, ok := expr.LHS.(*influxql.VarRef) 156 value := expr.RHS 157 if !ok { 158 key, ok = expr.RHS.(*influxql.VarRef) 159 if !ok { 160 return nil 161 } 162 value = expr.LHS 163 } 164 165 // check the key for situations we know we can't filter. 166 if key.Val != "_name" && key.Type != influxql.Unknown && key.Type != influxql.Tag { 167 return nil 168 } 169 170 // scrutinize the value to return an efficient guard. 171 switch value := value.(type) { 172 case *influxql.StringLiteral: 173 val := []byte(value.Val) 174 g := &exprGuard{tagMatches: &tagGuard{ 175 meas: key.Val == "_name", 176 key: []byte(key.Val), 177 }} 178 179 switch expr.Op { 180 case influxql.EQ: 181 g.tagMatches.op = func(x []byte) bool { return bytes.Equal(val, x) } 182 183 case influxql.NEQ: 184 g.tagMatches.op = func(x []byte) bool { return !bytes.Equal(val, x) } 185 186 default: // any other operator isn't valid. conservatively match everything. 187 return nil 188 } 189 190 return g 191 192 case *influxql.RegexLiteral: 193 // There's a tradeoff between being precise and being fast. For example, if the 194 // delete includes a very expensive regex, we don't want to run that against every 195 // incoming point. The decision here is to match any point that has a possibly 196 // expensive match if there is any overlap on the tags. In other words, expensive 197 // matches get transformed into trivially matching everything. 198 return &exprGuard{tagExists: map[string]struct{}{key.Val: {}}} 199 200 case *influxql.VarRef: 201 // We could do a better job here by encoding the two names and checking the points 202 // against them, but I'm not quite sure how to do that. Be conservative and match 203 // any points that contain either the key or value. 204 205 // since every point has a measurement, always match if either are on the measurement. 206 if key.Val == "_name" || value.Val == "_name" { 207 return nil 208 } 209 return &exprGuard{tagExists: map[string]struct{}{ 210 key.Val: {}, 211 value.Val: {}, 212 }} 213 214 default: // any other value type matches everything 215 return nil 216 } 217} 218 219// matches checks if the exprGuard matches the point. 220func (g *exprGuard) matches(pt models.Point) bool { 221 switch { 222 case g == nil: 223 return true 224 225 case g.and != nil: 226 return g.and[0].matches(pt) && g.and[1].matches(pt) 227 228 case g.or != nil: 229 return g.or[0].matches(pt) || g.or[1].matches(pt) 230 231 case g.tagMatches != nil: 232 if g.tagMatches.meas { 233 return g.tagMatches.op(pt.Name()) 234 } 235 for _, tag := range pt.Tags() { 236 if bytes.Equal(tag.Key, g.tagMatches.key) && g.tagMatches.op(tag.Value) { 237 return true 238 } 239 } 240 return false 241 242 case g.tagExists != nil: 243 for _, tag := range pt.Tags() { 244 if _, ok := g.tagExists[string(tag.Key)]; ok { 245 return true 246 } 247 } 248 return false 249 250 default: 251 return false 252 } 253} 254