1// Copyright 2012-present Oliver Eilhard. All rights reserved.
2// Use of this source code is governed by a MIT-license.
3// See http://olivere.mit-license.org/license.txt for details.
4
5package elastic
6
7import (
8	"context"
9	"fmt"
10	"io"
11	"net/http"
12	"net/url"
13	"strings"
14	"sync"
15
16	"github.com/olivere/elastic/uritemplates"
17)
18
19const (
20	// DefaultScrollKeepAlive is the default time a scroll cursor will be kept alive.
21	DefaultScrollKeepAlive = "5m"
22)
23
24// ScrollService iterates over pages of search results from Elasticsearch.
25type ScrollService struct {
26	client            *Client
27	retrier           Retrier
28	indices           []string
29	types             []string
30	keepAlive         string
31	body              interface{}
32	ss                *SearchSource
33	size              *int
34	pretty            bool
35	routing           string
36	preference        string
37	ignoreUnavailable *bool
38	allowNoIndices    *bool
39	expandWildcards   string
40	headers           http.Header
41	maxResponseSize   int64
42	filterPath        []string
43
44	mu       sync.RWMutex
45	scrollId string
46}
47
48// NewScrollService initializes and returns a new ScrollService.
49func NewScrollService(client *Client) *ScrollService {
50	builder := &ScrollService{
51		client:    client,
52		ss:        NewSearchSource(),
53		keepAlive: DefaultScrollKeepAlive,
54	}
55	return builder
56}
57
58// Header sets headers on the request
59func (s *ScrollService) Header(name string, value string) *ScrollService {
60	if s.headers == nil {
61		s.headers = http.Header{}
62	}
63	s.headers.Add(name, value)
64	return s
65}
66
67// Retrier allows to set specific retry logic for this ScrollService.
68// If not specified, it will use the client's default retrier.
69func (s *ScrollService) Retrier(retrier Retrier) *ScrollService {
70	s.retrier = retrier
71	return s
72}
73
74// Index sets the name of one or more indices to iterate over.
75func (s *ScrollService) Index(indices ...string) *ScrollService {
76	if s.indices == nil {
77		s.indices = make([]string, 0)
78	}
79	s.indices = append(s.indices, indices...)
80	return s
81}
82
83// Type sets the name of one or more types to iterate over.
84func (s *ScrollService) Type(types ...string) *ScrollService {
85	if s.types == nil {
86		s.types = make([]string, 0)
87	}
88	s.types = append(s.types, types...)
89	return s
90}
91
92// Scroll is an alias for KeepAlive, the time to keep
93// the cursor alive (e.g. "5m" for 5 minutes).
94func (s *ScrollService) Scroll(keepAlive string) *ScrollService {
95	s.keepAlive = keepAlive
96	return s
97}
98
99// KeepAlive sets the maximum time after which the cursor will expire.
100// It is "5m" by default.
101func (s *ScrollService) KeepAlive(keepAlive string) *ScrollService {
102	s.keepAlive = keepAlive
103	return s
104}
105
106// Size specifies the number of documents Elasticsearch should return
107// from each shard, per page.
108func (s *ScrollService) Size(size int) *ScrollService {
109	s.size = &size
110	return s
111}
112
113// Body sets the raw body to send to Elasticsearch. This can be e.g. a string,
114// a map[string]interface{} or anything that can be serialized into JSON.
115// Notice that setting the body disables the use of SearchSource and many
116// other properties of the ScanService.
117func (s *ScrollService) Body(body interface{}) *ScrollService {
118	s.body = body
119	return s
120}
121
122// SearchSource sets the search source builder to use with this iterator.
123// Notice that only a certain number of properties can be used when scrolling,
124// e.g. query and sorting.
125func (s *ScrollService) SearchSource(searchSource *SearchSource) *ScrollService {
126	s.ss = searchSource
127	if s.ss == nil {
128		s.ss = NewSearchSource()
129	}
130	return s
131}
132
133// Query sets the query to perform, e.g. a MatchAllQuery.
134func (s *ScrollService) Query(query Query) *ScrollService {
135	s.ss = s.ss.Query(query)
136	return s
137}
138
139// PostFilter is executed as the last filter. It only affects the
140// search hits but not facets. See
141// https://www.elastic.co/guide/en/elasticsearch/reference/6.7/search-request-post-filter.html
142// for details.
143func (s *ScrollService) PostFilter(postFilter Query) *ScrollService {
144	s.ss = s.ss.PostFilter(postFilter)
145	return s
146}
147
148// Slice allows slicing the scroll request into several batches.
149// This is supported in Elasticsearch 5.0 or later.
150// See https://www.elastic.co/guide/en/elasticsearch/reference/6.7/search-request-scroll.html#sliced-scroll
151// for details.
152func (s *ScrollService) Slice(sliceQuery Query) *ScrollService {
153	s.ss = s.ss.Slice(sliceQuery)
154	return s
155}
156
157// FetchSource indicates whether the response should contain the stored
158// _source for every hit.
159func (s *ScrollService) FetchSource(fetchSource bool) *ScrollService {
160	s.ss = s.ss.FetchSource(fetchSource)
161	return s
162}
163
164// FetchSourceContext indicates how the _source should be fetched.
165func (s *ScrollService) FetchSourceContext(fetchSourceContext *FetchSourceContext) *ScrollService {
166	s.ss = s.ss.FetchSourceContext(fetchSourceContext)
167	return s
168}
169
170// Version can be set to true to return a version for each search hit.
171// See https://www.elastic.co/guide/en/elasticsearch/reference/6.7/search-request-version.html.
172func (s *ScrollService) Version(version bool) *ScrollService {
173	s.ss = s.ss.Version(version)
174	return s
175}
176
177// Sort adds a sort order. This can have negative effects on the performance
178// of the scroll operation as Elasticsearch needs to sort first.
179func (s *ScrollService) Sort(field string, ascending bool) *ScrollService {
180	s.ss = s.ss.Sort(field, ascending)
181	return s
182}
183
184// SortWithInfo specifies a sort order. Notice that sorting can have a
185// negative impact on scroll performance.
186func (s *ScrollService) SortWithInfo(info SortInfo) *ScrollService {
187	s.ss = s.ss.SortWithInfo(info)
188	return s
189}
190
191// SortBy specifies a sort order. Notice that sorting can have a
192// negative impact on scroll performance.
193func (s *ScrollService) SortBy(sorter ...Sorter) *ScrollService {
194	s.ss = s.ss.SortBy(sorter...)
195	return s
196}
197
198// Pretty asks Elasticsearch to pretty-print the returned JSON.
199func (s *ScrollService) Pretty(pretty bool) *ScrollService {
200	s.pretty = pretty
201	return s
202}
203
204// Routing is a list of specific routing values to control the shards
205// the search will be executed on.
206func (s *ScrollService) Routing(routings ...string) *ScrollService {
207	s.routing = strings.Join(routings, ",")
208	return s
209}
210
211// Preference sets the preference to execute the search. Defaults to
212// randomize across shards ("random"). Can be set to "_local" to prefer
213// local shards, "_primary" to execute on primary shards only,
214// or a custom value which guarantees that the same order will be used
215// across different requests.
216func (s *ScrollService) Preference(preference string) *ScrollService {
217	s.preference = preference
218	return s
219}
220
221// IgnoreUnavailable indicates whether the specified concrete indices
222// should be ignored when unavailable (missing or closed).
223func (s *ScrollService) IgnoreUnavailable(ignoreUnavailable bool) *ScrollService {
224	s.ignoreUnavailable = &ignoreUnavailable
225	return s
226}
227
228// AllowNoIndices indicates whether to ignore if a wildcard indices
229// expression resolves into no concrete indices. (This includes `_all` string
230// or when no indices have been specified).
231func (s *ScrollService) AllowNoIndices(allowNoIndices bool) *ScrollService {
232	s.allowNoIndices = &allowNoIndices
233	return s
234}
235
236// ExpandWildcards indicates whether to expand wildcard expression to
237// concrete indices that are open, closed or both.
238func (s *ScrollService) ExpandWildcards(expandWildcards string) *ScrollService {
239	s.expandWildcards = expandWildcards
240	return s
241}
242
243// MaxResponseSize sets an upper limit on the response body size that we accept,
244// to guard against OOM situations.
245func (s *ScrollService) MaxResponseSize(maxResponseSize int64) *ScrollService {
246	s.maxResponseSize = maxResponseSize
247	return s
248}
249
250// FilterPath allows reducing the response, a mechanism known as
251// response filtering and described here:
252// https://www.elastic.co/guide/en/elasticsearch/reference/6.7/common-options.html#common-options-response-filtering.
253func (s *ScrollService) FilterPath(filterPath ...string) *ScrollService {
254	s.filterPath = append(s.filterPath, filterPath...)
255	return s
256}
257
258// ScrollId specifies the identifier of a scroll in action.
259func (s *ScrollService) ScrollId(scrollId string) *ScrollService {
260	s.mu.Lock()
261	s.scrollId = scrollId
262	s.mu.Unlock()
263	return s
264}
265
266// Do returns the next search result. It will return io.EOF as error if there
267// are no more search results.
268func (s *ScrollService) Do(ctx context.Context) (*SearchResult, error) {
269	s.mu.RLock()
270	nextScrollId := s.scrollId
271	s.mu.RUnlock()
272	if len(nextScrollId) == 0 {
273		return s.first(ctx)
274	}
275	return s.next(ctx)
276}
277
278// Clear cancels the current scroll operation. If you don't do this manually,
279// the scroll will be expired automatically by Elasticsearch. You can control
280// how long a scroll cursor is kept alive with the KeepAlive func.
281func (s *ScrollService) Clear(ctx context.Context) error {
282	s.mu.RLock()
283	scrollId := s.scrollId
284	s.mu.RUnlock()
285	if len(scrollId) == 0 {
286		return nil
287	}
288
289	path := "/_search/scroll"
290	params := url.Values{}
291	body := struct {
292		ScrollId []string `json:"scroll_id,omitempty"`
293	}{
294		ScrollId: []string{scrollId},
295	}
296
297	_, err := s.client.PerformRequest(ctx, PerformRequestOptions{
298		Method:  "DELETE",
299		Path:    path,
300		Params:  params,
301		Body:    body,
302		Retrier: s.retrier,
303	})
304	if err != nil {
305		return err
306	}
307
308	return nil
309}
310
311// -- First --
312
313// first takes the first page of search results.
314func (s *ScrollService) first(ctx context.Context) (*SearchResult, error) {
315	// Get URL and parameters for request
316	path, params, err := s.buildFirstURL()
317	if err != nil {
318		return nil, err
319	}
320
321	// Get HTTP request body
322	body, err := s.bodyFirst()
323	if err != nil {
324		return nil, err
325	}
326
327	// Get HTTP response
328	res, err := s.client.PerformRequest(ctx, PerformRequestOptions{
329		Method:          "POST",
330		Path:            path,
331		Params:          params,
332		Body:            body,
333		Retrier:         s.retrier,
334		Headers:         s.headers,
335		MaxResponseSize: s.maxResponseSize,
336	})
337	if err != nil {
338		return nil, err
339	}
340
341	// Return operation response
342	ret := new(SearchResult)
343	if err := s.client.decoder.Decode(res.Body, ret); err != nil {
344		return nil, err
345	}
346	s.mu.Lock()
347	s.scrollId = ret.ScrollId
348	s.mu.Unlock()
349	if ret.Hits == nil || len(ret.Hits.Hits) == 0 {
350		return ret, io.EOF
351	}
352	return ret, nil
353}
354
355// buildFirstURL builds the URL for retrieving the first page.
356func (s *ScrollService) buildFirstURL() (string, url.Values, error) {
357	// Build URL
358	var err error
359	var path string
360	if len(s.indices) == 0 && len(s.types) == 0 {
361		path = "/_search"
362	} else if len(s.indices) > 0 && len(s.types) == 0 {
363		path, err = uritemplates.Expand("/{index}/_search", map[string]string{
364			"index": strings.Join(s.indices, ","),
365		})
366	} else if len(s.indices) == 0 && len(s.types) > 0 {
367		path, err = uritemplates.Expand("/_all/{typ}/_search", map[string]string{
368			"typ": strings.Join(s.types, ","),
369		})
370	} else {
371		path, err = uritemplates.Expand("/{index}/{typ}/_search", map[string]string{
372			"index": strings.Join(s.indices, ","),
373			"typ":   strings.Join(s.types, ","),
374		})
375	}
376	if err != nil {
377		return "", url.Values{}, err
378	}
379
380	// Add query string parameters
381	params := url.Values{}
382	if s.pretty {
383		params.Set("pretty", "true")
384	}
385	if s.size != nil && *s.size > 0 {
386		params.Set("size", fmt.Sprintf("%d", *s.size))
387	}
388	if len(s.keepAlive) > 0 {
389		params.Set("scroll", s.keepAlive)
390	}
391	if len(s.routing) > 0 {
392		params.Set("routing", s.routing)
393	}
394	if len(s.preference) > 0 {
395		params.Set("preference", s.preference)
396	}
397	if s.allowNoIndices != nil {
398		params.Set("allow_no_indices", fmt.Sprintf("%v", *s.allowNoIndices))
399	}
400	if len(s.expandWildcards) > 0 {
401		params.Set("expand_wildcards", s.expandWildcards)
402	}
403	if s.ignoreUnavailable != nil {
404		params.Set("ignore_unavailable", fmt.Sprintf("%v", *s.ignoreUnavailable))
405	}
406	if len(s.filterPath) > 0 {
407		// Always add "hits._scroll_id", otherwise we cannot scroll
408		s.filterPath = append(s.filterPath, "_scroll_id")
409		params.Set("filter_path", strings.Join(s.filterPath, ","))
410	}
411
412	return path, params, nil
413}
414
415// bodyFirst returns the request to fetch the first batch of results.
416func (s *ScrollService) bodyFirst() (interface{}, error) {
417	var err error
418	var body interface{}
419
420	if s.body != nil {
421		body = s.body
422	} else {
423		// Use _doc sort by default if none is specified
424		if !s.ss.hasSort() {
425			// Use efficient sorting when no user-defined query/body is specified
426			s.ss = s.ss.SortBy(SortByDoc{})
427		}
428
429		// Body from search source
430		body, err = s.ss.Source()
431		if err != nil {
432			return nil, err
433		}
434	}
435
436	return body, nil
437}
438
439// -- Next --
440
441func (s *ScrollService) next(ctx context.Context) (*SearchResult, error) {
442	// Get URL for request
443	path, params, err := s.buildNextURL()
444	if err != nil {
445		return nil, err
446	}
447
448	// Setup HTTP request body
449	body, err := s.bodyNext()
450	if err != nil {
451		return nil, err
452	}
453
454	// Get HTTP response
455	res, err := s.client.PerformRequest(ctx, PerformRequestOptions{
456		Method:          "POST",
457		Path:            path,
458		Params:          params,
459		Body:            body,
460		Retrier:         s.retrier,
461		Headers:         s.headers,
462		MaxResponseSize: s.maxResponseSize,
463	})
464	if err != nil {
465		return nil, err
466	}
467
468	// Return operation response
469	ret := new(SearchResult)
470	if err := s.client.decoder.Decode(res.Body, ret); err != nil {
471		return nil, err
472	}
473	s.mu.Lock()
474	s.scrollId = ret.ScrollId
475	s.mu.Unlock()
476	if ret.Hits == nil || len(ret.Hits.Hits) == 0 {
477		return ret, io.EOF
478	}
479	return ret, nil
480}
481
482// buildNextURL builds the URL for the operation.
483func (s *ScrollService) buildNextURL() (string, url.Values, error) {
484	path := "/_search/scroll"
485
486	// Add query string parameters
487	params := url.Values{}
488	if s.pretty {
489		params.Set("pretty", "true")
490	}
491	if len(s.filterPath) > 0 {
492		// Always add "hits._scroll_id", otherwise we cannot scroll
493		s.filterPath = append(s.filterPath, "_scroll_id")
494		params.Set("filter_path", strings.Join(s.filterPath, ","))
495	}
496
497	return path, params, nil
498}
499
500// body returns the request to fetch the next batch of results.
501func (s *ScrollService) bodyNext() (interface{}, error) {
502	s.mu.RLock()
503	body := struct {
504		Scroll   string `json:"scroll"`
505		ScrollId string `json:"scroll_id,omitempty"`
506	}{
507		Scroll:   s.keepAlive,
508		ScrollId: s.scrollId,
509	}
510	s.mu.RUnlock()
511	return body, nil
512}
513