1// Copyright 2012-present Oliver Eilhard. All rights reserved. 2// Use of this source code is governed by a MIT-license. 3// See http://olivere.mit-license.org/license.txt for details. 4 5package elastic 6 7import ( 8 "fmt" 9 "net/url" 10 "strings" 11 12 "golang.org/x/net/context" 13 14 "gopkg.in/olivere/elastic.v3/uritemplates" 15) 16 17// UpdateByQueryService is documented at https://www.elastic.co/guide/en/elasticsearch/plugins/master/plugins-reindex.html. 18type UpdateByQueryService struct { 19 client *Client 20 pretty bool 21 index []string 22 typ []string 23 xSource []string 24 xSourceExclude []string 25 xSourceInclude []string 26 allowNoIndices *bool 27 analyzeWildcard *bool 28 analyzer string 29 conflicts string 30 consistency string 31 defaultOperator string 32 df string 33 expandWildcards string 34 explain *bool 35 fielddataFields []string 36 fields []string 37 from *int 38 ignoreUnavailable *bool 39 lenient *bool 40 lowercaseExpandedTerms *bool 41 preference string 42 q string 43 refresh *bool 44 requestCache *bool 45 routing []string 46 scroll string 47 scrollSize *int 48 searchTimeout string 49 searchType string 50 size *int 51 sort []string 52 stats []string 53 suggestField string 54 suggestMode string 55 suggestSize *int 56 suggestText string 57 terminateAfter *int 58 timeout string 59 trackScores *bool 60 version *bool 61 versionType *bool 62 waitForCompletion *bool 63 script *Script 64 query Query 65 bodyJson interface{} 66 bodyString string 67} 68 69// NewUpdateByQueryService creates a new UpdateByQueryService. 70func NewUpdateByQueryService(client *Client) *UpdateByQueryService { 71 return &UpdateByQueryService{ 72 client: client, 73 xSource: make([]string, 0), 74 xSourceExclude: make([]string, 0), 75 xSourceInclude: make([]string, 0), 76 fielddataFields: make([]string, 0), 77 fields: make([]string, 0), 78 routing: make([]string, 0), 79 sort: make([]string, 0), 80 stats: make([]string, 0), 81 } 82} 83 84// Type is a list of document types to search; leave empty to perform 85// the operation on all types. 86func (s *UpdateByQueryService) Type(typ ...string) *UpdateByQueryService { 87 s.typ = append(s.typ, typ...) 88 return s 89} 90 91// Index is a list of index names to search; use `_all` or empty string to 92// perform the operation on all indices. 93func (s *UpdateByQueryService) Index(index ...string) *UpdateByQueryService { 94 s.index = append(s.index, index...) 95 return s 96} 97 98// XSource is true or false to return the _source field or not, 99// or a list of fields to return. 100func (s *UpdateByQueryService) XSource(xSource ...string) *UpdateByQueryService { 101 s.xSource = append(s.xSource, xSource...) 102 return s 103} 104 105// XSourceExclude represents a list of fields to exclude from the returned _source field. 106func (s *UpdateByQueryService) XSourceExclude(xSourceExclude ...string) *UpdateByQueryService { 107 s.xSourceExclude = append(s.xSourceExclude, xSourceExclude...) 108 return s 109} 110 111// XSourceInclude represents a list of fields to extract and return from the _source field. 112func (s *UpdateByQueryService) XSourceInclude(xSourceInclude ...string) *UpdateByQueryService { 113 s.xSourceInclude = append(s.xSourceInclude, xSourceInclude...) 114 return s 115} 116 117// AllowNoIndices indicates whether to ignore if a wildcard indices expression 118// resolves into no concrete indices. (This includes `_all` string or when 119// no indices have been specified). 120func (s *UpdateByQueryService) AllowNoIndices(allowNoIndices bool) *UpdateByQueryService { 121 s.allowNoIndices = &allowNoIndices 122 return s 123} 124 125// AnalyzeWildcard specifies whether wildcard and prefix queries should be 126// analyzed (default: false). 127func (s *UpdateByQueryService) AnalyzeWildcard(analyzeWildcard bool) *UpdateByQueryService { 128 s.analyzeWildcard = &analyzeWildcard 129 return s 130} 131 132// Analyzer specifies the analyzer to use for the query string. 133func (s *UpdateByQueryService) Analyzer(analyzer string) *UpdateByQueryService { 134 s.analyzer = analyzer 135 return s 136} 137 138// Conflicts indicates what to do when the process detects version conflicts. 139// Possible values are "proceed" and "abort". 140func (s *UpdateByQueryService) Conflicts(conflicts string) *UpdateByQueryService { 141 s.conflicts = conflicts 142 return s 143} 144 145// AbortOnVersionConflict aborts the request on version conflicts. 146// It is an alias to setting Conflicts("abort"). 147func (s *UpdateByQueryService) AbortOnVersionConflict() *UpdateByQueryService { 148 s.conflicts = "abort" 149 return s 150} 151 152// ProceedOnVersionConflict aborts the request on version conflicts. 153// It is an alias to setting Conflicts("proceed"). 154func (s *UpdateByQueryService) ProceedOnVersionConflict() *UpdateByQueryService { 155 s.conflicts = "proceed" 156 return s 157} 158 159// Consistency sets an explicit write consistency setting for the operation. 160// Possible values are "one", "quorum", and "all". 161func (s *UpdateByQueryService) Consistency(consistency string) *UpdateByQueryService { 162 s.consistency = consistency 163 return s 164} 165 166// DefaultOperator is the default operator for query string query (AND or OR). 167func (s *UpdateByQueryService) DefaultOperator(defaultOperator string) *UpdateByQueryService { 168 s.defaultOperator = defaultOperator 169 return s 170} 171 172// Df specifies the field to use as default where no field prefix is given in the query string. 173func (s *UpdateByQueryService) Df(df string) *UpdateByQueryService { 174 s.df = df 175 return s 176} 177 178// ExpandWildcards indicates whether to expand wildcard expression to 179// concrete indices that are open, closed or both. 180func (s *UpdateByQueryService) ExpandWildcards(expandWildcards string) *UpdateByQueryService { 181 s.expandWildcards = expandWildcards 182 return s 183} 184 185// Explain specifies whether to return detailed information about score 186// computation as part of a hit. 187func (s *UpdateByQueryService) Explain(explain bool) *UpdateByQueryService { 188 s.explain = &explain 189 return s 190} 191 192// FielddataFields is a list of fields to return as the field data 193// representation of a field for each hit. 194func (s *UpdateByQueryService) FielddataFields(fielddataFields ...string) *UpdateByQueryService { 195 s.fielddataFields = append(s.fielddataFields, fielddataFields...) 196 return s 197} 198 199// Fields is a list of fields to return as part of a hit. 200func (s *UpdateByQueryService) Fields(fields ...string) *UpdateByQueryService { 201 s.fields = append(s.fields, fields...) 202 return s 203} 204 205// From is the starting offset (default: 0). 206func (s *UpdateByQueryService) From(from int) *UpdateByQueryService { 207 s.from = &from 208 return s 209} 210 211// IgnoreUnavailable indicates whether specified concrete indices should be 212// ignored when unavailable (missing or closed). 213func (s *UpdateByQueryService) IgnoreUnavailable(ignoreUnavailable bool) *UpdateByQueryService { 214 s.ignoreUnavailable = &ignoreUnavailable 215 return s 216} 217 218// Lenient specifies whether format-based query failures 219// (such as providing text to a numeric field) should be ignored. 220func (s *UpdateByQueryService) Lenient(lenient bool) *UpdateByQueryService { 221 s.lenient = &lenient 222 return s 223} 224 225// LowercaseExpandedTerms specifies whether query terms should be lowercased. 226func (s *UpdateByQueryService) LowercaseExpandedTerms(lowercaseExpandedTerms bool) *UpdateByQueryService { 227 s.lowercaseExpandedTerms = &lowercaseExpandedTerms 228 return s 229} 230 231// Preference specifies the node or shard the operation should be performed on 232// (default: random). 233func (s *UpdateByQueryService) Preference(preference string) *UpdateByQueryService { 234 s.preference = preference 235 return s 236} 237 238// Query in the Lucene query string syntax. 239func (s *UpdateByQueryService) Q(q string) *UpdateByQueryService { 240 s.q = q 241 return s 242} 243 244// Refresh indicates whether the effected indexes should be refreshed. 245func (s *UpdateByQueryService) Refresh(refresh bool) *UpdateByQueryService { 246 s.refresh = &refresh 247 return s 248} 249 250// RequestCache specifies if request cache should be used for this request 251// or not, defaults to index level setting. 252func (s *UpdateByQueryService) RequestCache(requestCache bool) *UpdateByQueryService { 253 s.requestCache = &requestCache 254 return s 255} 256 257// Routing is a list of specific routing values. 258func (s *UpdateByQueryService) Routing(routing ...string) *UpdateByQueryService { 259 s.routing = append(s.routing, routing...) 260 return s 261} 262 263// Scroll specifies how long a consistent view of the index should be maintained 264// for scrolled search. 265func (s *UpdateByQueryService) Scroll(scroll string) *UpdateByQueryService { 266 s.scroll = scroll 267 return s 268} 269 270// ScrollSize is the size on the scroll request powering the update_by_query. 271func (s *UpdateByQueryService) ScrollSize(scrollSize int) *UpdateByQueryService { 272 s.scrollSize = &scrollSize 273 return s 274} 275 276// SearchTimeout defines an explicit timeout for each search request. 277// Defaults to no timeout. 278func (s *UpdateByQueryService) SearchTimeout(searchTimeout string) *UpdateByQueryService { 279 s.searchTimeout = searchTimeout 280 return s 281} 282 283// SearchType is the search operation type. Possible values are 284// "query_then_fetch" and "dfs_query_then_fetch". 285func (s *UpdateByQueryService) SearchType(searchType string) *UpdateByQueryService { 286 s.searchType = searchType 287 return s 288} 289 290// Size represents the number of hits to return (default: 10). 291func (s *UpdateByQueryService) Size(size int) *UpdateByQueryService { 292 s.size = &size 293 return s 294} 295 296// Sort is a list of <field>:<direction> pairs. 297func (s *UpdateByQueryService) Sort(sort ...string) *UpdateByQueryService { 298 s.sort = append(s.sort, sort...) 299 return s 300} 301 302// SortByField adds a sort order. 303func (s *UpdateByQueryService) SortByField(field string, ascending bool) *UpdateByQueryService { 304 if ascending { 305 s.sort = append(s.sort, fmt.Sprintf("%s:asc", field)) 306 } else { 307 s.sort = append(s.sort, fmt.Sprintf("%s:desc", field)) 308 } 309 return s 310} 311 312// Stats specifies specific tag(s) of the request for logging and statistical purposes. 313func (s *UpdateByQueryService) Stats(stats ...string) *UpdateByQueryService { 314 s.stats = append(s.stats, stats...) 315 return s 316} 317 318// SuggestField specifies which field to use for suggestions. 319func (s *UpdateByQueryService) SuggestField(suggestField string) *UpdateByQueryService { 320 s.suggestField = suggestField 321 return s 322} 323 324// SuggestMode specifies the suggest mode. Possible values are 325// "missing", "popular", and "always". 326func (s *UpdateByQueryService) SuggestMode(suggestMode string) *UpdateByQueryService { 327 s.suggestMode = suggestMode 328 return s 329} 330 331// SuggestSize specifies how many suggestions to return in response. 332func (s *UpdateByQueryService) SuggestSize(suggestSize int) *UpdateByQueryService { 333 s.suggestSize = &suggestSize 334 return s 335} 336 337// SuggestText specifies the source text for which the suggestions should be returned. 338func (s *UpdateByQueryService) SuggestText(suggestText string) *UpdateByQueryService { 339 s.suggestText = suggestText 340 return s 341} 342 343// TerminateAfter indicates the maximum number of documents to collect 344// for each shard, upon reaching which the query execution will terminate early. 345func (s *UpdateByQueryService) TerminateAfter(terminateAfter int) *UpdateByQueryService { 346 s.terminateAfter = &terminateAfter 347 return s 348} 349 350// Timeout is the time each individual bulk request should wait for shards 351// that are unavailable. 352func (s *UpdateByQueryService) Timeout(timeout string) *UpdateByQueryService { 353 s.timeout = timeout 354 return s 355} 356 357// TimeoutInMillis sets the timeout in milliseconds. 358func (s *UpdateByQueryService) TimeoutInMillis(timeoutInMillis int) *UpdateByQueryService { 359 s.timeout = fmt.Sprintf("%dms", timeoutInMillis) 360 return s 361} 362 363// TrackScores indicates whether to calculate and return scores even if 364// they are not used for sorting. 365func (s *UpdateByQueryService) TrackScores(trackScores bool) *UpdateByQueryService { 366 s.trackScores = &trackScores 367 return s 368} 369 370// Version specifies whether to return document version as part of a hit. 371func (s *UpdateByQueryService) Version(version bool) *UpdateByQueryService { 372 s.version = &version 373 return s 374} 375 376// VersionType indicates if the document increment the version number (internal) 377// on hit or not (reindex). 378func (s *UpdateByQueryService) VersionType(versionType bool) *UpdateByQueryService { 379 s.versionType = &versionType 380 return s 381} 382 383// WaitForCompletion indicates if the request should block until the reindex is complete. 384func (s *UpdateByQueryService) WaitForCompletion(waitForCompletion bool) *UpdateByQueryService { 385 s.waitForCompletion = &waitForCompletion 386 return s 387} 388 389// Pretty indicates that the JSON response be indented and human readable. 390func (s *UpdateByQueryService) Pretty(pretty bool) *UpdateByQueryService { 391 s.pretty = pretty 392 return s 393} 394 395// Script sets an update script. 396func (s *UpdateByQueryService) Script(script *Script) *UpdateByQueryService { 397 s.script = script 398 return s 399} 400 401// Query sets a query definition using the Query DSL. 402func (s *UpdateByQueryService) Query(query Query) *UpdateByQueryService { 403 s.query = query 404 return s 405} 406 407// BodyJson specifies e.g. the query to restrict the results specified with the 408// Query DSL (optional). The interface{} will be serialized to a JSON document, 409// so use a map[string]interface{}. 410func (s *UpdateByQueryService) BodyJson(body interface{}) *UpdateByQueryService { 411 s.bodyJson = body 412 return s 413} 414 415// Body specifies e.g. a query to restrict the results specified with 416// the Query DSL (optional). 417func (s *UpdateByQueryService) BodyString(body string) *UpdateByQueryService { 418 s.bodyString = body 419 return s 420} 421 422// buildURL builds the URL for the operation. 423func (s *UpdateByQueryService) buildURL() (string, url.Values, error) { 424 // Build URL 425 var err error 426 var path string 427 if len(s.index) > 0 && len(s.typ) > 0 { 428 path, err = uritemplates.Expand("/{index}/{type}/_update_by_query", map[string]string{ 429 "index": strings.Join(s.index, ","), 430 "type": strings.Join(s.typ, ","), 431 }) 432 } else if len(s.index) > 0 && len(s.typ) == 0 { 433 path, err = uritemplates.Expand("/{index}/_update_by_query", map[string]string{ 434 "index": strings.Join(s.index, ","), 435 }) 436 } else if len(s.index) == 0 && len(s.typ) > 0 { 437 path, err = uritemplates.Expand("/_all/{type}/_update_by_query", map[string]string{ 438 "type": strings.Join(s.typ, ","), 439 }) 440 } else { 441 path = "/_all/_update_by_query" 442 } 443 if err != nil { 444 return "", url.Values{}, err 445 } 446 447 // Add query string parameters 448 params := url.Values{} 449 if s.pretty { 450 params.Set("pretty", "1") 451 } 452 if len(s.xSource) > 0 { 453 params.Set("_source", strings.Join(s.xSource, ",")) 454 } 455 if len(s.xSourceExclude) > 0 { 456 params.Set("_source_exclude", strings.Join(s.xSourceExclude, ",")) 457 } 458 if len(s.xSourceInclude) > 0 { 459 params.Set("_source_include", strings.Join(s.xSourceInclude, ",")) 460 } 461 if s.allowNoIndices != nil { 462 params.Set("allow_no_indices", fmt.Sprintf("%v", *s.allowNoIndices)) 463 } 464 if s.analyzeWildcard != nil { 465 params.Set("analyze_wildcard", fmt.Sprintf("%v", *s.analyzeWildcard)) 466 } 467 if s.analyzer != "" { 468 params.Set("analyzer", s.analyzer) 469 } 470 if s.conflicts != "" { 471 params.Set("conflicts", s.conflicts) 472 } 473 if s.consistency != "" { 474 params.Set("consistency", s.consistency) 475 } 476 if s.defaultOperator != "" { 477 params.Set("default_operator", s.defaultOperator) 478 } 479 if s.df != "" { 480 params.Set("df", s.df) 481 } 482 if s.expandWildcards != "" { 483 params.Set("expand_wildcards", s.expandWildcards) 484 } 485 if s.explain != nil { 486 params.Set("explain", fmt.Sprintf("%v", *s.explain)) 487 } 488 if len(s.fielddataFields) > 0 { 489 params.Set("fielddata_fields", strings.Join(s.fielddataFields, ",")) 490 } 491 if len(s.fields) > 0 { 492 params.Set("fields", strings.Join(s.fields, ",")) 493 } 494 if s.from != nil { 495 params.Set("from", fmt.Sprintf("%d", *s.from)) 496 } 497 if s.ignoreUnavailable != nil { 498 params.Set("ignore_unavailable", fmt.Sprintf("%v", *s.ignoreUnavailable)) 499 } 500 if s.lenient != nil { 501 params.Set("lenient", fmt.Sprintf("%v", *s.lenient)) 502 } 503 if s.lowercaseExpandedTerms != nil { 504 params.Set("lowercase_expanded_terms", fmt.Sprintf("%v", *s.lowercaseExpandedTerms)) 505 } 506 if s.preference != "" { 507 params.Set("preference", s.preference) 508 } 509 if s.q != "" { 510 params.Set("q", s.q) 511 } 512 if s.refresh != nil { 513 params.Set("refresh", fmt.Sprintf("%v", *s.refresh)) 514 } 515 if s.requestCache != nil { 516 params.Set("request_cache", fmt.Sprintf("%v", *s.requestCache)) 517 } 518 if len(s.routing) > 0 { 519 params.Set("routing", strings.Join(s.routing, ",")) 520 } 521 if s.scroll != "" { 522 params.Set("scroll", s.scroll) 523 } 524 if s.scrollSize != nil { 525 params.Set("scroll_size", fmt.Sprintf("%d", *s.scrollSize)) 526 } 527 if s.searchTimeout != "" { 528 params.Set("search_timeout", s.searchTimeout) 529 } 530 if s.searchType != "" { 531 params.Set("search_type", s.searchType) 532 } 533 if s.size != nil { 534 params.Set("size", fmt.Sprintf("%d", *s.size)) 535 } 536 if len(s.sort) > 0 { 537 params.Set("sort", strings.Join(s.sort, ",")) 538 } 539 if len(s.stats) > 0 { 540 params.Set("stats", strings.Join(s.stats, ",")) 541 } 542 if s.suggestField != "" { 543 params.Set("suggest_field", s.suggestField) 544 } 545 if s.suggestMode != "" { 546 params.Set("suggest_mode", s.suggestMode) 547 } 548 if s.suggestSize != nil { 549 params.Set("suggest_size", fmt.Sprintf("%v", *s.suggestSize)) 550 } 551 if s.suggestText != "" { 552 params.Set("suggest_text", s.suggestText) 553 } 554 if s.terminateAfter != nil { 555 params.Set("terminate_after", fmt.Sprintf("%v", *s.terminateAfter)) 556 } 557 if s.timeout != "" { 558 params.Set("timeout", s.timeout) 559 } 560 if s.trackScores != nil { 561 params.Set("track_scores", fmt.Sprintf("%v", *s.trackScores)) 562 } 563 if s.version != nil { 564 params.Set("version", fmt.Sprintf("%v", *s.version)) 565 } 566 if s.versionType != nil { 567 params.Set("version_type", fmt.Sprintf("%v", *s.versionType)) 568 } 569 if s.waitForCompletion != nil { 570 params.Set("wait_for_completion", fmt.Sprintf("%v", *s.waitForCompletion)) 571 } 572 return path, params, nil 573} 574 575// Validate checks if the operation is valid. 576func (s *UpdateByQueryService) Validate() error { 577 return nil 578} 579 580// body returns the body part of the document request. 581func (s *UpdateByQueryService) body() (interface{}, error) { 582 if s.bodyJson != nil { 583 return s.bodyJson, nil 584 } 585 if s.bodyString != "" { 586 return s.bodyString, nil 587 } 588 589 source := make(map[string]interface{}) 590 591 if s.script != nil { 592 src, err := s.script.Source() 593 if err != nil { 594 return nil, err 595 } 596 source["script"] = src 597 } 598 599 if s.query != nil { 600 src, err := s.query.Source() 601 if err != nil { 602 return nil, err 603 } 604 source["query"] = src 605 } 606 607 return source, nil 608} 609 610// Do executes the operation. 611func (s *UpdateByQueryService) Do() (*UpdateByQueryResponse, error) { 612 return s.DoC(nil) 613} 614 615// DoC executes the operation. 616func (s *UpdateByQueryService) DoC(ctx context.Context) (*UpdateByQueryResponse, error) { 617 // Check pre-conditions 618 if err := s.Validate(); err != nil { 619 return nil, err 620 } 621 622 // Get URL for request 623 path, params, err := s.buildURL() 624 if err != nil { 625 return nil, err 626 } 627 628 // Setup HTTP request body 629 body, err := s.body() 630 if err != nil { 631 return nil, err 632 } 633 634 // Get HTTP response 635 res, err := s.client.PerformRequestC(ctx, "POST", path, params, body) 636 if err != nil { 637 return nil, err 638 } 639 640 // Return operation response 641 ret := new(UpdateByQueryResponse) 642 if err := s.client.decoder.Decode(res.Body, ret); err != nil { 643 return nil, err 644 } 645 return ret, nil 646} 647 648// UpdateByQueryResponse is the response of UpdateByQueryService.Do. 649type UpdateByQueryResponse struct { 650 Took int64 `json:"took"` 651 TimedOut bool `json:"timed_out"` 652 Total int64 `json:"total"` 653 Updated int64 `json:"updated"` 654 Created int64 `json:"created"` 655 Deleted int64 `json:"deleted"` 656 Batches int64 `json:"batches"` 657 VersionConflicts int64 `json:"version_conflicts"` 658 Noops int64 `json:"noops"` 659 Retries int64 `json:"retries"` 660 Canceled string `json:"canceled"` 661 Failures []shardOperationFailure `json:"failures"` 662} 663