1package query_test 2 3import ( 4 "bytes" 5 "context" 6 "fmt" 7 "reflect" 8 "strings" 9 "testing" 10 "time" 11 12 "github.com/davecgh/go-spew/spew" 13 "github.com/influxdata/influxdb/pkg/deep" 14 "github.com/influxdata/influxdb/query" 15 "github.com/influxdata/influxql" 16) 17 18// Ensure that a set of iterators can be merged together, sorted by window and name/tag. 19func TestMergeIterator_Float(t *testing.T) { 20 inputs := []*FloatIterator{ 21 {Points: []query.FloatPoint{ 22 {Name: "cpu", Tags: ParseTags("host=A"), Time: 0, Value: 1}, 23 {Name: "cpu", Tags: ParseTags("host=A"), Time: 12, Value: 3}, 24 {Name: "cpu", Tags: ParseTags("host=A"), Time: 30, Value: 4}, 25 {Name: "cpu", Tags: ParseTags("host=B"), Time: 1, Value: 2}, 26 {Name: "mem", Tags: ParseTags("host=B"), Time: 11, Value: 8}, 27 }}, 28 {Points: []query.FloatPoint{ 29 {Name: "cpu", Tags: ParseTags("host=A"), Time: 20, Value: 7}, 30 {Name: "cpu", Tags: ParseTags("host=B"), Time: 11, Value: 5}, 31 {Name: "cpu", Tags: ParseTags("host=B"), Time: 13, Value: 6}, 32 {Name: "mem", Tags: ParseTags("host=A"), Time: 25, Value: 9}, 33 }}, 34 {Points: []query.FloatPoint{}}, 35 {Points: []query.FloatPoint{}}, 36 } 37 38 itr := query.NewMergeIterator(FloatIterators(inputs), query.IteratorOptions{ 39 Interval: query.Interval{ 40 Duration: 10 * time.Nanosecond, 41 }, 42 Dimensions: []string{"host"}, 43 Ascending: true, 44 }) 45 if a, err := Iterators([]query.Iterator{itr}).ReadAll(); err != nil { 46 t.Fatalf("unexpected error: %s", err) 47 } else if !deep.Equal(a, [][]query.Point{ 48 {&query.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 0, Value: 1}}, 49 {&query.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 12, Value: 3}}, 50 {&query.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 20, Value: 7}}, 51 {&query.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 30, Value: 4}}, 52 {&query.FloatPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 1, Value: 2}}, 53 {&query.FloatPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 11, Value: 5}}, 54 {&query.FloatPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 13, Value: 6}}, 55 {&query.FloatPoint{Name: "mem", Tags: ParseTags("host=A"), Time: 25, Value: 9}}, 56 {&query.FloatPoint{Name: "mem", Tags: ParseTags("host=B"), Time: 11, Value: 8}}, 57 }) { 58 t.Errorf("unexpected points: %s", spew.Sdump(a)) 59 } 60 61 for i, input := range inputs { 62 if !input.Closed { 63 t.Errorf("iterator %d not closed", i) 64 } 65 } 66} 67 68// Ensure that a set of iterators can be merged together, sorted by window and name/tag. 69func TestMergeIterator_Integer(t *testing.T) { 70 inputs := []*IntegerIterator{ 71 {Points: []query.IntegerPoint{ 72 {Name: "cpu", Tags: ParseTags("host=A"), Time: 0, Value: 1}, 73 {Name: "cpu", Tags: ParseTags("host=A"), Time: 12, Value: 3}, 74 {Name: "cpu", Tags: ParseTags("host=A"), Time: 30, Value: 4}, 75 {Name: "cpu", Tags: ParseTags("host=B"), Time: 1, Value: 2}, 76 {Name: "mem", Tags: ParseTags("host=B"), Time: 11, Value: 8}, 77 }}, 78 {Points: []query.IntegerPoint{ 79 {Name: "cpu", Tags: ParseTags("host=A"), Time: 20, Value: 7}, 80 {Name: "cpu", Tags: ParseTags("host=B"), Time: 11, Value: 5}, 81 {Name: "cpu", Tags: ParseTags("host=B"), Time: 13, Value: 6}, 82 {Name: "mem", Tags: ParseTags("host=A"), Time: 25, Value: 9}, 83 }}, 84 {Points: []query.IntegerPoint{}}, 85 } 86 itr := query.NewMergeIterator(IntegerIterators(inputs), query.IteratorOptions{ 87 Interval: query.Interval{ 88 Duration: 10 * time.Nanosecond, 89 }, 90 Dimensions: []string{"host"}, 91 Ascending: true, 92 }) 93 94 if a, err := Iterators([]query.Iterator{itr}).ReadAll(); err != nil { 95 t.Fatalf("unexpected error: %s", err) 96 } else if !deep.Equal(a, [][]query.Point{ 97 {&query.IntegerPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 0, Value: 1}}, 98 {&query.IntegerPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 12, Value: 3}}, 99 {&query.IntegerPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 20, Value: 7}}, 100 {&query.IntegerPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 30, Value: 4}}, 101 {&query.IntegerPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 1, Value: 2}}, 102 {&query.IntegerPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 11, Value: 5}}, 103 {&query.IntegerPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 13, Value: 6}}, 104 {&query.IntegerPoint{Name: "mem", Tags: ParseTags("host=A"), Time: 25, Value: 9}}, 105 {&query.IntegerPoint{Name: "mem", Tags: ParseTags("host=B"), Time: 11, Value: 8}}, 106 }) { 107 t.Errorf("unexpected points: %s", spew.Sdump(a)) 108 } 109 110 for i, input := range inputs { 111 if !input.Closed { 112 t.Errorf("iterator %d not closed", i) 113 } 114 } 115} 116 117// Ensure that a set of iterators can be merged together, sorted by window and name/tag. 118func TestMergeIterator_Unsigned(t *testing.T) { 119 inputs := []*UnsignedIterator{ 120 {Points: []query.UnsignedPoint{ 121 {Name: "cpu", Tags: ParseTags("host=A"), Time: 0, Value: 1}, 122 {Name: "cpu", Tags: ParseTags("host=A"), Time: 12, Value: 3}, 123 {Name: "cpu", Tags: ParseTags("host=A"), Time: 30, Value: 4}, 124 {Name: "cpu", Tags: ParseTags("host=B"), Time: 1, Value: 2}, 125 {Name: "mem", Tags: ParseTags("host=B"), Time: 11, Value: 8}, 126 }}, 127 {Points: []query.UnsignedPoint{ 128 {Name: "cpu", Tags: ParseTags("host=A"), Time: 20, Value: 7}, 129 {Name: "cpu", Tags: ParseTags("host=B"), Time: 11, Value: 5}, 130 {Name: "cpu", Tags: ParseTags("host=B"), Time: 13, Value: 6}, 131 {Name: "mem", Tags: ParseTags("host=A"), Time: 25, Value: 9}, 132 }}, 133 {Points: []query.UnsignedPoint{}}, 134 } 135 itr := query.NewMergeIterator(UnsignedIterators(inputs), query.IteratorOptions{ 136 Interval: query.Interval{ 137 Duration: 10 * time.Nanosecond, 138 }, 139 Dimensions: []string{"host"}, 140 Ascending: true, 141 }) 142 143 if a, err := Iterators([]query.Iterator{itr}).ReadAll(); err != nil { 144 t.Fatalf("unexpected error: %s", err) 145 } else if !deep.Equal(a, [][]query.Point{ 146 {&query.UnsignedPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 0, Value: 1}}, 147 {&query.UnsignedPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 12, Value: 3}}, 148 {&query.UnsignedPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 20, Value: 7}}, 149 {&query.UnsignedPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 30, Value: 4}}, 150 {&query.UnsignedPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 1, Value: 2}}, 151 {&query.UnsignedPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 11, Value: 5}}, 152 {&query.UnsignedPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 13, Value: 6}}, 153 {&query.UnsignedPoint{Name: "mem", Tags: ParseTags("host=A"), Time: 25, Value: 9}}, 154 {&query.UnsignedPoint{Name: "mem", Tags: ParseTags("host=B"), Time: 11, Value: 8}}, 155 }) { 156 t.Errorf("unexpected points: %s", spew.Sdump(a)) 157 } 158 159 for i, input := range inputs { 160 if !input.Closed { 161 t.Errorf("iterator %d not closed", i) 162 } 163 } 164} 165 166// Ensure that a set of iterators can be merged together, sorted by window and name/tag. 167func TestMergeIterator_String(t *testing.T) { 168 inputs := []*StringIterator{ 169 {Points: []query.StringPoint{ 170 {Name: "cpu", Tags: ParseTags("host=A"), Time: 0, Value: "a"}, 171 {Name: "cpu", Tags: ParseTags("host=A"), Time: 12, Value: "c"}, 172 {Name: "cpu", Tags: ParseTags("host=A"), Time: 30, Value: "d"}, 173 {Name: "cpu", Tags: ParseTags("host=B"), Time: 1, Value: "b"}, 174 {Name: "mem", Tags: ParseTags("host=B"), Time: 11, Value: "h"}, 175 }}, 176 {Points: []query.StringPoint{ 177 {Name: "cpu", Tags: ParseTags("host=A"), Time: 20, Value: "g"}, 178 {Name: "cpu", Tags: ParseTags("host=B"), Time: 11, Value: "e"}, 179 {Name: "cpu", Tags: ParseTags("host=B"), Time: 13, Value: "f"}, 180 {Name: "mem", Tags: ParseTags("host=A"), Time: 25, Value: "i"}, 181 }}, 182 {Points: []query.StringPoint{}}, 183 } 184 itr := query.NewMergeIterator(StringIterators(inputs), query.IteratorOptions{ 185 Interval: query.Interval{ 186 Duration: 10 * time.Nanosecond, 187 }, 188 Dimensions: []string{"host"}, 189 Ascending: true, 190 }) 191 192 if a, err := Iterators([]query.Iterator{itr}).ReadAll(); err != nil { 193 t.Fatalf("unexpected error: %s", err) 194 } else if !deep.Equal(a, [][]query.Point{ 195 {&query.StringPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 0, Value: "a"}}, 196 {&query.StringPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 12, Value: "c"}}, 197 {&query.StringPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 20, Value: "g"}}, 198 {&query.StringPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 30, Value: "d"}}, 199 {&query.StringPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 1, Value: "b"}}, 200 {&query.StringPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 11, Value: "e"}}, 201 {&query.StringPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 13, Value: "f"}}, 202 {&query.StringPoint{Name: "mem", Tags: ParseTags("host=A"), Time: 25, Value: "i"}}, 203 {&query.StringPoint{Name: "mem", Tags: ParseTags("host=B"), Time: 11, Value: "h"}}, 204 }) { 205 t.Errorf("unexpected points: %s", spew.Sdump(a)) 206 } 207 208 for i, input := range inputs { 209 if !input.Closed { 210 t.Errorf("iterator %d not closed", i) 211 } 212 } 213} 214 215// Ensure that a set of iterators can be merged together, sorted by window and name/tag. 216func TestMergeIterator_Boolean(t *testing.T) { 217 inputs := []*BooleanIterator{ 218 {Points: []query.BooleanPoint{ 219 {Name: "cpu", Tags: ParseTags("host=A"), Time: 0, Value: true}, 220 {Name: "cpu", Tags: ParseTags("host=A"), Time: 12, Value: true}, 221 {Name: "cpu", Tags: ParseTags("host=A"), Time: 30, Value: false}, 222 {Name: "cpu", Tags: ParseTags("host=B"), Time: 1, Value: false}, 223 {Name: "mem", Tags: ParseTags("host=B"), Time: 11, Value: true}, 224 }}, 225 {Points: []query.BooleanPoint{ 226 {Name: "cpu", Tags: ParseTags("host=A"), Time: 20, Value: true}, 227 {Name: "cpu", Tags: ParseTags("host=B"), Time: 11, Value: true}, 228 {Name: "cpu", Tags: ParseTags("host=B"), Time: 13, Value: false}, 229 {Name: "mem", Tags: ParseTags("host=A"), Time: 25, Value: false}, 230 }}, 231 {Points: []query.BooleanPoint{}}, 232 } 233 itr := query.NewMergeIterator(BooleanIterators(inputs), query.IteratorOptions{ 234 Interval: query.Interval{ 235 Duration: 10 * time.Nanosecond, 236 }, 237 Dimensions: []string{"host"}, 238 Ascending: true, 239 }) 240 241 if a, err := Iterators([]query.Iterator{itr}).ReadAll(); err != nil { 242 t.Fatalf("unexpected error: %s", err) 243 } else if !deep.Equal(a, [][]query.Point{ 244 {&query.BooleanPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 0, Value: true}}, 245 {&query.BooleanPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 12, Value: true}}, 246 {&query.BooleanPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 20, Value: true}}, 247 {&query.BooleanPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 30, Value: false}}, 248 {&query.BooleanPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 1, Value: false}}, 249 {&query.BooleanPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 11, Value: true}}, 250 {&query.BooleanPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 13, Value: false}}, 251 {&query.BooleanPoint{Name: "mem", Tags: ParseTags("host=A"), Time: 25, Value: false}}, 252 {&query.BooleanPoint{Name: "mem", Tags: ParseTags("host=B"), Time: 11, Value: true}}, 253 }) { 254 t.Errorf("unexpected points: %s", spew.Sdump(a)) 255 } 256 257 for i, input := range inputs { 258 if !input.Closed { 259 t.Errorf("iterator %d not closed", i) 260 } 261 } 262} 263 264func TestMergeIterator_Nil(t *testing.T) { 265 itr := query.NewMergeIterator([]query.Iterator{nil}, query.IteratorOptions{}) 266 if itr != nil { 267 t.Fatalf("unexpected iterator: %#v", itr) 268 } 269} 270 271// Verifies that coercing will drop values that aren't the primary type. 272// It's the responsibility of the engine to return the correct type. If they don't, 273// we drop iterators that don't match. 274func TestMergeIterator_Coerce_Float(t *testing.T) { 275 inputs := []query.Iterator{ 276 &FloatIterator{Points: []query.FloatPoint{ 277 {Name: "cpu", Tags: ParseTags("host=A"), Time: 20, Value: 7}, 278 {Name: "cpu", Tags: ParseTags("host=B"), Time: 11, Value: 5}, 279 {Name: "cpu", Tags: ParseTags("host=B"), Time: 13, Value: 6}, 280 {Name: "mem", Tags: ParseTags("host=A"), Time: 25, Value: 9}, 281 }}, 282 &IntegerIterator{Points: []query.IntegerPoint{ 283 {Name: "cpu", Tags: ParseTags("host=A"), Time: 0, Value: 1}, 284 {Name: "cpu", Tags: ParseTags("host=A"), Time: 12, Value: 3}, 285 {Name: "cpu", Tags: ParseTags("host=A"), Time: 30, Value: 4}, 286 {Name: "cpu", Tags: ParseTags("host=B"), Time: 1, Value: 2}, 287 {Name: "mem", Tags: ParseTags("host=B"), Time: 11, Value: 8}, 288 }}, 289 } 290 291 itr := query.NewMergeIterator(inputs, query.IteratorOptions{ 292 Interval: query.Interval{ 293 Duration: 10 * time.Nanosecond, 294 }, 295 Dimensions: []string{"host"}, 296 Ascending: true, 297 }) 298 if a, err := Iterators([]query.Iterator{itr}).ReadAll(); err != nil { 299 t.Fatalf("unexpected error: %s", err) 300 } else if !deep.Equal(a, [][]query.Point{ 301 {&query.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 20, Value: 7}}, 302 {&query.FloatPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 11, Value: 5}}, 303 {&query.FloatPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 13, Value: 6}}, 304 {&query.FloatPoint{Name: "mem", Tags: ParseTags("host=A"), Time: 25, Value: 9}}, 305 }) { 306 t.Errorf("unexpected points: %s", spew.Sdump(a)) 307 } 308 309 for i, input := range inputs { 310 switch input := input.(type) { 311 case *FloatIterator: 312 if !input.Closed { 313 t.Errorf("iterator %d not closed", i) 314 } 315 case *IntegerIterator: 316 if !input.Closed { 317 t.Errorf("iterator %d not closed", i) 318 } 319 case *UnsignedIterator: 320 if !input.Closed { 321 t.Errorf("iterator %d not closed", i) 322 } 323 } 324 } 325} 326 327// Ensure that a set of iterators can be merged together, sorted by name/tag. 328func TestSortedMergeIterator_Float(t *testing.T) { 329 inputs := []*FloatIterator{ 330 {Points: []query.FloatPoint{ 331 {Name: "cpu", Tags: ParseTags("host=A"), Time: 0, Value: 1}, 332 {Name: "cpu", Tags: ParseTags("host=A"), Time: 12, Value: 3}, 333 {Name: "cpu", Tags: ParseTags("host=A"), Time: 30, Value: 4}, 334 {Name: "cpu", Tags: ParseTags("host=B"), Time: 1, Value: 2}, 335 {Name: "mem", Tags: ParseTags("host=B"), Time: 4, Value: 8}, 336 }}, 337 {Points: []query.FloatPoint{ 338 {Name: "cpu", Tags: ParseTags("host=A"), Time: 20, Value: 7}, 339 {Name: "cpu", Tags: ParseTags("host=B"), Time: 11, Value: 5}, 340 {Name: "cpu", Tags: ParseTags("host=B"), Time: 13, Value: 6}, 341 {Name: "mem", Tags: ParseTags("host=A"), Time: 25, Value: 9}, 342 }}, 343 {Points: []query.FloatPoint{}}, 344 } 345 itr := query.NewSortedMergeIterator(FloatIterators(inputs), query.IteratorOptions{ 346 Interval: query.Interval{ 347 Duration: 10 * time.Nanosecond, 348 }, 349 Dimensions: []string{"host"}, 350 Ascending: true, 351 }) 352 if a, err := Iterators([]query.Iterator{itr}).ReadAll(); err != nil { 353 t.Fatalf("unexpected error: %s", err) 354 } else if !deep.Equal(a, [][]query.Point{ 355 {&query.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 0, Value: 1}}, 356 {&query.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 12, Value: 3}}, 357 {&query.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 20, Value: 7}}, 358 {&query.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 30, Value: 4}}, 359 {&query.FloatPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 1, Value: 2}}, 360 {&query.FloatPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 11, Value: 5}}, 361 {&query.FloatPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 13, Value: 6}}, 362 {&query.FloatPoint{Name: "mem", Tags: ParseTags("host=A"), Time: 25, Value: 9}}, 363 {&query.FloatPoint{Name: "mem", Tags: ParseTags("host=B"), Time: 4, Value: 8}}, 364 }) { 365 t.Errorf("unexpected points: %s", spew.Sdump(a)) 366 } 367 368 for i, input := range inputs { 369 if !input.Closed { 370 t.Errorf("iterator %d not closed", i) 371 } 372 } 373} 374 375// Ensure that a set of iterators can be merged together, sorted by name/tag. 376func TestSortedMergeIterator_Integer(t *testing.T) { 377 inputs := []*IntegerIterator{ 378 {Points: []query.IntegerPoint{ 379 {Name: "cpu", Tags: ParseTags("host=A"), Time: 0, Value: 1}, 380 {Name: "cpu", Tags: ParseTags("host=A"), Time: 12, Value: 3}, 381 {Name: "cpu", Tags: ParseTags("host=A"), Time: 30, Value: 4}, 382 {Name: "cpu", Tags: ParseTags("host=B"), Time: 1, Value: 2}, 383 {Name: "mem", Tags: ParseTags("host=B"), Time: 4, Value: 8}, 384 }}, 385 {Points: []query.IntegerPoint{ 386 {Name: "cpu", Tags: ParseTags("host=A"), Time: 20, Value: 7}, 387 {Name: "cpu", Tags: ParseTags("host=B"), Time: 11, Value: 5}, 388 {Name: "cpu", Tags: ParseTags("host=B"), Time: 13, Value: 6}, 389 {Name: "mem", Tags: ParseTags("host=A"), Time: 25, Value: 9}, 390 }}, 391 {Points: []query.IntegerPoint{}}, 392 } 393 itr := query.NewSortedMergeIterator(IntegerIterators(inputs), query.IteratorOptions{ 394 Interval: query.Interval{ 395 Duration: 10 * time.Nanosecond, 396 }, 397 Dimensions: []string{"host"}, 398 Ascending: true, 399 }) 400 if a, err := Iterators([]query.Iterator{itr}).ReadAll(); err != nil { 401 t.Fatalf("unexpected error: %s", err) 402 } else if !deep.Equal(a, [][]query.Point{ 403 {&query.IntegerPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 0, Value: 1}}, 404 {&query.IntegerPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 12, Value: 3}}, 405 {&query.IntegerPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 20, Value: 7}}, 406 {&query.IntegerPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 30, Value: 4}}, 407 {&query.IntegerPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 1, Value: 2}}, 408 {&query.IntegerPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 11, Value: 5}}, 409 {&query.IntegerPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 13, Value: 6}}, 410 {&query.IntegerPoint{Name: "mem", Tags: ParseTags("host=A"), Time: 25, Value: 9}}, 411 {&query.IntegerPoint{Name: "mem", Tags: ParseTags("host=B"), Time: 4, Value: 8}}, 412 }) { 413 t.Errorf("unexpected points: %s", spew.Sdump(a)) 414 } 415 416 for i, input := range inputs { 417 if !input.Closed { 418 t.Errorf("iterator %d not closed", i) 419 } 420 } 421} 422 423// Ensure that a set of iterators can be merged together, sorted by name/tag. 424func TestSortedMergeIterator_Unsigned(t *testing.T) { 425 inputs := []*UnsignedIterator{ 426 {Points: []query.UnsignedPoint{ 427 {Name: "cpu", Tags: ParseTags("host=A"), Time: 0, Value: 1}, 428 {Name: "cpu", Tags: ParseTags("host=A"), Time: 12, Value: 3}, 429 {Name: "cpu", Tags: ParseTags("host=A"), Time: 30, Value: 4}, 430 {Name: "cpu", Tags: ParseTags("host=B"), Time: 1, Value: 2}, 431 {Name: "mem", Tags: ParseTags("host=B"), Time: 4, Value: 8}, 432 }}, 433 {Points: []query.UnsignedPoint{ 434 {Name: "cpu", Tags: ParseTags("host=A"), Time: 20, Value: 7}, 435 {Name: "cpu", Tags: ParseTags("host=B"), Time: 11, Value: 5}, 436 {Name: "cpu", Tags: ParseTags("host=B"), Time: 13, Value: 6}, 437 {Name: "mem", Tags: ParseTags("host=A"), Time: 25, Value: 9}, 438 }}, 439 {Points: []query.UnsignedPoint{}}, 440 } 441 itr := query.NewSortedMergeIterator(UnsignedIterators(inputs), query.IteratorOptions{ 442 Interval: query.Interval{ 443 Duration: 10 * time.Nanosecond, 444 }, 445 Dimensions: []string{"host"}, 446 Ascending: true, 447 }) 448 if a, err := Iterators([]query.Iterator{itr}).ReadAll(); err != nil { 449 t.Fatalf("unexpected error: %s", err) 450 } else if !deep.Equal(a, [][]query.Point{ 451 {&query.UnsignedPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 0, Value: 1}}, 452 {&query.UnsignedPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 12, Value: 3}}, 453 {&query.UnsignedPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 20, Value: 7}}, 454 {&query.UnsignedPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 30, Value: 4}}, 455 {&query.UnsignedPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 1, Value: 2}}, 456 {&query.UnsignedPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 11, Value: 5}}, 457 {&query.UnsignedPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 13, Value: 6}}, 458 {&query.UnsignedPoint{Name: "mem", Tags: ParseTags("host=A"), Time: 25, Value: 9}}, 459 {&query.UnsignedPoint{Name: "mem", Tags: ParseTags("host=B"), Time: 4, Value: 8}}, 460 }) { 461 t.Errorf("unexpected points: %s", spew.Sdump(a)) 462 } 463 464 for i, input := range inputs { 465 if !input.Closed { 466 t.Errorf("iterator %d not closed", i) 467 } 468 } 469} 470 471// Ensure that a set of iterators can be merged together, sorted by name/tag. 472func TestSortedMergeIterator_String(t *testing.T) { 473 inputs := []*StringIterator{ 474 {Points: []query.StringPoint{ 475 {Name: "cpu", Tags: ParseTags("host=A"), Time: 0, Value: "a"}, 476 {Name: "cpu", Tags: ParseTags("host=A"), Time: 12, Value: "c"}, 477 {Name: "cpu", Tags: ParseTags("host=A"), Time: 30, Value: "d"}, 478 {Name: "cpu", Tags: ParseTags("host=B"), Time: 1, Value: "b"}, 479 {Name: "mem", Tags: ParseTags("host=B"), Time: 4, Value: "h"}, 480 }}, 481 {Points: []query.StringPoint{ 482 {Name: "cpu", Tags: ParseTags("host=A"), Time: 20, Value: "g"}, 483 {Name: "cpu", Tags: ParseTags("host=B"), Time: 11, Value: "e"}, 484 {Name: "cpu", Tags: ParseTags("host=B"), Time: 13, Value: "f"}, 485 {Name: "mem", Tags: ParseTags("host=A"), Time: 25, Value: "i"}, 486 }}, 487 {Points: []query.StringPoint{}}, 488 } 489 itr := query.NewSortedMergeIterator(StringIterators(inputs), query.IteratorOptions{ 490 Interval: query.Interval{ 491 Duration: 10 * time.Nanosecond, 492 }, 493 Dimensions: []string{"host"}, 494 Ascending: true, 495 }) 496 if a, err := Iterators([]query.Iterator{itr}).ReadAll(); err != nil { 497 t.Fatalf("unexpected error: %s", err) 498 } else if !deep.Equal(a, [][]query.Point{ 499 {&query.StringPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 0, Value: "a"}}, 500 {&query.StringPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 12, Value: "c"}}, 501 {&query.StringPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 20, Value: "g"}}, 502 {&query.StringPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 30, Value: "d"}}, 503 {&query.StringPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 1, Value: "b"}}, 504 {&query.StringPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 11, Value: "e"}}, 505 {&query.StringPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 13, Value: "f"}}, 506 {&query.StringPoint{Name: "mem", Tags: ParseTags("host=A"), Time: 25, Value: "i"}}, 507 {&query.StringPoint{Name: "mem", Tags: ParseTags("host=B"), Time: 4, Value: "h"}}, 508 }) { 509 t.Errorf("unexpected points: %s", spew.Sdump(a)) 510 } 511 512 for i, input := range inputs { 513 if !input.Closed { 514 t.Errorf("iterator %d not closed", i) 515 } 516 } 517} 518 519// Ensure that a set of iterators can be merged together, sorted by name/tag. 520func TestSortedMergeIterator_Boolean(t *testing.T) { 521 inputs := []*BooleanIterator{ 522 {Points: []query.BooleanPoint{ 523 {Name: "cpu", Tags: ParseTags("host=A"), Time: 0, Value: true}, 524 {Name: "cpu", Tags: ParseTags("host=A"), Time: 12, Value: true}, 525 {Name: "cpu", Tags: ParseTags("host=A"), Time: 30, Value: false}, 526 {Name: "cpu", Tags: ParseTags("host=B"), Time: 1, Value: false}, 527 {Name: "mem", Tags: ParseTags("host=B"), Time: 4, Value: true}, 528 }}, 529 {Points: []query.BooleanPoint{ 530 {Name: "cpu", Tags: ParseTags("host=A"), Time: 20, Value: true}, 531 {Name: "cpu", Tags: ParseTags("host=B"), Time: 11, Value: true}, 532 {Name: "cpu", Tags: ParseTags("host=B"), Time: 13, Value: false}, 533 {Name: "mem", Tags: ParseTags("host=A"), Time: 25, Value: true}, 534 }}, 535 {Points: []query.BooleanPoint{}}, 536 } 537 itr := query.NewSortedMergeIterator(BooleanIterators(inputs), query.IteratorOptions{ 538 Interval: query.Interval{ 539 Duration: 10 * time.Nanosecond, 540 }, 541 Dimensions: []string{"host"}, 542 Ascending: true, 543 }) 544 if a, err := Iterators([]query.Iterator{itr}).ReadAll(); err != nil { 545 t.Fatalf("unexpected error: %s", err) 546 } else if !deep.Equal(a, [][]query.Point{ 547 {&query.BooleanPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 0, Value: true}}, 548 {&query.BooleanPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 12, Value: true}}, 549 {&query.BooleanPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 20, Value: true}}, 550 {&query.BooleanPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 30, Value: false}}, 551 {&query.BooleanPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 1, Value: false}}, 552 {&query.BooleanPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 11, Value: true}}, 553 {&query.BooleanPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 13, Value: false}}, 554 {&query.BooleanPoint{Name: "mem", Tags: ParseTags("host=A"), Time: 25, Value: true}}, 555 {&query.BooleanPoint{Name: "mem", Tags: ParseTags("host=B"), Time: 4, Value: true}}, 556 }) { 557 t.Errorf("unexpected points: %s", spew.Sdump(a)) 558 } 559 560 for i, input := range inputs { 561 if !input.Closed { 562 t.Errorf("iterator %d not closed", i) 563 } 564 } 565} 566 567func TestSortedMergeIterator_Nil(t *testing.T) { 568 itr := query.NewSortedMergeIterator([]query.Iterator{nil}, query.IteratorOptions{}) 569 if itr != nil { 570 t.Fatalf("unexpected iterator: %#v", itr) 571 } 572} 573 574func TestSortedMergeIterator_Coerce_Float(t *testing.T) { 575 inputs := []query.Iterator{ 576 &FloatIterator{Points: []query.FloatPoint{ 577 {Name: "cpu", Tags: ParseTags("host=A"), Time: 20, Value: 7}, 578 {Name: "cpu", Tags: ParseTags("host=B"), Time: 11, Value: 5}, 579 {Name: "cpu", Tags: ParseTags("host=B"), Time: 13, Value: 6}, 580 {Name: "mem", Tags: ParseTags("host=A"), Time: 25, Value: 9}, 581 }}, 582 &IntegerIterator{Points: []query.IntegerPoint{ 583 {Name: "cpu", Tags: ParseTags("host=A"), Time: 0, Value: 1}, 584 {Name: "cpu", Tags: ParseTags("host=A"), Time: 12, Value: 3}, 585 {Name: "cpu", Tags: ParseTags("host=A"), Time: 30, Value: 4}, 586 {Name: "cpu", Tags: ParseTags("host=B"), Time: 1, Value: 2}, 587 {Name: "mem", Tags: ParseTags("host=B"), Time: 4, Value: 8}, 588 }}, 589 } 590 591 itr := query.NewSortedMergeIterator(inputs, query.IteratorOptions{ 592 Interval: query.Interval{ 593 Duration: 10 * time.Nanosecond, 594 }, 595 Dimensions: []string{"host"}, 596 Ascending: true, 597 }) 598 if a, err := Iterators([]query.Iterator{itr}).ReadAll(); err != nil { 599 t.Fatalf("unexpected error: %s", err) 600 } else if !deep.Equal(a, [][]query.Point{ 601 {&query.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 20, Value: 7}}, 602 {&query.FloatPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 11, Value: 5}}, 603 {&query.FloatPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 13, Value: 6}}, 604 {&query.FloatPoint{Name: "mem", Tags: ParseTags("host=A"), Time: 25, Value: 9}}, 605 }) { 606 t.Errorf("unexpected points: %s", spew.Sdump(a)) 607 } 608 609 for i, input := range inputs { 610 switch input := input.(type) { 611 case *FloatIterator: 612 if !input.Closed { 613 t.Errorf("iterator %d not closed", i) 614 } 615 case *IntegerIterator: 616 if !input.Closed { 617 t.Errorf("iterator %d not closed", i) 618 } 619 case *UnsignedIterator: 620 if !input.Closed { 621 t.Errorf("iterator %d not closed", i) 622 } 623 } 624 } 625} 626 627// Ensure limit iterators work with limit and offset. 628func TestLimitIterator_Float(t *testing.T) { 629 input := &FloatIterator{Points: []query.FloatPoint{ 630 {Name: "cpu", Time: 0, Value: 1}, 631 {Name: "cpu", Time: 5, Value: 3}, 632 {Name: "cpu", Time: 10, Value: 5}, 633 {Name: "mem", Time: 5, Value: 3}, 634 {Name: "mem", Time: 7, Value: 8}, 635 }} 636 637 itr := query.NewLimitIterator(input, query.IteratorOptions{ 638 Limit: 1, 639 Offset: 1, 640 }) 641 642 if a, err := Iterators([]query.Iterator{itr}).ReadAll(); err != nil { 643 t.Fatalf("unexpected error: %s", err) 644 } else if !deep.Equal(a, [][]query.Point{ 645 {&query.FloatPoint{Name: "cpu", Time: 5, Value: 3}}, 646 {&query.FloatPoint{Name: "mem", Time: 7, Value: 8}}, 647 }) { 648 t.Fatalf("unexpected points: %s", spew.Sdump(a)) 649 } 650 651 if !input.Closed { 652 t.Error("iterator not closed") 653 } 654} 655 656// Ensure limit iterators work with limit and offset. 657func TestLimitIterator_Integer(t *testing.T) { 658 input := &IntegerIterator{Points: []query.IntegerPoint{ 659 {Name: "cpu", Time: 0, Value: 1}, 660 {Name: "cpu", Time: 5, Value: 3}, 661 {Name: "cpu", Time: 10, Value: 5}, 662 {Name: "mem", Time: 5, Value: 3}, 663 {Name: "mem", Time: 7, Value: 8}, 664 }} 665 666 itr := query.NewLimitIterator(input, query.IteratorOptions{ 667 Limit: 1, 668 Offset: 1, 669 }) 670 671 if a, err := Iterators([]query.Iterator{itr}).ReadAll(); err != nil { 672 t.Fatalf("unexpected error: %s", err) 673 } else if !deep.Equal(a, [][]query.Point{ 674 {&query.IntegerPoint{Name: "cpu", Time: 5, Value: 3}}, 675 {&query.IntegerPoint{Name: "mem", Time: 7, Value: 8}}, 676 }) { 677 t.Fatalf("unexpected points: %s", spew.Sdump(a)) 678 } 679 680 if !input.Closed { 681 t.Error("iterator not closed") 682 } 683} 684 685// Ensure limit iterators work with limit and offset. 686func TestLimitIterator_Unsigned(t *testing.T) { 687 input := &UnsignedIterator{Points: []query.UnsignedPoint{ 688 {Name: "cpu", Time: 0, Value: 1}, 689 {Name: "cpu", Time: 5, Value: 3}, 690 {Name: "cpu", Time: 10, Value: 5}, 691 {Name: "mem", Time: 5, Value: 3}, 692 {Name: "mem", Time: 7, Value: 8}, 693 }} 694 695 itr := query.NewLimitIterator(input, query.IteratorOptions{ 696 Limit: 1, 697 Offset: 1, 698 }) 699 700 if a, err := Iterators([]query.Iterator{itr}).ReadAll(); err != nil { 701 t.Fatalf("unexpected error: %s", err) 702 } else if !deep.Equal(a, [][]query.Point{ 703 {&query.UnsignedPoint{Name: "cpu", Time: 5, Value: 3}}, 704 {&query.UnsignedPoint{Name: "mem", Time: 7, Value: 8}}, 705 }) { 706 t.Fatalf("unexpected points: %s", spew.Sdump(a)) 707 } 708 709 if !input.Closed { 710 t.Error("iterator not closed") 711 } 712} 713 714// Ensure limit iterators work with limit and offset. 715func TestLimitIterator_String(t *testing.T) { 716 input := &StringIterator{Points: []query.StringPoint{ 717 {Name: "cpu", Time: 0, Value: "a"}, 718 {Name: "cpu", Time: 5, Value: "b"}, 719 {Name: "cpu", Time: 10, Value: "c"}, 720 {Name: "mem", Time: 5, Value: "d"}, 721 {Name: "mem", Time: 7, Value: "e"}, 722 }} 723 724 itr := query.NewLimitIterator(input, query.IteratorOptions{ 725 Limit: 1, 726 Offset: 1, 727 }) 728 729 if a, err := Iterators([]query.Iterator{itr}).ReadAll(); err != nil { 730 t.Fatalf("unexpected error: %s", err) 731 } else if !deep.Equal(a, [][]query.Point{ 732 {&query.StringPoint{Name: "cpu", Time: 5, Value: "b"}}, 733 {&query.StringPoint{Name: "mem", Time: 7, Value: "e"}}, 734 }) { 735 t.Fatalf("unexpected points: %s", spew.Sdump(a)) 736 } 737 738 if !input.Closed { 739 t.Error("iterator not closed") 740 } 741} 742 743// Ensure limit iterators work with limit and offset. 744func TestLimitIterator_Boolean(t *testing.T) { 745 input := &BooleanIterator{Points: []query.BooleanPoint{ 746 {Name: "cpu", Time: 0, Value: true}, 747 {Name: "cpu", Time: 5, Value: false}, 748 {Name: "cpu", Time: 10, Value: true}, 749 {Name: "mem", Time: 5, Value: false}, 750 {Name: "mem", Time: 7, Value: true}, 751 }} 752 753 itr := query.NewLimitIterator(input, query.IteratorOptions{ 754 Limit: 1, 755 Offset: 1, 756 }) 757 758 if a, err := Iterators([]query.Iterator{itr}).ReadAll(); err != nil { 759 t.Fatalf("unexpected error: %s", err) 760 } else if !deep.Equal(a, [][]query.Point{ 761 {&query.BooleanPoint{Name: "cpu", Time: 5, Value: false}}, 762 {&query.BooleanPoint{Name: "mem", Time: 7, Value: true}}, 763 }) { 764 t.Fatalf("unexpected points: %s", spew.Sdump(a)) 765 } 766 767 if !input.Closed { 768 t.Error("iterator not closed") 769 } 770} 771 772// Ensure limit iterator returns a subset of points. 773func TestLimitIterator(t *testing.T) { 774 itr := query.NewLimitIterator( 775 &FloatIterator{Points: []query.FloatPoint{ 776 {Time: 0, Value: 0}, 777 {Time: 1, Value: 1}, 778 {Time: 2, Value: 2}, 779 {Time: 3, Value: 3}, 780 }}, 781 query.IteratorOptions{ 782 Limit: 2, 783 Offset: 1, 784 StartTime: influxql.MinTime, 785 EndTime: influxql.MaxTime, 786 }, 787 ) 788 789 if a, err := (Iterators{itr}).ReadAll(); err != nil { 790 t.Fatalf("unexpected error: %s", err) 791 } else if !deep.Equal(a, [][]query.Point{ 792 {&query.FloatPoint{Time: 1, Value: 1}}, 793 {&query.FloatPoint{Time: 2, Value: 2}}, 794 }) { 795 t.Fatalf("unexpected points: %s", spew.Sdump(a)) 796 } 797} 798 799func TestFillIterator_ImplicitStartTime(t *testing.T) { 800 opt := query.IteratorOptions{ 801 StartTime: influxql.MinTime, 802 EndTime: mustParseTime("2000-01-01T01:00:00Z").UnixNano() - 1, 803 Interval: query.Interval{ 804 Duration: 20 * time.Minute, 805 }, 806 Ascending: true, 807 } 808 start := mustParseTime("2000-01-01T00:00:00Z").UnixNano() 809 itr := query.NewFillIterator( 810 &FloatIterator{Points: []query.FloatPoint{ 811 {Time: start, Value: 0}, 812 }}, 813 nil, 814 opt, 815 ) 816 817 if a, err := (Iterators{itr}).ReadAll(); err != nil { 818 t.Fatalf("unexpected error: %s", err) 819 } else if !deep.Equal(a, [][]query.Point{ 820 {&query.FloatPoint{Time: start, Value: 0}}, 821 {&query.FloatPoint{Time: start + int64(20*time.Minute), Nil: true}}, 822 {&query.FloatPoint{Time: start + int64(40*time.Minute), Nil: true}}, 823 }) { 824 t.Fatalf("unexpected points: %s", spew.Sdump(a)) 825 } 826} 827 828func TestFillIterator_DST(t *testing.T) { 829 for _, tt := range []struct { 830 name string 831 start, end time.Time 832 points []time.Duration 833 opt query.IteratorOptions 834 }{ 835 { 836 name: "Start_GroupByDay_Ascending", 837 start: mustParseTime("2000-04-01T00:00:00-08:00"), 838 end: mustParseTime("2000-04-05T00:00:00-07:00"), 839 points: []time.Duration{ 840 24 * time.Hour, 841 47 * time.Hour, 842 71 * time.Hour, 843 }, 844 opt: query.IteratorOptions{ 845 Interval: query.Interval{ 846 Duration: 24 * time.Hour, 847 }, 848 Location: LosAngeles, 849 Ascending: true, 850 }, 851 }, 852 { 853 name: "Start_GroupByDay_Descending", 854 start: mustParseTime("2000-04-01T00:00:00-08:00"), 855 end: mustParseTime("2000-04-05T00:00:00-07:00"), 856 points: []time.Duration{ 857 71 * time.Hour, 858 47 * time.Hour, 859 24 * time.Hour, 860 }, 861 opt: query.IteratorOptions{ 862 Interval: query.Interval{ 863 Duration: 24 * time.Hour, 864 }, 865 Location: LosAngeles, 866 Ascending: false, 867 }, 868 }, 869 { 870 name: "Start_GroupByHour_Ascending", 871 start: mustParseTime("2000-04-02T00:00:00-08:00"), 872 end: mustParseTime("2000-04-02T05:00:00-07:00"), 873 points: []time.Duration{ 874 1 * time.Hour, 875 2 * time.Hour, 876 3 * time.Hour, 877 }, 878 opt: query.IteratorOptions{ 879 Interval: query.Interval{ 880 Duration: 1 * time.Hour, 881 }, 882 Location: LosAngeles, 883 Ascending: true, 884 }, 885 }, 886 { 887 name: "Start_GroupByHour_Descending", 888 start: mustParseTime("2000-04-02T00:00:00-08:00"), 889 end: mustParseTime("2000-04-02T05:00:00-07:00"), 890 points: []time.Duration{ 891 3 * time.Hour, 892 2 * time.Hour, 893 1 * time.Hour, 894 }, 895 opt: query.IteratorOptions{ 896 Interval: query.Interval{ 897 Duration: 1 * time.Hour, 898 }, 899 Location: LosAngeles, 900 Ascending: false, 901 }, 902 }, 903 { 904 name: "Start_GroupBy2Hour_Ascending", 905 start: mustParseTime("2000-04-02T00:00:00-08:00"), 906 end: mustParseTime("2000-04-02T07:00:00-07:00"), 907 points: []time.Duration{ 908 2 * time.Hour, 909 3 * time.Hour, 910 5 * time.Hour, 911 }, 912 opt: query.IteratorOptions{ 913 Interval: query.Interval{ 914 Duration: 2 * time.Hour, 915 }, 916 Location: LosAngeles, 917 Ascending: true, 918 }, 919 }, 920 { 921 name: "Start_GroupBy2Hour_Descending", 922 start: mustParseTime("2000-04-02T00:00:00-08:00"), 923 end: mustParseTime("2000-04-02T07:00:00-07:00"), 924 points: []time.Duration{ 925 5 * time.Hour, 926 3 * time.Hour, 927 2 * time.Hour, 928 }, 929 opt: query.IteratorOptions{ 930 Interval: query.Interval{ 931 Duration: 2 * time.Hour, 932 }, 933 Location: LosAngeles, 934 Ascending: false, 935 }, 936 }, 937 { 938 name: "End_GroupByDay_Ascending", 939 start: mustParseTime("2000-10-28T00:00:00-07:00"), 940 end: mustParseTime("2000-11-01T00:00:00-08:00"), 941 points: []time.Duration{ 942 24 * time.Hour, 943 49 * time.Hour, 944 73 * time.Hour, 945 }, 946 opt: query.IteratorOptions{ 947 Interval: query.Interval{ 948 Duration: 24 * time.Hour, 949 }, 950 Location: LosAngeles, 951 Ascending: true, 952 }, 953 }, 954 { 955 name: "End_GroupByDay_Descending", 956 start: mustParseTime("2000-10-28T00:00:00-07:00"), 957 end: mustParseTime("2000-11-01T00:00:00-08:00"), 958 points: []time.Duration{ 959 73 * time.Hour, 960 49 * time.Hour, 961 24 * time.Hour, 962 }, 963 opt: query.IteratorOptions{ 964 Interval: query.Interval{ 965 Duration: 24 * time.Hour, 966 }, 967 Location: LosAngeles, 968 Ascending: false, 969 }, 970 }, 971 { 972 name: "End_GroupByHour_Ascending", 973 start: mustParseTime("2000-10-29T00:00:00-07:00"), 974 end: mustParseTime("2000-10-29T03:00:00-08:00"), 975 points: []time.Duration{ 976 1 * time.Hour, 977 2 * time.Hour, 978 3 * time.Hour, 979 }, 980 opt: query.IteratorOptions{ 981 Interval: query.Interval{ 982 Duration: 1 * time.Hour, 983 }, 984 Location: LosAngeles, 985 Ascending: true, 986 }, 987 }, 988 { 989 name: "End_GroupByHour_Descending", 990 start: mustParseTime("2000-10-29T00:00:00-07:00"), 991 end: mustParseTime("2000-10-29T03:00:00-08:00"), 992 points: []time.Duration{ 993 3 * time.Hour, 994 2 * time.Hour, 995 1 * time.Hour, 996 }, 997 opt: query.IteratorOptions{ 998 Interval: query.Interval{ 999 Duration: 1 * time.Hour, 1000 }, 1001 Location: LosAngeles, 1002 Ascending: false, 1003 }, 1004 }, 1005 } { 1006 t.Run(tt.name, func(t *testing.T) { 1007 opt := tt.opt 1008 opt.StartTime = tt.start.UnixNano() 1009 opt.EndTime = tt.end.UnixNano() - 1 1010 1011 points := make([][]query.Point, 0, len(tt.points)+1) 1012 if opt.Ascending { 1013 points = append(points, []query.Point{ 1014 &query.FloatPoint{ 1015 Time: tt.start.UnixNano(), 1016 }, 1017 }) 1018 } 1019 for _, d := range tt.points { 1020 points = append(points, []query.Point{ 1021 &query.FloatPoint{ 1022 Time: tt.start.Add(d).UnixNano(), 1023 Nil: true, 1024 }, 1025 }) 1026 } 1027 if !opt.Ascending { 1028 points = append(points, []query.Point{ 1029 &query.FloatPoint{ 1030 Time: tt.start.UnixNano(), 1031 }, 1032 }) 1033 } 1034 itr := query.NewFillIterator( 1035 &FloatIterator{Points: []query.FloatPoint{{Time: tt.start.UnixNano(), Value: 0}}}, 1036 nil, 1037 opt, 1038 ) 1039 1040 if a, err := (Iterators{itr}).ReadAll(); err != nil { 1041 t.Fatalf("unexpected error: %s", err) 1042 } else if !deep.Equal(a, points) { 1043 t.Fatalf("unexpected points: %s", spew.Sdump(a)) 1044 } 1045 }) 1046 } 1047} 1048 1049// Iterators is a test wrapper for iterators. 1050type Iterators []query.Iterator 1051 1052// Next returns the next value from each iterator. 1053// Returns nil if any iterator returns a nil. 1054func (itrs Iterators) Next() ([]query.Point, error) { 1055 a := make([]query.Point, len(itrs)) 1056 for i, itr := range itrs { 1057 switch itr := itr.(type) { 1058 case query.FloatIterator: 1059 fp, err := itr.Next() 1060 if fp == nil || err != nil { 1061 return nil, err 1062 } 1063 a[i] = fp 1064 case query.IntegerIterator: 1065 ip, err := itr.Next() 1066 if ip == nil || err != nil { 1067 return nil, err 1068 } 1069 a[i] = ip 1070 case query.UnsignedIterator: 1071 up, err := itr.Next() 1072 if up == nil || err != nil { 1073 return nil, err 1074 } 1075 a[i] = up 1076 case query.StringIterator: 1077 sp, err := itr.Next() 1078 if sp == nil || err != nil { 1079 return nil, err 1080 } 1081 a[i] = sp 1082 case query.BooleanIterator: 1083 bp, err := itr.Next() 1084 if bp == nil || err != nil { 1085 return nil, err 1086 } 1087 a[i] = bp 1088 default: 1089 panic(fmt.Sprintf("iterator type not supported: %T", itr)) 1090 } 1091 } 1092 return a, nil 1093} 1094 1095// ReadAll reads all points from all iterators. 1096func (itrs Iterators) ReadAll() ([][]query.Point, error) { 1097 var a [][]query.Point 1098 1099 // Read from every iterator until a nil is encountered. 1100 for { 1101 points, err := itrs.Next() 1102 if err != nil { 1103 return nil, err 1104 } else if points == nil { 1105 break 1106 } 1107 a = append(a, query.Points(points).Clone()) 1108 } 1109 1110 // Close all iterators. 1111 query.Iterators(itrs).Close() 1112 1113 return a, nil 1114} 1115 1116func TestIteratorOptions_Window_Interval(t *testing.T) { 1117 opt := query.IteratorOptions{ 1118 Interval: query.Interval{ 1119 Duration: 10, 1120 }, 1121 } 1122 1123 start, end := opt.Window(4) 1124 if start != 0 { 1125 t.Errorf("expected start to be 0, got %d", start) 1126 } 1127 if end != 10 { 1128 t.Errorf("expected end to be 10, got %d", end) 1129 } 1130} 1131 1132func TestIteratorOptions_Window_Offset(t *testing.T) { 1133 opt := query.IteratorOptions{ 1134 Interval: query.Interval{ 1135 Duration: 10, 1136 Offset: 8, 1137 }, 1138 } 1139 1140 start, end := opt.Window(14) 1141 if start != 8 { 1142 t.Errorf("expected start to be 8, got %d", start) 1143 } 1144 if end != 18 { 1145 t.Errorf("expected end to be 18, got %d", end) 1146 } 1147} 1148 1149func TestIteratorOptions_Window_Default(t *testing.T) { 1150 opt := query.IteratorOptions{ 1151 StartTime: 0, 1152 EndTime: 60, 1153 } 1154 1155 start, end := opt.Window(34) 1156 if start != 0 { 1157 t.Errorf("expected start to be 0, got %d", start) 1158 } 1159 if end != 61 { 1160 t.Errorf("expected end to be 61, got %d", end) 1161 } 1162} 1163 1164func TestIteratorOptions_Window_Location(t *testing.T) { 1165 for _, tt := range []struct { 1166 now time.Time 1167 start, end time.Time 1168 interval time.Duration 1169 }{ 1170 { 1171 now: mustParseTime("2000-04-02T12:14:15-07:00"), 1172 start: mustParseTime("2000-04-02T00:00:00-08:00"), 1173 end: mustParseTime("2000-04-03T00:00:00-07:00"), 1174 interval: 24 * time.Hour, 1175 }, 1176 { 1177 now: mustParseTime("2000-04-02T01:17:12-08:00"), 1178 start: mustParseTime("2000-04-02T00:00:00-08:00"), 1179 end: mustParseTime("2000-04-03T00:00:00-07:00"), 1180 interval: 24 * time.Hour, 1181 }, 1182 { 1183 now: mustParseTime("2000-04-02T01:14:15-08:00"), 1184 start: mustParseTime("2000-04-02T00:00:00-08:00"), 1185 end: mustParseTime("2000-04-02T03:00:00-07:00"), 1186 interval: 2 * time.Hour, 1187 }, 1188 { 1189 now: mustParseTime("2000-04-02T03:17:12-07:00"), 1190 start: mustParseTime("2000-04-02T03:00:00-07:00"), 1191 end: mustParseTime("2000-04-02T04:00:00-07:00"), 1192 interval: 2 * time.Hour, 1193 }, 1194 { 1195 now: mustParseTime("2000-04-02T01:14:15-08:00"), 1196 start: mustParseTime("2000-04-02T01:00:00-08:00"), 1197 end: mustParseTime("2000-04-02T03:00:00-07:00"), 1198 interval: 1 * time.Hour, 1199 }, 1200 { 1201 now: mustParseTime("2000-04-02T03:17:12-07:00"), 1202 start: mustParseTime("2000-04-02T03:00:00-07:00"), 1203 end: mustParseTime("2000-04-02T04:00:00-07:00"), 1204 interval: 1 * time.Hour, 1205 }, 1206 { 1207 now: mustParseTime("2000-10-29T12:14:15-08:00"), 1208 start: mustParseTime("2000-10-29T00:00:00-07:00"), 1209 end: mustParseTime("2000-10-30T00:00:00-08:00"), 1210 interval: 24 * time.Hour, 1211 }, 1212 { 1213 now: mustParseTime("2000-10-29T01:17:12-07:00"), 1214 start: mustParseTime("2000-10-29T00:00:00-07:00"), 1215 end: mustParseTime("2000-10-30T00:00:00-08:00"), 1216 interval: 24 * time.Hour, 1217 }, 1218 { 1219 now: mustParseTime("2000-10-29T01:14:15-07:00"), 1220 start: mustParseTime("2000-10-29T00:00:00-07:00"), 1221 end: mustParseTime("2000-10-29T02:00:00-08:00"), 1222 interval: 2 * time.Hour, 1223 }, 1224 { 1225 now: mustParseTime("2000-10-29T03:17:12-08:00"), 1226 start: mustParseTime("2000-10-29T02:00:00-08:00"), 1227 end: mustParseTime("2000-10-29T04:00:00-08:00"), 1228 interval: 2 * time.Hour, 1229 }, 1230 { 1231 now: mustParseTime("2000-10-29T01:14:15-07:00"), 1232 start: mustParseTime("2000-10-29T01:00:00-07:00"), 1233 end: mustParseTime("2000-10-29T01:00:00-08:00"), 1234 interval: 1 * time.Hour, 1235 }, 1236 { 1237 now: mustParseTime("2000-10-29T02:17:12-07:00"), 1238 start: mustParseTime("2000-10-29T02:00:00-07:00"), 1239 end: mustParseTime("2000-10-29T03:00:00-07:00"), 1240 interval: 1 * time.Hour, 1241 }, 1242 } { 1243 t.Run(fmt.Sprintf("%s/%s", tt.now, tt.interval), func(t *testing.T) { 1244 opt := query.IteratorOptions{ 1245 Location: LosAngeles, 1246 Interval: query.Interval{ 1247 Duration: tt.interval, 1248 }, 1249 } 1250 start, end := opt.Window(tt.now.UnixNano()) 1251 if have, want := time.Unix(0, start).In(LosAngeles), tt.start; !have.Equal(want) { 1252 t.Errorf("unexpected start time: %s != %s", have, want) 1253 } 1254 if have, want := time.Unix(0, end).In(LosAngeles), tt.end; !have.Equal(want) { 1255 t.Errorf("unexpected end time: %s != %s", have, want) 1256 } 1257 }) 1258 } 1259} 1260 1261func TestIteratorOptions_Window_MinTime(t *testing.T) { 1262 opt := query.IteratorOptions{ 1263 StartTime: influxql.MinTime, 1264 EndTime: influxql.MaxTime, 1265 Interval: query.Interval{ 1266 Duration: time.Hour, 1267 }, 1268 } 1269 expected := time.Unix(0, influxql.MinTime).Add(time.Hour).Truncate(time.Hour) 1270 1271 start, end := opt.Window(influxql.MinTime) 1272 if start != influxql.MinTime { 1273 t.Errorf("expected start to be %d, got %d", influxql.MinTime, start) 1274 } 1275 if have, want := end, expected.UnixNano(); have != want { 1276 t.Errorf("expected end to be %d, got %d", want, have) 1277 } 1278} 1279 1280func TestIteratorOptions_Window_MaxTime(t *testing.T) { 1281 opt := query.IteratorOptions{ 1282 StartTime: influxql.MinTime, 1283 EndTime: influxql.MaxTime, 1284 Interval: query.Interval{ 1285 Duration: time.Hour, 1286 }, 1287 } 1288 expected := time.Unix(0, influxql.MaxTime).Truncate(time.Hour) 1289 1290 start, end := opt.Window(influxql.MaxTime) 1291 if have, want := start, expected.UnixNano(); have != want { 1292 t.Errorf("expected start to be %d, got %d", want, have) 1293 } 1294 if end != influxql.MaxTime { 1295 t.Errorf("expected end to be %d, got %d", influxql.MaxTime, end) 1296 } 1297} 1298 1299func TestIteratorOptions_SeekTime_Ascending(t *testing.T) { 1300 opt := query.IteratorOptions{ 1301 StartTime: 30, 1302 EndTime: 60, 1303 Ascending: true, 1304 } 1305 1306 time := opt.SeekTime() 1307 if time != 30 { 1308 t.Errorf("expected time to be 30, got %d", time) 1309 } 1310} 1311 1312func TestIteratorOptions_SeekTime_Descending(t *testing.T) { 1313 opt := query.IteratorOptions{ 1314 StartTime: 30, 1315 EndTime: 60, 1316 Ascending: false, 1317 } 1318 1319 time := opt.SeekTime() 1320 if time != 60 { 1321 t.Errorf("expected time to be 60, got %d", time) 1322 } 1323} 1324 1325func TestIteratorOptions_DerivativeInterval_Default(t *testing.T) { 1326 opt := query.IteratorOptions{} 1327 expected := query.Interval{Duration: time.Second} 1328 actual := opt.DerivativeInterval() 1329 if actual != expected { 1330 t.Errorf("expected derivative interval to be %v, got %v", expected, actual) 1331 } 1332} 1333 1334func TestIteratorOptions_DerivativeInterval_GroupBy(t *testing.T) { 1335 opt := query.IteratorOptions{ 1336 Interval: query.Interval{ 1337 Duration: 10, 1338 Offset: 2, 1339 }, 1340 } 1341 expected := query.Interval{Duration: 10} 1342 actual := opt.DerivativeInterval() 1343 if actual != expected { 1344 t.Errorf("expected derivative interval to be %v, got %v", expected, actual) 1345 } 1346} 1347 1348func TestIteratorOptions_DerivativeInterval_Call(t *testing.T) { 1349 opt := query.IteratorOptions{ 1350 Expr: &influxql.Call{ 1351 Name: "mean", 1352 Args: []influxql.Expr{ 1353 &influxql.VarRef{Val: "value"}, 1354 &influxql.DurationLiteral{Val: 2 * time.Second}, 1355 }, 1356 }, 1357 Interval: query.Interval{ 1358 Duration: 10, 1359 Offset: 2, 1360 }, 1361 } 1362 expected := query.Interval{Duration: 2 * time.Second} 1363 actual := opt.DerivativeInterval() 1364 if actual != expected { 1365 t.Errorf("expected derivative interval to be %v, got %v", expected, actual) 1366 } 1367} 1368 1369func TestIteratorOptions_ElapsedInterval_Default(t *testing.T) { 1370 opt := query.IteratorOptions{} 1371 expected := query.Interval{Duration: time.Nanosecond} 1372 actual := opt.ElapsedInterval() 1373 if actual != expected { 1374 t.Errorf("expected elapsed interval to be %v, got %v", expected, actual) 1375 } 1376} 1377 1378func TestIteratorOptions_ElapsedInterval_GroupBy(t *testing.T) { 1379 opt := query.IteratorOptions{ 1380 Interval: query.Interval{ 1381 Duration: 10, 1382 Offset: 2, 1383 }, 1384 } 1385 expected := query.Interval{Duration: time.Nanosecond} 1386 actual := opt.ElapsedInterval() 1387 if actual != expected { 1388 t.Errorf("expected elapsed interval to be %v, got %v", expected, actual) 1389 } 1390} 1391 1392func TestIteratorOptions_ElapsedInterval_Call(t *testing.T) { 1393 opt := query.IteratorOptions{ 1394 Expr: &influxql.Call{ 1395 Name: "mean", 1396 Args: []influxql.Expr{ 1397 &influxql.VarRef{Val: "value"}, 1398 &influxql.DurationLiteral{Val: 2 * time.Second}, 1399 }, 1400 }, 1401 Interval: query.Interval{ 1402 Duration: 10, 1403 Offset: 2, 1404 }, 1405 } 1406 expected := query.Interval{Duration: 2 * time.Second} 1407 actual := opt.ElapsedInterval() 1408 if actual != expected { 1409 t.Errorf("expected elapsed interval to be %v, got %v", expected, actual) 1410 } 1411} 1412 1413func TestIteratorOptions_IntegralInterval_Default(t *testing.T) { 1414 opt := query.IteratorOptions{} 1415 expected := query.Interval{Duration: time.Second} 1416 actual := opt.IntegralInterval() 1417 if actual != expected { 1418 t.Errorf("expected default integral interval to be %v, got %v", expected, actual) 1419 } 1420} 1421 1422// Ensure iterator options can be marshaled to and from a binary format. 1423func TestIteratorOptions_MarshalBinary(t *testing.T) { 1424 opt := &query.IteratorOptions{ 1425 Expr: MustParseExpr("count(value)"), 1426 Aux: []influxql.VarRef{{Val: "a"}, {Val: "b"}, {Val: "c"}}, 1427 Interval: query.Interval{ 1428 Duration: 1 * time.Hour, 1429 Offset: 20 * time.Minute, 1430 }, 1431 Dimensions: []string{"region", "host"}, 1432 GroupBy: map[string]struct{}{ 1433 "region": {}, 1434 "host": {}, 1435 "cluster": {}, 1436 }, 1437 Fill: influxql.NumberFill, 1438 FillValue: float64(100), 1439 Condition: MustParseExpr(`foo = 'bar'`), 1440 StartTime: 1000, 1441 EndTime: 2000, 1442 Ascending: true, 1443 Limit: 100, 1444 Offset: 200, 1445 SLimit: 300, 1446 SOffset: 400, 1447 StripName: true, 1448 Dedupe: true, 1449 } 1450 1451 // Marshal to binary. 1452 buf, err := opt.MarshalBinary() 1453 if err != nil { 1454 t.Fatal(err) 1455 } 1456 1457 // Unmarshal back to an object. 1458 var other query.IteratorOptions 1459 if err := other.UnmarshalBinary(buf); err != nil { 1460 t.Fatal(err) 1461 } else if !reflect.DeepEqual(&other, opt) { 1462 t.Fatalf("unexpected options: %s", spew.Sdump(other)) 1463 } 1464} 1465 1466// Ensure iterator can be encoded and decoded over a byte stream. 1467func TestIterator_EncodeDecode(t *testing.T) { 1468 var buf bytes.Buffer 1469 1470 // Create an iterator with several points & stats. 1471 itr := &FloatIterator{ 1472 Points: []query.FloatPoint{ 1473 {Name: "cpu", Tags: ParseTags("host=A"), Time: 0, Value: 0}, 1474 {Name: "mem", Tags: ParseTags("host=B"), Time: 1, Value: 10}, 1475 }, 1476 stats: query.IteratorStats{ 1477 SeriesN: 2, 1478 PointN: 0, 1479 }, 1480 } 1481 1482 // Encode to the buffer. 1483 enc := query.NewIteratorEncoder(&buf) 1484 enc.StatsInterval = 100 * time.Millisecond 1485 if err := enc.EncodeIterator(itr); err != nil { 1486 t.Fatal(err) 1487 } 1488 1489 // Decode from the buffer. 1490 dec := query.NewReaderIterator(context.Background(), &buf, influxql.Float, itr.Stats()) 1491 1492 // Initial stats should exist immediately. 1493 fdec := dec.(query.FloatIterator) 1494 if stats := fdec.Stats(); !reflect.DeepEqual(stats, query.IteratorStats{SeriesN: 2, PointN: 0}) { 1495 t.Fatalf("unexpected stats(initial): %#v", stats) 1496 } 1497 1498 // Read both points. 1499 if p, err := fdec.Next(); err != nil { 1500 t.Fatalf("unexpected error(0): %#v", err) 1501 } else if !reflect.DeepEqual(p, &query.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 0, Value: 0}) { 1502 t.Fatalf("unexpected point(0); %#v", p) 1503 } 1504 if p, err := fdec.Next(); err != nil { 1505 t.Fatalf("unexpected error(1): %#v", err) 1506 } else if !reflect.DeepEqual(p, &query.FloatPoint{Name: "mem", Tags: ParseTags("host=B"), Time: 1, Value: 10}) { 1507 t.Fatalf("unexpected point(1); %#v", p) 1508 } 1509 if p, err := fdec.Next(); err != nil { 1510 t.Fatalf("unexpected error(eof): %#v", err) 1511 } else if p != nil { 1512 t.Fatalf("unexpected point(eof); %#v", p) 1513 } 1514} 1515 1516// Test implementation of influxql.FloatIterator 1517type FloatIterator struct { 1518 Context context.Context 1519 Points []query.FloatPoint 1520 Closed bool 1521 Delay time.Duration 1522 stats query.IteratorStats 1523 point query.FloatPoint 1524} 1525 1526func (itr *FloatIterator) Stats() query.IteratorStats { return itr.stats } 1527func (itr *FloatIterator) Close() error { itr.Closed = true; return nil } 1528 1529// Next returns the next value and shifts it off the beginning of the points slice. 1530func (itr *FloatIterator) Next() (*query.FloatPoint, error) { 1531 if len(itr.Points) == 0 || itr.Closed { 1532 return nil, nil 1533 } 1534 1535 // If we have asked for a delay, then delay the returning of the point 1536 // until either an (optional) context is done or the time has passed. 1537 if itr.Delay > 0 { 1538 var done <-chan struct{} 1539 if itr.Context != nil { 1540 done = itr.Context.Done() 1541 } 1542 1543 timer := time.NewTimer(itr.Delay) 1544 select { 1545 case <-timer.C: 1546 case <-done: 1547 timer.Stop() 1548 return nil, itr.Context.Err() 1549 } 1550 } 1551 v := &itr.Points[0] 1552 itr.Points = itr.Points[1:] 1553 1554 // Copy the returned point into a static point that we return. 1555 // This actual storage engine returns a point from the same memory location 1556 // so we need to test that the query engine does not misuse this memory. 1557 itr.point.Name = v.Name 1558 itr.point.Tags = v.Tags 1559 itr.point.Time = v.Time 1560 itr.point.Value = v.Value 1561 itr.point.Nil = v.Nil 1562 if len(itr.point.Aux) != len(v.Aux) { 1563 itr.point.Aux = make([]interface{}, len(v.Aux)) 1564 } 1565 copy(itr.point.Aux, v.Aux) 1566 return &itr.point, nil 1567} 1568 1569func FloatIterators(inputs []*FloatIterator) []query.Iterator { 1570 itrs := make([]query.Iterator, len(inputs)) 1571 for i := range itrs { 1572 itrs[i] = query.Iterator(inputs[i]) 1573 } 1574 return itrs 1575} 1576 1577// Test implementation of query.IntegerIterator 1578type IntegerIterator struct { 1579 Points []query.IntegerPoint 1580 Closed bool 1581 stats query.IteratorStats 1582 point query.IntegerPoint 1583} 1584 1585func (itr *IntegerIterator) Stats() query.IteratorStats { return itr.stats } 1586func (itr *IntegerIterator) Close() error { itr.Closed = true; return nil } 1587 1588// Next returns the next value and shifts it off the beginning of the points slice. 1589func (itr *IntegerIterator) Next() (*query.IntegerPoint, error) { 1590 if len(itr.Points) == 0 || itr.Closed { 1591 return nil, nil 1592 } 1593 1594 v := &itr.Points[0] 1595 itr.Points = itr.Points[1:] 1596 1597 // Copy the returned point into a static point that we return. 1598 // This actual storage engine returns a point from the same memory location 1599 // so we need to test that the query engine does not misuse this memory. 1600 itr.point.Name = v.Name 1601 itr.point.Tags = v.Tags 1602 itr.point.Time = v.Time 1603 itr.point.Value = v.Value 1604 itr.point.Nil = v.Nil 1605 if len(itr.point.Aux) != len(v.Aux) { 1606 itr.point.Aux = make([]interface{}, len(v.Aux)) 1607 } 1608 copy(itr.point.Aux, v.Aux) 1609 return &itr.point, nil 1610} 1611 1612func IntegerIterators(inputs []*IntegerIterator) []query.Iterator { 1613 itrs := make([]query.Iterator, len(inputs)) 1614 for i := range itrs { 1615 itrs[i] = query.Iterator(inputs[i]) 1616 } 1617 return itrs 1618} 1619 1620// Test implementation of query.UnsignedIterator 1621type UnsignedIterator struct { 1622 Points []query.UnsignedPoint 1623 Closed bool 1624 stats query.IteratorStats 1625 point query.UnsignedPoint 1626} 1627 1628func (itr *UnsignedIterator) Stats() query.IteratorStats { return itr.stats } 1629func (itr *UnsignedIterator) Close() error { itr.Closed = true; return nil } 1630 1631// Next returns the next value and shifts it off the beginning of the points slice. 1632func (itr *UnsignedIterator) Next() (*query.UnsignedPoint, error) { 1633 if len(itr.Points) == 0 || itr.Closed { 1634 return nil, nil 1635 } 1636 1637 v := &itr.Points[0] 1638 itr.Points = itr.Points[1:] 1639 1640 // Copy the returned point into a static point that we return. 1641 // This actual storage engine returns a point from the same memory location 1642 // so we need to test that the query engine does not misuse this memory. 1643 itr.point.Name = v.Name 1644 itr.point.Tags = v.Tags 1645 itr.point.Time = v.Time 1646 itr.point.Value = v.Value 1647 itr.point.Nil = v.Nil 1648 if len(itr.point.Aux) != len(v.Aux) { 1649 itr.point.Aux = make([]interface{}, len(v.Aux)) 1650 } 1651 copy(itr.point.Aux, v.Aux) 1652 return &itr.point, nil 1653} 1654 1655func UnsignedIterators(inputs []*UnsignedIterator) []query.Iterator { 1656 itrs := make([]query.Iterator, len(inputs)) 1657 for i := range itrs { 1658 itrs[i] = query.Iterator(inputs[i]) 1659 } 1660 return itrs 1661} 1662 1663// Test implementation of query.StringIterator 1664type StringIterator struct { 1665 Points []query.StringPoint 1666 Closed bool 1667 stats query.IteratorStats 1668 point query.StringPoint 1669} 1670 1671func (itr *StringIterator) Stats() query.IteratorStats { return itr.stats } 1672func (itr *StringIterator) Close() error { itr.Closed = true; return nil } 1673 1674// Next returns the next value and shifts it off the beginning of the points slice. 1675func (itr *StringIterator) Next() (*query.StringPoint, error) { 1676 if len(itr.Points) == 0 || itr.Closed { 1677 return nil, nil 1678 } 1679 1680 v := &itr.Points[0] 1681 itr.Points = itr.Points[1:] 1682 1683 // Copy the returned point into a static point that we return. 1684 // This actual storage engine returns a point from the same memory location 1685 // so we need to test that the query engine does not misuse this memory. 1686 itr.point.Name = v.Name 1687 itr.point.Tags = v.Tags 1688 itr.point.Time = v.Time 1689 itr.point.Value = v.Value 1690 itr.point.Nil = v.Nil 1691 if len(itr.point.Aux) != len(v.Aux) { 1692 itr.point.Aux = make([]interface{}, len(v.Aux)) 1693 } 1694 copy(itr.point.Aux, v.Aux) 1695 return &itr.point, nil 1696} 1697 1698func StringIterators(inputs []*StringIterator) []query.Iterator { 1699 itrs := make([]query.Iterator, len(inputs)) 1700 for i := range itrs { 1701 itrs[i] = query.Iterator(inputs[i]) 1702 } 1703 return itrs 1704} 1705 1706// Test implementation of query.BooleanIterator 1707type BooleanIterator struct { 1708 Points []query.BooleanPoint 1709 Closed bool 1710 stats query.IteratorStats 1711 point query.BooleanPoint 1712} 1713 1714func (itr *BooleanIterator) Stats() query.IteratorStats { return itr.stats } 1715func (itr *BooleanIterator) Close() error { itr.Closed = true; return nil } 1716 1717// Next returns the next value and shifts it off the beginning of the points slice. 1718func (itr *BooleanIterator) Next() (*query.BooleanPoint, error) { 1719 if len(itr.Points) == 0 || itr.Closed { 1720 return nil, nil 1721 } 1722 1723 v := &itr.Points[0] 1724 itr.Points = itr.Points[1:] 1725 1726 // Copy the returned point into a static point that we return. 1727 // This actual storage engine returns a point from the same memory location 1728 // so we need to test that the query engine does not misuse this memory. 1729 itr.point.Name = v.Name 1730 itr.point.Tags = v.Tags 1731 itr.point.Time = v.Time 1732 itr.point.Value = v.Value 1733 itr.point.Nil = v.Nil 1734 if len(itr.point.Aux) != len(v.Aux) { 1735 itr.point.Aux = make([]interface{}, len(v.Aux)) 1736 } 1737 copy(itr.point.Aux, v.Aux) 1738 return &itr.point, nil 1739} 1740 1741func BooleanIterators(inputs []*BooleanIterator) []query.Iterator { 1742 itrs := make([]query.Iterator, len(inputs)) 1743 for i := range itrs { 1744 itrs[i] = query.Iterator(inputs[i]) 1745 } 1746 return itrs 1747} 1748 1749// MustParseSelectStatement parses a select statement. Panic on error. 1750func MustParseSelectStatement(s string) *influxql.SelectStatement { 1751 stmt, err := influxql.NewParser(strings.NewReader(s)).ParseStatement() 1752 if err != nil { 1753 panic(err) 1754 } 1755 return stmt.(*influxql.SelectStatement) 1756} 1757 1758// MustParseExpr parses an expression. Panic on error. 1759func MustParseExpr(s string) influxql.Expr { 1760 expr, err := influxql.NewParser(strings.NewReader(s)).ParseExpr() 1761 if err != nil { 1762 panic(err) 1763 } 1764 return expr 1765} 1766 1767// mustParseTime parses an IS0-8601 string. Panic on error. 1768func mustParseTime(s string) time.Time { 1769 t, err := time.Parse(time.RFC3339, s) 1770 if err != nil { 1771 panic(err.Error()) 1772 } 1773 return t 1774} 1775 1776func mustLoadLocation(s string) *time.Location { 1777 l, err := time.LoadLocation(s) 1778 if err != nil { 1779 panic(err) 1780 } 1781 return l 1782} 1783 1784var LosAngeles = mustLoadLocation("America/Los_Angeles") 1785