1// Copyright 2012-present Oliver Eilhard. All rights reserved. 2// Use of this source code is governed by a MIT-license. 3// See http://olivere.mit-license.org/license.txt for details. 4 5package elastic 6 7import ( 8 "context" 9 "fmt" 10 "io" 11 "net/http" 12 "net/url" 13 "strings" 14 "sync" 15 16 "github.com/olivere/elastic/uritemplates" 17) 18 19const ( 20 // DefaultScrollKeepAlive is the default time a scroll cursor will be kept alive. 21 DefaultScrollKeepAlive = "5m" 22) 23 24// ScrollService iterates over pages of search results from Elasticsearch. 25type ScrollService struct { 26 client *Client 27 retrier Retrier 28 29 pretty *bool // pretty format the returned JSON response 30 human *bool // return human readable values for statistics 31 errorTrace *bool // include the stack trace of returned errors 32 filterPath []string // list of filters used to reduce the response 33 headers http.Header // custom request-level HTTP headers 34 35 indices []string 36 types []string 37 keepAlive string 38 body interface{} 39 ss *SearchSource 40 size *int 41 routing string 42 preference string 43 ignoreUnavailable *bool 44 allowNoIndices *bool 45 expandWildcards string 46 maxResponseSize int64 47 48 mu sync.RWMutex 49 scrollId string 50} 51 52// NewScrollService initializes and returns a new ScrollService. 53func NewScrollService(client *Client) *ScrollService { 54 builder := &ScrollService{ 55 client: client, 56 ss: NewSearchSource(), 57 keepAlive: DefaultScrollKeepAlive, 58 } 59 return builder 60} 61 62// Pretty tells Elasticsearch whether to return a formatted JSON response. 63func (s *ScrollService) Pretty(pretty bool) *ScrollService { 64 s.pretty = &pretty 65 return s 66} 67 68// Human specifies whether human readable values should be returned in 69// the JSON response, e.g. "7.5mb". 70func (s *ScrollService) Human(human bool) *ScrollService { 71 s.human = &human 72 return s 73} 74 75// ErrorTrace specifies whether to include the stack trace of returned errors. 76func (s *ScrollService) ErrorTrace(errorTrace bool) *ScrollService { 77 s.errorTrace = &errorTrace 78 return s 79} 80 81// FilterPath specifies a list of filters used to reduce the response. 82func (s *ScrollService) FilterPath(filterPath ...string) *ScrollService { 83 s.filterPath = filterPath 84 return s 85} 86 87// Header adds a header to the request. 88func (s *ScrollService) Header(name string, value string) *ScrollService { 89 if s.headers == nil { 90 s.headers = http.Header{} 91 } 92 s.headers.Add(name, value) 93 return s 94} 95 96// Headers specifies the headers of the request. 97func (s *ScrollService) Headers(headers http.Header) *ScrollService { 98 s.headers = headers 99 return s 100} 101 102// Retrier allows to set specific retry logic for this ScrollService. 103// If not specified, it will use the client's default retrier. 104func (s *ScrollService) Retrier(retrier Retrier) *ScrollService { 105 s.retrier = retrier 106 return s 107} 108 109// Index sets the name of one or more indices to iterate over. 110func (s *ScrollService) Index(indices ...string) *ScrollService { 111 if s.indices == nil { 112 s.indices = make([]string, 0) 113 } 114 s.indices = append(s.indices, indices...) 115 return s 116} 117 118// Type sets the name of one or more types to iterate over. 119func (s *ScrollService) Type(types ...string) *ScrollService { 120 if s.types == nil { 121 s.types = make([]string, 0) 122 } 123 s.types = append(s.types, types...) 124 return s 125} 126 127// Scroll is an alias for KeepAlive, the time to keep 128// the cursor alive (e.g. "5m" for 5 minutes). 129func (s *ScrollService) Scroll(keepAlive string) *ScrollService { 130 s.keepAlive = keepAlive 131 return s 132} 133 134// KeepAlive sets the maximum time after which the cursor will expire. 135// It is "5m" by default. 136func (s *ScrollService) KeepAlive(keepAlive string) *ScrollService { 137 s.keepAlive = keepAlive 138 return s 139} 140 141// Size specifies the number of documents Elasticsearch should return 142// from each shard, per page. 143func (s *ScrollService) Size(size int) *ScrollService { 144 s.size = &size 145 return s 146} 147 148// Highlight allows to highlight search results on one or more fields 149func (s *ScrollService) Highlight(highlight *Highlight) *ScrollService { 150 s.ss = s.ss.Highlight(highlight) 151 return s 152} 153 154// Body sets the raw body to send to Elasticsearch. This can be e.g. a string, 155// a map[string]interface{} or anything that can be serialized into JSON. 156// Notice that setting the body disables the use of SearchSource and many 157// other properties of the ScanService. 158func (s *ScrollService) Body(body interface{}) *ScrollService { 159 s.body = body 160 return s 161} 162 163// SearchSource sets the search source builder to use with this iterator. 164// Notice that only a certain number of properties can be used when scrolling, 165// e.g. query and sorting. 166func (s *ScrollService) SearchSource(searchSource *SearchSource) *ScrollService { 167 s.ss = searchSource 168 if s.ss == nil { 169 s.ss = NewSearchSource() 170 } 171 return s 172} 173 174// Query sets the query to perform, e.g. a MatchAllQuery. 175func (s *ScrollService) Query(query Query) *ScrollService { 176 s.ss = s.ss.Query(query) 177 return s 178} 179 180// PostFilter is executed as the last filter. It only affects the 181// search hits but not facets. See 182// https://www.elastic.co/guide/en/elasticsearch/reference/6.8/search-request-post-filter.html 183// for details. 184func (s *ScrollService) PostFilter(postFilter Query) *ScrollService { 185 s.ss = s.ss.PostFilter(postFilter) 186 return s 187} 188 189// Slice allows slicing the scroll request into several batches. 190// This is supported in Elasticsearch 5.0 or later. 191// See https://www.elastic.co/guide/en/elasticsearch/reference/6.8/search-request-scroll.html#sliced-scroll 192// for details. 193func (s *ScrollService) Slice(sliceQuery Query) *ScrollService { 194 s.ss = s.ss.Slice(sliceQuery) 195 return s 196} 197 198// FetchSource indicates whether the response should contain the stored 199// _source for every hit. 200func (s *ScrollService) FetchSource(fetchSource bool) *ScrollService { 201 s.ss = s.ss.FetchSource(fetchSource) 202 return s 203} 204 205// FetchSourceContext indicates how the _source should be fetched. 206func (s *ScrollService) FetchSourceContext(fetchSourceContext *FetchSourceContext) *ScrollService { 207 s.ss = s.ss.FetchSourceContext(fetchSourceContext) 208 return s 209} 210 211// Version can be set to true to return a version for each search hit. 212// See https://www.elastic.co/guide/en/elasticsearch/reference/6.8/search-request-version.html. 213func (s *ScrollService) Version(version bool) *ScrollService { 214 s.ss = s.ss.Version(version) 215 return s 216} 217 218// Sort adds a sort order. This can have negative effects on the performance 219// of the scroll operation as Elasticsearch needs to sort first. 220func (s *ScrollService) Sort(field string, ascending bool) *ScrollService { 221 s.ss = s.ss.Sort(field, ascending) 222 return s 223} 224 225// SortWithInfo specifies a sort order. Notice that sorting can have a 226// negative impact on scroll performance. 227func (s *ScrollService) SortWithInfo(info SortInfo) *ScrollService { 228 s.ss = s.ss.SortWithInfo(info) 229 return s 230} 231 232// SortBy specifies a sort order. Notice that sorting can have a 233// negative impact on scroll performance. 234func (s *ScrollService) SortBy(sorter ...Sorter) *ScrollService { 235 s.ss = s.ss.SortBy(sorter...) 236 return s 237} 238 239// Routing is a list of specific routing values to control the shards 240// the search will be executed on. 241func (s *ScrollService) Routing(routings ...string) *ScrollService { 242 s.routing = strings.Join(routings, ",") 243 return s 244} 245 246// Preference sets the preference to execute the search. Defaults to 247// randomize across shards ("random"). Can be set to "_local" to prefer 248// local shards, "_primary" to execute on primary shards only, 249// or a custom value which guarantees that the same order will be used 250// across different requests. 251func (s *ScrollService) Preference(preference string) *ScrollService { 252 s.preference = preference 253 return s 254} 255 256// IgnoreUnavailable indicates whether the specified concrete indices 257// should be ignored when unavailable (missing or closed). 258func (s *ScrollService) IgnoreUnavailable(ignoreUnavailable bool) *ScrollService { 259 s.ignoreUnavailable = &ignoreUnavailable 260 return s 261} 262 263// AllowNoIndices indicates whether to ignore if a wildcard indices 264// expression resolves into no concrete indices. (This includes `_all` string 265// or when no indices have been specified). 266func (s *ScrollService) AllowNoIndices(allowNoIndices bool) *ScrollService { 267 s.allowNoIndices = &allowNoIndices 268 return s 269} 270 271// ExpandWildcards indicates whether to expand wildcard expression to 272// concrete indices that are open, closed or both. 273func (s *ScrollService) ExpandWildcards(expandWildcards string) *ScrollService { 274 s.expandWildcards = expandWildcards 275 return s 276} 277 278// MaxResponseSize sets an upper limit on the response body size that we accept, 279// to guard against OOM situations. 280func (s *ScrollService) MaxResponseSize(maxResponseSize int64) *ScrollService { 281 s.maxResponseSize = maxResponseSize 282 return s 283} 284 285// ScrollId specifies the identifier of a scroll in action. 286func (s *ScrollService) ScrollId(scrollId string) *ScrollService { 287 s.mu.Lock() 288 s.scrollId = scrollId 289 s.mu.Unlock() 290 return s 291} 292 293// Do returns the next search result. It will return io.EOF as error if there 294// are no more search results. 295func (s *ScrollService) Do(ctx context.Context) (*SearchResult, error) { 296 s.mu.RLock() 297 nextScrollId := s.scrollId 298 s.mu.RUnlock() 299 if len(nextScrollId) == 0 { 300 return s.first(ctx) 301 } 302 return s.next(ctx) 303} 304 305// Clear cancels the current scroll operation. If you don't do this manually, 306// the scroll will be expired automatically by Elasticsearch. You can control 307// how long a scroll cursor is kept alive with the KeepAlive func. 308func (s *ScrollService) Clear(ctx context.Context) error { 309 s.mu.RLock() 310 scrollId := s.scrollId 311 s.mu.RUnlock() 312 if len(scrollId) == 0 { 313 return nil 314 } 315 316 path := "/_search/scroll" 317 params := url.Values{} 318 if v := s.pretty; v != nil { 319 params.Set("pretty", fmt.Sprint(*v)) 320 } 321 if v := s.human; v != nil { 322 params.Set("human", fmt.Sprint(*v)) 323 } 324 if v := s.errorTrace; v != nil { 325 params.Set("error_trace", fmt.Sprint(*v)) 326 } 327 if len(s.filterPath) > 0 { 328 params.Set("filter_path", strings.Join(s.filterPath, ",")) 329 } 330 body := struct { 331 ScrollId []string `json:"scroll_id,omitempty"` 332 }{ 333 ScrollId: []string{scrollId}, 334 } 335 336 _, err := s.client.PerformRequest(ctx, PerformRequestOptions{ 337 Method: "DELETE", 338 Path: path, 339 Params: params, 340 Body: body, 341 Retrier: s.retrier, 342 }) 343 if err != nil { 344 return err 345 } 346 347 return nil 348} 349 350// -- First -- 351 352// first takes the first page of search results. 353func (s *ScrollService) first(ctx context.Context) (*SearchResult, error) { 354 // Get URL and parameters for request 355 path, params, err := s.buildFirstURL() 356 if err != nil { 357 return nil, err 358 } 359 360 // Get HTTP request body 361 body, err := s.bodyFirst() 362 if err != nil { 363 return nil, err 364 } 365 366 // Get HTTP response 367 res, err := s.client.PerformRequest(ctx, PerformRequestOptions{ 368 Method: "POST", 369 Path: path, 370 Params: params, 371 Body: body, 372 Retrier: s.retrier, 373 Headers: s.headers, 374 MaxResponseSize: s.maxResponseSize, 375 }) 376 if err != nil { 377 return nil, err 378 } 379 380 // Return operation response 381 ret := new(SearchResult) 382 if err := s.client.decoder.Decode(res.Body, ret); err != nil { 383 return nil, err 384 } 385 s.mu.Lock() 386 s.scrollId = ret.ScrollId 387 s.mu.Unlock() 388 if ret.Hits == nil || len(ret.Hits.Hits) == 0 { 389 return ret, io.EOF 390 } 391 return ret, nil 392} 393 394// buildFirstURL builds the URL for retrieving the first page. 395func (s *ScrollService) buildFirstURL() (string, url.Values, error) { 396 // Build URL 397 var err error 398 var path string 399 if len(s.indices) == 0 && len(s.types) == 0 { 400 path = "/_search" 401 } else if len(s.indices) > 0 && len(s.types) == 0 { 402 path, err = uritemplates.Expand("/{index}/_search", map[string]string{ 403 "index": strings.Join(s.indices, ","), 404 }) 405 } else if len(s.indices) == 0 && len(s.types) > 0 { 406 path, err = uritemplates.Expand("/_all/{typ}/_search", map[string]string{ 407 "typ": strings.Join(s.types, ","), 408 }) 409 } else { 410 path, err = uritemplates.Expand("/{index}/{typ}/_search", map[string]string{ 411 "index": strings.Join(s.indices, ","), 412 "typ": strings.Join(s.types, ","), 413 }) 414 } 415 if err != nil { 416 return "", url.Values{}, err 417 } 418 419 // Add query string parameters 420 params := url.Values{} 421 if v := s.pretty; v != nil { 422 params.Set("pretty", fmt.Sprint(*v)) 423 } 424 if v := s.human; v != nil { 425 params.Set("human", fmt.Sprint(*v)) 426 } 427 if v := s.errorTrace; v != nil { 428 params.Set("error_trace", fmt.Sprint(*v)) 429 } 430 if len(s.filterPath) > 0 { 431 // Always add "hits._scroll_id", otherwise we cannot scroll 432 s.filterPath = append(s.filterPath, "_scroll_id") 433 params.Set("filter_path", strings.Join(s.filterPath, ",")) 434 } 435 if s.size != nil && *s.size > 0 { 436 params.Set("size", fmt.Sprintf("%d", *s.size)) 437 } 438 if len(s.keepAlive) > 0 { 439 params.Set("scroll", s.keepAlive) 440 } 441 if len(s.routing) > 0 { 442 params.Set("routing", s.routing) 443 } 444 if len(s.preference) > 0 { 445 params.Set("preference", s.preference) 446 } 447 if s.allowNoIndices != nil { 448 params.Set("allow_no_indices", fmt.Sprintf("%v", *s.allowNoIndices)) 449 } 450 if len(s.expandWildcards) > 0 { 451 params.Set("expand_wildcards", s.expandWildcards) 452 } 453 if s.ignoreUnavailable != nil { 454 params.Set("ignore_unavailable", fmt.Sprintf("%v", *s.ignoreUnavailable)) 455 } 456 457 return path, params, nil 458} 459 460// bodyFirst returns the request to fetch the first batch of results. 461func (s *ScrollService) bodyFirst() (interface{}, error) { 462 var err error 463 var body interface{} 464 465 if s.body != nil { 466 body = s.body 467 } else { 468 // Use _doc sort by default if none is specified 469 if !s.ss.hasSort() { 470 // Use efficient sorting when no user-defined query/body is specified 471 s.ss = s.ss.SortBy(SortByDoc{}) 472 } 473 474 // Body from search source 475 body, err = s.ss.Source() 476 if err != nil { 477 return nil, err 478 } 479 } 480 481 return body, nil 482} 483 484// -- Next -- 485 486func (s *ScrollService) next(ctx context.Context) (*SearchResult, error) { 487 // Get URL for request 488 path, params, err := s.buildNextURL() 489 if err != nil { 490 return nil, err 491 } 492 493 // Setup HTTP request body 494 body, err := s.bodyNext() 495 if err != nil { 496 return nil, err 497 } 498 499 // Get HTTP response 500 res, err := s.client.PerformRequest(ctx, PerformRequestOptions{ 501 Method: "POST", 502 Path: path, 503 Params: params, 504 Body: body, 505 Retrier: s.retrier, 506 Headers: s.headers, 507 MaxResponseSize: s.maxResponseSize, 508 }) 509 if err != nil { 510 return nil, err 511 } 512 513 // Return operation response 514 ret := new(SearchResult) 515 if err := s.client.decoder.Decode(res.Body, ret); err != nil { 516 return nil, err 517 } 518 s.mu.Lock() 519 s.scrollId = ret.ScrollId 520 s.mu.Unlock() 521 if ret.Hits == nil || len(ret.Hits.Hits) == 0 { 522 return ret, io.EOF 523 } 524 return ret, nil 525} 526 527// buildNextURL builds the URL for the operation. 528func (s *ScrollService) buildNextURL() (string, url.Values, error) { 529 path := "/_search/scroll" 530 531 // Add query string parameters 532 params := url.Values{} 533 if v := s.pretty; v != nil { 534 params.Set("pretty", fmt.Sprint(*v)) 535 } 536 if v := s.human; v != nil { 537 params.Set("human", fmt.Sprint(*v)) 538 } 539 if v := s.errorTrace; v != nil { 540 params.Set("error_trace", fmt.Sprint(*v)) 541 } 542 if len(s.filterPath) > 0 { 543 // Always add "hits._scroll_id", otherwise we cannot scroll 544 s.filterPath = append(s.filterPath, "_scroll_id") 545 params.Set("filter_path", strings.Join(s.filterPath, ",")) 546 } 547 548 return path, params, nil 549} 550 551// body returns the request to fetch the next batch of results. 552func (s *ScrollService) bodyNext() (interface{}, error) { 553 s.mu.RLock() 554 body := struct { 555 Scroll string `json:"scroll"` 556 ScrollId string `json:"scroll_id,omitempty"` 557 }{ 558 Scroll: s.keepAlive, 559 ScrollId: s.scrollId, 560 } 561 s.mu.RUnlock() 562 return body, nil 563} 564