1package gen
2
3import (
4	"bufio"
5	"fmt"
6	"math/rand"
7	"os"
8	"path"
9	"path/filepath"
10	"sort"
11	"unicode/utf8"
12
13	"github.com/BurntSushi/toml"
14	"github.com/influxdata/influxdb/models"
15	"github.com/pkg/errors"
16)
17
18type Spec struct {
19	SeriesLimit  *int64
20	Measurements []MeasurementSpec
21}
22
23func NewSeriesGeneratorFromSpec(s *Spec, tr TimeRange) SeriesGenerator {
24	sg := make([]SeriesGenerator, len(s.Measurements))
25	for i := range s.Measurements {
26		sg[i] = newSeriesGeneratorFromMeasurementSpec(&s.Measurements[i], tr)
27	}
28	if s.SeriesLimit == nil {
29		return NewMergedSeriesGenerator(sg)
30	}
31	return NewMergedSeriesGeneratorLimit(sg, *s.SeriesLimit)
32}
33
34type MeasurementSpec struct {
35	Name            string
36	SeriesLimit     *SeriesLimit
37	TagsSpec        *TagsSpec
38	FieldValuesSpec *FieldValuesSpec
39}
40
41func newSeriesGeneratorFromMeasurementSpec(ms *MeasurementSpec, tr TimeRange) SeriesGenerator {
42	if ms.SeriesLimit == nil {
43		return NewSeriesGenerator(
44			[]byte(ms.Name),
45			[]byte(ms.FieldValuesSpec.Name),
46			newTimeValuesSequenceFromFieldValuesSpec(ms.FieldValuesSpec, tr),
47			newTagsSequenceFromTagsSpec(ms.TagsSpec))
48	}
49	return NewSeriesGeneratorLimit(
50		[]byte(ms.Name),
51		[]byte(ms.FieldValuesSpec.Name),
52		newTimeValuesSequenceFromFieldValuesSpec(ms.FieldValuesSpec, tr),
53		newTagsSequenceFromTagsSpec(ms.TagsSpec),
54		int64(*ms.SeriesLimit))
55}
56
57// NewTimeValuesSequenceFn returns a TimeValuesSequence that will generate a
58// sequence of values based on the spec.
59type NewTimeValuesSequenceFn func(spec TimeSequenceSpec) TimeValuesSequence
60
61type NewTagsValuesSequenceFn func() TagsSequence
62
63type NewCountableSequenceFn func() CountableSequence
64
65type TagsSpec struct {
66	Tags   []*TagValuesSpec
67	Sample *sample
68}
69
70func newTagsSequenceFromTagsSpec(ts *TagsSpec) TagsSequence {
71	var keys []string
72	var vals []CountableSequence
73	for _, spec := range ts.Tags {
74		keys = append(keys, spec.TagKey)
75		vals = append(vals, spec.Values())
76	}
77
78	var opts []tagsValuesOption
79	if ts.Sample != nil && *ts.Sample != 1.0 {
80		opts = append(opts, TagValuesSampleOption(float64(*ts.Sample)))
81	}
82
83	return NewTagsValuesSequenceKeysValues(keys, vals, opts...)
84}
85
86type TagValuesSpec struct {
87	TagKey string
88	Values NewCountableSequenceFn
89}
90
91type FieldValuesSpec struct {
92	TimeSequenceSpec
93	Name     string
94	DataType models.FieldType
95	Values   NewTimeValuesSequenceFn
96}
97
98func newTimeValuesSequenceFromFieldValuesSpec(fs *FieldValuesSpec, tr TimeRange) TimeValuesSequence {
99	return fs.Values(fs.TimeSequenceSpec.ForTimeRange(tr))
100}
101
102func NewSpecFromToml(s string) (*Spec, error) {
103	var out Schema
104	if _, err := toml.Decode(s, &out); err != nil {
105		return nil, err
106	}
107	return NewSpecFromSchema(&out)
108}
109
110func NewSpecFromPath(p string) (*Spec, error) {
111	var err error
112	p, err = filepath.Abs(p)
113	if err != nil {
114		return nil, err
115	}
116
117	var out Schema
118	if _, err := toml.DecodeFile(p, &out); err != nil {
119		return nil, err
120	}
121	return newSpecFromSchema(&out, schemaDir(path.Dir(p)))
122}
123
124func NewSchemaFromPath(path string) (*Schema, error) {
125	var out Schema
126	if _, err := toml.DecodeFile(path, &out); err != nil {
127		return nil, err
128	}
129	return &out, nil
130}
131
132type schemaToSpecState int
133
134const (
135	stateOk schemaToSpecState = iota
136	stateErr
137)
138
139type schemaToSpec struct {
140	schemaDir string
141	stack     []interface{}
142	state     schemaToSpecState
143	spec      *Spec
144	err       error
145}
146
147func (s *schemaToSpec) push(v interface{}) {
148	s.stack = append(s.stack, v)
149}
150
151func (s *schemaToSpec) pop() interface{} {
152	tail := len(s.stack) - 1
153	v := s.stack[tail]
154	s.stack[tail] = nil
155	s.stack = s.stack[:tail]
156	return v
157}
158
159func (s *schemaToSpec) peek() interface{} {
160	if len(s.stack) == 0 {
161		return nil
162	}
163	return s.stack[len(s.stack)-1]
164}
165
166func (s *schemaToSpec) Visit(node SchemaNode) (w Visitor) {
167	switch s.state {
168	case stateOk:
169		if s.visit(node) {
170			return s
171		}
172		s.state = stateErr
173
174	case stateErr:
175		s.visitErr(node)
176	}
177
178	return nil
179}
180
181func (s *schemaToSpec) visit(node SchemaNode) bool {
182	switch n := node.(type) {
183	case *Schema:
184		s.spec.Measurements = s.pop().([]MeasurementSpec)
185		if n.SeriesLimit != nil {
186			sl := int64(*n.SeriesLimit)
187			s.spec.SeriesLimit = &sl
188		}
189
190	case Measurements:
191		// flatten measurements
192		var mss []MeasurementSpec
193		for {
194			if specs, ok := s.peek().([]MeasurementSpec); ok {
195				s.pop()
196				mss = append(mss, specs...)
197				continue
198			}
199			break
200		}
201		sort.Slice(mss, func(i, j int) bool {
202			return mss[i].Name < mss[j].Name
203		})
204
205		// validate field types are homogeneous for a single measurement
206		mg := make(map[string]models.FieldType)
207		for i := range mss {
208			spec := &mss[i]
209			key := spec.Name + "." + spec.FieldValuesSpec.Name
210			ft := spec.FieldValuesSpec.DataType
211			if dt, ok := mg[key]; !ok {
212				mg[key] = ft
213			} else if dt != ft {
214				s.err = fmt.Errorf("field %q data-type conflict, found %s and %s",
215					key,
216					dt,
217					ft)
218				return false
219			}
220		}
221
222		s.push(mss)
223
224	case *Measurement:
225		if len(n.Name) == 0 {
226			s.err = errors.New("missing measurement name")
227			return false
228		}
229
230		fields := s.pop().([]*FieldValuesSpec)
231		tagsSpec := s.pop().(*TagsSpec)
232
233		tagsSpec.Sample = n.Sample
234
235		// default: sample 50%
236		if n.Sample == nil {
237			s := sample(0.5)
238			tagsSpec.Sample = &s
239		}
240
241		if *tagsSpec.Sample <= 0.0 || *tagsSpec.Sample > 1.0 {
242			s.err = errors.New("invalid sample, must be 0 < sample ≤ 1.0")
243			return false
244		}
245
246		var ms []MeasurementSpec
247		for _, spec := range fields {
248			ms = append(ms, MeasurementSpec{
249				Name:            n.Name,
250				SeriesLimit:     n.SeriesLimit,
251				TagsSpec:        tagsSpec,
252				FieldValuesSpec: spec,
253			})
254		}
255
256		// NOTE: sort each measurement name + field name to ensure series are produced
257		//  in correct order
258		sort.Slice(ms, func(i, j int) bool {
259			return ms[i].FieldValuesSpec.Name < ms[j].FieldValuesSpec.Name
260		})
261		s.push(ms)
262
263	case Tags:
264		var ts TagsSpec
265		for {
266			if spec, ok := s.peek().(*TagValuesSpec); ok {
267				s.pop()
268				ts.Tags = append(ts.Tags, spec)
269				continue
270			}
271			break
272		}
273		// Tag keys must be sorted to produce a valid series key sequence
274		sort.Slice(ts.Tags, func(i, j int) bool {
275			return ts.Tags[i].TagKey < ts.Tags[j].TagKey
276		})
277
278		for i := 1; i < len(ts.Tags); i++ {
279			if ts.Tags[i-1].TagKey == ts.Tags[i].TagKey {
280				s.err = fmt.Errorf("duplicate tag keys %q", ts.Tags[i].TagKey)
281				return false
282			}
283		}
284
285		s.push(&ts)
286
287	case Fields:
288		// combine fields
289		var fs []*FieldValuesSpec
290		for {
291			if spec, ok := s.peek().(*FieldValuesSpec); ok {
292				s.pop()
293				fs = append(fs, spec)
294				continue
295			}
296			break
297		}
298
299		sort.Slice(fs, func(i, j int) bool {
300			return fs[i].Name < fs[j].Name
301		})
302
303		for i := 1; i < len(fs); i++ {
304			if fs[i-1].Name == fs[i].Name {
305				s.err = fmt.Errorf("duplicate field names %q", fs[i].Name)
306				return false
307			}
308		}
309
310		s.push(fs)
311
312	case *Field:
313		fs, ok := s.peek().(*FieldValuesSpec)
314		if !ok {
315			panic(fmt.Sprintf("unexpected type %T", fs))
316		}
317
318		fs.TimeSequenceSpec = n.TimeSequenceSpec()
319		fs.Name = n.Name
320
321	case *FieldConstantValue:
322		var fs FieldValuesSpec
323		switch v := n.Value.(type) {
324		case float64:
325			fs.DataType = models.Float
326			fs.Values = func(spec TimeSequenceSpec) TimeValuesSequence {
327				return NewTimeFloatValuesSequence(
328					spec.Count,
329					NewTimestampSequenceFromSpec(spec),
330					NewFloatConstantValuesSequence(v),
331				)
332			}
333		case int64:
334			fs.DataType = models.Integer
335			fs.Values = func(spec TimeSequenceSpec) TimeValuesSequence {
336				return NewTimeIntegerValuesSequence(
337					spec.Count,
338					NewTimestampSequenceFromSpec(spec),
339					NewIntegerConstantValuesSequence(v),
340				)
341			}
342		case string:
343			fs.DataType = models.String
344			fs.Values = func(spec TimeSequenceSpec) TimeValuesSequence {
345				return NewTimeStringValuesSequence(
346					spec.Count,
347					NewTimestampSequenceFromSpec(spec),
348					NewStringConstantValuesSequence(v),
349				)
350			}
351		case bool:
352			fs.DataType = models.Boolean
353			fs.Values = func(spec TimeSequenceSpec) TimeValuesSequence {
354				return NewTimeBooleanValuesSequence(
355					spec.Count,
356					NewTimestampSequenceFromSpec(spec),
357					NewBooleanConstantValuesSequence(v),
358				)
359			}
360		default:
361			panic(fmt.Sprintf("unexpected type %T", v))
362		}
363
364		s.push(&fs)
365
366	case *FieldArraySource:
367		var fs FieldValuesSpec
368		switch v := n.Value.(type) {
369		case []float64:
370			fs.DataType = models.Float
371			fs.Values = func(spec TimeSequenceSpec) TimeValuesSequence {
372				return NewTimeFloatValuesSequence(
373					spec.Count,
374					NewTimestampSequenceFromSpec(spec),
375					NewFloatArrayValuesSequence(v),
376				)
377			}
378		case []int64:
379			fs.DataType = models.Integer
380			fs.Values = func(spec TimeSequenceSpec) TimeValuesSequence {
381				return NewTimeIntegerValuesSequence(
382					spec.Count,
383					NewTimestampSequenceFromSpec(spec),
384					NewIntegerArrayValuesSequence(v),
385				)
386			}
387		case []string:
388			fs.DataType = models.String
389			fs.Values = func(spec TimeSequenceSpec) TimeValuesSequence {
390				return NewTimeStringValuesSequence(
391					spec.Count,
392					NewTimestampSequenceFromSpec(spec),
393					NewStringArrayValuesSequence(v),
394				)
395			}
396		case []bool:
397			fs.DataType = models.Boolean
398			fs.Values = func(spec TimeSequenceSpec) TimeValuesSequence {
399				return NewTimeBooleanValuesSequence(
400					spec.Count,
401					NewTimestampSequenceFromSpec(spec),
402					NewBooleanArrayValuesSequence(v),
403				)
404			}
405		default:
406			panic(fmt.Sprintf("unexpected type %T", v))
407		}
408
409		s.push(&fs)
410
411	case *FieldFloatRandomSource:
412		var fs FieldValuesSpec
413		fs.DataType = models.Float
414		fs.Values = NewTimeValuesSequenceFn(func(spec TimeSequenceSpec) TimeValuesSequence {
415			return NewTimeFloatValuesSequence(
416				spec.Count,
417				NewTimestampSequenceFromSpec(spec),
418				NewFloatRandomValuesSequence(n.Min, n.Max, rand.New(rand.NewSource(n.Seed))),
419			)
420		})
421		s.push(&fs)
422
423	case *FieldIntegerZipfSource:
424		var fs FieldValuesSpec
425		fs.DataType = models.Integer
426		fs.Values = NewTimeValuesSequenceFn(func(spec TimeSequenceSpec) TimeValuesSequence {
427			return NewTimeIntegerValuesSequence(
428				spec.Count,
429				NewTimestampSequenceFromSpec(spec),
430				NewIntegerZipfValuesSequence(n),
431			)
432		})
433		s.push(&fs)
434
435	case *Tag:
436		s.push(&TagValuesSpec{
437			TagKey: n.Name,
438			Values: s.pop().(NewCountableSequenceFn),
439		})
440
441	case *TagSequenceSource:
442		s.push(NewCountableSequenceFn(func() CountableSequence {
443			return NewCounterByteSequence(n.Format, int(n.Start), int(n.Start+n.Count))
444		}))
445
446	case *TagFileSource:
447		p, err := s.resolvePath(n.Path)
448		if err != nil {
449			s.err = err
450			return false
451		}
452
453		lines, err := s.readLines(p)
454		if err != nil {
455			s.err = err
456			return false
457		}
458
459		s.push(NewCountableSequenceFn(func() CountableSequence {
460			return NewStringArraySequence(lines)
461		}))
462
463	case *TagArraySource:
464		s.push(NewCountableSequenceFn(func() CountableSequence {
465			return NewStringArraySequence(n.Values)
466		}))
467
468	case nil:
469
470	default:
471		panic(fmt.Sprintf("unexpected type %T", node))
472	}
473
474	return true
475}
476
477func (s *schemaToSpec) visitErr(node SchemaNode) {
478	switch n := node.(type) {
479	case *Schema:
480		s.err = fmt.Errorf("error processing schema: %v", s.err)
481	case *Measurement:
482		s.err = fmt.Errorf("measurement %q: %v", n.Name, s.err)
483	case *Tag:
484		s.err = fmt.Errorf("tag %q: %v", n.Name, s.err)
485	case *Field:
486		s.err = fmt.Errorf("field %q: %v", n.Name, s.err)
487	}
488}
489
490func (s *schemaToSpec) resolvePath(p string) (string, error) {
491	fullPath := os.ExpandEnv(p)
492	if !filepath.IsAbs(fullPath) {
493		fullPath = filepath.Join(s.schemaDir, fullPath)
494	}
495
496	fi, err := os.Stat(fullPath)
497	if err != nil {
498		return "", fmt.Errorf("error resolving path %q: %v", p, err)
499	}
500
501	if fi.IsDir() {
502		return "", fmt.Errorf("path %q is not a file: resolved to %s", p, fullPath)
503	}
504
505	return fullPath, nil
506}
507
508func (s *schemaToSpec) readLines(p string) ([]string, error) {
509	fp, err := s.resolvePath(p)
510	if err != nil {
511		return nil, err
512	}
513
514	f, err := os.Open(fp)
515	if err != nil {
516		return nil, fmt.Errorf("path error: %v", err)
517	}
518	defer f.Close()
519	scan := bufio.NewScanner(f)
520	scan.Split(bufio.ScanLines)
521
522	n := 0
523	var lines []string
524
525	for scan.Scan() {
526		if len(scan.Bytes()) == 0 {
527			// skip empty lines
528			continue
529		}
530
531		if !utf8.Valid(scan.Bytes()) {
532			return nil, fmt.Errorf("path %q, invalid UTF-8 on line %d", p, n)
533		}
534		lines = append(lines, scan.Text())
535	}
536
537	if scan.Err() != nil {
538		return nil, scan.Err()
539	}
540
541	return lines, nil
542}
543
544type option func(s *schemaToSpec)
545
546func schemaDir(p string) option {
547	return func(s *schemaToSpec) {
548		s.schemaDir = p
549	}
550}
551
552func NewSpecFromSchema(root *Schema) (*Spec, error) {
553	return newSpecFromSchema(root)
554}
555
556func newSpecFromSchema(root *Schema, opts ...option) (*Spec, error) {
557	var spec Spec
558
559	vis := &schemaToSpec{spec: &spec}
560	for _, o := range opts {
561		o(vis)
562	}
563
564	WalkUp(vis, root)
565	if vis.err != nil {
566		return nil, vis.err
567	}
568
569	return &spec, nil
570}
571