1package stress // import "github.com/influxdata/influxdb/stress" 2 3import ( 4 "bytes" 5 "fmt" 6 "net/http" 7 "sync" 8 "time" 9) 10 11// Point is an interface that is used to represent 12// the abstract idea of a point in InfluxDB. 13type Point interface { 14 Line() []byte 15 Graphite() []byte 16 OpenJSON() []byte 17 OpenTelnet() []byte 18} 19 20/////////////////////////////////////////////////// 21// Example Implementation of the Point Interface // 22/////////////////////////////////////////////////// 23 24// KeyValue is an intermediate type that is used 25// to express Tag and Field similarly. 26type KeyValue struct { 27 Key string 28 Value string 29} 30 31// Tag is a struct for a tag in influxdb. 32type Tag KeyValue 33 34// Field is a struct for a field in influxdb. 35type Field KeyValue 36 37// Tags is an slice of all the tags for a point. 38type Tags []Tag 39 40// Fields is an slice of all the fields for a point. 41type Fields []Field 42 43// tagset returns a byte array for a points tagset. 44func (t Tags) tagset() []byte { 45 var buf bytes.Buffer 46 for _, tag := range t { 47 buf.Write([]byte(fmt.Sprintf("%v=%v,", tag.Key, tag.Value))) 48 } 49 50 b := buf.Bytes() 51 b = b[0 : len(b)-1] 52 53 return b 54} 55 56// fieldset returns a byte array for a points fieldset. 57func (f Fields) fieldset() []byte { 58 var buf bytes.Buffer 59 for _, field := range f { 60 buf.Write([]byte(fmt.Sprintf("%v=%v,", field.Key, field.Value))) 61 } 62 63 b := buf.Bytes() 64 b = b[0 : len(b)-1] 65 66 return b 67} 68 69// StdPoint represents a point in InfluxDB 70type StdPoint struct { 71 Measurement string 72 Tags Tags 73 Fields Fields 74 Timestamp int64 75} 76 77// Line returns a byte array for a point in 78// line-protocol format 79func (p StdPoint) Line() []byte { 80 var buf bytes.Buffer 81 82 buf.Write([]byte(fmt.Sprintf("%v,", p.Measurement))) 83 buf.Write(p.Tags.tagset()) 84 buf.Write([]byte(" ")) 85 buf.Write(p.Fields.fieldset()) 86 buf.Write([]byte(" ")) 87 buf.Write([]byte(fmt.Sprintf("%v", p.Timestamp))) 88 89 byt := buf.Bytes() 90 91 return byt 92} 93 94// Graphite returns a byte array for a point 95// in graphite-protocol format 96func (p StdPoint) Graphite() []byte { 97 // TODO: implement 98 // timestamp is at second level resolution 99 // but can be specified as a float to get nanosecond 100 // level precision 101 t := "tag_1.tag_2.measurement[.field] acutal_value timestamp" 102 return []byte(t) 103} 104 105// OpenJSON returns a byte array for a point 106// in JSON format 107func (p StdPoint) OpenJSON() []byte { 108 // TODO: implement 109 //[ 110 // { 111 // "metric": "sys.cpu.nice", 112 // "timestamp": 1346846400, 113 // "value": 18, 114 // "tags": { 115 // "host": "web01", 116 // "dc": "lga" 117 // } 118 // }, 119 // { 120 // "metric": "sys.cpu.nice", 121 // "timestamp": 1346846400, 122 // "value": 9, 123 // "tags": { 124 // "host": "web02", 125 // "dc": "lga" 126 // } 127 // } 128 //] 129 return []byte("hello") 130} 131 132// OpenTelnet returns a byte array for a point 133// in OpenTSDB-telnet format 134func (p StdPoint) OpenTelnet() []byte { 135 // TODO: implement 136 // timestamp can be 13 digits at most 137 // sys.cpu.nice timestamp value tag_key_1=tag_value_1 tag_key_2=tag_value_2 138 return []byte("hello") 139} 140 141//////////////////////////////////////// 142 143// response is the results making 144// a request to influxdb. 145type response struct { 146 Resp *http.Response 147 Time time.Time 148 Timer *Timer 149} 150 151// Success returns true if the request 152// was successful and false otherwise. 153func (r response) Success() bool { 154 // ADD success for tcp, udp, etc 155 return !(r.Resp == nil || r.Resp.StatusCode != 204) 156} 157 158// WriteResponse is a response for a Writer 159type WriteResponse response 160 161// QueryResponse is a response for a Querier 162type QueryResponse struct { 163 response 164 Body string 165} 166 167/////////////////////////////// 168// Definition of the Writer /// 169/////////////////////////////// 170 171// PointGenerator is an interface for generating points. 172type PointGenerator interface { 173 Generate() (<-chan Point, error) 174 Time() time.Time 175} 176 177// InfluxClient is an interface for writing data to the database. 178type InfluxClient interface { 179 Batch(ps <-chan Point, r chan<- response) error 180 send(b []byte) (response, error) 181 //ResponseHandler 182} 183 184// Writer is a PointGenerator and an InfluxClient. 185type Writer struct { 186 PointGenerator 187 InfluxClient 188} 189 190// NewWriter returns a Writer. 191func NewWriter(p PointGenerator, i InfluxClient) Writer { 192 w := Writer{ 193 PointGenerator: p, 194 InfluxClient: i, 195 } 196 197 return w 198} 199 200//////////////////////////////// 201// Definition of the Querier /// 202//////////////////////////////// 203 204// Query is query 205type Query string 206 207// QueryGenerator is an interface that is used 208// to define queries that will be ran on the DB. 209type QueryGenerator interface { 210 QueryGenerate(f func() time.Time) (<-chan Query, error) 211 SetTime(t time.Time) 212} 213 214// QueryClient is an interface that can write a query 215// to an InfluxDB instance. 216type QueryClient interface { 217 Query(q Query) (response, error) 218 Exec(qs <-chan Query, r chan<- response) error 219} 220 221// Querier queries the database. 222type Querier struct { 223 QueryGenerator 224 QueryClient 225} 226 227// NewQuerier returns a Querier. 228func NewQuerier(q QueryGenerator, c QueryClient) Querier { 229 r := Querier{ 230 QueryGenerator: q, 231 QueryClient: c, 232 } 233 234 return r 235} 236 237/////////////////////////////////// 238// Definition of the Provisioner // 239/////////////////////////////////// 240 241// Provisioner is an interface that provisions an 242// InfluxDB instance 243type Provisioner interface { 244 Provision() error 245} 246 247///////////////////////////////// 248// Definition of StressTest ///// 249///////////////////////////////// 250 251// StressTest is a struct that contains all of 252// the logic required to execute a Stress Test 253type StressTest struct { 254 Provisioner 255 Writer 256 Querier 257} 258 259// responseHandler 260type responseHandler func(r <-chan response, t *Timer) 261 262// Start executes the Stress Test 263func (s *StressTest) Start(wHandle responseHandler, rHandle responseHandler) { 264 var wg sync.WaitGroup 265 266 // Provision the Instance 267 s.Provision() 268 269 wg.Add(1) 270 // Starts Writing 271 go func() { 272 r := make(chan response) 273 wt := NewTimer() 274 275 go func() { 276 defer wt.StopTimer() 277 defer close(r) 278 p, err := s.Generate() 279 if err != nil { 280 fmt.Println(err) 281 return 282 } 283 284 err = s.Batch(p, r) 285 if err != nil { 286 fmt.Println(err) 287 return 288 } 289 }() 290 291 // Write Results Handler 292 wHandle(r, wt) 293 wg.Done() 294 }() 295 296 wg.Add(1) 297 // Starts Querying 298 go func() { 299 r := make(chan response) 300 rt := NewTimer() 301 302 go func() { 303 defer rt.StopTimer() 304 defer close(r) 305 q, err := s.QueryGenerate(s.Time) 306 if err != nil { 307 fmt.Println(err) 308 return 309 } 310 311 err = s.Exec(q, r) 312 if err != nil { 313 fmt.Println(err) 314 return 315 } 316 }() 317 318 // Read Results Handler 319 rHandle(r, rt) 320 wg.Done() 321 }() 322 323 wg.Wait() 324} 325 326// NewStressTest returns an instance of a StressTest 327func NewStressTest(p Provisioner, w Writer, r Querier) StressTest { 328 s := StressTest{ 329 Provisioner: p, 330 Writer: w, 331 Querier: r, 332 } 333 334 return s 335} 336