1package stress // import "github.com/influxdata/influxdb/stress"
2
3import (
4	"bytes"
5	"fmt"
6	"net/http"
7	"sync"
8	"time"
9)
10
11// Point is an interface that is used to represent
12// the abstract idea of a point in InfluxDB.
13type Point interface {
14	Line() []byte
15	Graphite() []byte
16	OpenJSON() []byte
17	OpenTelnet() []byte
18}
19
20///////////////////////////////////////////////////
21// Example Implementation of the Point Interface //
22///////////////////////////////////////////////////
23
24// KeyValue is an intermediate type that is used
25// to express Tag and Field similarly.
26type KeyValue struct {
27	Key   string
28	Value string
29}
30
31// Tag is a struct for a tag in influxdb.
32type Tag KeyValue
33
34// Field is a struct for a field in influxdb.
35type Field KeyValue
36
37// Tags is an slice of all the tags for a point.
38type Tags []Tag
39
40// Fields is an slice of all the fields for a point.
41type Fields []Field
42
43// tagset returns a byte array for a points tagset.
44func (t Tags) tagset() []byte {
45	var buf bytes.Buffer
46	for _, tag := range t {
47		buf.Write([]byte(fmt.Sprintf("%v=%v,", tag.Key, tag.Value)))
48	}
49
50	b := buf.Bytes()
51	b = b[0 : len(b)-1]
52
53	return b
54}
55
56// fieldset returns a byte array for a points fieldset.
57func (f Fields) fieldset() []byte {
58	var buf bytes.Buffer
59	for _, field := range f {
60		buf.Write([]byte(fmt.Sprintf("%v=%v,", field.Key, field.Value)))
61	}
62
63	b := buf.Bytes()
64	b = b[0 : len(b)-1]
65
66	return b
67}
68
69// StdPoint represents a point in InfluxDB
70type StdPoint struct {
71	Measurement string
72	Tags        Tags
73	Fields      Fields
74	Timestamp   int64
75}
76
77// Line returns a byte array for a point in
78// line-protocol format
79func (p StdPoint) Line() []byte {
80	var buf bytes.Buffer
81
82	buf.Write([]byte(fmt.Sprintf("%v,", p.Measurement)))
83	buf.Write(p.Tags.tagset())
84	buf.Write([]byte(" "))
85	buf.Write(p.Fields.fieldset())
86	buf.Write([]byte(" "))
87	buf.Write([]byte(fmt.Sprintf("%v", p.Timestamp)))
88
89	byt := buf.Bytes()
90
91	return byt
92}
93
94// Graphite returns a byte array for a point
95// in graphite-protocol format
96func (p StdPoint) Graphite() []byte {
97	// TODO: implement
98	// timestamp is at second level resolution
99	// but can be specified as a float to get nanosecond
100	// level precision
101	t := "tag_1.tag_2.measurement[.field] acutal_value timestamp"
102	return []byte(t)
103}
104
105// OpenJSON returns a byte array for a point
106// in JSON format
107func (p StdPoint) OpenJSON() []byte {
108	// TODO: implement
109	//[
110	//    {
111	//        "metric": "sys.cpu.nice",
112	//        "timestamp": 1346846400,
113	//        "value": 18,
114	//        "tags": {
115	//           "host": "web01",
116	//           "dc": "lga"
117	//        }
118	//    },
119	//    {
120	//        "metric": "sys.cpu.nice",
121	//        "timestamp": 1346846400,
122	//        "value": 9,
123	//        "tags": {
124	//           "host": "web02",
125	//           "dc": "lga"
126	//        }
127	//    }
128	//]
129	return []byte("hello")
130}
131
132// OpenTelnet returns a byte array for a point
133// in OpenTSDB-telnet format
134func (p StdPoint) OpenTelnet() []byte {
135	// TODO: implement
136	// timestamp can be 13 digits at most
137	// sys.cpu.nice timestamp value tag_key_1=tag_value_1 tag_key_2=tag_value_2
138	return []byte("hello")
139}
140
141////////////////////////////////////////
142
143// response is the results making
144// a request to influxdb.
145type response struct {
146	Resp  *http.Response
147	Time  time.Time
148	Timer *Timer
149}
150
151// Success returns true if the request
152// was successful and false otherwise.
153func (r response) Success() bool {
154	// ADD success for tcp, udp, etc
155	return !(r.Resp == nil || r.Resp.StatusCode != 204)
156}
157
158// WriteResponse is a response for a Writer
159type WriteResponse response
160
161// QueryResponse is a response for a Querier
162type QueryResponse struct {
163	response
164	Body string
165}
166
167///////////////////////////////
168// Definition of the Writer ///
169///////////////////////////////
170
171// PointGenerator is an interface for generating points.
172type PointGenerator interface {
173	Generate() (<-chan Point, error)
174	Time() time.Time
175}
176
177// InfluxClient is an interface for writing data to the database.
178type InfluxClient interface {
179	Batch(ps <-chan Point, r chan<- response) error
180	send(b []byte) (response, error)
181	//ResponseHandler
182}
183
184// Writer is a PointGenerator and an InfluxClient.
185type Writer struct {
186	PointGenerator
187	InfluxClient
188}
189
190// NewWriter returns a Writer.
191func NewWriter(p PointGenerator, i InfluxClient) Writer {
192	w := Writer{
193		PointGenerator: p,
194		InfluxClient:   i,
195	}
196
197	return w
198}
199
200////////////////////////////////
201// Definition of the Querier ///
202////////////////////////////////
203
204// Query is query
205type Query string
206
207// QueryGenerator is an interface that is used
208// to define queries that will be ran on the DB.
209type QueryGenerator interface {
210	QueryGenerate(f func() time.Time) (<-chan Query, error)
211	SetTime(t time.Time)
212}
213
214// QueryClient is an interface that can write a query
215// to an InfluxDB instance.
216type QueryClient interface {
217	Query(q Query) (response, error)
218	Exec(qs <-chan Query, r chan<- response) error
219}
220
221// Querier queries the database.
222type Querier struct {
223	QueryGenerator
224	QueryClient
225}
226
227// NewQuerier returns a Querier.
228func NewQuerier(q QueryGenerator, c QueryClient) Querier {
229	r := Querier{
230		QueryGenerator: q,
231		QueryClient:    c,
232	}
233
234	return r
235}
236
237///////////////////////////////////
238// Definition of the Provisioner //
239///////////////////////////////////
240
241// Provisioner is an interface that provisions an
242// InfluxDB instance
243type Provisioner interface {
244	Provision() error
245}
246
247/////////////////////////////////
248// Definition of StressTest /////
249/////////////////////////////////
250
251// StressTest is a struct that contains all of
252// the logic required to execute a Stress Test
253type StressTest struct {
254	Provisioner
255	Writer
256	Querier
257}
258
259// responseHandler
260type responseHandler func(r <-chan response, t *Timer)
261
262// Start executes the Stress Test
263func (s *StressTest) Start(wHandle responseHandler, rHandle responseHandler) {
264	var wg sync.WaitGroup
265
266	// Provision the Instance
267	s.Provision()
268
269	wg.Add(1)
270	// Starts Writing
271	go func() {
272		r := make(chan response)
273		wt := NewTimer()
274
275		go func() {
276			defer wt.StopTimer()
277			defer close(r)
278			p, err := s.Generate()
279			if err != nil {
280				fmt.Println(err)
281				return
282			}
283
284			err = s.Batch(p, r)
285			if err != nil {
286				fmt.Println(err)
287				return
288			}
289		}()
290
291		// Write Results Handler
292		wHandle(r, wt)
293		wg.Done()
294	}()
295
296	wg.Add(1)
297	// Starts Querying
298	go func() {
299		r := make(chan response)
300		rt := NewTimer()
301
302		go func() {
303			defer rt.StopTimer()
304			defer close(r)
305			q, err := s.QueryGenerate(s.Time)
306			if err != nil {
307				fmt.Println(err)
308				return
309			}
310
311			err = s.Exec(q, r)
312			if err != nil {
313				fmt.Println(err)
314				return
315			}
316		}()
317
318		// Read Results Handler
319		rHandle(r, rt)
320		wg.Done()
321	}()
322
323	wg.Wait()
324}
325
326// NewStressTest returns an instance of a StressTest
327func NewStressTest(p Provisioner, w Writer, r Querier) StressTest {
328	s := StressTest{
329		Provisioner: p,
330		Writer:      w,
331		Querier:     r,
332	}
333
334	return s
335}
336