1// Copyright 2012-present Oliver Eilhard. All rights reserved.
2// Use of this source code is governed by a MIT-license.
3// See http://olivere.mit-license.org/license.txt for details.
4
5package main
6
7import (
8	"context"
9	"encoding/json"
10	"errors"
11	"flag"
12	"fmt"
13	"log"
14	"math/rand"
15	"os"
16	"runtime"
17	"strings"
18	"sync/atomic"
19	"time"
20
21	"github.com/olivere/elastic"
22)
23
24type Tweet struct {
25	User     string                `json:"user"`
26	Message  string                `json:"message"`
27	Retweets int                   `json:"retweets"`
28	Image    string                `json:"image,omitempty"`
29	Created  time.Time             `json:"created,omitempty"`
30	Tags     []string              `json:"tags,omitempty"`
31	Location string                `json:"location,omitempty"`
32	Suggest  *elastic.SuggestField `json:"suggest_field,omitempty"`
33}
34
35var (
36	nodes         = flag.String("nodes", "", "comma-separated list of ES URLs (e.g. 'http://192.168.2.10:9200,http://192.168.2.11:9200')")
37	n             = flag.Int("n", 5, "number of goroutines that run searches")
38	index         = flag.String("index", "twitter", "name of ES index to use")
39	errorlogfile  = flag.String("errorlog", "", "error log file")
40	infologfile   = flag.String("infolog", "", "info log file")
41	tracelogfile  = flag.String("tracelog", "", "trace log file")
42	retries       = flag.Int("retries", 0, "number of retries")
43	sniff         = flag.Bool("sniff", elastic.DefaultSnifferEnabled, "enable or disable sniffer")
44	sniffer       = flag.Duration("sniffer", elastic.DefaultSnifferInterval, "sniffer interval")
45	healthcheck   = flag.Bool("healthcheck", elastic.DefaultHealthcheckEnabled, "enable or disable healthchecks")
46	healthchecker = flag.Duration("healthchecker", elastic.DefaultHealthcheckInterval, "healthcheck interval")
47)
48
49func main() {
50	flag.Parse()
51
52	runtime.GOMAXPROCS(runtime.NumCPU())
53
54	if *nodes == "" {
55		log.Fatal("no nodes specified")
56	}
57	urls := strings.SplitN(*nodes, ",", -1)
58
59	testcase, err := NewTestCase(*index, urls)
60	if err != nil {
61		log.Fatal(err)
62	}
63
64	testcase.SetErrorLogFile(*errorlogfile)
65	testcase.SetInfoLogFile(*infologfile)
66	testcase.SetTraceLogFile(*tracelogfile)
67	testcase.SetMaxRetries(*retries)
68	testcase.SetHealthcheck(*healthcheck)
69	testcase.SetHealthcheckInterval(*healthchecker)
70	testcase.SetSniff(*sniff)
71	testcase.SetSnifferInterval(*sniffer)
72
73	if err := testcase.Run(*n); err != nil {
74		log.Fatal(err)
75	}
76
77	select {}
78}
79
80type RunInfo struct {
81	Success bool
82}
83
84type TestCase struct {
85	nodes               []string
86	client              *elastic.Client
87	runs                int64
88	failures            int64
89	runCh               chan RunInfo
90	index               string
91	errorlogfile        string
92	infologfile         string
93	tracelogfile        string
94	maxRetries          int
95	healthcheck         bool
96	healthcheckInterval time.Duration
97	sniff               bool
98	snifferInterval     time.Duration
99}
100
101func NewTestCase(index string, nodes []string) (*TestCase, error) {
102	if index == "" {
103		return nil, errors.New("no index name specified")
104	}
105
106	return &TestCase{
107		index: index,
108		nodes: nodes,
109		runCh: make(chan RunInfo),
110	}, nil
111}
112
113func (t *TestCase) SetIndex(name string) {
114	t.index = name
115}
116
117func (t *TestCase) SetErrorLogFile(name string) {
118	t.errorlogfile = name
119}
120
121func (t *TestCase) SetInfoLogFile(name string) {
122	t.infologfile = name
123}
124
125func (t *TestCase) SetTraceLogFile(name string) {
126	t.tracelogfile = name
127}
128
129func (t *TestCase) SetMaxRetries(n int) {
130	t.maxRetries = n
131}
132
133func (t *TestCase) SetSniff(enabled bool) {
134	t.sniff = enabled
135}
136
137func (t *TestCase) SetSnifferInterval(d time.Duration) {
138	t.snifferInterval = d
139}
140
141func (t *TestCase) SetHealthcheck(enabled bool) {
142	t.healthcheck = enabled
143}
144
145func (t *TestCase) SetHealthcheckInterval(d time.Duration) {
146	t.healthcheckInterval = d
147}
148
149func (t *TestCase) Run(n int) error {
150	if err := t.setup(); err != nil {
151		return err
152	}
153
154	for i := 1; i < n; i++ {
155		go t.search()
156	}
157
158	go t.monitor()
159
160	return nil
161}
162
163func (t *TestCase) monitor() {
164	print := func() {
165		fmt.Printf("\033[32m%5d\033[0m; \033[31m%5d\033[0m: %s%s\r", t.runs, t.failures, t.client.String(), "    ")
166	}
167
168	for {
169		select {
170		case run := <-t.runCh:
171			atomic.AddInt64(&t.runs, 1)
172			if !run.Success {
173				atomic.AddInt64(&t.failures, 1)
174				fmt.Println()
175			}
176			print()
177		case <-time.After(5 * time.Second):
178			// Print stats after some inactivity
179			print()
180			break
181		}
182	}
183}
184
185func (t *TestCase) setup() error {
186	var options []elastic.ClientOptionFunc
187
188	if t.errorlogfile != "" {
189		f, err := os.OpenFile(t.errorlogfile, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0664)
190		if err != nil {
191			return err
192		}
193		logger := log.New(f, "", log.Ltime|log.Lmicroseconds|log.Lshortfile)
194		options = append(options, elastic.SetErrorLog(logger))
195	}
196
197	if t.infologfile != "" {
198		f, err := os.OpenFile(t.infologfile, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0664)
199		if err != nil {
200			return err
201		}
202		logger := log.New(f, "", log.LstdFlags)
203		options = append(options, elastic.SetInfoLog(logger))
204	}
205
206	// Trace request and response details like this
207	if t.tracelogfile != "" {
208		f, err := os.OpenFile(t.tracelogfile, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0664)
209		if err != nil {
210			return err
211		}
212		logger := log.New(f, "", log.LstdFlags)
213		options = append(options, elastic.SetTraceLog(logger))
214	}
215
216	options = append(options, elastic.SetURL(t.nodes...))
217	options = append(options, elastic.SetMaxRetries(t.maxRetries))
218	options = append(options, elastic.SetSniff(t.sniff))
219	options = append(options, elastic.SetSnifferInterval(t.snifferInterval))
220	options = append(options, elastic.SetHealthcheck(t.healthcheck))
221	options = append(options, elastic.SetHealthcheckInterval(t.healthcheckInterval))
222
223	client, err := elastic.NewClient(options...)
224	if err != nil {
225		// Handle error
226		return err
227	}
228	t.client = client
229
230	ctx := context.Background()
231
232	// Use the IndexExists service to check if a specified index exists.
233	exists, err := t.client.IndexExists(t.index).Do(ctx)
234	if err != nil {
235		return err
236	}
237	if exists {
238		deleteIndex, err := t.client.DeleteIndex(t.index).Do(ctx)
239		if err != nil {
240			return err
241		}
242		if !deleteIndex.Acknowledged {
243			return errors.New("delete index not acknowledged")
244		}
245	}
246
247	// Create a new index.
248	createIndex, err := t.client.CreateIndex(t.index).Do(ctx)
249	if err != nil {
250		return err
251	}
252	if !createIndex.Acknowledged {
253		return errors.New("create index not acknowledged")
254	}
255
256	// Index a tweet (using JSON serialization)
257	tweet1 := Tweet{User: "olivere", Message: "Take Five", Retweets: 0}
258	_, err = t.client.Index().
259		Index(t.index).
260		Type("tweet").
261		Id("1").
262		BodyJson(tweet1).
263		Do(ctx)
264	if err != nil {
265		return err
266	}
267
268	// Index a second tweet (by string)
269	tweet2 := `{"user" : "olivere", "message" : "It's a Raggy Waltz"}`
270	_, err = t.client.Index().
271		Index(t.index).
272		Type("tweet").
273		Id("2").
274		BodyString(tweet2).
275		Do(ctx)
276	if err != nil {
277		return err
278	}
279
280	// Flush to make sure the documents got written.
281	_, err = t.client.Flush().Index(t.index).Do(ctx)
282	if err != nil {
283		return err
284	}
285
286	return nil
287}
288
289func (t *TestCase) search() {
290	ctx := context.Background()
291
292	// Loop forever to check for connection issues
293	for {
294		// Get tweet with specified ID
295		_, err := t.client.Get().
296			Index(t.index).
297			Type("tweet").
298			Id("1").
299			Do(ctx)
300		if err != nil {
301			//failf("Get failed: %v", err)
302			t.runCh <- RunInfo{Success: false}
303			continue
304		}
305		if elastic.IsNotFound(err) {
306			//log.Printf("Document %s not found\n", "1")
307			//fmt.Printf("Got document %s in version %d from index %s, type %s\n", get1.Id, get1.Version, get1.Index, get1.Type)
308			t.runCh <- RunInfo{Success: false}
309			continue
310		}
311
312		// Search with a term query
313		searchResult, err := t.client.Search().
314			Index(t.index).                                 // search in index t.index
315			Query(elastic.NewTermQuery("user", "olivere")). // specify the query
316			Sort("user", true).                             // sort by "user" field, ascending
317			From(0).Size(10).                               // take documents 0-9
318			Pretty(true).                                   // pretty print request and response JSON
319			Do(ctx)                                         // execute
320		if err != nil {
321			//failf("Search failed: %v\n", err)
322			t.runCh <- RunInfo{Success: false}
323			continue
324		}
325
326		// searchResult is of type SearchResult and returns hits, suggestions,
327		// and all kinds of other information from Elasticsearch.
328		//fmt.Printf("Query took %d milliseconds\n", searchResult.TookInMillis)
329
330		// Number of hits
331		if searchResult.Hits.TotalHits > 0 {
332			//fmt.Printf("Found a total of %d tweets\n", searchResult.Hits.TotalHits)
333
334			// Iterate through results
335			for _, hit := range searchResult.Hits.Hits {
336				// hit.Index contains the name of the index
337
338				// Deserialize hit.Source into a Tweet (could also be just a map[string]interface{}).
339				var tweet Tweet
340				err := json.Unmarshal(*hit.Source, &tweet)
341				if err != nil {
342					// Deserialization failed
343					//failf("Deserialize failed: %v\n", err)
344					t.runCh <- RunInfo{Success: false}
345					continue
346				}
347
348				// Work with tweet
349				//fmt.Printf("Tweet by %s: %s\n", t.User, t.Message)
350			}
351		} else {
352			// No hits
353			//fmt.Print("Found no tweets\n")
354		}
355
356		t.runCh <- RunInfo{Success: true}
357
358		// Sleep some time
359		time.Sleep(time.Duration(rand.Intn(500)) * time.Millisecond)
360	}
361}
362