1// Copyright 2012-present Oliver Eilhard. All rights reserved.
2// Use of this source code is governed by a MIT-license.
3// See http://olivere.mit-license.org/license.txt for details.
4
5package elastic
6
7import (
8	"context"
9	"fmt"
10	"io"
11	"net/http"
12	"net/url"
13	"strings"
14	"sync"
15
16	"github.com/olivere/elastic/uritemplates"
17)
18
19const (
20	// DefaultScrollKeepAlive is the default time a scroll cursor will be kept alive.
21	DefaultScrollKeepAlive = "5m"
22)
23
24// ScrollService iterates over pages of search results from Elasticsearch.
25type ScrollService struct {
26	client  *Client
27	retrier Retrier
28
29	pretty     *bool       // pretty format the returned JSON response
30	human      *bool       // return human readable values for statistics
31	errorTrace *bool       // include the stack trace of returned errors
32	filterPath []string    // list of filters used to reduce the response
33	headers    http.Header // custom request-level HTTP headers
34
35	indices           []string
36	types             []string
37	keepAlive         string
38	body              interface{}
39	ss                *SearchSource
40	size              *int
41	routing           string
42	preference        string
43	ignoreUnavailable *bool
44	allowNoIndices    *bool
45	expandWildcards   string
46	maxResponseSize   int64
47
48	mu       sync.RWMutex
49	scrollId string
50}
51
52// NewScrollService initializes and returns a new ScrollService.
53func NewScrollService(client *Client) *ScrollService {
54	builder := &ScrollService{
55		client:    client,
56		ss:        NewSearchSource(),
57		keepAlive: DefaultScrollKeepAlive,
58	}
59	return builder
60}
61
62// Pretty tells Elasticsearch whether to return a formatted JSON response.
63func (s *ScrollService) Pretty(pretty bool) *ScrollService {
64	s.pretty = &pretty
65	return s
66}
67
68// Human specifies whether human readable values should be returned in
69// the JSON response, e.g. "7.5mb".
70func (s *ScrollService) Human(human bool) *ScrollService {
71	s.human = &human
72	return s
73}
74
75// ErrorTrace specifies whether to include the stack trace of returned errors.
76func (s *ScrollService) ErrorTrace(errorTrace bool) *ScrollService {
77	s.errorTrace = &errorTrace
78	return s
79}
80
81// FilterPath specifies a list of filters used to reduce the response.
82func (s *ScrollService) FilterPath(filterPath ...string) *ScrollService {
83	s.filterPath = filterPath
84	return s
85}
86
87// Header adds a header to the request.
88func (s *ScrollService) Header(name string, value string) *ScrollService {
89	if s.headers == nil {
90		s.headers = http.Header{}
91	}
92	s.headers.Add(name, value)
93	return s
94}
95
96// Headers specifies the headers of the request.
97func (s *ScrollService) Headers(headers http.Header) *ScrollService {
98	s.headers = headers
99	return s
100}
101
102// Retrier allows to set specific retry logic for this ScrollService.
103// If not specified, it will use the client's default retrier.
104func (s *ScrollService) Retrier(retrier Retrier) *ScrollService {
105	s.retrier = retrier
106	return s
107}
108
109// Index sets the name of one or more indices to iterate over.
110func (s *ScrollService) Index(indices ...string) *ScrollService {
111	if s.indices == nil {
112		s.indices = make([]string, 0)
113	}
114	s.indices = append(s.indices, indices...)
115	return s
116}
117
118// Type sets the name of one or more types to iterate over.
119func (s *ScrollService) Type(types ...string) *ScrollService {
120	if s.types == nil {
121		s.types = make([]string, 0)
122	}
123	s.types = append(s.types, types...)
124	return s
125}
126
127// Scroll is an alias for KeepAlive, the time to keep
128// the cursor alive (e.g. "5m" for 5 minutes).
129func (s *ScrollService) Scroll(keepAlive string) *ScrollService {
130	s.keepAlive = keepAlive
131	return s
132}
133
134// KeepAlive sets the maximum time after which the cursor will expire.
135// It is "5m" by default.
136func (s *ScrollService) KeepAlive(keepAlive string) *ScrollService {
137	s.keepAlive = keepAlive
138	return s
139}
140
141// Size specifies the number of documents Elasticsearch should return
142// from each shard, per page.
143func (s *ScrollService) Size(size int) *ScrollService {
144	s.size = &size
145	return s
146}
147
148// Highlight allows to highlight search results on one or more fields
149func (s *ScrollService) Highlight(highlight *Highlight) *ScrollService {
150	s.ss = s.ss.Highlight(highlight)
151	return s
152}
153
154// Body sets the raw body to send to Elasticsearch. This can be e.g. a string,
155// a map[string]interface{} or anything that can be serialized into JSON.
156// Notice that setting the body disables the use of SearchSource and many
157// other properties of the ScanService.
158func (s *ScrollService) Body(body interface{}) *ScrollService {
159	s.body = body
160	return s
161}
162
163// SearchSource sets the search source builder to use with this iterator.
164// Notice that only a certain number of properties can be used when scrolling,
165// e.g. query and sorting.
166func (s *ScrollService) SearchSource(searchSource *SearchSource) *ScrollService {
167	s.ss = searchSource
168	if s.ss == nil {
169		s.ss = NewSearchSource()
170	}
171	return s
172}
173
174// Query sets the query to perform, e.g. a MatchAllQuery.
175func (s *ScrollService) Query(query Query) *ScrollService {
176	s.ss = s.ss.Query(query)
177	return s
178}
179
180// PostFilter is executed as the last filter. It only affects the
181// search hits but not facets. See
182// https://www.elastic.co/guide/en/elasticsearch/reference/6.8/search-request-post-filter.html
183// for details.
184func (s *ScrollService) PostFilter(postFilter Query) *ScrollService {
185	s.ss = s.ss.PostFilter(postFilter)
186	return s
187}
188
189// Slice allows slicing the scroll request into several batches.
190// This is supported in Elasticsearch 5.0 or later.
191// See https://www.elastic.co/guide/en/elasticsearch/reference/6.8/search-request-scroll.html#sliced-scroll
192// for details.
193func (s *ScrollService) Slice(sliceQuery Query) *ScrollService {
194	s.ss = s.ss.Slice(sliceQuery)
195	return s
196}
197
198// FetchSource indicates whether the response should contain the stored
199// _source for every hit.
200func (s *ScrollService) FetchSource(fetchSource bool) *ScrollService {
201	s.ss = s.ss.FetchSource(fetchSource)
202	return s
203}
204
205// FetchSourceContext indicates how the _source should be fetched.
206func (s *ScrollService) FetchSourceContext(fetchSourceContext *FetchSourceContext) *ScrollService {
207	s.ss = s.ss.FetchSourceContext(fetchSourceContext)
208	return s
209}
210
211// Version can be set to true to return a version for each search hit.
212// See https://www.elastic.co/guide/en/elasticsearch/reference/6.8/search-request-version.html.
213func (s *ScrollService) Version(version bool) *ScrollService {
214	s.ss = s.ss.Version(version)
215	return s
216}
217
218// Sort adds a sort order. This can have negative effects on the performance
219// of the scroll operation as Elasticsearch needs to sort first.
220func (s *ScrollService) Sort(field string, ascending bool) *ScrollService {
221	s.ss = s.ss.Sort(field, ascending)
222	return s
223}
224
225// SortWithInfo specifies a sort order. Notice that sorting can have a
226// negative impact on scroll performance.
227func (s *ScrollService) SortWithInfo(info SortInfo) *ScrollService {
228	s.ss = s.ss.SortWithInfo(info)
229	return s
230}
231
232// SortBy specifies a sort order. Notice that sorting can have a
233// negative impact on scroll performance.
234func (s *ScrollService) SortBy(sorter ...Sorter) *ScrollService {
235	s.ss = s.ss.SortBy(sorter...)
236	return s
237}
238
239// Routing is a list of specific routing values to control the shards
240// the search will be executed on.
241func (s *ScrollService) Routing(routings ...string) *ScrollService {
242	s.routing = strings.Join(routings, ",")
243	return s
244}
245
246// Preference sets the preference to execute the search. Defaults to
247// randomize across shards ("random"). Can be set to "_local" to prefer
248// local shards, "_primary" to execute on primary shards only,
249// or a custom value which guarantees that the same order will be used
250// across different requests.
251func (s *ScrollService) Preference(preference string) *ScrollService {
252	s.preference = preference
253	return s
254}
255
256// IgnoreUnavailable indicates whether the specified concrete indices
257// should be ignored when unavailable (missing or closed).
258func (s *ScrollService) IgnoreUnavailable(ignoreUnavailable bool) *ScrollService {
259	s.ignoreUnavailable = &ignoreUnavailable
260	return s
261}
262
263// AllowNoIndices indicates whether to ignore if a wildcard indices
264// expression resolves into no concrete indices. (This includes `_all` string
265// or when no indices have been specified).
266func (s *ScrollService) AllowNoIndices(allowNoIndices bool) *ScrollService {
267	s.allowNoIndices = &allowNoIndices
268	return s
269}
270
271// ExpandWildcards indicates whether to expand wildcard expression to
272// concrete indices that are open, closed or both.
273func (s *ScrollService) ExpandWildcards(expandWildcards string) *ScrollService {
274	s.expandWildcards = expandWildcards
275	return s
276}
277
278// MaxResponseSize sets an upper limit on the response body size that we accept,
279// to guard against OOM situations.
280func (s *ScrollService) MaxResponseSize(maxResponseSize int64) *ScrollService {
281	s.maxResponseSize = maxResponseSize
282	return s
283}
284
285// ScrollId specifies the identifier of a scroll in action.
286func (s *ScrollService) ScrollId(scrollId string) *ScrollService {
287	s.mu.Lock()
288	s.scrollId = scrollId
289	s.mu.Unlock()
290	return s
291}
292
293// Do returns the next search result. It will return io.EOF as error if there
294// are no more search results.
295func (s *ScrollService) Do(ctx context.Context) (*SearchResult, error) {
296	s.mu.RLock()
297	nextScrollId := s.scrollId
298	s.mu.RUnlock()
299	if len(nextScrollId) == 0 {
300		return s.first(ctx)
301	}
302	return s.next(ctx)
303}
304
305// Clear cancels the current scroll operation. If you don't do this manually,
306// the scroll will be expired automatically by Elasticsearch. You can control
307// how long a scroll cursor is kept alive with the KeepAlive func.
308func (s *ScrollService) Clear(ctx context.Context) error {
309	s.mu.RLock()
310	scrollId := s.scrollId
311	s.mu.RUnlock()
312	if len(scrollId) == 0 {
313		return nil
314	}
315
316	path := "/_search/scroll"
317	params := url.Values{}
318	if v := s.pretty; v != nil {
319		params.Set("pretty", fmt.Sprint(*v))
320	}
321	if v := s.human; v != nil {
322		params.Set("human", fmt.Sprint(*v))
323	}
324	if v := s.errorTrace; v != nil {
325		params.Set("error_trace", fmt.Sprint(*v))
326	}
327	if len(s.filterPath) > 0 {
328		params.Set("filter_path", strings.Join(s.filterPath, ","))
329	}
330	body := struct {
331		ScrollId []string `json:"scroll_id,omitempty"`
332	}{
333		ScrollId: []string{scrollId},
334	}
335
336	_, err := s.client.PerformRequest(ctx, PerformRequestOptions{
337		Method:  "DELETE",
338		Path:    path,
339		Params:  params,
340		Body:    body,
341		Retrier: s.retrier,
342	})
343	if err != nil {
344		return err
345	}
346
347	return nil
348}
349
350// -- First --
351
352// first takes the first page of search results.
353func (s *ScrollService) first(ctx context.Context) (*SearchResult, error) {
354	// Get URL and parameters for request
355	path, params, err := s.buildFirstURL()
356	if err != nil {
357		return nil, err
358	}
359
360	// Get HTTP request body
361	body, err := s.bodyFirst()
362	if err != nil {
363		return nil, err
364	}
365
366	// Get HTTP response
367	res, err := s.client.PerformRequest(ctx, PerformRequestOptions{
368		Method:          "POST",
369		Path:            path,
370		Params:          params,
371		Body:            body,
372		Retrier:         s.retrier,
373		Headers:         s.headers,
374		MaxResponseSize: s.maxResponseSize,
375	})
376	if err != nil {
377		return nil, err
378	}
379
380	// Return operation response
381	ret := new(SearchResult)
382	if err := s.client.decoder.Decode(res.Body, ret); err != nil {
383		return nil, err
384	}
385	s.mu.Lock()
386	s.scrollId = ret.ScrollId
387	s.mu.Unlock()
388	if ret.Hits == nil || len(ret.Hits.Hits) == 0 {
389		return ret, io.EOF
390	}
391	return ret, nil
392}
393
394// buildFirstURL builds the URL for retrieving the first page.
395func (s *ScrollService) buildFirstURL() (string, url.Values, error) {
396	// Build URL
397	var err error
398	var path string
399	if len(s.indices) == 0 && len(s.types) == 0 {
400		path = "/_search"
401	} else if len(s.indices) > 0 && len(s.types) == 0 {
402		path, err = uritemplates.Expand("/{index}/_search", map[string]string{
403			"index": strings.Join(s.indices, ","),
404		})
405	} else if len(s.indices) == 0 && len(s.types) > 0 {
406		path, err = uritemplates.Expand("/_all/{typ}/_search", map[string]string{
407			"typ": strings.Join(s.types, ","),
408		})
409	} else {
410		path, err = uritemplates.Expand("/{index}/{typ}/_search", map[string]string{
411			"index": strings.Join(s.indices, ","),
412			"typ":   strings.Join(s.types, ","),
413		})
414	}
415	if err != nil {
416		return "", url.Values{}, err
417	}
418
419	// Add query string parameters
420	params := url.Values{}
421	if v := s.pretty; v != nil {
422		params.Set("pretty", fmt.Sprint(*v))
423	}
424	if v := s.human; v != nil {
425		params.Set("human", fmt.Sprint(*v))
426	}
427	if v := s.errorTrace; v != nil {
428		params.Set("error_trace", fmt.Sprint(*v))
429	}
430	if len(s.filterPath) > 0 {
431		// Always add "hits._scroll_id", otherwise we cannot scroll
432		s.filterPath = append(s.filterPath, "_scroll_id")
433		params.Set("filter_path", strings.Join(s.filterPath, ","))
434	}
435	if s.size != nil && *s.size > 0 {
436		params.Set("size", fmt.Sprintf("%d", *s.size))
437	}
438	if len(s.keepAlive) > 0 {
439		params.Set("scroll", s.keepAlive)
440	}
441	if len(s.routing) > 0 {
442		params.Set("routing", s.routing)
443	}
444	if len(s.preference) > 0 {
445		params.Set("preference", s.preference)
446	}
447	if s.allowNoIndices != nil {
448		params.Set("allow_no_indices", fmt.Sprintf("%v", *s.allowNoIndices))
449	}
450	if len(s.expandWildcards) > 0 {
451		params.Set("expand_wildcards", s.expandWildcards)
452	}
453	if s.ignoreUnavailable != nil {
454		params.Set("ignore_unavailable", fmt.Sprintf("%v", *s.ignoreUnavailable))
455	}
456
457	return path, params, nil
458}
459
460// bodyFirst returns the request to fetch the first batch of results.
461func (s *ScrollService) bodyFirst() (interface{}, error) {
462	var err error
463	var body interface{}
464
465	if s.body != nil {
466		body = s.body
467	} else {
468		// Use _doc sort by default if none is specified
469		if !s.ss.hasSort() {
470			// Use efficient sorting when no user-defined query/body is specified
471			s.ss = s.ss.SortBy(SortByDoc{})
472		}
473
474		// Body from search source
475		body, err = s.ss.Source()
476		if err != nil {
477			return nil, err
478		}
479	}
480
481	return body, nil
482}
483
484// -- Next --
485
486func (s *ScrollService) next(ctx context.Context) (*SearchResult, error) {
487	// Get URL for request
488	path, params, err := s.buildNextURL()
489	if err != nil {
490		return nil, err
491	}
492
493	// Setup HTTP request body
494	body, err := s.bodyNext()
495	if err != nil {
496		return nil, err
497	}
498
499	// Get HTTP response
500	res, err := s.client.PerformRequest(ctx, PerformRequestOptions{
501		Method:          "POST",
502		Path:            path,
503		Params:          params,
504		Body:            body,
505		Retrier:         s.retrier,
506		Headers:         s.headers,
507		MaxResponseSize: s.maxResponseSize,
508	})
509	if err != nil {
510		return nil, err
511	}
512
513	// Return operation response
514	ret := new(SearchResult)
515	if err := s.client.decoder.Decode(res.Body, ret); err != nil {
516		return nil, err
517	}
518	s.mu.Lock()
519	s.scrollId = ret.ScrollId
520	s.mu.Unlock()
521	if ret.Hits == nil || len(ret.Hits.Hits) == 0 {
522		return ret, io.EOF
523	}
524	return ret, nil
525}
526
527// buildNextURL builds the URL for the operation.
528func (s *ScrollService) buildNextURL() (string, url.Values, error) {
529	path := "/_search/scroll"
530
531	// Add query string parameters
532	params := url.Values{}
533	if v := s.pretty; v != nil {
534		params.Set("pretty", fmt.Sprint(*v))
535	}
536	if v := s.human; v != nil {
537		params.Set("human", fmt.Sprint(*v))
538	}
539	if v := s.errorTrace; v != nil {
540		params.Set("error_trace", fmt.Sprint(*v))
541	}
542	if len(s.filterPath) > 0 {
543		// Always add "hits._scroll_id", otherwise we cannot scroll
544		s.filterPath = append(s.filterPath, "_scroll_id")
545		params.Set("filter_path", strings.Join(s.filterPath, ","))
546	}
547
548	return path, params, nil
549}
550
551// body returns the request to fetch the next batch of results.
552func (s *ScrollService) bodyNext() (interface{}, error) {
553	s.mu.RLock()
554	body := struct {
555		Scroll   string `json:"scroll"`
556		ScrollId string `json:"scroll_id,omitempty"`
557	}{
558		Scroll:   s.keepAlive,
559		ScrollId: s.scrollId,
560	}
561	s.mu.RUnlock()
562	return body, nil
563}
564