1package gen 2 3import ( 4 "bufio" 5 "fmt" 6 "math/rand" 7 "os" 8 "path" 9 "path/filepath" 10 "sort" 11 "unicode/utf8" 12 13 "github.com/BurntSushi/toml" 14 "github.com/influxdata/influxdb/models" 15 "github.com/pkg/errors" 16) 17 18type Spec struct { 19 SeriesLimit *int64 20 Measurements []MeasurementSpec 21} 22 23func NewSeriesGeneratorFromSpec(s *Spec, tr TimeRange) SeriesGenerator { 24 sg := make([]SeriesGenerator, len(s.Measurements)) 25 for i := range s.Measurements { 26 sg[i] = newSeriesGeneratorFromMeasurementSpec(&s.Measurements[i], tr) 27 } 28 if s.SeriesLimit == nil { 29 return NewMergedSeriesGenerator(sg) 30 } 31 return NewMergedSeriesGeneratorLimit(sg, *s.SeriesLimit) 32} 33 34type MeasurementSpec struct { 35 Name string 36 SeriesLimit *SeriesLimit 37 TagsSpec *TagsSpec 38 FieldValuesSpec *FieldValuesSpec 39} 40 41func newSeriesGeneratorFromMeasurementSpec(ms *MeasurementSpec, tr TimeRange) SeriesGenerator { 42 if ms.SeriesLimit == nil { 43 return NewSeriesGenerator( 44 []byte(ms.Name), 45 []byte(ms.FieldValuesSpec.Name), 46 newTimeValuesSequenceFromFieldValuesSpec(ms.FieldValuesSpec, tr), 47 newTagsSequenceFromTagsSpec(ms.TagsSpec)) 48 } 49 return NewSeriesGeneratorLimit( 50 []byte(ms.Name), 51 []byte(ms.FieldValuesSpec.Name), 52 newTimeValuesSequenceFromFieldValuesSpec(ms.FieldValuesSpec, tr), 53 newTagsSequenceFromTagsSpec(ms.TagsSpec), 54 int64(*ms.SeriesLimit)) 55} 56 57// NewTimeValuesSequenceFn returns a TimeValuesSequence that will generate a 58// sequence of values based on the spec. 59type NewTimeValuesSequenceFn func(spec TimeSequenceSpec) TimeValuesSequence 60 61type NewTagsValuesSequenceFn func() TagsSequence 62 63type NewCountableSequenceFn func() CountableSequence 64 65type TagsSpec struct { 66 Tags []*TagValuesSpec 67 Sample *sample 68} 69 70func newTagsSequenceFromTagsSpec(ts *TagsSpec) TagsSequence { 71 var keys []string 72 var vals []CountableSequence 73 for _, spec := range ts.Tags { 74 keys = append(keys, spec.TagKey) 75 vals = append(vals, spec.Values()) 76 } 77 78 var opts []tagsValuesOption 79 if ts.Sample != nil && *ts.Sample != 1.0 { 80 opts = append(opts, TagValuesSampleOption(float64(*ts.Sample))) 81 } 82 83 return NewTagsValuesSequenceKeysValues(keys, vals, opts...) 84} 85 86type TagValuesSpec struct { 87 TagKey string 88 Values NewCountableSequenceFn 89} 90 91type FieldValuesSpec struct { 92 TimeSequenceSpec 93 Name string 94 DataType models.FieldType 95 Values NewTimeValuesSequenceFn 96} 97 98func newTimeValuesSequenceFromFieldValuesSpec(fs *FieldValuesSpec, tr TimeRange) TimeValuesSequence { 99 return fs.Values(fs.TimeSequenceSpec.ForTimeRange(tr)) 100} 101 102func NewSpecFromToml(s string) (*Spec, error) { 103 var out Schema 104 if _, err := toml.Decode(s, &out); err != nil { 105 return nil, err 106 } 107 return NewSpecFromSchema(&out) 108} 109 110func NewSpecFromPath(p string) (*Spec, error) { 111 var err error 112 p, err = filepath.Abs(p) 113 if err != nil { 114 return nil, err 115 } 116 117 var out Schema 118 if _, err := toml.DecodeFile(p, &out); err != nil { 119 return nil, err 120 } 121 return newSpecFromSchema(&out, schemaDir(path.Dir(p))) 122} 123 124func NewSchemaFromPath(path string) (*Schema, error) { 125 var out Schema 126 if _, err := toml.DecodeFile(path, &out); err != nil { 127 return nil, err 128 } 129 return &out, nil 130} 131 132type schemaToSpecState int 133 134const ( 135 stateOk schemaToSpecState = iota 136 stateErr 137) 138 139type schemaToSpec struct { 140 schemaDir string 141 stack []interface{} 142 state schemaToSpecState 143 spec *Spec 144 err error 145} 146 147func (s *schemaToSpec) push(v interface{}) { 148 s.stack = append(s.stack, v) 149} 150 151func (s *schemaToSpec) pop() interface{} { 152 tail := len(s.stack) - 1 153 v := s.stack[tail] 154 s.stack[tail] = nil 155 s.stack = s.stack[:tail] 156 return v 157} 158 159func (s *schemaToSpec) peek() interface{} { 160 if len(s.stack) == 0 { 161 return nil 162 } 163 return s.stack[len(s.stack)-1] 164} 165 166func (s *schemaToSpec) Visit(node SchemaNode) (w Visitor) { 167 switch s.state { 168 case stateOk: 169 if s.visit(node) { 170 return s 171 } 172 s.state = stateErr 173 174 case stateErr: 175 s.visitErr(node) 176 } 177 178 return nil 179} 180 181func (s *schemaToSpec) visit(node SchemaNode) bool { 182 switch n := node.(type) { 183 case *Schema: 184 s.spec.Measurements = s.pop().([]MeasurementSpec) 185 if n.SeriesLimit != nil { 186 sl := int64(*n.SeriesLimit) 187 s.spec.SeriesLimit = &sl 188 } 189 190 case Measurements: 191 // flatten measurements 192 var mss []MeasurementSpec 193 for { 194 if specs, ok := s.peek().([]MeasurementSpec); ok { 195 s.pop() 196 mss = append(mss, specs...) 197 continue 198 } 199 break 200 } 201 sort.Slice(mss, func(i, j int) bool { 202 return mss[i].Name < mss[j].Name 203 }) 204 205 // validate field types are homogeneous for a single measurement 206 mg := make(map[string]models.FieldType) 207 for i := range mss { 208 spec := &mss[i] 209 key := spec.Name + "." + spec.FieldValuesSpec.Name 210 ft := spec.FieldValuesSpec.DataType 211 if dt, ok := mg[key]; !ok { 212 mg[key] = ft 213 } else if dt != ft { 214 s.err = fmt.Errorf("field %q data-type conflict, found %s and %s", 215 key, 216 dt, 217 ft) 218 return false 219 } 220 } 221 222 s.push(mss) 223 224 case *Measurement: 225 if len(n.Name) == 0 { 226 s.err = errors.New("missing measurement name") 227 return false 228 } 229 230 fields := s.pop().([]*FieldValuesSpec) 231 tagsSpec := s.pop().(*TagsSpec) 232 233 tagsSpec.Sample = n.Sample 234 235 // default: sample 50% 236 if n.Sample == nil { 237 s := sample(0.5) 238 tagsSpec.Sample = &s 239 } 240 241 if *tagsSpec.Sample <= 0.0 || *tagsSpec.Sample > 1.0 { 242 s.err = errors.New("invalid sample, must be 0 < sample ≤ 1.0") 243 return false 244 } 245 246 var ms []MeasurementSpec 247 for _, spec := range fields { 248 ms = append(ms, MeasurementSpec{ 249 Name: n.Name, 250 SeriesLimit: n.SeriesLimit, 251 TagsSpec: tagsSpec, 252 FieldValuesSpec: spec, 253 }) 254 } 255 256 // NOTE: sort each measurement name + field name to ensure series are produced 257 // in correct order 258 sort.Slice(ms, func(i, j int) bool { 259 return ms[i].FieldValuesSpec.Name < ms[j].FieldValuesSpec.Name 260 }) 261 s.push(ms) 262 263 case Tags: 264 var ts TagsSpec 265 for { 266 if spec, ok := s.peek().(*TagValuesSpec); ok { 267 s.pop() 268 ts.Tags = append(ts.Tags, spec) 269 continue 270 } 271 break 272 } 273 // Tag keys must be sorted to produce a valid series key sequence 274 sort.Slice(ts.Tags, func(i, j int) bool { 275 return ts.Tags[i].TagKey < ts.Tags[j].TagKey 276 }) 277 278 for i := 1; i < len(ts.Tags); i++ { 279 if ts.Tags[i-1].TagKey == ts.Tags[i].TagKey { 280 s.err = fmt.Errorf("duplicate tag keys %q", ts.Tags[i].TagKey) 281 return false 282 } 283 } 284 285 s.push(&ts) 286 287 case Fields: 288 // combine fields 289 var fs []*FieldValuesSpec 290 for { 291 if spec, ok := s.peek().(*FieldValuesSpec); ok { 292 s.pop() 293 fs = append(fs, spec) 294 continue 295 } 296 break 297 } 298 299 sort.Slice(fs, func(i, j int) bool { 300 return fs[i].Name < fs[j].Name 301 }) 302 303 for i := 1; i < len(fs); i++ { 304 if fs[i-1].Name == fs[i].Name { 305 s.err = fmt.Errorf("duplicate field names %q", fs[i].Name) 306 return false 307 } 308 } 309 310 s.push(fs) 311 312 case *Field: 313 fs, ok := s.peek().(*FieldValuesSpec) 314 if !ok { 315 panic(fmt.Sprintf("unexpected type %T", fs)) 316 } 317 318 fs.TimeSequenceSpec = n.TimeSequenceSpec() 319 fs.Name = n.Name 320 321 case *FieldConstantValue: 322 var fs FieldValuesSpec 323 switch v := n.Value.(type) { 324 case float64: 325 fs.DataType = models.Float 326 fs.Values = func(spec TimeSequenceSpec) TimeValuesSequence { 327 return NewTimeFloatValuesSequence( 328 spec.Count, 329 NewTimestampSequenceFromSpec(spec), 330 NewFloatConstantValuesSequence(v), 331 ) 332 } 333 case int64: 334 fs.DataType = models.Integer 335 fs.Values = func(spec TimeSequenceSpec) TimeValuesSequence { 336 return NewTimeIntegerValuesSequence( 337 spec.Count, 338 NewTimestampSequenceFromSpec(spec), 339 NewIntegerConstantValuesSequence(v), 340 ) 341 } 342 case string: 343 fs.DataType = models.String 344 fs.Values = func(spec TimeSequenceSpec) TimeValuesSequence { 345 return NewTimeStringValuesSequence( 346 spec.Count, 347 NewTimestampSequenceFromSpec(spec), 348 NewStringConstantValuesSequence(v), 349 ) 350 } 351 case bool: 352 fs.DataType = models.Boolean 353 fs.Values = func(spec TimeSequenceSpec) TimeValuesSequence { 354 return NewTimeBooleanValuesSequence( 355 spec.Count, 356 NewTimestampSequenceFromSpec(spec), 357 NewBooleanConstantValuesSequence(v), 358 ) 359 } 360 default: 361 panic(fmt.Sprintf("unexpected type %T", v)) 362 } 363 364 s.push(&fs) 365 366 case *FieldArraySource: 367 var fs FieldValuesSpec 368 switch v := n.Value.(type) { 369 case []float64: 370 fs.DataType = models.Float 371 fs.Values = func(spec TimeSequenceSpec) TimeValuesSequence { 372 return NewTimeFloatValuesSequence( 373 spec.Count, 374 NewTimestampSequenceFromSpec(spec), 375 NewFloatArrayValuesSequence(v), 376 ) 377 } 378 case []int64: 379 fs.DataType = models.Integer 380 fs.Values = func(spec TimeSequenceSpec) TimeValuesSequence { 381 return NewTimeIntegerValuesSequence( 382 spec.Count, 383 NewTimestampSequenceFromSpec(spec), 384 NewIntegerArrayValuesSequence(v), 385 ) 386 } 387 case []string: 388 fs.DataType = models.String 389 fs.Values = func(spec TimeSequenceSpec) TimeValuesSequence { 390 return NewTimeStringValuesSequence( 391 spec.Count, 392 NewTimestampSequenceFromSpec(spec), 393 NewStringArrayValuesSequence(v), 394 ) 395 } 396 case []bool: 397 fs.DataType = models.Boolean 398 fs.Values = func(spec TimeSequenceSpec) TimeValuesSequence { 399 return NewTimeBooleanValuesSequence( 400 spec.Count, 401 NewTimestampSequenceFromSpec(spec), 402 NewBooleanArrayValuesSequence(v), 403 ) 404 } 405 default: 406 panic(fmt.Sprintf("unexpected type %T", v)) 407 } 408 409 s.push(&fs) 410 411 case *FieldFloatRandomSource: 412 var fs FieldValuesSpec 413 fs.DataType = models.Float 414 fs.Values = NewTimeValuesSequenceFn(func(spec TimeSequenceSpec) TimeValuesSequence { 415 return NewTimeFloatValuesSequence( 416 spec.Count, 417 NewTimestampSequenceFromSpec(spec), 418 NewFloatRandomValuesSequence(n.Min, n.Max, rand.New(rand.NewSource(n.Seed))), 419 ) 420 }) 421 s.push(&fs) 422 423 case *FieldIntegerZipfSource: 424 var fs FieldValuesSpec 425 fs.DataType = models.Integer 426 fs.Values = NewTimeValuesSequenceFn(func(spec TimeSequenceSpec) TimeValuesSequence { 427 return NewTimeIntegerValuesSequence( 428 spec.Count, 429 NewTimestampSequenceFromSpec(spec), 430 NewIntegerZipfValuesSequence(n), 431 ) 432 }) 433 s.push(&fs) 434 435 case *Tag: 436 s.push(&TagValuesSpec{ 437 TagKey: n.Name, 438 Values: s.pop().(NewCountableSequenceFn), 439 }) 440 441 case *TagSequenceSource: 442 s.push(NewCountableSequenceFn(func() CountableSequence { 443 return NewCounterByteSequence(n.Format, int(n.Start), int(n.Start+n.Count)) 444 })) 445 446 case *TagFileSource: 447 p, err := s.resolvePath(n.Path) 448 if err != nil { 449 s.err = err 450 return false 451 } 452 453 lines, err := s.readLines(p) 454 if err != nil { 455 s.err = err 456 return false 457 } 458 459 s.push(NewCountableSequenceFn(func() CountableSequence { 460 return NewStringArraySequence(lines) 461 })) 462 463 case *TagArraySource: 464 s.push(NewCountableSequenceFn(func() CountableSequence { 465 return NewStringArraySequence(n.Values) 466 })) 467 468 case nil: 469 470 default: 471 panic(fmt.Sprintf("unexpected type %T", node)) 472 } 473 474 return true 475} 476 477func (s *schemaToSpec) visitErr(node SchemaNode) { 478 switch n := node.(type) { 479 case *Schema: 480 s.err = fmt.Errorf("error processing schema: %v", s.err) 481 case *Measurement: 482 s.err = fmt.Errorf("measurement %q: %v", n.Name, s.err) 483 case *Tag: 484 s.err = fmt.Errorf("tag %q: %v", n.Name, s.err) 485 case *Field: 486 s.err = fmt.Errorf("field %q: %v", n.Name, s.err) 487 } 488} 489 490func (s *schemaToSpec) resolvePath(p string) (string, error) { 491 fullPath := os.ExpandEnv(p) 492 if !filepath.IsAbs(fullPath) { 493 fullPath = filepath.Join(s.schemaDir, fullPath) 494 } 495 496 fi, err := os.Stat(fullPath) 497 if err != nil { 498 return "", fmt.Errorf("error resolving path %q: %v", p, err) 499 } 500 501 if fi.IsDir() { 502 return "", fmt.Errorf("path %q is not a file: resolved to %s", p, fullPath) 503 } 504 505 return fullPath, nil 506} 507 508func (s *schemaToSpec) readLines(p string) ([]string, error) { 509 fp, err := s.resolvePath(p) 510 if err != nil { 511 return nil, err 512 } 513 514 f, err := os.Open(fp) 515 if err != nil { 516 return nil, fmt.Errorf("path error: %v", err) 517 } 518 defer f.Close() 519 scan := bufio.NewScanner(f) 520 scan.Split(bufio.ScanLines) 521 522 n := 0 523 var lines []string 524 525 for scan.Scan() { 526 if len(scan.Bytes()) == 0 { 527 // skip empty lines 528 continue 529 } 530 531 if !utf8.Valid(scan.Bytes()) { 532 return nil, fmt.Errorf("path %q, invalid UTF-8 on line %d", p, n) 533 } 534 lines = append(lines, scan.Text()) 535 } 536 537 if scan.Err() != nil { 538 return nil, scan.Err() 539 } 540 541 return lines, nil 542} 543 544type option func(s *schemaToSpec) 545 546func schemaDir(p string) option { 547 return func(s *schemaToSpec) { 548 s.schemaDir = p 549 } 550} 551 552func NewSpecFromSchema(root *Schema) (*Spec, error) { 553 return newSpecFromSchema(root) 554} 555 556func newSpecFromSchema(root *Schema, opts ...option) (*Spec, error) { 557 var spec Spec 558 559 vis := &schemaToSpec{spec: &spec} 560 for _, o := range opts { 561 o(vis) 562 } 563 564 WalkUp(vis, root) 565 if vis.err != nil { 566 return nil, vis.err 567 } 568 569 return &spec, nil 570} 571