1package reads_test 2 3import ( 4 "context" 5 "strings" 6 "testing" 7 8 "github.com/google/go-cmp/cmp" 9 "github.com/influxdata/influxdb/models" 10 "github.com/influxdata/influxdb/pkg/data/gen" 11 "github.com/influxdata/influxdb/storage/reads" 12 "github.com/influxdata/influxdb/storage/reads/datatypes" 13) 14 15func TestGroupGroupResultSetSorting(t *testing.T) { 16 tests := []struct { 17 name string 18 cur reads.SeriesCursor 19 group datatypes.ReadRequest_Group 20 keys []string 21 exp string 22 }{ 23 { 24 name: "group by tag1 in all series", 25 cur: &sliceSeriesCursor{ 26 rows: newSeriesRows( 27 "cpu,tag0=val00,tag1=val10", 28 "cpu,tag0=val00,tag1=val11", 29 "cpu,tag0=val00,tag1=val12", 30 "cpu,tag0=val01,tag1=val10", 31 "cpu,tag0=val01,tag1=val11", 32 "cpu,tag0=val01,tag1=val12", 33 )}, 34 group: datatypes.GroupBy, 35 keys: []string{"tag1"}, 36 exp: `group: 37 tag key : _m,tag0,tag1 38 partition key: val10 39 series: _m=cpu,tag0=val00,tag1=val10 40 series: _m=cpu,tag0=val01,tag1=val10 41group: 42 tag key : _m,tag0,tag1 43 partition key: val11 44 series: _m=cpu,tag0=val00,tag1=val11 45 series: _m=cpu,tag0=val01,tag1=val11 46group: 47 tag key : _m,tag0,tag1 48 partition key: val12 49 series: _m=cpu,tag0=val00,tag1=val12 50 series: _m=cpu,tag0=val01,tag1=val12 51`, 52 }, 53 { 54 name: "group by tag1 in partial series", 55 cur: &sliceSeriesCursor{ 56 rows: newSeriesRows( 57 "aaa,tag0=val00", 58 "aaa,tag0=val01", 59 "cpu,tag0=val00,tag1=val10", 60 "cpu,tag0=val00,tag1=val11", 61 "cpu,tag0=val00,tag1=val12", 62 "cpu,tag0=val01,tag1=val10", 63 "cpu,tag0=val01,tag1=val11", 64 "cpu,tag0=val01,tag1=val12", 65 )}, 66 group: datatypes.GroupBy, 67 keys: []string{"tag1"}, 68 exp: `group: 69 tag key : _m,tag0,tag1 70 partition key: val10 71 series: _m=cpu,tag0=val00,tag1=val10 72 series: _m=cpu,tag0=val01,tag1=val10 73group: 74 tag key : _m,tag0,tag1 75 partition key: val11 76 series: _m=cpu,tag0=val01,tag1=val11 77 series: _m=cpu,tag0=val00,tag1=val11 78group: 79 tag key : _m,tag0,tag1 80 partition key: val12 81 series: _m=cpu,tag0=val01,tag1=val12 82 series: _m=cpu,tag0=val00,tag1=val12 83group: 84 tag key : _m,tag0 85 partition key: <nil> 86 series: _m=aaa,tag0=val00 87 series: _m=aaa,tag0=val01 88`, 89 }, 90 { 91 name: "group by tag2,tag1 with partial series", 92 cur: &sliceSeriesCursor{ 93 rows: newSeriesRows( 94 "aaa,tag0=val00", 95 "aaa,tag0=val01", 96 "cpu,tag0=val00,tag1=val10", 97 "cpu,tag0=val00,tag1=val11", 98 "cpu,tag0=val00,tag1=val12", 99 "mem,tag1=val10,tag2=val20", 100 "mem,tag1=val11,tag2=val20", 101 "mem,tag1=val11,tag2=val21", 102 )}, 103 group: datatypes.GroupBy, 104 keys: []string{"tag2", "tag1"}, 105 exp: `group: 106 tag key : _m,tag1,tag2 107 partition key: val20,val10 108 series: _m=mem,tag1=val10,tag2=val20 109group: 110 tag key : _m,tag1,tag2 111 partition key: val20,val11 112 series: _m=mem,tag1=val11,tag2=val20 113group: 114 tag key : _m,tag1,tag2 115 partition key: val21,val11 116 series: _m=mem,tag1=val11,tag2=val21 117group: 118 tag key : _m,tag0,tag1 119 partition key: <nil>,val10 120 series: _m=cpu,tag0=val00,tag1=val10 121group: 122 tag key : _m,tag0,tag1 123 partition key: <nil>,val11 124 series: _m=cpu,tag0=val00,tag1=val11 125group: 126 tag key : _m,tag0,tag1 127 partition key: <nil>,val12 128 series: _m=cpu,tag0=val00,tag1=val12 129group: 130 tag key : _m,tag0 131 partition key: <nil>,<nil> 132 series: _m=aaa,tag0=val00 133 series: _m=aaa,tag0=val01 134`, 135 }, 136 { 137 name: "group by tag0,tag2 with partial series", 138 cur: &sliceSeriesCursor{ 139 rows: newSeriesRows( 140 "aaa,tag0=val00", 141 "aaa,tag0=val01", 142 "cpu,tag0=val00,tag1=val10", 143 "cpu,tag0=val00,tag1=val11", 144 "cpu,tag0=val00,tag1=val12", 145 "mem,tag1=val10,tag2=val20", 146 "mem,tag1=val11,tag2=val20", 147 "mem,tag1=val11,tag2=val21", 148 )}, 149 group: datatypes.GroupBy, 150 keys: []string{"tag0", "tag2"}, 151 exp: `group: 152 tag key : _m,tag0,tag1 153 partition key: val00,<nil> 154 series: _m=aaa,tag0=val00 155 series: _m=cpu,tag0=val00,tag1=val10 156 series: _m=cpu,tag0=val00,tag1=val11 157 series: _m=cpu,tag0=val00,tag1=val12 158group: 159 tag key : _m,tag0 160 partition key: val01,<nil> 161 series: _m=aaa,tag0=val01 162group: 163 tag key : _m,tag1,tag2 164 partition key: <nil>,val20 165 series: _m=mem,tag1=val10,tag2=val20 166 series: _m=mem,tag1=val11,tag2=val20 167group: 168 tag key : _m,tag1,tag2 169 partition key: <nil>,val21 170 series: _m=mem,tag1=val11,tag2=val21 171`, 172 }, 173 } 174 175 for _, tt := range tests { 176 t.Run(tt.name, func(t *testing.T) { 177 178 newCursor := func() (reads.SeriesCursor, error) { 179 return tt.cur, nil 180 } 181 182 var hints datatypes.HintFlags 183 hints.SetHintSchemaAllTime() 184 rs := reads.NewGroupResultSet(context.Background(), &datatypes.ReadRequest{Group: tt.group, GroupKeys: tt.keys, Hints: hints}, newCursor) 185 186 sb := new(strings.Builder) 187 GroupResultSetToString(sb, rs, SkipNilCursor()) 188 189 if got := sb.String(); !cmp.Equal(got, tt.exp) { 190 t.Errorf("unexpected value; -got/+exp\n%s", cmp.Diff(strings.Split(got, "\n"), strings.Split(tt.exp, "\n"))) 191 } 192 }) 193 } 194} 195 196func TestNewGroupResultSet_GroupNone_NoDataReturnsNil(t *testing.T) { 197 newCursor := func() (reads.SeriesCursor, error) { 198 return &sliceSeriesCursor{ 199 rows: newSeriesRows( 200 "aaa,tag0=val00", 201 "aaa,tag0=val01", 202 )}, nil 203 } 204 205 rs := reads.NewGroupResultSet(context.Background(), &datatypes.ReadRequest{Group: datatypes.GroupNone}, newCursor) 206 if rs != nil { 207 t.Errorf("expected nil cursor") 208 } 209} 210 211func TestNewGroupResultSet_GroupBy_NoDataReturnsNil(t *testing.T) { 212 newCursor := func() (reads.SeriesCursor, error) { 213 return &sliceSeriesCursor{ 214 rows: newSeriesRows( 215 "aaa,tag0=val00", 216 "aaa,tag0=val01", 217 )}, nil 218 } 219 220 rs := reads.NewGroupResultSet(context.Background(), &datatypes.ReadRequest{Group: datatypes.GroupBy, GroupKeys: []string{"tag0"}}, newCursor) 221 if rs != nil { 222 t.Errorf("expected nil cursor") 223 } 224} 225 226func TestNewGroupResultSet_Sorting(t *testing.T) { 227 tests := []struct { 228 name string 229 keys []string 230 opts []reads.GroupOption 231 exp string 232 }{ 233 { 234 name: "nil hi", 235 keys: []string{"tag0", "tag2"}, 236 exp: `group: 237 tag key : _m,tag0,tag1 238 partition key: val00,<nil> 239 series: _m=aaa,tag0=val00 240 series: _m=cpu,tag0=val00,tag1=val10 241 series: _m=cpu,tag0=val00,tag1=val11 242 series: _m=cpu,tag0=val00,tag1=val12 243group: 244 tag key : _m,tag0 245 partition key: val01,<nil> 246 series: _m=aaa,tag0=val01 247group: 248 tag key : _m,tag1,tag2 249 partition key: <nil>,val20 250 series: _m=mem,tag1=val10,tag2=val20 251 series: _m=mem,tag1=val11,tag2=val20 252group: 253 tag key : _m,tag1,tag2 254 partition key: <nil>,val21 255 series: _m=mem,tag1=val11,tag2=val21 256`, 257 }, 258 { 259 name: "nil lo", 260 keys: []string{"tag0", "tag2"}, 261 opts: []reads.GroupOption{reads.GroupOptionNilSortLo()}, 262 exp: `group: 263 tag key : _m,tag1,tag2 264 partition key: <nil>,val20 265 series: _m=mem,tag1=val11,tag2=val20 266 series: _m=mem,tag1=val10,tag2=val20 267group: 268 tag key : _m,tag1,tag2 269 partition key: <nil>,val21 270 series: _m=mem,tag1=val11,tag2=val21 271group: 272 tag key : _m,tag0,tag1 273 partition key: val00,<nil> 274 series: _m=cpu,tag0=val00,tag1=val10 275 series: _m=cpu,tag0=val00,tag1=val11 276 series: _m=cpu,tag0=val00,tag1=val12 277 series: _m=aaa,tag0=val00 278group: 279 tag key : _m,tag0 280 partition key: val01,<nil> 281 series: _m=aaa,tag0=val01 282`, 283 }, 284 } 285 for _, tt := range tests { 286 t.Run(tt.name, func(t *testing.T) { 287 newCursor := func() (reads.SeriesCursor, error) { 288 return &sliceSeriesCursor{ 289 rows: newSeriesRows( 290 "aaa,tag0=val00", 291 "aaa,tag0=val01", 292 "cpu,tag0=val00,tag1=val10", 293 "cpu,tag0=val00,tag1=val11", 294 "cpu,tag0=val00,tag1=val12", 295 "mem,tag1=val10,tag2=val20", 296 "mem,tag1=val11,tag2=val20", 297 "mem,tag1=val11,tag2=val21", 298 )}, nil 299 } 300 301 var hints datatypes.HintFlags 302 hints.SetHintSchemaAllTime() 303 rs := reads.NewGroupResultSet(context.Background(), &datatypes.ReadRequest{Group: datatypes.GroupBy, GroupKeys: tt.keys, Hints: hints}, newCursor, tt.opts...) 304 305 sb := new(strings.Builder) 306 GroupResultSetToString(sb, rs, SkipNilCursor()) 307 308 if got := sb.String(); !cmp.Equal(got, tt.exp) { 309 t.Errorf("unexpected value; -got/+exp\n%s", cmp.Diff(strings.Split(got, "\n"), strings.Split(tt.exp, "\n"))) 310 } 311 }) 312 } 313} 314 315type sliceSeriesCursor struct { 316 rows []reads.SeriesRow 317 i int 318} 319 320func newSeriesRows(keys ...string) []reads.SeriesRow { 321 rows := make([]reads.SeriesRow, len(keys)) 322 for i := range keys { 323 rows[i].Name, rows[i].SeriesTags = models.ParseKeyBytes([]byte(keys[i])) 324 rows[i].Tags = rows[i].SeriesTags.Clone() 325 rows[i].Tags.Set([]byte("_m"), rows[i].Name) 326 } 327 return rows 328} 329 330func (s *sliceSeriesCursor) Close() {} 331func (s *sliceSeriesCursor) Err() error { return nil } 332 333func (s *sliceSeriesCursor) Next() *reads.SeriesRow { 334 if s.i < len(s.rows) { 335 s.i++ 336 return &s.rows[s.i-1] 337 } 338 return nil 339} 340 341func BenchmarkNewGroupResultSet_GroupBy(b *testing.B) { 342 card := []int{10, 10, 10} 343 vals := make([]gen.CountableSequence, len(card)) 344 for i := range card { 345 vals[i] = gen.NewCounterByteSequenceCount(card[i]) 346 } 347 348 tags := gen.NewTagsValuesSequenceValues("tag", vals) 349 rows := make([]reads.SeriesRow, tags.Count()) 350 for i := range rows { 351 tags.Next() 352 t := tags.Value().Clone() 353 rows[i].SeriesTags = t 354 rows[i].Tags = t 355 rows[i].Name = []byte("m0") 356 } 357 358 cur := &sliceSeriesCursor{rows: rows} 359 newCursor := func() (reads.SeriesCursor, error) { 360 cur.i = 0 361 return cur, nil 362 } 363 364 b.ResetTimer() 365 b.ReportAllocs() 366 for i := 0; i < b.N; i++ { 367 var hints datatypes.HintFlags 368 hints.SetHintSchemaAllTime() 369 rs := reads.NewGroupResultSet(context.Background(), &datatypes.ReadRequest{Group: datatypes.GroupBy, GroupKeys: []string{"tag2"}, Hints: hints}, newCursor) 370 rs.Close() 371 } 372} 373