1// Copyright 2012-present Oliver Eilhard. All rights reserved. 2// Use of this source code is governed by a MIT-license. 3// See http://olivere.mit-license.org/license.txt for details. 4 5package elastic 6 7import ( 8 "context" 9 "fmt" 10 "io" 11 "net/http" 12 "net/url" 13 "strings" 14 "sync" 15 16 "github.com/olivere/elastic/uritemplates" 17) 18 19const ( 20 // DefaultScrollKeepAlive is the default time a scroll cursor will be kept alive. 21 DefaultScrollKeepAlive = "5m" 22) 23 24// ScrollService iterates over pages of search results from Elasticsearch. 25type ScrollService struct { 26 client *Client 27 retrier Retrier 28 indices []string 29 types []string 30 keepAlive string 31 body interface{} 32 ss *SearchSource 33 size *int 34 pretty bool 35 routing string 36 preference string 37 ignoreUnavailable *bool 38 allowNoIndices *bool 39 expandWildcards string 40 headers http.Header 41 maxResponseSize int64 42 filterPath []string 43 44 mu sync.RWMutex 45 scrollId string 46} 47 48// NewScrollService initializes and returns a new ScrollService. 49func NewScrollService(client *Client) *ScrollService { 50 builder := &ScrollService{ 51 client: client, 52 ss: NewSearchSource(), 53 keepAlive: DefaultScrollKeepAlive, 54 } 55 return builder 56} 57 58// Header sets headers on the request 59func (s *ScrollService) Header(name string, value string) *ScrollService { 60 if s.headers == nil { 61 s.headers = http.Header{} 62 } 63 s.headers.Add(name, value) 64 return s 65} 66 67// Retrier allows to set specific retry logic for this ScrollService. 68// If not specified, it will use the client's default retrier. 69func (s *ScrollService) Retrier(retrier Retrier) *ScrollService { 70 s.retrier = retrier 71 return s 72} 73 74// Index sets the name of one or more indices to iterate over. 75func (s *ScrollService) Index(indices ...string) *ScrollService { 76 if s.indices == nil { 77 s.indices = make([]string, 0) 78 } 79 s.indices = append(s.indices, indices...) 80 return s 81} 82 83// Type sets the name of one or more types to iterate over. 84func (s *ScrollService) Type(types ...string) *ScrollService { 85 if s.types == nil { 86 s.types = make([]string, 0) 87 } 88 s.types = append(s.types, types...) 89 return s 90} 91 92// Scroll is an alias for KeepAlive, the time to keep 93// the cursor alive (e.g. "5m" for 5 minutes). 94func (s *ScrollService) Scroll(keepAlive string) *ScrollService { 95 s.keepAlive = keepAlive 96 return s 97} 98 99// KeepAlive sets the maximum time after which the cursor will expire. 100// It is "5m" by default. 101func (s *ScrollService) KeepAlive(keepAlive string) *ScrollService { 102 s.keepAlive = keepAlive 103 return s 104} 105 106// Size specifies the number of documents Elasticsearch should return 107// from each shard, per page. 108func (s *ScrollService) Size(size int) *ScrollService { 109 s.size = &size 110 return s 111} 112 113// Body sets the raw body to send to Elasticsearch. This can be e.g. a string, 114// a map[string]interface{} or anything that can be serialized into JSON. 115// Notice that setting the body disables the use of SearchSource and many 116// other properties of the ScanService. 117func (s *ScrollService) Body(body interface{}) *ScrollService { 118 s.body = body 119 return s 120} 121 122// SearchSource sets the search source builder to use with this iterator. 123// Notice that only a certain number of properties can be used when scrolling, 124// e.g. query and sorting. 125func (s *ScrollService) SearchSource(searchSource *SearchSource) *ScrollService { 126 s.ss = searchSource 127 if s.ss == nil { 128 s.ss = NewSearchSource() 129 } 130 return s 131} 132 133// Query sets the query to perform, e.g. a MatchAllQuery. 134func (s *ScrollService) Query(query Query) *ScrollService { 135 s.ss = s.ss.Query(query) 136 return s 137} 138 139// PostFilter is executed as the last filter. It only affects the 140// search hits but not facets. See 141// https://www.elastic.co/guide/en/elasticsearch/reference/6.7/search-request-post-filter.html 142// for details. 143func (s *ScrollService) PostFilter(postFilter Query) *ScrollService { 144 s.ss = s.ss.PostFilter(postFilter) 145 return s 146} 147 148// Slice allows slicing the scroll request into several batches. 149// This is supported in Elasticsearch 5.0 or later. 150// See https://www.elastic.co/guide/en/elasticsearch/reference/6.7/search-request-scroll.html#sliced-scroll 151// for details. 152func (s *ScrollService) Slice(sliceQuery Query) *ScrollService { 153 s.ss = s.ss.Slice(sliceQuery) 154 return s 155} 156 157// FetchSource indicates whether the response should contain the stored 158// _source for every hit. 159func (s *ScrollService) FetchSource(fetchSource bool) *ScrollService { 160 s.ss = s.ss.FetchSource(fetchSource) 161 return s 162} 163 164// FetchSourceContext indicates how the _source should be fetched. 165func (s *ScrollService) FetchSourceContext(fetchSourceContext *FetchSourceContext) *ScrollService { 166 s.ss = s.ss.FetchSourceContext(fetchSourceContext) 167 return s 168} 169 170// Version can be set to true to return a version for each search hit. 171// See https://www.elastic.co/guide/en/elasticsearch/reference/6.7/search-request-version.html. 172func (s *ScrollService) Version(version bool) *ScrollService { 173 s.ss = s.ss.Version(version) 174 return s 175} 176 177// Sort adds a sort order. This can have negative effects on the performance 178// of the scroll operation as Elasticsearch needs to sort first. 179func (s *ScrollService) Sort(field string, ascending bool) *ScrollService { 180 s.ss = s.ss.Sort(field, ascending) 181 return s 182} 183 184// SortWithInfo specifies a sort order. Notice that sorting can have a 185// negative impact on scroll performance. 186func (s *ScrollService) SortWithInfo(info SortInfo) *ScrollService { 187 s.ss = s.ss.SortWithInfo(info) 188 return s 189} 190 191// SortBy specifies a sort order. Notice that sorting can have a 192// negative impact on scroll performance. 193func (s *ScrollService) SortBy(sorter ...Sorter) *ScrollService { 194 s.ss = s.ss.SortBy(sorter...) 195 return s 196} 197 198// Pretty asks Elasticsearch to pretty-print the returned JSON. 199func (s *ScrollService) Pretty(pretty bool) *ScrollService { 200 s.pretty = pretty 201 return s 202} 203 204// Routing is a list of specific routing values to control the shards 205// the search will be executed on. 206func (s *ScrollService) Routing(routings ...string) *ScrollService { 207 s.routing = strings.Join(routings, ",") 208 return s 209} 210 211// Preference sets the preference to execute the search. Defaults to 212// randomize across shards ("random"). Can be set to "_local" to prefer 213// local shards, "_primary" to execute on primary shards only, 214// or a custom value which guarantees that the same order will be used 215// across different requests. 216func (s *ScrollService) Preference(preference string) *ScrollService { 217 s.preference = preference 218 return s 219} 220 221// IgnoreUnavailable indicates whether the specified concrete indices 222// should be ignored when unavailable (missing or closed). 223func (s *ScrollService) IgnoreUnavailable(ignoreUnavailable bool) *ScrollService { 224 s.ignoreUnavailable = &ignoreUnavailable 225 return s 226} 227 228// AllowNoIndices indicates whether to ignore if a wildcard indices 229// expression resolves into no concrete indices. (This includes `_all` string 230// or when no indices have been specified). 231func (s *ScrollService) AllowNoIndices(allowNoIndices bool) *ScrollService { 232 s.allowNoIndices = &allowNoIndices 233 return s 234} 235 236// ExpandWildcards indicates whether to expand wildcard expression to 237// concrete indices that are open, closed or both. 238func (s *ScrollService) ExpandWildcards(expandWildcards string) *ScrollService { 239 s.expandWildcards = expandWildcards 240 return s 241} 242 243// MaxResponseSize sets an upper limit on the response body size that we accept, 244// to guard against OOM situations. 245func (s *ScrollService) MaxResponseSize(maxResponseSize int64) *ScrollService { 246 s.maxResponseSize = maxResponseSize 247 return s 248} 249 250// FilterPath allows reducing the response, a mechanism known as 251// response filtering and described here: 252// https://www.elastic.co/guide/en/elasticsearch/reference/6.7/common-options.html#common-options-response-filtering. 253func (s *ScrollService) FilterPath(filterPath ...string) *ScrollService { 254 s.filterPath = append(s.filterPath, filterPath...) 255 return s 256} 257 258// ScrollId specifies the identifier of a scroll in action. 259func (s *ScrollService) ScrollId(scrollId string) *ScrollService { 260 s.mu.Lock() 261 s.scrollId = scrollId 262 s.mu.Unlock() 263 return s 264} 265 266// Do returns the next search result. It will return io.EOF as error if there 267// are no more search results. 268func (s *ScrollService) Do(ctx context.Context) (*SearchResult, error) { 269 s.mu.RLock() 270 nextScrollId := s.scrollId 271 s.mu.RUnlock() 272 if len(nextScrollId) == 0 { 273 return s.first(ctx) 274 } 275 return s.next(ctx) 276} 277 278// Clear cancels the current scroll operation. If you don't do this manually, 279// the scroll will be expired automatically by Elasticsearch. You can control 280// how long a scroll cursor is kept alive with the KeepAlive func. 281func (s *ScrollService) Clear(ctx context.Context) error { 282 s.mu.RLock() 283 scrollId := s.scrollId 284 s.mu.RUnlock() 285 if len(scrollId) == 0 { 286 return nil 287 } 288 289 path := "/_search/scroll" 290 params := url.Values{} 291 body := struct { 292 ScrollId []string `json:"scroll_id,omitempty"` 293 }{ 294 ScrollId: []string{scrollId}, 295 } 296 297 _, err := s.client.PerformRequest(ctx, PerformRequestOptions{ 298 Method: "DELETE", 299 Path: path, 300 Params: params, 301 Body: body, 302 Retrier: s.retrier, 303 }) 304 if err != nil { 305 return err 306 } 307 308 return nil 309} 310 311// -- First -- 312 313// first takes the first page of search results. 314func (s *ScrollService) first(ctx context.Context) (*SearchResult, error) { 315 // Get URL and parameters for request 316 path, params, err := s.buildFirstURL() 317 if err != nil { 318 return nil, err 319 } 320 321 // Get HTTP request body 322 body, err := s.bodyFirst() 323 if err != nil { 324 return nil, err 325 } 326 327 // Get HTTP response 328 res, err := s.client.PerformRequest(ctx, PerformRequestOptions{ 329 Method: "POST", 330 Path: path, 331 Params: params, 332 Body: body, 333 Retrier: s.retrier, 334 Headers: s.headers, 335 MaxResponseSize: s.maxResponseSize, 336 }) 337 if err != nil { 338 return nil, err 339 } 340 341 // Return operation response 342 ret := new(SearchResult) 343 if err := s.client.decoder.Decode(res.Body, ret); err != nil { 344 return nil, err 345 } 346 s.mu.Lock() 347 s.scrollId = ret.ScrollId 348 s.mu.Unlock() 349 if ret.Hits == nil || len(ret.Hits.Hits) == 0 { 350 return ret, io.EOF 351 } 352 return ret, nil 353} 354 355// buildFirstURL builds the URL for retrieving the first page. 356func (s *ScrollService) buildFirstURL() (string, url.Values, error) { 357 // Build URL 358 var err error 359 var path string 360 if len(s.indices) == 0 && len(s.types) == 0 { 361 path = "/_search" 362 } else if len(s.indices) > 0 && len(s.types) == 0 { 363 path, err = uritemplates.Expand("/{index}/_search", map[string]string{ 364 "index": strings.Join(s.indices, ","), 365 }) 366 } else if len(s.indices) == 0 && len(s.types) > 0 { 367 path, err = uritemplates.Expand("/_all/{typ}/_search", map[string]string{ 368 "typ": strings.Join(s.types, ","), 369 }) 370 } else { 371 path, err = uritemplates.Expand("/{index}/{typ}/_search", map[string]string{ 372 "index": strings.Join(s.indices, ","), 373 "typ": strings.Join(s.types, ","), 374 }) 375 } 376 if err != nil { 377 return "", url.Values{}, err 378 } 379 380 // Add query string parameters 381 params := url.Values{} 382 if s.pretty { 383 params.Set("pretty", "true") 384 } 385 if s.size != nil && *s.size > 0 { 386 params.Set("size", fmt.Sprintf("%d", *s.size)) 387 } 388 if len(s.keepAlive) > 0 { 389 params.Set("scroll", s.keepAlive) 390 } 391 if len(s.routing) > 0 { 392 params.Set("routing", s.routing) 393 } 394 if len(s.preference) > 0 { 395 params.Set("preference", s.preference) 396 } 397 if s.allowNoIndices != nil { 398 params.Set("allow_no_indices", fmt.Sprintf("%v", *s.allowNoIndices)) 399 } 400 if len(s.expandWildcards) > 0 { 401 params.Set("expand_wildcards", s.expandWildcards) 402 } 403 if s.ignoreUnavailable != nil { 404 params.Set("ignore_unavailable", fmt.Sprintf("%v", *s.ignoreUnavailable)) 405 } 406 if len(s.filterPath) > 0 { 407 // Always add "hits._scroll_id", otherwise we cannot scroll 408 s.filterPath = append(s.filterPath, "_scroll_id") 409 params.Set("filter_path", strings.Join(s.filterPath, ",")) 410 } 411 412 return path, params, nil 413} 414 415// bodyFirst returns the request to fetch the first batch of results. 416func (s *ScrollService) bodyFirst() (interface{}, error) { 417 var err error 418 var body interface{} 419 420 if s.body != nil { 421 body = s.body 422 } else { 423 // Use _doc sort by default if none is specified 424 if !s.ss.hasSort() { 425 // Use efficient sorting when no user-defined query/body is specified 426 s.ss = s.ss.SortBy(SortByDoc{}) 427 } 428 429 // Body from search source 430 body, err = s.ss.Source() 431 if err != nil { 432 return nil, err 433 } 434 } 435 436 return body, nil 437} 438 439// -- Next -- 440 441func (s *ScrollService) next(ctx context.Context) (*SearchResult, error) { 442 // Get URL for request 443 path, params, err := s.buildNextURL() 444 if err != nil { 445 return nil, err 446 } 447 448 // Setup HTTP request body 449 body, err := s.bodyNext() 450 if err != nil { 451 return nil, err 452 } 453 454 // Get HTTP response 455 res, err := s.client.PerformRequest(ctx, PerformRequestOptions{ 456 Method: "POST", 457 Path: path, 458 Params: params, 459 Body: body, 460 Retrier: s.retrier, 461 Headers: s.headers, 462 MaxResponseSize: s.maxResponseSize, 463 }) 464 if err != nil { 465 return nil, err 466 } 467 468 // Return operation response 469 ret := new(SearchResult) 470 if err := s.client.decoder.Decode(res.Body, ret); err != nil { 471 return nil, err 472 } 473 s.mu.Lock() 474 s.scrollId = ret.ScrollId 475 s.mu.Unlock() 476 if ret.Hits == nil || len(ret.Hits.Hits) == 0 { 477 return ret, io.EOF 478 } 479 return ret, nil 480} 481 482// buildNextURL builds the URL for the operation. 483func (s *ScrollService) buildNextURL() (string, url.Values, error) { 484 path := "/_search/scroll" 485 486 // Add query string parameters 487 params := url.Values{} 488 if s.pretty { 489 params.Set("pretty", "true") 490 } 491 if len(s.filterPath) > 0 { 492 // Always add "hits._scroll_id", otherwise we cannot scroll 493 s.filterPath = append(s.filterPath, "_scroll_id") 494 params.Set("filter_path", strings.Join(s.filterPath, ",")) 495 } 496 497 return path, params, nil 498} 499 500// body returns the request to fetch the next batch of results. 501func (s *ScrollService) bodyNext() (interface{}, error) { 502 s.mu.RLock() 503 body := struct { 504 Scroll string `json:"scroll"` 505 ScrollId string `json:"scroll_id,omitempty"` 506 }{ 507 Scroll: s.keepAlive, 508 ScrollId: s.scrollId, 509 } 510 s.mu.RUnlock() 511 return body, nil 512} 513