1// Copyright 2012-present Oliver Eilhard. All rights reserved.
2// Use of this source code is governed by a MIT-license.
3// See http://olivere.mit-license.org/license.txt for details.
4
5package elastic
6
7import (
8	"context"
9	"encoding/json"
10	"io"
11	_ "net/http"
12	"testing"
13)
14
15func TestScroll(t *testing.T) {
16	// client := setupTestClientAndCreateIndexAndLog(t)
17	client := setupTestClientAndCreateIndex(t)
18
19	tweet1 := tweet{User: "olivere", Message: "Welcome to Golang and Elasticsearch."}
20	tweet2 := tweet{User: "olivere", Message: "Another unrelated topic."}
21	tweet3 := tweet{User: "sandrae", Message: "Cycling is fun."}
22
23	// Add all documents
24	_, err := client.Index().Index(testIndexName).Type("doc").Id("1").BodyJson(&tweet1).Do(context.TODO())
25	if err != nil {
26		t.Fatal(err)
27	}
28
29	_, err = client.Index().Index(testIndexName).Type("doc").Id("2").BodyJson(&tweet2).Do(context.TODO())
30	if err != nil {
31		t.Fatal(err)
32	}
33
34	_, err = client.Index().Index(testIndexName).Type("doc").Id("3").BodyJson(&tweet3).Do(context.TODO())
35	if err != nil {
36		t.Fatal(err)
37	}
38
39	_, err = client.Flush().Index(testIndexName).Do(context.TODO())
40	if err != nil {
41		t.Fatal(err)
42	}
43
44	// Should return all documents. Just don't call Do yet!
45	svc := client.Scroll(testIndexName).Size(1)
46
47	pages := 0
48	docs := 0
49
50	for {
51		res, err := svc.Do(context.TODO())
52		if err == io.EOF {
53			break
54		}
55		if err != nil {
56			t.Fatal(err)
57		}
58		if res == nil {
59			t.Fatal("expected results != nil; got nil")
60		}
61		if res.Hits == nil {
62			t.Fatal("expected results.Hits != nil; got nil")
63		}
64		if want, have := int64(3), res.Hits.TotalHits; want != have {
65			t.Fatalf("expected results.Hits.TotalHits = %d; got %d", want, have)
66		}
67		if want, have := 1, len(res.Hits.Hits); want != have {
68			t.Fatalf("expected len(results.Hits.Hits) = %d; got %d", want, have)
69		}
70
71		pages++
72
73		for _, hit := range res.Hits.Hits {
74			if hit.Index != testIndexName {
75				t.Fatalf("expected SearchResult.Hits.Hit.Index = %q; got %q", testIndexName, hit.Index)
76			}
77			item := make(map[string]interface{})
78			err := json.Unmarshal(*hit.Source, &item)
79			if err != nil {
80				t.Fatal(err)
81			}
82			docs++
83		}
84
85		if len(res.ScrollId) == 0 {
86			t.Fatalf("expected scrollId in results; got %q", res.ScrollId)
87		}
88	}
89
90	if want, have := 3, pages; want != have {
91		t.Fatalf("expected to retrieve %d pages; got %d", want, have)
92	}
93	if want, have := 3, docs; want != have {
94		t.Fatalf("expected to retrieve %d hits; got %d", want, have)
95	}
96
97	err = svc.Clear(context.TODO())
98	if err != nil {
99		t.Fatal(err)
100	}
101
102	_, err = svc.Do(context.TODO())
103	if err == nil {
104		t.Fatal("expected to fail")
105	}
106}
107
108func TestScrollWithQueryAndSort(t *testing.T) {
109	client := setupTestClientAndCreateIndex(t)
110	// client := setupTestClientAndCreateIndexAndAddDocs(t, SetTraceLog(log.New(os.Stdout, "", log.LstdFlags)))
111
112	tweet1 := tweet{User: "olivere", Message: "Welcome to Golang and Elasticsearch."}
113	tweet2 := tweet{User: "olivere", Message: "Another unrelated topic."}
114	tweet3 := tweet{User: "sandrae", Message: "Cycling is fun."}
115
116	// Add all documents
117	_, err := client.Index().Index(testIndexName).Type("doc").Id("1").BodyJson(&tweet1).Do(context.TODO())
118	if err != nil {
119		t.Fatal(err)
120	}
121
122	_, err = client.Index().Index(testIndexName).Type("doc").Id("2").BodyJson(&tweet2).Do(context.TODO())
123	if err != nil {
124		t.Fatal(err)
125	}
126
127	_, err = client.Index().Index(testIndexName).Type("doc").Id("3").BodyJson(&tweet3).Do(context.TODO())
128	if err != nil {
129		t.Fatal(err)
130	}
131
132	_, err = client.Flush().Index(testIndexName).Do(context.TODO())
133	if err != nil {
134		t.Fatal(err)
135	}
136
137	// Create a scroll service that returns tweets from user olivere
138	// and returns them sorted by "message", in reverse order.
139	//
140	// Just don't call Do yet!
141	svc := client.Scroll(testIndexName).
142		Query(NewTermQuery("user", "olivere")).
143		Sort("message", false).
144		Size(1)
145
146	docs := 0
147	pages := 0
148	for {
149		res, err := svc.Do(context.TODO())
150		if err == io.EOF {
151			break
152		}
153		if err != nil {
154			t.Fatal(err)
155		}
156		if err != nil {
157			t.Fatal(err)
158		}
159		if res == nil {
160			t.Fatal("expected results != nil; got nil")
161		}
162		if res.Hits == nil {
163			t.Fatal("expected results.Hits != nil; got nil")
164		}
165		if want, have := int64(2), res.Hits.TotalHits; want != have {
166			t.Fatalf("expected results.Hits.TotalHits = %d; got %d", want, have)
167		}
168		if want, have := 1, len(res.Hits.Hits); want != have {
169			t.Fatalf("expected len(results.Hits.Hits) = %d; got %d", want, have)
170		}
171
172		pages++
173
174		for _, hit := range res.Hits.Hits {
175			if hit.Index != testIndexName {
176				t.Fatalf("expected SearchResult.Hits.Hit.Index = %q; got %q", testIndexName, hit.Index)
177			}
178			item := make(map[string]interface{})
179			err := json.Unmarshal(*hit.Source, &item)
180			if err != nil {
181				t.Fatal(err)
182			}
183			docs++
184		}
185	}
186
187	if want, have := 2, pages; want != have {
188		t.Fatalf("expected to retrieve %d pages; got %d", want, have)
189	}
190	if want, have := 2, docs; want != have {
191		t.Fatalf("expected to retrieve %d hits; got %d", want, have)
192	}
193}
194
195func TestScrollWithBody(t *testing.T) {
196	// client := setupTestClientAndCreateIndexAndLog(t)
197	client := setupTestClientAndCreateIndex(t)
198
199	tweet1 := tweet{User: "olivere", Message: "Welcome to Golang and Elasticsearch.", Retweets: 4}
200	tweet2 := tweet{User: "olivere", Message: "Another unrelated topic.", Retweets: 10}
201	tweet3 := tweet{User: "sandrae", Message: "Cycling is fun.", Retweets: 3}
202
203	// Add all documents
204	_, err := client.Index().Index(testIndexName).Type("doc").Id("1").BodyJson(&tweet1).Do(context.TODO())
205	if err != nil {
206		t.Fatal(err)
207	}
208
209	_, err = client.Index().Index(testIndexName).Type("doc").Id("2").BodyJson(&tweet2).Do(context.TODO())
210	if err != nil {
211		t.Fatal(err)
212	}
213
214	_, err = client.Index().Index(testIndexName).Type("doc").Id("3").BodyJson(&tweet3).Do(context.TODO())
215	if err != nil {
216		t.Fatal(err)
217	}
218
219	_, err = client.Flush().Index(testIndexName).Do(context.TODO())
220	if err != nil {
221		t.Fatal(err)
222	}
223
224	// Test with simple strings and a map
225	var tests = []struct {
226		Body              interface{}
227		ExpectedTotalHits int64
228		ExpectedDocs      int
229		ExpectedPages     int
230	}{
231		{
232			Body:              `{"query":{"match_all":{}}}`,
233			ExpectedTotalHits: 3,
234			ExpectedDocs:      3,
235			ExpectedPages:     3,
236		},
237		{
238			Body:              `{"query":{"term":{"user":"olivere"}},"sort":["_doc"]}`,
239			ExpectedTotalHits: 2,
240			ExpectedDocs:      2,
241			ExpectedPages:     2,
242		},
243		{
244			Body:              `{"query":{"term":{"user":"olivere"}},"sort":[{"retweets":"desc"}]}`,
245			ExpectedTotalHits: 2,
246			ExpectedDocs:      2,
247			ExpectedPages:     2,
248		},
249		{
250			Body: map[string]interface{}{
251				"query": map[string]interface{}{
252					"term": map[string]interface{}{
253						"user": "olivere",
254					},
255				},
256				"sort": []interface{}{"_doc"},
257			},
258			ExpectedTotalHits: 2,
259			ExpectedDocs:      2,
260			ExpectedPages:     2,
261		},
262	}
263
264	for i, tt := range tests {
265		// Should return all documents. Just don't call Do yet!
266		svc := client.Scroll(testIndexName).Size(1).Body(tt.Body)
267
268		pages := 0
269		docs := 0
270
271		for {
272			res, err := svc.Do(context.TODO())
273			if err == io.EOF {
274				break
275			}
276			if err != nil {
277				t.Fatal(err)
278			}
279			if res == nil {
280				t.Fatalf("#%d: expected results != nil; got nil", i)
281			}
282			if res.Hits == nil {
283				t.Fatalf("#%d: expected results.Hits != nil; got nil", i)
284			}
285			if want, have := tt.ExpectedTotalHits, res.Hits.TotalHits; want != have {
286				t.Fatalf("#%d: expected results.Hits.TotalHits = %d; got %d", i, want, have)
287			}
288			if want, have := 1, len(res.Hits.Hits); want != have {
289				t.Fatalf("#%d: expected len(results.Hits.Hits) = %d; got %d", i, want, have)
290			}
291
292			pages++
293
294			for _, hit := range res.Hits.Hits {
295				if hit.Index != testIndexName {
296					t.Fatalf("#%d: expected SearchResult.Hits.Hit.Index = %q; got %q", i, testIndexName, hit.Index)
297				}
298				item := make(map[string]interface{})
299				err := json.Unmarshal(*hit.Source, &item)
300				if err != nil {
301					t.Fatalf("#%d: %v", i, err)
302				}
303				docs++
304			}
305
306			if len(res.ScrollId) == 0 {
307				t.Fatalf("#%d: expected scrollId in results; got %q", i, res.ScrollId)
308			}
309		}
310
311		if want, have := tt.ExpectedPages, pages; want != have {
312			t.Fatalf("#%d: expected to retrieve %d pages; got %d", i, want, have)
313		}
314		if want, have := tt.ExpectedDocs, docs; want != have {
315			t.Fatalf("#%d: expected to retrieve %d hits; got %d", i, want, have)
316		}
317
318		err = svc.Clear(context.TODO())
319		if err != nil {
320			t.Fatalf("#%d: failed to clear scroll context: %v", i, err)
321		}
322
323		_, err = svc.Do(context.TODO())
324		if err == nil {
325			t.Fatalf("#%d: expected to fail", i)
326		}
327	}
328}
329
330func TestScrollWithSlice(t *testing.T) {
331	client := setupTestClientAndCreateIndexAndAddDocs(t) //, SetTraceLog(log.New(os.Stdout, "", 0)))
332
333	// Should return all documents. Just don't call Do yet!
334	sliceQuery := NewSliceQuery().Id(0).Max(2)
335	svc := client.Scroll(testIndexName).Slice(sliceQuery).Size(1)
336
337	pages := 0
338	docs := 0
339
340	for {
341		res, err := svc.Do(context.TODO())
342		if err == io.EOF {
343			break
344		}
345		if err != nil {
346			t.Fatal(err)
347		}
348		if res == nil {
349			t.Fatal("expected results != nil; got nil")
350		}
351		if res.Hits == nil {
352			t.Fatal("expected results.Hits != nil; got nil")
353		}
354
355		pages++
356
357		for _, hit := range res.Hits.Hits {
358			if hit.Index != testIndexName {
359				t.Fatalf("expected SearchResult.Hits.Hit.Index = %q; got %q", testIndexName, hit.Index)
360			}
361			item := make(map[string]interface{})
362			err := json.Unmarshal(*hit.Source, &item)
363			if err != nil {
364				t.Fatal(err)
365			}
366			docs++
367		}
368
369		if len(res.ScrollId) == 0 {
370			t.Fatalf("expected scrollId in results; got %q", res.ScrollId)
371		}
372	}
373
374	if pages == 0 {
375		t.Fatal("expected to retrieve some pages")
376	}
377	if docs == 0 {
378		t.Fatal("expected to retrieve some hits")
379	}
380
381	if err := svc.Clear(context.TODO()); err != nil {
382		t.Fatal(err)
383	}
384
385	if _, err := svc.Do(context.TODO()); err == nil {
386		t.Fatal("expected to fail")
387	}
388}
389
390func TestScrollWithMaxResponseSize(t *testing.T) {
391	client := setupTestClientAndCreateIndex(t)
392
393	tweet1 := tweet{User: "sandrae", Message: "Cycling is fun.", Retweets: 3}
394	tweet2 := tweet{User: "olivere", Message: "Welcome to Golang and Elasticsearch.", Retweets: 4}
395
396	// Add all documents
397	_, err := client.Index().Index(testIndexName).Type("doc").Id("1").BodyJson(&tweet1).Do(context.TODO())
398	if err != nil {
399		t.Fatal(err)
400	}
401
402	_, err = client.Index().Index(testIndexName).Type("doc").Id("2").BodyJson(&tweet2).Do(context.TODO())
403	if err != nil {
404		t.Fatal(err)
405	}
406
407	_, err = client.Flush().Index(testIndexName).Do(context.TODO())
408	if err != nil {
409		t.Fatal(err)
410	}
411
412	// Test response size error on first scroll request (first response is 391 bytes)
413	svc := client.Scroll(testIndexName).Size(1).MaxResponseSize(300)
414	_, err = svc.Do(context.TODO())
415	if err != ErrResponseSize {
416		t.Fatalf("expected response size error")
417	}
418
419	// Test response size error on second scroll request (first response is 391 bytes, second is 401 bytes)
420	svc = client.Scroll(testIndexName).Size(1).MaxResponseSize(400)
421	_, err = svc.Do(context.TODO())
422	if err != nil {
423		t.Fatal(err)
424	}
425
426	_, err = svc.Do(context.TODO())
427	if err != ErrResponseSize {
428		t.Fatalf("expected response size error")
429	}
430}
431
432func TestScrollWithFilterPath(t *testing.T) {
433	// client := setupTestClientAndCreateIndexAndLog(t)
434	client := setupTestClientAndCreateIndex(t)
435
436	tweet1 := tweet{User: "olivere", Message: "Welcome to Golang and Elasticsearch."}
437	tweet2 := tweet{User: "olivere", Message: "Another unrelated topic."}
438	tweet3 := tweet{User: "sandrae", Message: "Cycling is fun."}
439
440	// Add all documents
441	_, err := client.Index().Index(testIndexName).Type("doc").Id("1").BodyJson(&tweet1).Do(context.TODO())
442	if err != nil {
443		t.Fatal(err)
444	}
445
446	_, err = client.Index().Index(testIndexName).Type("doc").Id("2").BodyJson(&tweet2).Do(context.TODO())
447	if err != nil {
448		t.Fatal(err)
449	}
450
451	_, err = client.Index().Index(testIndexName).Type("doc").Id("3").BodyJson(&tweet3).Do(context.TODO())
452	if err != nil {
453		t.Fatal(err)
454	}
455
456	_, err = client.Flush().Index(testIndexName).Do(context.TODO())
457	if err != nil {
458		t.Fatal(err)
459	}
460
461	// Should return all documents. Just don't call Do yet!
462	// Notice that we don't have to add "_scroll_id" to the FilterPath here:
463	// It's been added automatically by the ScrollService.
464	svc := client.Scroll(testIndexName).Size(1).
465		FilterPath("hits.total", "hits.hits._index", "hits.hits._id", "hits.hits._source")
466
467	pages := 0
468	docs := 0
469
470	for {
471		res, err := svc.Do(context.TODO())
472		if err == io.EOF {
473			break
474		}
475		if err != nil {
476			t.Fatal(err)
477		}
478		if res == nil {
479			t.Fatal("expected results != nil; got nil")
480		}
481		if res.Hits == nil {
482			t.Fatal("expected results.Hits != nil; got nil")
483		}
484		if want, have := int64(3), res.Hits.TotalHits; want != have {
485			t.Fatalf("expected results.Hits.TotalHits = %d; got %d", want, have)
486		}
487		if want, have := 1, len(res.Hits.Hits); want != have {
488			t.Fatalf("expected len(results.Hits.Hits) = %d; got %d", want, have)
489		}
490
491		pages++
492
493		for _, hit := range res.Hits.Hits {
494			if hit.Index != testIndexName {
495				t.Fatalf("expected SearchResult.Hits.Hit.Index = %q; got %q", testIndexName, hit.Index)
496			}
497			item := make(map[string]interface{})
498			err := json.Unmarshal(*hit.Source, &item)
499			if err != nil {
500				t.Fatal(err)
501			}
502			docs++
503		}
504
505		if len(res.ScrollId) == 0 {
506			t.Fatalf("expected scrollId in results; got %q", res.ScrollId)
507		}
508	}
509
510	if want, have := 3, pages; want != have {
511		t.Fatalf("expected to retrieve %d pages; got %d", want, have)
512	}
513	if want, have := 3, docs; want != have {
514		t.Fatalf("expected to retrieve %d hits; got %d", want, have)
515	}
516
517	err = svc.Clear(context.TODO())
518	if err != nil {
519		t.Fatal(err)
520	}
521
522	_, err = svc.Do(context.TODO())
523	if err == nil {
524		t.Fatal("expected to fail")
525	}
526}
527
528func TestScrollWithFilterPathKeepingContext(t *testing.T) {
529	// client := setupTestClientAndCreateIndexAndLog(t)
530	client := setupTestClientAndCreateIndex(t)
531
532	tweet1 := tweet{User: "olivere", Message: "Welcome to Golang and Elasticsearch."}
533	tweet2 := tweet{User: "olivere", Message: "Another unrelated topic."}
534	tweet3 := tweet{User: "sandrae", Message: "Cycling is fun."}
535
536	// Add all documents
537	_, err := client.Index().Index(testIndexName).Type("doc").Id("1").BodyJson(&tweet1).Do(context.TODO())
538	if err != nil {
539		t.Fatal(err)
540	}
541
542	_, err = client.Index().Index(testIndexName).Type("doc").Id("2").BodyJson(&tweet2).Do(context.TODO())
543	if err != nil {
544		t.Fatal(err)
545	}
546
547	_, err = client.Index().Index(testIndexName).Type("doc").Id("3").BodyJson(&tweet3).Do(context.TODO())
548	if err != nil {
549		t.Fatal(err)
550	}
551
552	_, err = client.Flush().Index(testIndexName).Do(context.TODO())
553	if err != nil {
554		t.Fatal(err)
555	}
556
557	// Should return all documents. Just don't call Do yet!
558	// Notice that we don't have to add "_scroll_id" to the FilterPath here:
559	// It's been added automatically by the ScrollService.
560	svc := client.Scroll(testIndexName).Size(1).
561		FilterPath("hits.total", "hits.hits._index", "hits.hits._id")
562
563	pages := 0
564	docs := 0
565
566	for {
567		res, err := svc.Do(context.TODO())
568		if err == io.EOF {
569			break
570		}
571		if err != nil {
572			t.Fatal(err)
573		}
574		if res == nil {
575			t.Fatal("expected results != nil; got nil")
576		}
577		if res.Hits == nil {
578			t.Fatal("expected results.Hits != nil; got nil")
579		}
580		if want, have := int64(3), res.Hits.TotalHits; want != have {
581			t.Fatalf("expected results.Hits.TotalHits = %d; got %d", want, have)
582		}
583		if want, have := 1, len(res.Hits.Hits); want != have {
584			t.Fatalf("expected len(results.Hits.Hits) = %d; got %d", want, have)
585		}
586
587		pages++
588
589		for _, hit := range res.Hits.Hits {
590			if hit.Index != testIndexName {
591				t.Fatalf("expected SearchResult.Hits.Hit.Index = %q; got %q", testIndexName, hit.Index)
592			}
593			if hit.Source != nil {
594				t.Fatal("expected SearchResult.Hits.Hit.Source = nil")
595			}
596			docs++
597		}
598
599		if len(res.ScrollId) == 0 {
600			t.Fatalf("expected scrollId in results; got %q", res.ScrollId)
601		}
602	}
603
604	if want, have := 3, pages; want != have {
605		t.Fatalf("expected to retrieve %d pages; got %d", want, have)
606	}
607	if want, have := 3, docs; want != have {
608		t.Fatalf("expected to retrieve %d hits; got %d", want, have)
609	}
610
611	err = svc.Clear(context.TODO())
612	if err != nil {
613		t.Fatal(err)
614	}
615
616	_, err = svc.Do(context.TODO())
617	if err == nil {
618		t.Fatal("expected to fail")
619	}
620}
621