1package inmem 2 3import ( 4 "bytes" 5 "fmt" 6 "regexp" 7 "sort" 8 "sync" 9 "unsafe" 10 11 "github.com/influxdata/influxdb/models" 12 "github.com/influxdata/influxdb/pkg/bytesutil" 13 "github.com/influxdata/influxdb/pkg/radix" 14 "github.com/influxdata/influxdb/query" 15 "github.com/influxdata/influxdb/tsdb" 16 "github.com/influxdata/influxql" 17) 18 19// Measurement represents a collection of time series in a database. It also 20// contains in memory structures for indexing tags. Exported functions are 21// goroutine safe while un-exported functions assume the caller will use the 22// appropriate locks. 23type measurement struct { 24 Database string 25 Name string `json:"name,omitempty"` 26 NameBytes []byte // cached version as []byte 27 28 mu sync.RWMutex 29 fieldNames map[string]struct{} 30 31 // in-memory index fields 32 seriesByID map[uint64]*series // lookup table for series by their id 33 seriesByTagKeyValue map[string]*tagKeyValue // map from tag key to value to sorted set of series ids 34 35 // lazyily created sorted series IDs 36 sortedSeriesIDs seriesIDs // sorted list of series IDs in this measurement 37 38 // Indicates whether the seriesByTagKeyValueMap needs to be rebuilt as it contains deleted series 39 // that waste memory. 40 dirty bool 41} 42 43// newMeasurement allocates and initializes a new Measurement. 44func newMeasurement(database, name string) *measurement { 45 return &measurement{ 46 Database: database, 47 Name: name, 48 NameBytes: []byte(name), 49 50 fieldNames: make(map[string]struct{}), 51 seriesByID: make(map[uint64]*series), 52 seriesByTagKeyValue: make(map[string]*tagKeyValue), 53 } 54} 55 56// bytes estimates the memory footprint of this measurement, in bytes. 57func (m *measurement) bytes() int { 58 var b int 59 m.mu.RLock() 60 b += int(unsafe.Sizeof(m.Database)) + len(m.Database) 61 b += int(unsafe.Sizeof(m.Name)) + len(m.Name) 62 if m.NameBytes != nil { 63 b += int(unsafe.Sizeof(m.NameBytes)) + len(m.NameBytes) 64 } 65 b += 24 // 24 bytes for m.mu RWMutex 66 b += int(unsafe.Sizeof(m.fieldNames)) 67 for fieldName := range m.fieldNames { 68 b += int(unsafe.Sizeof(fieldName)) + len(fieldName) 69 } 70 b += int(unsafe.Sizeof(m.seriesByID)) 71 for k, v := range m.seriesByID { 72 b += int(unsafe.Sizeof(k)) 73 b += int(unsafe.Sizeof(v)) 74 // Do not count footprint of each series, to avoid double-counting in Index.bytes(). 75 } 76 b += int(unsafe.Sizeof(m.seriesByTagKeyValue)) 77 for k, v := range m.seriesByTagKeyValue { 78 b += int(unsafe.Sizeof(k)) + len(k) 79 b += int(unsafe.Sizeof(v)) + v.bytes() 80 } 81 b += int(unsafe.Sizeof(m.sortedSeriesIDs)) 82 for _, seriesID := range m.sortedSeriesIDs { 83 b += int(unsafe.Sizeof(seriesID)) 84 } 85 b += int(unsafe.Sizeof(m.dirty)) 86 m.mu.RUnlock() 87 return b 88} 89 90// Authorized determines if this Measurement is authorized to be read, according 91// to the provided Authorizer. A measurement is authorized to be read if at 92// least one undeleted series from the measurement is authorized to be read. 93func (m *measurement) Authorized(auth query.Authorizer) bool { 94 // Note(edd): the cost of this check scales linearly with the number of series 95 // belonging to a measurement, which means it may become expensive when there 96 // are large numbers of series on a measurement. 97 // 98 // In the future we might want to push the set of series down into the 99 // authorizer, but that will require an API change. 100 for _, s := range m.SeriesByIDMap() { 101 if s != nil && s.Deleted() { 102 continue 103 } 104 105 if query.AuthorizerIsOpen(auth) || auth.AuthorizeSeriesRead(m.Database, m.NameBytes, s.Tags) { 106 return true 107 } 108 } 109 return false 110} 111 112func (m *measurement) HasField(name string) bool { 113 m.mu.RLock() 114 _, hasField := m.fieldNames[name] 115 m.mu.RUnlock() 116 return hasField 117} 118 119// SeriesByID returns a series by identifier. 120func (m *measurement) SeriesByID(id uint64) *series { 121 m.mu.RLock() 122 defer m.mu.RUnlock() 123 return m.seriesByID[id] 124} 125 126// SeriesByIDMap returns the internal seriesByID map. 127func (m *measurement) SeriesByIDMap() map[uint64]*series { 128 m.mu.RLock() 129 defer m.mu.RUnlock() 130 return m.seriesByID 131} 132 133// SeriesByIDSlice returns a list of series by identifiers. 134func (m *measurement) SeriesByIDSlice(ids []uint64) []*series { 135 m.mu.RLock() 136 defer m.mu.RUnlock() 137 a := make([]*series, len(ids)) 138 for i, id := range ids { 139 a[i] = m.seriesByID[id] 140 } 141 return a 142} 143 144// AppendSeriesKeysByID appends keys for a list of series ids to a buffer. 145func (m *measurement) AppendSeriesKeysByID(dst []string, ids []uint64) []string { 146 m.mu.RLock() 147 defer m.mu.RUnlock() 148 for _, id := range ids { 149 if s := m.seriesByID[id]; s != nil && !s.Deleted() { 150 dst = append(dst, s.Key) 151 } 152 } 153 return dst 154} 155 156// SeriesKeysByID returns the a list of keys for a set of ids. 157func (m *measurement) SeriesKeysByID(ids seriesIDs) [][]byte { 158 m.mu.RLock() 159 defer m.mu.RUnlock() 160 keys := make([][]byte, 0, len(ids)) 161 for _, id := range ids { 162 s := m.seriesByID[id] 163 if s == nil || s.Deleted() { 164 continue 165 } 166 keys = append(keys, []byte(s.Key)) 167 } 168 169 if !bytesutil.IsSorted(keys) { 170 bytesutil.Sort(keys) 171 } 172 173 return keys 174} 175 176// SeriesKeys returns the keys of every series in this measurement 177func (m *measurement) SeriesKeys() [][]byte { 178 m.mu.RLock() 179 defer m.mu.RUnlock() 180 keys := make([][]byte, 0, len(m.seriesByID)) 181 for _, s := range m.seriesByID { 182 if s.Deleted() { 183 continue 184 } 185 keys = append(keys, []byte(s.Key)) 186 } 187 188 if !bytesutil.IsSorted(keys) { 189 bytesutil.Sort(keys) 190 } 191 192 return keys 193} 194 195func (m *measurement) SeriesIDs() seriesIDs { 196 m.mu.RLock() 197 if len(m.sortedSeriesIDs) == len(m.seriesByID) { 198 s := m.sortedSeriesIDs 199 m.mu.RUnlock() 200 return s 201 } 202 m.mu.RUnlock() 203 204 m.mu.Lock() 205 if len(m.sortedSeriesIDs) == len(m.seriesByID) { 206 s := m.sortedSeriesIDs 207 m.mu.Unlock() 208 return s 209 } 210 211 m.sortedSeriesIDs = m.sortedSeriesIDs[:0] 212 if cap(m.sortedSeriesIDs) < len(m.seriesByID) { 213 m.sortedSeriesIDs = make(seriesIDs, 0, len(m.seriesByID)) 214 } 215 216 for k, v := range m.seriesByID { 217 if v.Deleted() { 218 continue 219 } 220 m.sortedSeriesIDs = append(m.sortedSeriesIDs, k) 221 } 222 sort.Sort(m.sortedSeriesIDs) 223 s := m.sortedSeriesIDs 224 m.mu.Unlock() 225 return s 226} 227 228// HasTagKey returns true if at least one series in this measurement has written a value for the passed in tag key 229func (m *measurement) HasTagKey(k string) bool { 230 m.mu.RLock() 231 defer m.mu.RUnlock() 232 _, hasTag := m.seriesByTagKeyValue[k] 233 return hasTag 234} 235 236func (m *measurement) HasTagKeyValue(k, v []byte) bool { 237 m.mu.RLock() 238 defer m.mu.RUnlock() 239 return m.seriesByTagKeyValue[string(k)].Contains(string(v)) 240} 241 242// HasSeries returns true if there is at least 1 series under this measurement. 243func (m *measurement) HasSeries() bool { 244 m.mu.RLock() 245 defer m.mu.RUnlock() 246 return len(m.seriesByID) > 0 247} 248 249// CardinalityBytes returns the number of values associated with the given tag key. 250func (m *measurement) CardinalityBytes(key []byte) int { 251 m.mu.RLock() 252 defer m.mu.RUnlock() 253 return m.seriesByTagKeyValue[string(key)].Cardinality() 254} 255 256// AddSeries adds a series to the measurement's index. 257// It returns true if the series was added successfully or false if the series was already present. 258func (m *measurement) AddSeries(s *series) bool { 259 if s == nil { 260 return false 261 } 262 263 m.mu.RLock() 264 if m.seriesByID[s.ID] != nil { 265 m.mu.RUnlock() 266 return false 267 } 268 m.mu.RUnlock() 269 270 m.mu.Lock() 271 defer m.mu.Unlock() 272 273 if m.seriesByID[s.ID] != nil { 274 return false 275 } 276 277 m.seriesByID[s.ID] = s 278 279 if len(m.seriesByID) == 1 || (len(m.sortedSeriesIDs) == len(m.seriesByID)-1 && s.ID > m.sortedSeriesIDs[len(m.sortedSeriesIDs)-1]) { 280 m.sortedSeriesIDs = append(m.sortedSeriesIDs, s.ID) 281 } 282 283 // add this series id to the tag index on the measurement 284 for _, t := range s.Tags { 285 valueMap := m.seriesByTagKeyValue[string(t.Key)] 286 if valueMap == nil { 287 valueMap = newTagKeyValue() 288 m.seriesByTagKeyValue[string(t.Key)] = valueMap 289 } 290 valueMap.InsertSeriesIDByte(t.Value, s.ID) 291 } 292 293 return true 294} 295 296// DropSeries removes a series from the measurement's index. 297func (m *measurement) DropSeries(series *series) { 298 seriesID := series.ID 299 m.mu.Lock() 300 defer m.mu.Unlock() 301 302 // Existence check before delete here to clean up the caching/indexing only when needed 303 if _, ok := m.seriesByID[seriesID]; !ok { 304 return 305 } 306 delete(m.seriesByID, seriesID) 307 308 // clear our lazily sorted set of ids 309 m.sortedSeriesIDs = m.sortedSeriesIDs[:0] 310 311 // Mark that this measurements tagValue map has stale entries that need to be rebuilt. 312 m.dirty = true 313} 314 315func (m *measurement) Rebuild() *measurement { 316 m.mu.RLock() 317 318 // Nothing needs to be rebuilt. 319 if !m.dirty { 320 m.mu.RUnlock() 321 return m 322 } 323 324 // Create a new measurement from the state of the existing measurement 325 nm := newMeasurement(m.Database, string(m.NameBytes)) 326 nm.fieldNames = m.fieldNames 327 m.mu.RUnlock() 328 329 // Re-add each series to allow the measurement indexes to get re-created. If there were 330 // deletes, the existing measurement may have references to deleted series that need to be 331 // expunged. Note: we're NOT using SeriesIDs which returns the series in sorted order because 332 // we need to do this under a write lock to prevent races. The series are added in sorted 333 // order to prevent resorting them again after they are all re-added. 334 m.mu.Lock() 335 defer m.mu.Unlock() 336 337 for k, v := range m.seriesByID { 338 if v.Deleted() { 339 continue 340 } 341 m.sortedSeriesIDs = append(m.sortedSeriesIDs, k) 342 } 343 sort.Sort(m.sortedSeriesIDs) 344 345 for _, id := range m.sortedSeriesIDs { 346 if s := m.seriesByID[id]; s != nil { 347 // Explicitly set the new measurement on the series. 348 s.Measurement = nm 349 nm.AddSeries(s) 350 } 351 } 352 return nm 353} 354 355// filters walks the where clause of a select statement and returns a map with all series ids 356// matching the where clause and any filter expression that should be applied to each 357func (m *measurement) filters(condition influxql.Expr) ([]uint64, map[uint64]influxql.Expr, error) { 358 if condition == nil { 359 return m.SeriesIDs(), nil, nil 360 } 361 return m.WalkWhereForSeriesIds(condition) 362} 363 364// TagSets returns the unique tag sets that exist for the given tag keys. This is used to determine 365// what composite series will be created by a group by. i.e. "group by region" should return: 366// {"region":"uswest"}, {"region":"useast"} 367// or region, service returns 368// {"region": "uswest", "service": "redis"}, {"region": "uswest", "service": "mysql"}, etc... 369// This will also populate the TagSet objects with the series IDs that match each tagset and any 370// influx filter expression that goes with the series 371// TODO: this shouldn't be exported. However, until tx.go and the engine get refactored into tsdb, we need it. 372func (m *measurement) TagSets(shardSeriesIDs *tsdb.SeriesIDSet, opt query.IteratorOptions) ([]*query.TagSet, error) { 373 // get the unique set of series ids and the filters that should be applied to each 374 ids, filters, err := m.filters(opt.Condition) 375 if err != nil { 376 return nil, err 377 } 378 379 var dims []string 380 if len(opt.Dimensions) > 0 { 381 dims = make([]string, len(opt.Dimensions)) 382 copy(dims, opt.Dimensions) 383 sort.Strings(dims) 384 } 385 386 m.mu.RLock() 387 // For every series, get the tag values for the requested tag keys i.e. dimensions. This is the 388 // TagSet for that series. Series with the same TagSet are then grouped together, because for the 389 // purpose of GROUP BY they are part of the same composite series. 390 tagSets := make(map[string]*query.TagSet, 64) 391 var seriesN int 392 for _, id := range ids { 393 // Abort if the query was killed 394 select { 395 case <-opt.InterruptCh: 396 m.mu.RUnlock() 397 return nil, query.ErrQueryInterrupted 398 default: 399 } 400 401 if opt.MaxSeriesN > 0 && seriesN > opt.MaxSeriesN { 402 m.mu.RUnlock() 403 return nil, fmt.Errorf("max-select-series limit exceeded: (%d/%d)", seriesN, opt.MaxSeriesN) 404 } 405 406 s := m.seriesByID[id] 407 if s == nil || s.Deleted() || !shardSeriesIDs.Contains(id) { 408 continue 409 } 410 411 if opt.Authorizer != nil && !opt.Authorizer.AuthorizeSeriesRead(m.Database, m.NameBytes, s.Tags) { 412 continue 413 } 414 415 var tagsAsKey []byte 416 if len(dims) > 0 { 417 tagsAsKey = tsdb.MakeTagsKey(dims, s.Tags) 418 } 419 420 tagSet := tagSets[string(tagsAsKey)] 421 if tagSet == nil { 422 // This TagSet is new, create a new entry for it. 423 tagSet = &query.TagSet{ 424 Tags: nil, 425 Key: tagsAsKey, 426 } 427 tagSets[string(tagsAsKey)] = tagSet 428 } 429 // Associate the series and filter with the Tagset. 430 tagSet.AddFilter(s.Key, filters[id]) 431 seriesN++ 432 } 433 // Release the lock while we sort all the tags 434 m.mu.RUnlock() 435 436 // Sort the series in each tag set. 437 for _, t := range tagSets { 438 // Abort if the query was killed 439 select { 440 case <-opt.InterruptCh: 441 return nil, query.ErrQueryInterrupted 442 default: 443 } 444 445 sort.Sort(t) 446 } 447 448 // The TagSets have been created, as a map of TagSets. Just send 449 // the values back as a slice, sorting for consistency. 450 sortedTagsSets := make([]*query.TagSet, 0, len(tagSets)) 451 for _, v := range tagSets { 452 sortedTagsSets = append(sortedTagsSets, v) 453 } 454 sort.Sort(byTagKey(sortedTagsSets)) 455 456 return sortedTagsSets, nil 457} 458 459// intersectSeriesFilters performs an intersection for two sets of ids and filter expressions. 460func intersectSeriesFilters(lids, rids seriesIDs, lfilters, rfilters FilterExprs) (seriesIDs, FilterExprs) { 461 // We only want to allocate a slice and map of the smaller size. 462 var ids []uint64 463 if len(lids) > len(rids) { 464 ids = make([]uint64, 0, len(rids)) 465 } else { 466 ids = make([]uint64, 0, len(lids)) 467 } 468 469 var filters FilterExprs 470 if len(lfilters) > len(rfilters) { 471 filters = make(FilterExprs, len(rfilters)) 472 } else { 473 filters = make(FilterExprs, len(lfilters)) 474 } 475 476 // They're in sorted order so advance the counter as needed. 477 // This is, don't run comparisons against lower values that we've already passed. 478 for len(lids) > 0 && len(rids) > 0 { 479 lid, rid := lids[0], rids[0] 480 if lid == rid { 481 ids = append(ids, lid) 482 483 var expr influxql.Expr 484 lfilter := lfilters[lid] 485 rfilter := rfilters[rid] 486 487 if lfilter != nil && rfilter != nil { 488 be := &influxql.BinaryExpr{ 489 Op: influxql.AND, 490 LHS: lfilter, 491 RHS: rfilter, 492 } 493 expr = influxql.Reduce(be, nil) 494 } else if lfilter != nil { 495 expr = lfilter 496 } else if rfilter != nil { 497 expr = rfilter 498 } 499 500 if expr != nil { 501 filters[lid] = expr 502 } 503 lids, rids = lids[1:], rids[1:] 504 } else if lid < rid { 505 lids = lids[1:] 506 } else { 507 rids = rids[1:] 508 } 509 } 510 return ids, filters 511} 512 513// unionSeriesFilters performs a union for two sets of ids and filter expressions. 514func unionSeriesFilters(lids, rids seriesIDs, lfilters, rfilters FilterExprs) (seriesIDs, FilterExprs) { 515 ids := make([]uint64, 0, len(lids)+len(rids)) 516 517 // Setup the filters with the smallest size since we will discard filters 518 // that do not have a match on the other side. 519 var filters FilterExprs 520 if len(lfilters) < len(rfilters) { 521 filters = make(FilterExprs, len(lfilters)) 522 } else { 523 filters = make(FilterExprs, len(rfilters)) 524 } 525 526 for len(lids) > 0 && len(rids) > 0 { 527 lid, rid := lids[0], rids[0] 528 if lid == rid { 529 ids = append(ids, lid) 530 531 // If one side does not have a filter, then the series has been 532 // included on one side of the OR with no condition. Eliminate the 533 // filter in this case. 534 var expr influxql.Expr 535 lfilter := lfilters[lid] 536 rfilter := rfilters[rid] 537 if lfilter != nil && rfilter != nil { 538 be := &influxql.BinaryExpr{ 539 Op: influxql.OR, 540 LHS: lfilter, 541 RHS: rfilter, 542 } 543 expr = influxql.Reduce(be, nil) 544 } 545 546 if expr != nil { 547 filters[lid] = expr 548 } 549 lids, rids = lids[1:], rids[1:] 550 } else if lid < rid { 551 ids = append(ids, lid) 552 553 filter := lfilters[lid] 554 if filter != nil { 555 filters[lid] = filter 556 } 557 lids = lids[1:] 558 } else { 559 ids = append(ids, rid) 560 561 filter := rfilters[rid] 562 if filter != nil { 563 filters[rid] = filter 564 } 565 rids = rids[1:] 566 } 567 } 568 569 // Now append the remainder. 570 if len(lids) > 0 { 571 for i := 0; i < len(lids); i++ { 572 ids = append(ids, lids[i]) 573 574 filter := lfilters[lids[i]] 575 if filter != nil { 576 filters[lids[i]] = filter 577 } 578 } 579 } else if len(rids) > 0 { 580 for i := 0; i < len(rids); i++ { 581 ids = append(ids, rids[i]) 582 583 filter := rfilters[rids[i]] 584 if filter != nil { 585 filters[rids[i]] = filter 586 } 587 } 588 } 589 return ids, filters 590} 591 592// SeriesIDsByTagKey returns a list of all series for a tag key. 593func (m *measurement) SeriesIDsByTagKey(key []byte) seriesIDs { 594 tagVals := m.seriesByTagKeyValue[string(key)] 595 if tagVals == nil { 596 return nil 597 } 598 599 var ids seriesIDs 600 tagVals.RangeAll(func(_ string, a seriesIDs) { 601 ids = append(ids, a...) 602 }) 603 sort.Sort(ids) 604 return ids 605} 606 607// SeriesIDsByTagValue returns a list of all series for a tag value. 608func (m *measurement) SeriesIDsByTagValue(key, value []byte) seriesIDs { 609 tagVals := m.seriesByTagKeyValue[string(key)] 610 if tagVals == nil { 611 return nil 612 } 613 return tagVals.Load(string(value)) 614} 615 616// IDsForExpr returns the series IDs that are candidates to match the given expression. 617func (m *measurement) IDsForExpr(n *influxql.BinaryExpr) seriesIDs { 618 ids, _, _ := m.idsForExpr(n) 619 return ids 620} 621 622// idsForExpr returns a collection of series ids and a filter expression that should 623// be used to filter points from those series. 624func (m *measurement) idsForExpr(n *influxql.BinaryExpr) (seriesIDs, influxql.Expr, error) { 625 // If this binary expression has another binary expression, then this 626 // is some expression math and we should just pass it to the underlying query. 627 if _, ok := n.LHS.(*influxql.BinaryExpr); ok { 628 return m.SeriesIDs(), n, nil 629 } else if _, ok := n.RHS.(*influxql.BinaryExpr); ok { 630 return m.SeriesIDs(), n, nil 631 } 632 633 // Retrieve the variable reference from the correct side of the expression. 634 name, ok := n.LHS.(*influxql.VarRef) 635 value := n.RHS 636 if !ok { 637 name, ok = n.RHS.(*influxql.VarRef) 638 if !ok { 639 // This is an expression we do not know how to evaluate. Let the 640 // query engine take care of this. 641 return m.SeriesIDs(), n, nil 642 } 643 value = n.LHS 644 } 645 646 // For fields, return all series IDs from this measurement and return 647 // the expression passed in, as the filter. 648 if name.Val != "_name" && ((name.Type == influxql.Unknown && m.HasField(name.Val)) || name.Type == influxql.AnyField || (name.Type != influxql.Tag && name.Type != influxql.Unknown)) { 649 return m.SeriesIDs(), n, nil 650 } else if value, ok := value.(*influxql.VarRef); ok { 651 // Check if the RHS is a variable and if it is a field. 652 if value.Val != "_name" && ((value.Type == influxql.Unknown && m.HasField(value.Val)) || name.Type == influxql.AnyField || (value.Type != influxql.Tag && value.Type != influxql.Unknown)) { 653 return m.SeriesIDs(), n, nil 654 } 655 } 656 657 // Retrieve list of series with this tag key. 658 tagVals := m.seriesByTagKeyValue[name.Val] 659 660 // if we're looking for series with a specific tag value 661 if str, ok := value.(*influxql.StringLiteral); ok { 662 var ids seriesIDs 663 664 // Special handling for "_name" to match measurement name. 665 if name.Val == "_name" { 666 if (n.Op == influxql.EQ && str.Val == m.Name) || (n.Op == influxql.NEQ && str.Val != m.Name) { 667 return m.SeriesIDs(), nil, nil 668 } 669 return nil, nil, nil 670 } 671 672 if n.Op == influxql.EQ { 673 if str.Val != "" { 674 // return series that have a tag of specific value. 675 ids = tagVals.Load(str.Val) 676 } else { 677 // Make a copy of all series ids and mark the ones we need to evict. 678 sIDs := newEvictSeriesIDs(m.SeriesIDs()) 679 680 // Go through each slice and mark the values we find as zero so 681 // they can be removed later. 682 tagVals.RangeAll(func(_ string, a seriesIDs) { 683 sIDs.mark(a) 684 }) 685 686 // Make a new slice with only the remaining ids. 687 ids = sIDs.evict() 688 } 689 } else if n.Op == influxql.NEQ { 690 if str.Val != "" { 691 ids = m.SeriesIDs() 692 if vals := tagVals.Load(str.Val); len(vals) > 0 { 693 ids = ids.Reject(vals) 694 } 695 } else { 696 tagVals.RangeAll(func(_ string, a seriesIDs) { 697 ids = append(ids, a...) 698 }) 699 sort.Sort(ids) 700 } 701 } 702 return ids, nil, nil 703 } 704 705 // if we're looking for series with a tag value that matches a regex 706 if re, ok := value.(*influxql.RegexLiteral); ok { 707 var ids seriesIDs 708 709 // Special handling for "_name" to match measurement name. 710 if name.Val == "_name" { 711 match := re.Val.MatchString(m.Name) 712 if (n.Op == influxql.EQREGEX && match) || (n.Op == influxql.NEQREGEX && !match) { 713 return m.SeriesIDs(), &influxql.BooleanLiteral{Val: true}, nil 714 } 715 return nil, nil, nil 716 } 717 718 // Check if we match the empty string to see if we should include series 719 // that are missing the tag. 720 empty := re.Val.MatchString("") 721 722 // Gather the series that match the regex. If we should include the empty string, 723 // start with the list of all series and reject series that don't match our condition. 724 // If we should not include the empty string, include series that match our condition. 725 if empty && n.Op == influxql.EQREGEX { 726 // See comments above for EQ with a StringLiteral. 727 sIDs := newEvictSeriesIDs(m.SeriesIDs()) 728 tagVals.RangeAll(func(k string, a seriesIDs) { 729 if !re.Val.MatchString(k) { 730 sIDs.mark(a) 731 } 732 }) 733 ids = sIDs.evict() 734 } else if empty && n.Op == influxql.NEQREGEX { 735 ids = make(seriesIDs, 0, len(m.SeriesIDs())) 736 tagVals.RangeAll(func(k string, a seriesIDs) { 737 if !re.Val.MatchString(k) { 738 ids = append(ids, a...) 739 } 740 }) 741 sort.Sort(ids) 742 } else if !empty && n.Op == influxql.EQREGEX { 743 ids = make(seriesIDs, 0, len(m.SeriesIDs())) 744 tagVals.RangeAll(func(k string, a seriesIDs) { 745 if re.Val.MatchString(k) { 746 ids = append(ids, a...) 747 } 748 }) 749 sort.Sort(ids) 750 } else if !empty && n.Op == influxql.NEQREGEX { 751 // See comments above for EQ with a StringLiteral. 752 sIDs := newEvictSeriesIDs(m.SeriesIDs()) 753 tagVals.RangeAll(func(k string, a seriesIDs) { 754 if re.Val.MatchString(k) { 755 sIDs.mark(a) 756 } 757 }) 758 ids = sIDs.evict() 759 } 760 return ids, nil, nil 761 } 762 763 // compare tag values 764 if ref, ok := value.(*influxql.VarRef); ok { 765 var ids seriesIDs 766 767 if n.Op == influxql.NEQ { 768 ids = m.SeriesIDs() 769 } 770 771 rhsTagVals := m.seriesByTagKeyValue[ref.Val] 772 tagVals.RangeAll(func(k string, a seriesIDs) { 773 tags := a.Intersect(rhsTagVals.Load(k)) 774 if n.Op == influxql.EQ { 775 ids = ids.Union(tags) 776 } else if n.Op == influxql.NEQ { 777 ids = ids.Reject(tags) 778 } 779 }) 780 return ids, nil, nil 781 } 782 783 // We do not know how to evaluate this expression so pass it 784 // on to the query engine. 785 return m.SeriesIDs(), n, nil 786} 787 788// FilterExprs represents a map of series IDs to filter expressions. 789type FilterExprs map[uint64]influxql.Expr 790 791// DeleteBoolLiteralTrues deletes all elements whose filter expression is a boolean literal true. 792func (fe FilterExprs) DeleteBoolLiteralTrues() { 793 for id, expr := range fe { 794 if e, ok := expr.(*influxql.BooleanLiteral); ok && e.Val { 795 delete(fe, id) 796 } 797 } 798} 799 800// Len returns the number of elements. 801func (fe FilterExprs) Len() int { 802 if fe == nil { 803 return 0 804 } 805 return len(fe) 806} 807 808// WalkWhereForSeriesIds recursively walks the WHERE clause and returns an ordered set of series IDs and 809// a map from those series IDs to filter expressions that should be used to limit points returned in 810// the final query result. 811func (m *measurement) WalkWhereForSeriesIds(expr influxql.Expr) (seriesIDs, FilterExprs, error) { 812 switch n := expr.(type) { 813 case *influxql.BinaryExpr: 814 switch n.Op { 815 case influxql.EQ, influxql.NEQ, influxql.LT, influxql.LTE, influxql.GT, influxql.GTE, influxql.EQREGEX, influxql.NEQREGEX: 816 // Get the series IDs and filter expression for the tag or field comparison. 817 ids, expr, err := m.idsForExpr(n) 818 if err != nil { 819 return nil, nil, err 820 } 821 822 if len(ids) == 0 { 823 return ids, nil, nil 824 } 825 826 // If the expression is a boolean literal that is true, ignore it. 827 if b, ok := expr.(*influxql.BooleanLiteral); ok && b.Val { 828 expr = nil 829 } 830 831 var filters FilterExprs 832 if expr != nil { 833 filters = make(FilterExprs, len(ids)) 834 for _, id := range ids { 835 filters[id] = expr 836 } 837 } 838 839 return ids, filters, nil 840 case influxql.AND, influxql.OR: 841 // Get the series IDs and filter expressions for the LHS. 842 lids, lfilters, err := m.WalkWhereForSeriesIds(n.LHS) 843 if err != nil { 844 return nil, nil, err 845 } 846 847 // Get the series IDs and filter expressions for the RHS. 848 rids, rfilters, err := m.WalkWhereForSeriesIds(n.RHS) 849 if err != nil { 850 return nil, nil, err 851 } 852 853 // Combine the series IDs from the LHS and RHS. 854 if n.Op == influxql.AND { 855 ids, filters := intersectSeriesFilters(lids, rids, lfilters, rfilters) 856 return ids, filters, nil 857 } else { 858 ids, filters := unionSeriesFilters(lids, rids, lfilters, rfilters) 859 return ids, filters, nil 860 } 861 } 862 863 ids, _, err := m.idsForExpr(n) 864 return ids, nil, err 865 case *influxql.ParenExpr: 866 // walk down the tree 867 return m.WalkWhereForSeriesIds(n.Expr) 868 case *influxql.BooleanLiteral: 869 if n.Val { 870 return m.SeriesIDs(), nil, nil 871 } 872 return nil, nil, nil 873 default: 874 return nil, nil, nil 875 } 876} 877 878// SeriesIDsAllOrByExpr walks an expressions for matching series IDs 879// or, if no expressions is given, returns all series IDs for the measurement. 880func (m *measurement) SeriesIDsAllOrByExpr(expr influxql.Expr) (seriesIDs, error) { 881 // If no expression given or the measurement has no series, 882 // we can take just return the ids or nil accordingly. 883 if expr == nil { 884 return m.SeriesIDs(), nil 885 } 886 887 m.mu.RLock() 888 l := len(m.seriesByID) 889 m.mu.RUnlock() 890 if l == 0 { 891 return nil, nil 892 } 893 894 // Get series IDs that match the WHERE clause. 895 ids, _, err := m.WalkWhereForSeriesIds(expr) 896 if err != nil { 897 return nil, err 898 } 899 900 return ids, nil 901} 902 903// tagKeysByExpr extracts the tag keys wanted by the expression. 904func (m *measurement) TagKeysByExpr(expr influxql.Expr) (map[string]struct{}, error) { 905 if expr == nil { 906 set := make(map[string]struct{}) 907 for _, key := range m.TagKeys() { 908 set[key] = struct{}{} 909 } 910 return set, nil 911 } 912 913 switch e := expr.(type) { 914 case *influxql.BinaryExpr: 915 switch e.Op { 916 case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX: 917 tag, ok := e.LHS.(*influxql.VarRef) 918 if !ok { 919 return nil, fmt.Errorf("left side of '%s' must be a tag key", e.Op.String()) 920 } else if tag.Val != "_tagKey" { 921 return nil, nil 922 } 923 924 if influxql.IsRegexOp(e.Op) { 925 re, ok := e.RHS.(*influxql.RegexLiteral) 926 if !ok { 927 return nil, fmt.Errorf("right side of '%s' must be a regular expression", e.Op.String()) 928 } 929 return m.tagKeysByFilter(e.Op, "", re.Val), nil 930 } 931 932 s, ok := e.RHS.(*influxql.StringLiteral) 933 if !ok { 934 return nil, fmt.Errorf("right side of '%s' must be a tag value string", e.Op.String()) 935 } 936 return m.tagKeysByFilter(e.Op, s.Val, nil), nil 937 938 case influxql.AND, influxql.OR: 939 lhs, err := m.TagKeysByExpr(e.LHS) 940 if err != nil { 941 return nil, err 942 } 943 944 rhs, err := m.TagKeysByExpr(e.RHS) 945 if err != nil { 946 return nil, err 947 } 948 949 if lhs != nil && rhs != nil { 950 if e.Op == influxql.OR { 951 return stringSet(lhs).union(rhs), nil 952 } 953 return stringSet(lhs).intersect(rhs), nil 954 } else if lhs != nil { 955 return lhs, nil 956 } else if rhs != nil { 957 return rhs, nil 958 } 959 return nil, nil 960 default: 961 return nil, fmt.Errorf("invalid operator") 962 } 963 964 case *influxql.ParenExpr: 965 return m.TagKeysByExpr(e.Expr) 966 } 967 968 return nil, fmt.Errorf("%#v", expr) 969} 970 971// tagKeysByFilter will filter the tag keys for the measurement. 972func (m *measurement) tagKeysByFilter(op influxql.Token, val string, regex *regexp.Regexp) stringSet { 973 ss := newStringSet() 974 for _, key := range m.TagKeys() { 975 var matched bool 976 switch op { 977 case influxql.EQ: 978 matched = key == val 979 case influxql.NEQ: 980 matched = key != val 981 case influxql.EQREGEX: 982 matched = regex.MatchString(key) 983 case influxql.NEQREGEX: 984 matched = !regex.MatchString(key) 985 } 986 987 if !matched { 988 continue 989 } 990 ss.add(key) 991 } 992 return ss 993} 994 995// Measurements represents a list of *Measurement. 996type measurements []*measurement 997 998// Len implements sort.Interface. 999func (a measurements) Len() int { return len(a) } 1000 1001// Less implements sort.Interface. 1002func (a measurements) Less(i, j int) bool { return a[i].Name < a[j].Name } 1003 1004// Swap implements sort.Interface. 1005func (a measurements) Swap(i, j int) { a[i], a[j] = a[j], a[i] } 1006 1007// series belong to a Measurement and represent unique time series in a database. 1008type series struct { 1009 mu sync.RWMutex 1010 deleted bool 1011 1012 // immutable 1013 ID uint64 1014 Measurement *measurement 1015 Key string 1016 Tags models.Tags 1017} 1018 1019// newSeries returns an initialized series struct 1020func newSeries(id uint64, m *measurement, key string, tags models.Tags) *series { 1021 return &series{ 1022 ID: id, 1023 Measurement: m, 1024 Key: key, 1025 Tags: tags, 1026 } 1027} 1028 1029// bytes estimates the memory footprint of this series, in bytes. 1030func (s *series) bytes() int { 1031 var b int 1032 s.mu.RLock() 1033 b += 24 // RWMutex uses 24 bytes 1034 b += int(unsafe.Sizeof(s.deleted) + unsafe.Sizeof(s.ID)) 1035 // Do not count s.Measurement to prevent double-counting in Index.Bytes. 1036 b += int(unsafe.Sizeof(s.Key)) + len(s.Key) 1037 for _, tag := range s.Tags { 1038 b += int(unsafe.Sizeof(tag)) + len(tag.Key) + len(tag.Value) 1039 } 1040 b += int(unsafe.Sizeof(s.Tags)) 1041 s.mu.RUnlock() 1042 return b 1043} 1044 1045// Delete marks this series as deleted. A deleted series should not be returned for queries. 1046func (s *series) Delete() { 1047 s.mu.Lock() 1048 s.deleted = true 1049 s.mu.Unlock() 1050} 1051 1052// Deleted indicates if this was previously deleted. 1053func (s *series) Deleted() bool { 1054 s.mu.RLock() 1055 v := s.deleted 1056 s.mu.RUnlock() 1057 return v 1058} 1059 1060// TagKeyValue provides goroutine-safe concurrent access to the set of series 1061// ids mapping to a set of tag values. 1062type tagKeyValue struct { 1063 mu sync.RWMutex 1064 entries map[string]*tagKeyValueEntry 1065} 1066 1067// bytes estimates the memory footprint of this tagKeyValue, in bytes. 1068func (t *tagKeyValue) bytes() int { 1069 var b int 1070 t.mu.RLock() 1071 b += 24 // RWMutex is 24 bytes 1072 b += int(unsafe.Sizeof(t.entries)) 1073 for k, v := range t.entries { 1074 b += int(unsafe.Sizeof(k)) + len(k) 1075 b += len(v.m) * 8 // uint64 1076 b += len(v.a) * 8 // uint64 1077 b += int(unsafe.Sizeof(v) + unsafe.Sizeof(*v)) 1078 } 1079 t.mu.RUnlock() 1080 return b 1081} 1082 1083// NewTagKeyValue initialises a new TagKeyValue. 1084func newTagKeyValue() *tagKeyValue { 1085 return &tagKeyValue{entries: make(map[string]*tagKeyValueEntry)} 1086} 1087 1088// Cardinality returns the number of values in the TagKeyValue. 1089func (t *tagKeyValue) Cardinality() int { 1090 if t == nil { 1091 return 0 1092 } 1093 1094 t.mu.RLock() 1095 defer t.mu.RUnlock() 1096 return len(t.entries) 1097} 1098 1099// Contains returns true if the TagKeyValue contains value. 1100func (t *tagKeyValue) Contains(value string) bool { 1101 if t == nil { 1102 return false 1103 } 1104 1105 t.mu.RLock() 1106 defer t.mu.RUnlock() 1107 _, ok := t.entries[value] 1108 return ok 1109} 1110 1111// InsertSeriesIDByte adds a series id to the tag key value. 1112func (t *tagKeyValue) InsertSeriesIDByte(value []byte, id uint64) { 1113 t.mu.Lock() 1114 entry := t.entries[string(value)] 1115 if entry == nil { 1116 entry = newTagKeyValueEntry() 1117 t.entries[string(value)] = entry 1118 } 1119 entry.m[id] = struct{}{} 1120 t.mu.Unlock() 1121} 1122 1123// Load returns the SeriesIDs for the provided tag value. 1124func (t *tagKeyValue) Load(value string) seriesIDs { 1125 if t == nil { 1126 return nil 1127 } 1128 1129 t.mu.RLock() 1130 entry := t.entries[value] 1131 ids, changed := entry.ids() 1132 t.mu.RUnlock() 1133 1134 if changed { 1135 t.mu.Lock() 1136 entry.setIDs(ids) 1137 t.mu.Unlock() 1138 } 1139 return ids 1140} 1141 1142// TagKeyValue is a no-op. 1143// 1144// If f returns false then iteration over any remaining keys or values will cease. 1145func (t *tagKeyValue) Range(f func(tagValue string, a seriesIDs) bool) { 1146 if t == nil { 1147 return 1148 } 1149 1150 t.mu.RLock() 1151 for tagValue, entry := range t.entries { 1152 ids, changed := entry.ids() 1153 if changed { 1154 t.mu.RUnlock() 1155 t.mu.Lock() 1156 entry.setIDs(ids) 1157 t.mu.Unlock() 1158 t.mu.RLock() 1159 } 1160 1161 if !f(tagValue, ids) { 1162 t.mu.RUnlock() 1163 return 1164 } 1165 } 1166 t.mu.RUnlock() 1167} 1168 1169// RangeAll calls f sequentially on each key and value. A call to RangeAll on a 1170// nil TagKeyValue is a no-op. 1171func (t *tagKeyValue) RangeAll(f func(k string, a seriesIDs)) { 1172 t.Range(func(k string, a seriesIDs) bool { 1173 f(k, a) 1174 return true 1175 }) 1176} 1177 1178type tagKeyValueEntry struct { 1179 m map[uint64]struct{} // series id set 1180 a seriesIDs // lazily sorted list of series. 1181} 1182 1183func newTagKeyValueEntry() *tagKeyValueEntry { 1184 return &tagKeyValueEntry{m: make(map[uint64]struct{})} 1185} 1186 1187func (e *tagKeyValueEntry) ids() (_ seriesIDs, changed bool) { 1188 if e == nil { 1189 return nil, false 1190 } else if len(e.a) == len(e.m) { 1191 return e.a, false 1192 } 1193 1194 a := make(seriesIDs, 0, len(e.m)) 1195 for id := range e.m { 1196 a = append(a, id) 1197 } 1198 radix.SortUint64s(a) 1199 1200 return a, true 1201} 1202 1203func (e *tagKeyValueEntry) setIDs(a seriesIDs) { 1204 if e == nil { 1205 return 1206 } 1207 e.a = a 1208} 1209 1210// SeriesIDs is a convenience type for sorting, checking equality, and doing 1211// union and intersection of collections of series ids. 1212type seriesIDs []uint64 1213 1214// Len implements sort.Interface. 1215func (a seriesIDs) Len() int { return len(a) } 1216 1217// Less implements sort.Interface. 1218func (a seriesIDs) Less(i, j int) bool { return a[i] < a[j] } 1219 1220// Swap implements sort.Interface. 1221func (a seriesIDs) Swap(i, j int) { a[i], a[j] = a[j], a[i] } 1222 1223// Equals assumes that both are sorted. 1224func (a seriesIDs) Equals(other seriesIDs) bool { 1225 if len(a) != len(other) { 1226 return false 1227 } 1228 for i, s := range other { 1229 if a[i] != s { 1230 return false 1231 } 1232 } 1233 return true 1234} 1235 1236// Intersect returns a new collection of series ids in sorted order that is the intersection of the two. 1237// The two collections must already be sorted. 1238func (a seriesIDs) Intersect(other seriesIDs) seriesIDs { 1239 l := a 1240 r := other 1241 1242 // we want to iterate through the shortest one and stop 1243 if len(other) < len(a) { 1244 l = other 1245 r = a 1246 } 1247 1248 // they're in sorted order so advance the counter as needed. 1249 // That is, don't run comparisons against lower values that we've already passed 1250 var i, j int 1251 1252 ids := make([]uint64, 0, len(l)) 1253 for i < len(l) && j < len(r) { 1254 if l[i] == r[j] { 1255 ids = append(ids, l[i]) 1256 i++ 1257 j++ 1258 } else if l[i] < r[j] { 1259 i++ 1260 } else { 1261 j++ 1262 } 1263 } 1264 1265 return seriesIDs(ids) 1266} 1267 1268// Union returns a new collection of series ids in sorted order that is the union of the two. 1269// The two collections must already be sorted. 1270func (a seriesIDs) Union(other seriesIDs) seriesIDs { 1271 l := a 1272 r := other 1273 ids := make([]uint64, 0, len(l)+len(r)) 1274 var i, j int 1275 for i < len(l) && j < len(r) { 1276 if l[i] == r[j] { 1277 ids = append(ids, l[i]) 1278 i++ 1279 j++ 1280 } else if l[i] < r[j] { 1281 ids = append(ids, l[i]) 1282 i++ 1283 } else { 1284 ids = append(ids, r[j]) 1285 j++ 1286 } 1287 } 1288 1289 // now append the remainder 1290 if i < len(l) { 1291 ids = append(ids, l[i:]...) 1292 } else if j < len(r) { 1293 ids = append(ids, r[j:]...) 1294 } 1295 1296 return ids 1297} 1298 1299// Reject returns a new collection of series ids in sorted order with the passed in set removed from the original. 1300// This is useful for the NOT operator. The two collections must already be sorted. 1301func (a seriesIDs) Reject(other seriesIDs) seriesIDs { 1302 l := a 1303 r := other 1304 var i, j int 1305 1306 ids := make([]uint64, 0, len(l)) 1307 for i < len(l) && j < len(r) { 1308 if l[i] == r[j] { 1309 i++ 1310 j++ 1311 } else if l[i] < r[j] { 1312 ids = append(ids, l[i]) 1313 i++ 1314 } else { 1315 j++ 1316 } 1317 } 1318 1319 // Append the remainder 1320 if i < len(l) { 1321 ids = append(ids, l[i:]...) 1322 } 1323 1324 return seriesIDs(ids) 1325} 1326 1327// seriesID is a series id that may or may not have been evicted from the 1328// current id list. 1329type seriesID struct { 1330 val uint64 1331 evict bool 1332} 1333 1334// evictSeriesIDs is a slice of SeriesIDs with an extra field to mark if the 1335// field should be evicted or not. 1336type evictSeriesIDs struct { 1337 ids []seriesID 1338 sz int 1339} 1340 1341// newEvictSeriesIDs copies the ids into a new slice that can be used for 1342// evicting series from the slice. 1343func newEvictSeriesIDs(ids []uint64) evictSeriesIDs { 1344 a := make([]seriesID, len(ids)) 1345 for i, id := range ids { 1346 a[i].val = id 1347 } 1348 return evictSeriesIDs{ 1349 ids: a, 1350 sz: len(a), 1351 } 1352} 1353 1354// mark marks all of the ids in the sorted slice to be evicted from the list of 1355// series ids. If an id to be evicted does not exist, it just gets ignored. 1356func (a *evictSeriesIDs) mark(ids []uint64) { 1357 sIDs := a.ids 1358 for _, id := range ids { 1359 if len(sIDs) == 0 { 1360 break 1361 } 1362 1363 // Perform a binary search of the remaining slice if 1364 // the first element does not match the value we're 1365 // looking for. 1366 i := 0 1367 if sIDs[0].val < id { 1368 i = sort.Search(len(sIDs), func(i int) bool { 1369 return sIDs[i].val >= id 1370 }) 1371 } 1372 1373 if i >= len(sIDs) { 1374 break 1375 } else if sIDs[i].val == id { 1376 if !sIDs[i].evict { 1377 sIDs[i].evict = true 1378 a.sz-- 1379 } 1380 // Skip over this series since it has been evicted and won't be 1381 // encountered again. 1382 i++ 1383 } 1384 sIDs = sIDs[i:] 1385 } 1386} 1387 1388// evict creates a new slice with only the series that have not been evicted. 1389func (a *evictSeriesIDs) evict() (ids seriesIDs) { 1390 if a.sz == 0 { 1391 return ids 1392 } 1393 1394 // Make a new slice with only the remaining ids. 1395 ids = make([]uint64, 0, a.sz) 1396 for _, id := range a.ids { 1397 if id.evict { 1398 continue 1399 } 1400 ids = append(ids, id.val) 1401 } 1402 return ids 1403} 1404 1405// TagFilter represents a tag filter when looking up other tags or measurements. 1406type TagFilter struct { 1407 Op influxql.Token 1408 Key string 1409 Value string 1410 Regex *regexp.Regexp 1411} 1412 1413// TagKeys returns a list of the measurement's tag names, in sorted order. 1414func (m *measurement) TagKeys() []string { 1415 m.mu.RLock() 1416 keys := make([]string, 0, len(m.seriesByTagKeyValue)) 1417 for k := range m.seriesByTagKeyValue { 1418 keys = append(keys, k) 1419 } 1420 m.mu.RUnlock() 1421 sort.Strings(keys) 1422 return keys 1423} 1424 1425// TagValues returns all the values for the given tag key, in an arbitrary order. 1426func (m *measurement) TagValues(auth query.Authorizer, key string) []string { 1427 m.mu.RLock() 1428 defer m.mu.RUnlock() 1429 values := make([]string, 0, m.seriesByTagKeyValue[key].Cardinality()) 1430 1431 m.seriesByTagKeyValue[key].RangeAll(func(k string, a seriesIDs) { 1432 if query.AuthorizerIsOpen(auth) { 1433 values = append(values, k) 1434 } else { 1435 for _, sid := range a { 1436 s := m.seriesByID[sid] 1437 if s == nil { 1438 continue 1439 } 1440 if auth.AuthorizeSeriesRead(m.Database, m.NameBytes, s.Tags) { 1441 values = append(values, k) 1442 return 1443 } 1444 } 1445 } 1446 }) 1447 return values 1448} 1449 1450// SetFieldName adds the field name to the measurement. 1451func (m *measurement) SetFieldName(name string) { 1452 m.mu.RLock() 1453 _, ok := m.fieldNames[name] 1454 m.mu.RUnlock() 1455 1456 if ok { 1457 return 1458 } 1459 1460 m.mu.Lock() 1461 m.fieldNames[name] = struct{}{} 1462 m.mu.Unlock() 1463} 1464 1465// SeriesByTagKeyValue returns the TagKeyValue for the provided tag key. 1466func (m *measurement) SeriesByTagKeyValue(key string) *tagKeyValue { 1467 m.mu.RLock() 1468 defer m.mu.RUnlock() 1469 return m.seriesByTagKeyValue[key] 1470} 1471 1472// stringSet represents a set of strings. 1473type stringSet map[string]struct{} 1474 1475// newStringSet returns an empty stringSet. 1476func newStringSet() stringSet { 1477 return make(map[string]struct{}) 1478} 1479 1480// add adds strings to the set. 1481func (s stringSet) add(ss ...string) { 1482 for _, n := range ss { 1483 s[n] = struct{}{} 1484 } 1485} 1486 1487// list returns the current elements in the set, in sorted order. 1488func (s stringSet) list() []string { 1489 l := make([]string, 0, len(s)) 1490 for k := range s { 1491 l = append(l, k) 1492 } 1493 sort.Strings(l) 1494 return l 1495} 1496 1497// union returns the union of this set and another. 1498func (s stringSet) union(o stringSet) stringSet { 1499 ns := newStringSet() 1500 for k := range s { 1501 ns[k] = struct{}{} 1502 } 1503 for k := range o { 1504 ns[k] = struct{}{} 1505 } 1506 return ns 1507} 1508 1509// intersect returns the intersection of this set and another. 1510func (s stringSet) intersect(o stringSet) stringSet { 1511 shorter, longer := s, o 1512 if len(longer) < len(shorter) { 1513 shorter, longer = longer, shorter 1514 } 1515 1516 ns := newStringSet() 1517 for k := range shorter { 1518 if _, ok := longer[k]; ok { 1519 ns[k] = struct{}{} 1520 } 1521 } 1522 return ns 1523} 1524 1525type byTagKey []*query.TagSet 1526 1527func (t byTagKey) Len() int { return len(t) } 1528func (t byTagKey) Less(i, j int) bool { return bytes.Compare(t[i].Key, t[j].Key) < 0 } 1529func (t byTagKey) Swap(i, j int) { t[i], t[j] = t[j], t[i] } 1530