1// Copyright 2012-present Oliver Eilhard. All rights reserved.
2// Use of this source code is governed by a MIT-license.
3// See http://olivere.mit-license.org/license.txt for details.
4
5package elastic
6
7import (
8	"fmt"
9	"net/url"
10	"strings"
11
12	"golang.org/x/net/context"
13
14	"gopkg.in/olivere/elastic.v3/uritemplates"
15)
16
17// UpdateByQueryService is documented at https://www.elastic.co/guide/en/elasticsearch/plugins/master/plugins-reindex.html.
18type UpdateByQueryService struct {
19	client                 *Client
20	pretty                 bool
21	index                  []string
22	typ                    []string
23	xSource                []string
24	xSourceExclude         []string
25	xSourceInclude         []string
26	allowNoIndices         *bool
27	analyzeWildcard        *bool
28	analyzer               string
29	conflicts              string
30	consistency            string
31	defaultOperator        string
32	df                     string
33	expandWildcards        string
34	explain                *bool
35	fielddataFields        []string
36	fields                 []string
37	from                   *int
38	ignoreUnavailable      *bool
39	lenient                *bool
40	lowercaseExpandedTerms *bool
41	preference             string
42	q                      string
43	refresh                *bool
44	requestCache           *bool
45	routing                []string
46	scroll                 string
47	scrollSize             *int
48	searchTimeout          string
49	searchType             string
50	size                   *int
51	sort                   []string
52	stats                  []string
53	suggestField           string
54	suggestMode            string
55	suggestSize            *int
56	suggestText            string
57	terminateAfter         *int
58	timeout                string
59	trackScores            *bool
60	version                *bool
61	versionType            *bool
62	waitForCompletion      *bool
63	script                 *Script
64	query                  Query
65	bodyJson               interface{}
66	bodyString             string
67}
68
69// NewUpdateByQueryService creates a new UpdateByQueryService.
70func NewUpdateByQueryService(client *Client) *UpdateByQueryService {
71	return &UpdateByQueryService{
72		client:          client,
73		xSource:         make([]string, 0),
74		xSourceExclude:  make([]string, 0),
75		xSourceInclude:  make([]string, 0),
76		fielddataFields: make([]string, 0),
77		fields:          make([]string, 0),
78		routing:         make([]string, 0),
79		sort:            make([]string, 0),
80		stats:           make([]string, 0),
81	}
82}
83
84// Type is a list of document types to search; leave empty to perform
85// the operation on all types.
86func (s *UpdateByQueryService) Type(typ ...string) *UpdateByQueryService {
87	s.typ = append(s.typ, typ...)
88	return s
89}
90
91// Index is a list of index names to search; use `_all` or empty string to
92// perform the operation on all indices.
93func (s *UpdateByQueryService) Index(index ...string) *UpdateByQueryService {
94	s.index = append(s.index, index...)
95	return s
96}
97
98// XSource is true or false to return the _source field or not,
99// or a list of fields to return.
100func (s *UpdateByQueryService) XSource(xSource ...string) *UpdateByQueryService {
101	s.xSource = append(s.xSource, xSource...)
102	return s
103}
104
105// XSourceExclude represents a list of fields to exclude from the returned _source field.
106func (s *UpdateByQueryService) XSourceExclude(xSourceExclude ...string) *UpdateByQueryService {
107	s.xSourceExclude = append(s.xSourceExclude, xSourceExclude...)
108	return s
109}
110
111// XSourceInclude represents a list of fields to extract and return from the _source field.
112func (s *UpdateByQueryService) XSourceInclude(xSourceInclude ...string) *UpdateByQueryService {
113	s.xSourceInclude = append(s.xSourceInclude, xSourceInclude...)
114	return s
115}
116
117// AllowNoIndices indicates whether to ignore if a wildcard indices expression
118// resolves into no concrete indices. (This includes `_all` string or when
119// no indices have been specified).
120func (s *UpdateByQueryService) AllowNoIndices(allowNoIndices bool) *UpdateByQueryService {
121	s.allowNoIndices = &allowNoIndices
122	return s
123}
124
125// AnalyzeWildcard specifies whether wildcard and prefix queries should be
126// analyzed (default: false).
127func (s *UpdateByQueryService) AnalyzeWildcard(analyzeWildcard bool) *UpdateByQueryService {
128	s.analyzeWildcard = &analyzeWildcard
129	return s
130}
131
132// Analyzer specifies the analyzer to use for the query string.
133func (s *UpdateByQueryService) Analyzer(analyzer string) *UpdateByQueryService {
134	s.analyzer = analyzer
135	return s
136}
137
138// Conflicts indicates what to do when the process detects version conflicts.
139// Possible values are "proceed" and "abort".
140func (s *UpdateByQueryService) Conflicts(conflicts string) *UpdateByQueryService {
141	s.conflicts = conflicts
142	return s
143}
144
145// AbortOnVersionConflict aborts the request on version conflicts.
146// It is an alias to setting Conflicts("abort").
147func (s *UpdateByQueryService) AbortOnVersionConflict() *UpdateByQueryService {
148	s.conflicts = "abort"
149	return s
150}
151
152// ProceedOnVersionConflict aborts the request on version conflicts.
153// It is an alias to setting Conflicts("proceed").
154func (s *UpdateByQueryService) ProceedOnVersionConflict() *UpdateByQueryService {
155	s.conflicts = "proceed"
156	return s
157}
158
159// Consistency sets an explicit write consistency setting for the operation.
160// Possible values are "one", "quorum", and "all".
161func (s *UpdateByQueryService) Consistency(consistency string) *UpdateByQueryService {
162	s.consistency = consistency
163	return s
164}
165
166// DefaultOperator is the default operator for query string query (AND or OR).
167func (s *UpdateByQueryService) DefaultOperator(defaultOperator string) *UpdateByQueryService {
168	s.defaultOperator = defaultOperator
169	return s
170}
171
172// Df specifies the field to use as default where no field prefix is given in the query string.
173func (s *UpdateByQueryService) Df(df string) *UpdateByQueryService {
174	s.df = df
175	return s
176}
177
178// ExpandWildcards indicates whether to expand wildcard expression to
179// concrete indices that are open, closed or both.
180func (s *UpdateByQueryService) ExpandWildcards(expandWildcards string) *UpdateByQueryService {
181	s.expandWildcards = expandWildcards
182	return s
183}
184
185// Explain specifies whether to return detailed information about score
186// computation as part of a hit.
187func (s *UpdateByQueryService) Explain(explain bool) *UpdateByQueryService {
188	s.explain = &explain
189	return s
190}
191
192// FielddataFields is a list of fields to return as the field data
193// representation of a field for each hit.
194func (s *UpdateByQueryService) FielddataFields(fielddataFields ...string) *UpdateByQueryService {
195	s.fielddataFields = append(s.fielddataFields, fielddataFields...)
196	return s
197}
198
199// Fields is a list of fields to return as part of a hit.
200func (s *UpdateByQueryService) Fields(fields ...string) *UpdateByQueryService {
201	s.fields = append(s.fields, fields...)
202	return s
203}
204
205// From is the starting offset (default: 0).
206func (s *UpdateByQueryService) From(from int) *UpdateByQueryService {
207	s.from = &from
208	return s
209}
210
211// IgnoreUnavailable indicates whether specified concrete indices should be
212// ignored when unavailable (missing or closed).
213func (s *UpdateByQueryService) IgnoreUnavailable(ignoreUnavailable bool) *UpdateByQueryService {
214	s.ignoreUnavailable = &ignoreUnavailable
215	return s
216}
217
218// Lenient specifies whether format-based query failures
219// (such as providing text to a numeric field) should be ignored.
220func (s *UpdateByQueryService) Lenient(lenient bool) *UpdateByQueryService {
221	s.lenient = &lenient
222	return s
223}
224
225// LowercaseExpandedTerms specifies whether query terms should be lowercased.
226func (s *UpdateByQueryService) LowercaseExpandedTerms(lowercaseExpandedTerms bool) *UpdateByQueryService {
227	s.lowercaseExpandedTerms = &lowercaseExpandedTerms
228	return s
229}
230
231// Preference specifies the node or shard the operation should be performed on
232// (default: random).
233func (s *UpdateByQueryService) Preference(preference string) *UpdateByQueryService {
234	s.preference = preference
235	return s
236}
237
238// Query in the Lucene query string syntax.
239func (s *UpdateByQueryService) Q(q string) *UpdateByQueryService {
240	s.q = q
241	return s
242}
243
244// Refresh indicates whether the effected indexes should be refreshed.
245func (s *UpdateByQueryService) Refresh(refresh bool) *UpdateByQueryService {
246	s.refresh = &refresh
247	return s
248}
249
250// RequestCache specifies if request cache should be used for this request
251// or not, defaults to index level setting.
252func (s *UpdateByQueryService) RequestCache(requestCache bool) *UpdateByQueryService {
253	s.requestCache = &requestCache
254	return s
255}
256
257// Routing is a list of specific routing values.
258func (s *UpdateByQueryService) Routing(routing ...string) *UpdateByQueryService {
259	s.routing = append(s.routing, routing...)
260	return s
261}
262
263// Scroll specifies how long a consistent view of the index should be maintained
264// for scrolled search.
265func (s *UpdateByQueryService) Scroll(scroll string) *UpdateByQueryService {
266	s.scroll = scroll
267	return s
268}
269
270// ScrollSize is the size on the scroll request powering the update_by_query.
271func (s *UpdateByQueryService) ScrollSize(scrollSize int) *UpdateByQueryService {
272	s.scrollSize = &scrollSize
273	return s
274}
275
276// SearchTimeout defines an explicit timeout for each search request.
277// Defaults to no timeout.
278func (s *UpdateByQueryService) SearchTimeout(searchTimeout string) *UpdateByQueryService {
279	s.searchTimeout = searchTimeout
280	return s
281}
282
283// SearchType is the search operation type. Possible values are
284// "query_then_fetch" and "dfs_query_then_fetch".
285func (s *UpdateByQueryService) SearchType(searchType string) *UpdateByQueryService {
286	s.searchType = searchType
287	return s
288}
289
290// Size represents the number of hits to return (default: 10).
291func (s *UpdateByQueryService) Size(size int) *UpdateByQueryService {
292	s.size = &size
293	return s
294}
295
296// Sort is a list of <field>:<direction> pairs.
297func (s *UpdateByQueryService) Sort(sort ...string) *UpdateByQueryService {
298	s.sort = append(s.sort, sort...)
299	return s
300}
301
302// SortByField adds a sort order.
303func (s *UpdateByQueryService) SortByField(field string, ascending bool) *UpdateByQueryService {
304	if ascending {
305		s.sort = append(s.sort, fmt.Sprintf("%s:asc", field))
306	} else {
307		s.sort = append(s.sort, fmt.Sprintf("%s:desc", field))
308	}
309	return s
310}
311
312// Stats specifies specific tag(s) of the request for logging and statistical purposes.
313func (s *UpdateByQueryService) Stats(stats ...string) *UpdateByQueryService {
314	s.stats = append(s.stats, stats...)
315	return s
316}
317
318// SuggestField specifies which field to use for suggestions.
319func (s *UpdateByQueryService) SuggestField(suggestField string) *UpdateByQueryService {
320	s.suggestField = suggestField
321	return s
322}
323
324// SuggestMode specifies the suggest mode. Possible values are
325// "missing", "popular", and "always".
326func (s *UpdateByQueryService) SuggestMode(suggestMode string) *UpdateByQueryService {
327	s.suggestMode = suggestMode
328	return s
329}
330
331// SuggestSize specifies how many suggestions to return in response.
332func (s *UpdateByQueryService) SuggestSize(suggestSize int) *UpdateByQueryService {
333	s.suggestSize = &suggestSize
334	return s
335}
336
337// SuggestText specifies the source text for which the suggestions should be returned.
338func (s *UpdateByQueryService) SuggestText(suggestText string) *UpdateByQueryService {
339	s.suggestText = suggestText
340	return s
341}
342
343// TerminateAfter indicates the maximum number of documents to collect
344// for each shard, upon reaching which the query execution will terminate early.
345func (s *UpdateByQueryService) TerminateAfter(terminateAfter int) *UpdateByQueryService {
346	s.terminateAfter = &terminateAfter
347	return s
348}
349
350// Timeout is the time each individual bulk request should wait for shards
351// that are unavailable.
352func (s *UpdateByQueryService) Timeout(timeout string) *UpdateByQueryService {
353	s.timeout = timeout
354	return s
355}
356
357// TimeoutInMillis sets the timeout in milliseconds.
358func (s *UpdateByQueryService) TimeoutInMillis(timeoutInMillis int) *UpdateByQueryService {
359	s.timeout = fmt.Sprintf("%dms", timeoutInMillis)
360	return s
361}
362
363// TrackScores indicates whether to calculate and return scores even if
364// they are not used for sorting.
365func (s *UpdateByQueryService) TrackScores(trackScores bool) *UpdateByQueryService {
366	s.trackScores = &trackScores
367	return s
368}
369
370// Version specifies whether to return document version as part of a hit.
371func (s *UpdateByQueryService) Version(version bool) *UpdateByQueryService {
372	s.version = &version
373	return s
374}
375
376// VersionType indicates if the document increment the version number (internal)
377// on hit or not (reindex).
378func (s *UpdateByQueryService) VersionType(versionType bool) *UpdateByQueryService {
379	s.versionType = &versionType
380	return s
381}
382
383// WaitForCompletion indicates if the request should block until the reindex is complete.
384func (s *UpdateByQueryService) WaitForCompletion(waitForCompletion bool) *UpdateByQueryService {
385	s.waitForCompletion = &waitForCompletion
386	return s
387}
388
389// Pretty indicates that the JSON response be indented and human readable.
390func (s *UpdateByQueryService) Pretty(pretty bool) *UpdateByQueryService {
391	s.pretty = pretty
392	return s
393}
394
395// Script sets an update script.
396func (s *UpdateByQueryService) Script(script *Script) *UpdateByQueryService {
397	s.script = script
398	return s
399}
400
401// Query sets a query definition using the Query DSL.
402func (s *UpdateByQueryService) Query(query Query) *UpdateByQueryService {
403	s.query = query
404	return s
405}
406
407// BodyJson specifies e.g. the query to restrict the results specified with the
408// Query DSL (optional). The interface{} will be serialized to a JSON document,
409// so use a map[string]interface{}.
410func (s *UpdateByQueryService) BodyJson(body interface{}) *UpdateByQueryService {
411	s.bodyJson = body
412	return s
413}
414
415// Body specifies e.g. a query to restrict the results specified with
416// the Query DSL (optional).
417func (s *UpdateByQueryService) BodyString(body string) *UpdateByQueryService {
418	s.bodyString = body
419	return s
420}
421
422// buildURL builds the URL for the operation.
423func (s *UpdateByQueryService) buildURL() (string, url.Values, error) {
424	// Build URL
425	var err error
426	var path string
427	if len(s.index) > 0 && len(s.typ) > 0 {
428		path, err = uritemplates.Expand("/{index}/{type}/_update_by_query", map[string]string{
429			"index": strings.Join(s.index, ","),
430			"type":  strings.Join(s.typ, ","),
431		})
432	} else if len(s.index) > 0 && len(s.typ) == 0 {
433		path, err = uritemplates.Expand("/{index}/_update_by_query", map[string]string{
434			"index": strings.Join(s.index, ","),
435		})
436	} else if len(s.index) == 0 && len(s.typ) > 0 {
437		path, err = uritemplates.Expand("/_all/{type}/_update_by_query", map[string]string{
438			"type": strings.Join(s.typ, ","),
439		})
440	} else {
441		path = "/_all/_update_by_query"
442	}
443	if err != nil {
444		return "", url.Values{}, err
445	}
446
447	// Add query string parameters
448	params := url.Values{}
449	if s.pretty {
450		params.Set("pretty", "1")
451	}
452	if len(s.xSource) > 0 {
453		params.Set("_source", strings.Join(s.xSource, ","))
454	}
455	if len(s.xSourceExclude) > 0 {
456		params.Set("_source_exclude", strings.Join(s.xSourceExclude, ","))
457	}
458	if len(s.xSourceInclude) > 0 {
459		params.Set("_source_include", strings.Join(s.xSourceInclude, ","))
460	}
461	if s.allowNoIndices != nil {
462		params.Set("allow_no_indices", fmt.Sprintf("%v", *s.allowNoIndices))
463	}
464	if s.analyzeWildcard != nil {
465		params.Set("analyze_wildcard", fmt.Sprintf("%v", *s.analyzeWildcard))
466	}
467	if s.analyzer != "" {
468		params.Set("analyzer", s.analyzer)
469	}
470	if s.conflicts != "" {
471		params.Set("conflicts", s.conflicts)
472	}
473	if s.consistency != "" {
474		params.Set("consistency", s.consistency)
475	}
476	if s.defaultOperator != "" {
477		params.Set("default_operator", s.defaultOperator)
478	}
479	if s.df != "" {
480		params.Set("df", s.df)
481	}
482	if s.expandWildcards != "" {
483		params.Set("expand_wildcards", s.expandWildcards)
484	}
485	if s.explain != nil {
486		params.Set("explain", fmt.Sprintf("%v", *s.explain))
487	}
488	if len(s.fielddataFields) > 0 {
489		params.Set("fielddata_fields", strings.Join(s.fielddataFields, ","))
490	}
491	if len(s.fields) > 0 {
492		params.Set("fields", strings.Join(s.fields, ","))
493	}
494	if s.from != nil {
495		params.Set("from", fmt.Sprintf("%d", *s.from))
496	}
497	if s.ignoreUnavailable != nil {
498		params.Set("ignore_unavailable", fmt.Sprintf("%v", *s.ignoreUnavailable))
499	}
500	if s.lenient != nil {
501		params.Set("lenient", fmt.Sprintf("%v", *s.lenient))
502	}
503	if s.lowercaseExpandedTerms != nil {
504		params.Set("lowercase_expanded_terms", fmt.Sprintf("%v", *s.lowercaseExpandedTerms))
505	}
506	if s.preference != "" {
507		params.Set("preference", s.preference)
508	}
509	if s.q != "" {
510		params.Set("q", s.q)
511	}
512	if s.refresh != nil {
513		params.Set("refresh", fmt.Sprintf("%v", *s.refresh))
514	}
515	if s.requestCache != nil {
516		params.Set("request_cache", fmt.Sprintf("%v", *s.requestCache))
517	}
518	if len(s.routing) > 0 {
519		params.Set("routing", strings.Join(s.routing, ","))
520	}
521	if s.scroll != "" {
522		params.Set("scroll", s.scroll)
523	}
524	if s.scrollSize != nil {
525		params.Set("scroll_size", fmt.Sprintf("%d", *s.scrollSize))
526	}
527	if s.searchTimeout != "" {
528		params.Set("search_timeout", s.searchTimeout)
529	}
530	if s.searchType != "" {
531		params.Set("search_type", s.searchType)
532	}
533	if s.size != nil {
534		params.Set("size", fmt.Sprintf("%d", *s.size))
535	}
536	if len(s.sort) > 0 {
537		params.Set("sort", strings.Join(s.sort, ","))
538	}
539	if len(s.stats) > 0 {
540		params.Set("stats", strings.Join(s.stats, ","))
541	}
542	if s.suggestField != "" {
543		params.Set("suggest_field", s.suggestField)
544	}
545	if s.suggestMode != "" {
546		params.Set("suggest_mode", s.suggestMode)
547	}
548	if s.suggestSize != nil {
549		params.Set("suggest_size", fmt.Sprintf("%v", *s.suggestSize))
550	}
551	if s.suggestText != "" {
552		params.Set("suggest_text", s.suggestText)
553	}
554	if s.terminateAfter != nil {
555		params.Set("terminate_after", fmt.Sprintf("%v", *s.terminateAfter))
556	}
557	if s.timeout != "" {
558		params.Set("timeout", s.timeout)
559	}
560	if s.trackScores != nil {
561		params.Set("track_scores", fmt.Sprintf("%v", *s.trackScores))
562	}
563	if s.version != nil {
564		params.Set("version", fmt.Sprintf("%v", *s.version))
565	}
566	if s.versionType != nil {
567		params.Set("version_type", fmt.Sprintf("%v", *s.versionType))
568	}
569	if s.waitForCompletion != nil {
570		params.Set("wait_for_completion", fmt.Sprintf("%v", *s.waitForCompletion))
571	}
572	return path, params, nil
573}
574
575// Validate checks if the operation is valid.
576func (s *UpdateByQueryService) Validate() error {
577	return nil
578}
579
580// body returns the body part of the document request.
581func (s *UpdateByQueryService) body() (interface{}, error) {
582	if s.bodyJson != nil {
583		return s.bodyJson, nil
584	}
585	if s.bodyString != "" {
586		return s.bodyString, nil
587	}
588
589	source := make(map[string]interface{})
590
591	if s.script != nil {
592		src, err := s.script.Source()
593		if err != nil {
594			return nil, err
595		}
596		source["script"] = src
597	}
598
599	if s.query != nil {
600		src, err := s.query.Source()
601		if err != nil {
602			return nil, err
603		}
604		source["query"] = src
605	}
606
607	return source, nil
608}
609
610// Do executes the operation.
611func (s *UpdateByQueryService) Do() (*UpdateByQueryResponse, error) {
612	return s.DoC(nil)
613}
614
615// DoC executes the operation.
616func (s *UpdateByQueryService) DoC(ctx context.Context) (*UpdateByQueryResponse, error) {
617	// Check pre-conditions
618	if err := s.Validate(); err != nil {
619		return nil, err
620	}
621
622	// Get URL for request
623	path, params, err := s.buildURL()
624	if err != nil {
625		return nil, err
626	}
627
628	// Setup HTTP request body
629	body, err := s.body()
630	if err != nil {
631		return nil, err
632	}
633
634	// Get HTTP response
635	res, err := s.client.PerformRequestC(ctx, "POST", path, params, body)
636	if err != nil {
637		return nil, err
638	}
639
640	// Return operation response
641	ret := new(UpdateByQueryResponse)
642	if err := s.client.decoder.Decode(res.Body, ret); err != nil {
643		return nil, err
644	}
645	return ret, nil
646}
647
648// UpdateByQueryResponse is the response of UpdateByQueryService.Do.
649type UpdateByQueryResponse struct {
650	Took             int64                   `json:"took"`
651	TimedOut         bool                    `json:"timed_out"`
652	Total            int64                   `json:"total"`
653	Updated          int64                   `json:"updated"`
654	Created          int64                   `json:"created"`
655	Deleted          int64                   `json:"deleted"`
656	Batches          int64                   `json:"batches"`
657	VersionConflicts int64                   `json:"version_conflicts"`
658	Noops            int64                   `json:"noops"`
659	Retries          int64                   `json:"retries"`
660	Canceled         string                  `json:"canceled"`
661	Failures         []shardOperationFailure `json:"failures"`
662}
663