1package tsi1_test 2 3import ( 4 "fmt" 5 "reflect" 6 "sort" 7 "testing" 8 9 "github.com/influxdata/influxdb/models" 10 "github.com/influxdata/influxdb/tsdb" 11) 12 13// Ensure fileset can return an iterator over all series in the index. 14func TestFileSet_SeriesIDIterator(t *testing.T) { 15 idx := MustOpenIndex(1) 16 defer idx.Close() 17 18 // Create initial set of series. 19 if err := idx.CreateSeriesSliceIfNotExists([]Series{ 20 {Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "east"})}, 21 {Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "west"})}, 22 {Name: []byte("mem"), Tags: models.NewTags(map[string]string{"region": "east"})}, 23 }); err != nil { 24 t.Fatal(err) 25 } 26 27 // Verify initial set of series. 28 idx.Run(t, func(t *testing.T) { 29 fs, err := idx.PartitionAt(0).RetainFileSet() 30 if err != nil { 31 t.Fatal(err) 32 } 33 defer fs.Release() 34 35 itr := fs.SeriesFile().SeriesIDIterator() 36 if itr == nil { 37 t.Fatal("expected iterator") 38 } 39 if result := MustReadAllSeriesIDIteratorString(fs.SeriesFile(), itr); !reflect.DeepEqual(result, []string{ 40 "cpu,[{region east}]", 41 "cpu,[{region west}]", 42 "mem,[{region east}]", 43 }) { 44 t.Fatalf("unexpected keys: %s", result) 45 } 46 }) 47 48 // Add more series. 49 if err := idx.CreateSeriesSliceIfNotExists([]Series{ 50 {Name: []byte("disk")}, 51 {Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "north"})}, 52 {Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "east"})}, 53 }); err != nil { 54 t.Fatal(err) 55 } 56 57 // Verify additional series. 58 idx.Run(t, func(t *testing.T) { 59 fs, err := idx.PartitionAt(0).RetainFileSet() 60 if err != nil { 61 t.Fatal(err) 62 } 63 defer fs.Release() 64 65 itr := fs.SeriesFile().SeriesIDIterator() 66 if itr == nil { 67 t.Fatal("expected iterator") 68 } 69 70 if result := MustReadAllSeriesIDIteratorString(fs.SeriesFile(), itr); !reflect.DeepEqual(result, []string{ 71 "cpu,[{region east}]", 72 "cpu,[{region north}]", 73 "cpu,[{region west}]", 74 "disk,[]", 75 "mem,[{region east}]", 76 }) { 77 t.Fatalf("unexpected keys: %s", result) 78 } 79 }) 80} 81 82// Ensure fileset can return an iterator over all series for one measurement. 83func TestFileSet_MeasurementSeriesIDIterator(t *testing.T) { 84 idx := MustOpenIndex(1) 85 defer idx.Close() 86 87 // Create initial set of series. 88 if err := idx.CreateSeriesSliceIfNotExists([]Series{ 89 {Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "east"})}, 90 {Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "west"})}, 91 {Name: []byte("mem"), Tags: models.NewTags(map[string]string{"region": "east"})}, 92 }); err != nil { 93 t.Fatal(err) 94 } 95 96 // Verify initial set of series. 97 idx.Run(t, func(t *testing.T) { 98 fs, err := idx.PartitionAt(0).RetainFileSet() 99 if err != nil { 100 t.Fatal(err) 101 } 102 defer fs.Release() 103 104 itr := fs.MeasurementSeriesIDIterator([]byte("cpu")) 105 if itr == nil { 106 t.Fatal("expected iterator") 107 } 108 109 if result := MustReadAllSeriesIDIteratorString(fs.SeriesFile(), itr); !reflect.DeepEqual(result, []string{ 110 "cpu,[{region east}]", 111 "cpu,[{region west}]", 112 }) { 113 t.Fatalf("unexpected keys: %s", result) 114 } 115 }) 116 117 // Add more series. 118 if err := idx.CreateSeriesSliceIfNotExists([]Series{ 119 {Name: []byte("disk")}, 120 {Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "north"})}, 121 }); err != nil { 122 t.Fatal(err) 123 } 124 125 // Verify additional series. 126 idx.Run(t, func(t *testing.T) { 127 fs, err := idx.PartitionAt(0).RetainFileSet() 128 if err != nil { 129 t.Fatal(err) 130 } 131 defer fs.Release() 132 133 itr := fs.MeasurementSeriesIDIterator([]byte("cpu")) 134 if itr == nil { 135 t.Fatalf("expected iterator") 136 } 137 138 if result := MustReadAllSeriesIDIteratorString(fs.SeriesFile(), itr); !reflect.DeepEqual(result, []string{ 139 "cpu,[{region east}]", 140 "cpu,[{region north}]", 141 "cpu,[{region west}]", 142 }) { 143 t.Fatalf("unexpected keys: %s", result) 144 } 145 }) 146} 147 148// Ensure fileset can return an iterator over all measurements for the index. 149func TestFileSet_MeasurementIterator(t *testing.T) { 150 idx := MustOpenIndex(1) 151 defer idx.Close() 152 153 // Create initial set of series. 154 if err := idx.CreateSeriesSliceIfNotExists([]Series{ 155 {Name: []byte("cpu")}, 156 {Name: []byte("mem")}, 157 }); err != nil { 158 t.Fatal(err) 159 } 160 161 // Verify initial set of series. 162 idx.Run(t, func(t *testing.T) { 163 fs, err := idx.PartitionAt(0).RetainFileSet() 164 if err != nil { 165 t.Fatal(err) 166 } 167 defer fs.Release() 168 169 itr := fs.MeasurementIterator() 170 if itr == nil { 171 t.Fatal("expected iterator") 172 } 173 174 expectedNames := []string{"cpu", "mem", ""} // Empty string implies end 175 for _, name := range expectedNames { 176 e := itr.Next() 177 if name == "" && e != nil { 178 t.Errorf("got measurement %s, expected nil measurement", e.Name()) 179 } else if e == nil && name != "" { 180 t.Errorf("got nil measurement, expected %s", name) 181 } else if e != nil && string(e.Name()) != name { 182 t.Errorf("got measurement %s, expected %s", e.Name(), name) 183 } 184 } 185 }) 186 187 // Add more series. 188 if err := idx.CreateSeriesSliceIfNotExists([]Series{ 189 {Name: []byte("disk"), Tags: models.NewTags(map[string]string{"foo": "bar"})}, 190 {Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "north", "x": "y"})}, 191 }); err != nil { 192 t.Fatal(err) 193 } 194 195 // Verify additional series. 196 idx.Run(t, func(t *testing.T) { 197 fs, err := idx.PartitionAt(0).RetainFileSet() 198 if err != nil { 199 t.Fatal(err) 200 } 201 defer fs.Release() 202 203 itr := fs.MeasurementIterator() 204 if itr == nil { 205 t.Fatal("expected iterator") 206 } 207 208 expectedNames := []string{"cpu", "disk", "mem", ""} // Empty string implies end 209 for _, name := range expectedNames { 210 e := itr.Next() 211 if name == "" && e != nil { 212 t.Errorf("got measurement %s, expected nil measurement", e.Name()) 213 } else if e == nil && name != "" { 214 t.Errorf("got nil measurement, expected %s", name) 215 } else if e != nil && string(e.Name()) != name { 216 t.Errorf("got measurement %s, expected %s", e.Name(), name) 217 } 218 } 219 }) 220} 221 222// Ensure fileset can return an iterator over all keys for one measurement. 223func TestFileSet_TagKeyIterator(t *testing.T) { 224 idx := MustOpenIndex(1) 225 defer idx.Close() 226 227 // Create initial set of series. 228 if err := idx.CreateSeriesSliceIfNotExists([]Series{ 229 {Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "east"})}, 230 {Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "west", "type": "gpu"})}, 231 {Name: []byte("mem"), Tags: models.NewTags(map[string]string{"region": "east", "misc": "other"})}, 232 }); err != nil { 233 t.Fatal(err) 234 } 235 236 // Verify initial set of series. 237 idx.Run(t, func(t *testing.T) { 238 fs, err := idx.PartitionAt(0).RetainFileSet() 239 if err != nil { 240 t.Fatal(err) 241 } 242 defer fs.Release() 243 244 itr := fs.TagKeyIterator([]byte("cpu")) 245 if itr == nil { 246 t.Fatalf("expected iterator") 247 } 248 249 if e := itr.Next(); string(e.Key()) != `region` { 250 t.Fatalf("unexpected key: %s", e.Key()) 251 } else if e := itr.Next(); string(e.Key()) != `type` { 252 t.Fatalf("unexpected key: %s", e.Key()) 253 } else if e := itr.Next(); e != nil { 254 t.Fatalf("expected nil key: %s", e.Key()) 255 } 256 }) 257 258 // Add more series. 259 if err := idx.CreateSeriesSliceIfNotExists([]Series{ 260 {Name: []byte("disk"), Tags: models.NewTags(map[string]string{"foo": "bar"})}, 261 {Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "north", "x": "y"})}, 262 }); err != nil { 263 t.Fatal(err) 264 } 265 266 // Verify additional series. 267 idx.Run(t, func(t *testing.T) { 268 fs, err := idx.PartitionAt(0).RetainFileSet() 269 if err != nil { 270 t.Fatal(err) 271 } 272 defer fs.Release() 273 274 itr := fs.TagKeyIterator([]byte("cpu")) 275 if itr == nil { 276 t.Fatal("expected iterator") 277 } 278 279 if e := itr.Next(); string(e.Key()) != `region` { 280 t.Fatalf("unexpected key: %s", e.Key()) 281 } else if e := itr.Next(); string(e.Key()) != `type` { 282 t.Fatalf("unexpected key: %s", e.Key()) 283 } else if e := itr.Next(); string(e.Key()) != `x` { 284 t.Fatalf("unexpected key: %s", e.Key()) 285 } else if e := itr.Next(); e != nil { 286 t.Fatalf("expected nil key: %s", e.Key()) 287 } 288 }) 289} 290 291func MustReadAllSeriesIDIteratorString(sfile *tsdb.SeriesFile, itr tsdb.SeriesIDIterator) []string { 292 // Read all ids. 293 ids, err := tsdb.ReadAllSeriesIDIterator(itr) 294 if err != nil { 295 panic(err) 296 } 297 298 // Convert to keys and sort. 299 keys := sfile.SeriesKeys(ids) 300 sort.Slice(keys, func(i, j int) bool { return tsdb.CompareSeriesKeys(keys[i], keys[j]) == -1 }) 301 302 // Convert to strings. 303 a := make([]string, len(keys)) 304 for i := range a { 305 name, tags := tsdb.ParseSeriesKey(keys[i]) 306 a[i] = fmt.Sprintf("%s,%s", name, tags.String()) 307 } 308 return a 309} 310