1// Copyright 2012-present Oliver Eilhard. All rights reserved. 2// Use of this source code is governed by a MIT-license. 3// See http://olivere.mit-license.org/license.txt for details. 4 5package main 6 7import ( 8 "context" 9 "encoding/json" 10 "errors" 11 "flag" 12 "fmt" 13 "log" 14 "math/rand" 15 "os" 16 "runtime" 17 "strings" 18 "sync/atomic" 19 "time" 20 21 "github.com/olivere/elastic" 22) 23 24type Tweet struct { 25 User string `json:"user"` 26 Message string `json:"message"` 27 Retweets int `json:"retweets"` 28 Image string `json:"image,omitempty"` 29 Created time.Time `json:"created,omitempty"` 30 Tags []string `json:"tags,omitempty"` 31 Location string `json:"location,omitempty"` 32 Suggest *elastic.SuggestField `json:"suggest_field,omitempty"` 33} 34 35var ( 36 nodes = flag.String("nodes", "", "comma-separated list of ES URLs (e.g. 'http://192.168.2.10:9200,http://192.168.2.11:9200')") 37 n = flag.Int("n", 5, "number of goroutines that run searches") 38 index = flag.String("index", "twitter", "name of ES index to use") 39 errorlogfile = flag.String("errorlog", "", "error log file") 40 infologfile = flag.String("infolog", "", "info log file") 41 tracelogfile = flag.String("tracelog", "", "trace log file") 42 retries = flag.Int("retries", 0, "number of retries") 43 sniff = flag.Bool("sniff", elastic.DefaultSnifferEnabled, "enable or disable sniffer") 44 sniffer = flag.Duration("sniffer", elastic.DefaultSnifferInterval, "sniffer interval") 45 healthcheck = flag.Bool("healthcheck", elastic.DefaultHealthcheckEnabled, "enable or disable healthchecks") 46 healthchecker = flag.Duration("healthchecker", elastic.DefaultHealthcheckInterval, "healthcheck interval") 47) 48 49func main() { 50 flag.Parse() 51 52 runtime.GOMAXPROCS(runtime.NumCPU()) 53 54 if *nodes == "" { 55 log.Fatal("no nodes specified") 56 } 57 urls := strings.SplitN(*nodes, ",", -1) 58 59 testcase, err := NewTestCase(*index, urls) 60 if err != nil { 61 log.Fatal(err) 62 } 63 64 testcase.SetErrorLogFile(*errorlogfile) 65 testcase.SetInfoLogFile(*infologfile) 66 testcase.SetTraceLogFile(*tracelogfile) 67 testcase.SetMaxRetries(*retries) 68 testcase.SetHealthcheck(*healthcheck) 69 testcase.SetHealthcheckInterval(*healthchecker) 70 testcase.SetSniff(*sniff) 71 testcase.SetSnifferInterval(*sniffer) 72 73 if err := testcase.Run(*n); err != nil { 74 log.Fatal(err) 75 } 76 77 select {} 78} 79 80type RunInfo struct { 81 Success bool 82} 83 84type TestCase struct { 85 nodes []string 86 client *elastic.Client 87 runs int64 88 failures int64 89 runCh chan RunInfo 90 index string 91 errorlogfile string 92 infologfile string 93 tracelogfile string 94 maxRetries int 95 healthcheck bool 96 healthcheckInterval time.Duration 97 sniff bool 98 snifferInterval time.Duration 99} 100 101func NewTestCase(index string, nodes []string) (*TestCase, error) { 102 if index == "" { 103 return nil, errors.New("no index name specified") 104 } 105 106 return &TestCase{ 107 index: index, 108 nodes: nodes, 109 runCh: make(chan RunInfo), 110 }, nil 111} 112 113func (t *TestCase) SetIndex(name string) { 114 t.index = name 115} 116 117func (t *TestCase) SetErrorLogFile(name string) { 118 t.errorlogfile = name 119} 120 121func (t *TestCase) SetInfoLogFile(name string) { 122 t.infologfile = name 123} 124 125func (t *TestCase) SetTraceLogFile(name string) { 126 t.tracelogfile = name 127} 128 129func (t *TestCase) SetMaxRetries(n int) { 130 t.maxRetries = n 131} 132 133func (t *TestCase) SetSniff(enabled bool) { 134 t.sniff = enabled 135} 136 137func (t *TestCase) SetSnifferInterval(d time.Duration) { 138 t.snifferInterval = d 139} 140 141func (t *TestCase) SetHealthcheck(enabled bool) { 142 t.healthcheck = enabled 143} 144 145func (t *TestCase) SetHealthcheckInterval(d time.Duration) { 146 t.healthcheckInterval = d 147} 148 149func (t *TestCase) Run(n int) error { 150 if err := t.setup(); err != nil { 151 return err 152 } 153 154 for i := 1; i < n; i++ { 155 go t.search() 156 } 157 158 go t.monitor() 159 160 return nil 161} 162 163func (t *TestCase) monitor() { 164 print := func() { 165 fmt.Printf("\033[32m%5d\033[0m; \033[31m%5d\033[0m: %s%s\r", t.runs, t.failures, t.client.String(), " ") 166 } 167 168 for { 169 select { 170 case run := <-t.runCh: 171 atomic.AddInt64(&t.runs, 1) 172 if !run.Success { 173 atomic.AddInt64(&t.failures, 1) 174 fmt.Println() 175 } 176 print() 177 case <-time.After(5 * time.Second): 178 // Print stats after some inactivity 179 print() 180 break 181 } 182 } 183} 184 185func (t *TestCase) setup() error { 186 var options []elastic.ClientOptionFunc 187 188 if t.errorlogfile != "" { 189 f, err := os.OpenFile(t.errorlogfile, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0664) 190 if err != nil { 191 return err 192 } 193 logger := log.New(f, "", log.Ltime|log.Lmicroseconds|log.Lshortfile) 194 options = append(options, elastic.SetErrorLog(logger)) 195 } 196 197 if t.infologfile != "" { 198 f, err := os.OpenFile(t.infologfile, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0664) 199 if err != nil { 200 return err 201 } 202 logger := log.New(f, "", log.LstdFlags) 203 options = append(options, elastic.SetInfoLog(logger)) 204 } 205 206 // Trace request and response details like this 207 if t.tracelogfile != "" { 208 f, err := os.OpenFile(t.tracelogfile, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0664) 209 if err != nil { 210 return err 211 } 212 logger := log.New(f, "", log.LstdFlags) 213 options = append(options, elastic.SetTraceLog(logger)) 214 } 215 216 options = append(options, elastic.SetURL(t.nodes...)) 217 options = append(options, elastic.SetMaxRetries(t.maxRetries)) 218 options = append(options, elastic.SetSniff(t.sniff)) 219 options = append(options, elastic.SetSnifferInterval(t.snifferInterval)) 220 options = append(options, elastic.SetHealthcheck(t.healthcheck)) 221 options = append(options, elastic.SetHealthcheckInterval(t.healthcheckInterval)) 222 223 client, err := elastic.NewClient(options...) 224 if err != nil { 225 // Handle error 226 return err 227 } 228 t.client = client 229 230 ctx := context.Background() 231 232 // Use the IndexExists service to check if a specified index exists. 233 exists, err := t.client.IndexExists(t.index).Do(ctx) 234 if err != nil { 235 return err 236 } 237 if exists { 238 deleteIndex, err := t.client.DeleteIndex(t.index).Do(ctx) 239 if err != nil { 240 return err 241 } 242 if !deleteIndex.Acknowledged { 243 return errors.New("delete index not acknowledged") 244 } 245 } 246 247 // Create a new index. 248 createIndex, err := t.client.CreateIndex(t.index).Do(ctx) 249 if err != nil { 250 return err 251 } 252 if !createIndex.Acknowledged { 253 return errors.New("create index not acknowledged") 254 } 255 256 // Index a tweet (using JSON serialization) 257 tweet1 := Tweet{User: "olivere", Message: "Take Five", Retweets: 0} 258 _, err = t.client.Index(). 259 Index(t.index). 260 Type("tweet"). 261 Id("1"). 262 BodyJson(tweet1). 263 Do(ctx) 264 if err != nil { 265 return err 266 } 267 268 // Index a second tweet (by string) 269 tweet2 := `{"user" : "olivere", "message" : "It's a Raggy Waltz"}` 270 _, err = t.client.Index(). 271 Index(t.index). 272 Type("tweet"). 273 Id("2"). 274 BodyString(tweet2). 275 Do(ctx) 276 if err != nil { 277 return err 278 } 279 280 // Flush to make sure the documents got written. 281 _, err = t.client.Flush().Index(t.index).Do(ctx) 282 if err != nil { 283 return err 284 } 285 286 return nil 287} 288 289func (t *TestCase) search() { 290 ctx := context.Background() 291 292 // Loop forever to check for connection issues 293 for { 294 // Get tweet with specified ID 295 _, err := t.client.Get(). 296 Index(t.index). 297 Type("tweet"). 298 Id("1"). 299 Do(ctx) 300 if err != nil { 301 //failf("Get failed: %v", err) 302 t.runCh <- RunInfo{Success: false} 303 continue 304 } 305 if elastic.IsNotFound(err) { 306 //log.Printf("Document %s not found\n", "1") 307 //fmt.Printf("Got document %s in version %d from index %s, type %s\n", get1.Id, get1.Version, get1.Index, get1.Type) 308 t.runCh <- RunInfo{Success: false} 309 continue 310 } 311 312 // Search with a term query 313 searchResult, err := t.client.Search(). 314 Index(t.index). // search in index t.index 315 Query(elastic.NewTermQuery("user", "olivere")). // specify the query 316 Sort("user", true). // sort by "user" field, ascending 317 From(0).Size(10). // take documents 0-9 318 Pretty(true). // pretty print request and response JSON 319 Do(ctx) // execute 320 if err != nil { 321 //failf("Search failed: %v\n", err) 322 t.runCh <- RunInfo{Success: false} 323 continue 324 } 325 326 // searchResult is of type SearchResult and returns hits, suggestions, 327 // and all kinds of other information from Elasticsearch. 328 //fmt.Printf("Query took %d milliseconds\n", searchResult.TookInMillis) 329 330 // Number of hits 331 if searchResult.Hits.TotalHits > 0 { 332 //fmt.Printf("Found a total of %d tweets\n", searchResult.Hits.TotalHits) 333 334 // Iterate through results 335 for _, hit := range searchResult.Hits.Hits { 336 // hit.Index contains the name of the index 337 338 // Deserialize hit.Source into a Tweet (could also be just a map[string]interface{}). 339 var tweet Tweet 340 err := json.Unmarshal(*hit.Source, &tweet) 341 if err != nil { 342 // Deserialization failed 343 //failf("Deserialize failed: %v\n", err) 344 t.runCh <- RunInfo{Success: false} 345 continue 346 } 347 348 // Work with tweet 349 //fmt.Printf("Tweet by %s: %s\n", t.User, t.Message) 350 } 351 } else { 352 // No hits 353 //fmt.Print("Found no tweets\n") 354 } 355 356 t.runCh <- RunInfo{Success: true} 357 358 // Sleep some time 359 time.Sleep(time.Duration(rand.Intn(500)) * time.Millisecond) 360 } 361} 362