1// Copyright 2012-present Oliver Eilhard. All rights reserved.
2// Use of this source code is governed by a MIT-license.
3// See http://olivere.mit-license.org/license.txt for details.
4
5package elastic
6
7import (
8	"fmt"
9	"net/url"
10
11	"golang.org/x/net/context"
12)
13
14// ReindexService is a method to copy documents from one index to another.
15// It was introduced in Elasticsearch 2.3.0.
16//
17// Notice that Elastic already had a Reindexer service that pre-dated
18// the Reindex API. Use that if you're on an earlier version of Elasticsearch.
19//
20// It is documented at https://www.elastic.co/guide/en/elasticsearch/plugins/master/plugins-reindex.html.
21type ReindexService struct {
22	client            *Client
23	pretty            bool
24	consistency       string
25	refresh           *bool
26	timeout           string
27	waitForCompletion *bool
28	bodyJson          interface{}
29	bodyString        string
30	source            *ReindexSource
31	destination       *ReindexDestination
32	conflicts         string
33	size              *int
34	script            *Script
35}
36
37// NewReindexService creates a new ReindexService.
38func NewReindexService(client *Client) *ReindexService {
39	return &ReindexService{
40		client: client,
41	}
42}
43
44// Consistency specifies an explicit write consistency setting for the operation.
45func (s *ReindexService) Consistency(consistency string) *ReindexService {
46	s.consistency = consistency
47	return s
48}
49
50// Refresh indicates whether Elasticsearch should refresh the effected indexes
51// immediately.
52func (s *ReindexService) Refresh(refresh bool) *ReindexService {
53	s.refresh = &refresh
54	return s
55}
56
57// Timeout is the time each individual bulk request should wait for shards
58// that are unavailable.
59func (s *ReindexService) Timeout(timeout string) *ReindexService {
60	s.timeout = timeout
61	return s
62}
63
64// WaitForCompletion indicates whether Elasticsearch should block until the
65// reindex is complete.
66func (s *ReindexService) WaitForCompletion(waitForCompletion bool) *ReindexService {
67	s.waitForCompletion = &waitForCompletion
68	return s
69}
70
71// Pretty indicates that the JSON response be indented and human readable.
72func (s *ReindexService) Pretty(pretty bool) *ReindexService {
73	s.pretty = pretty
74	return s
75}
76
77// Source specifies the source of the reindexing process.
78func (s *ReindexService) Source(source *ReindexSource) *ReindexService {
79	s.source = source
80	return s
81}
82
83// SourceIndex specifies the source index of the reindexing process.
84func (s *ReindexService) SourceIndex(index string) *ReindexService {
85	if s.source == nil {
86		s.source = NewReindexSource()
87	}
88	s.source = s.source.Index(index)
89	return s
90}
91
92// Destination specifies the destination of the reindexing process.
93func (s *ReindexService) Destination(destination *ReindexDestination) *ReindexService {
94	s.destination = destination
95	return s
96}
97
98// DestinationIndex specifies the destination index of the reindexing process.
99func (s *ReindexService) DestinationIndex(index string) *ReindexService {
100	if s.destination == nil {
101		s.destination = NewReindexDestination()
102	}
103	s.destination = s.destination.Index(index)
104	return s
105}
106
107// DestinationIndexAndType specifies both the destination index and type
108// of the reindexing process.
109func (s *ReindexService) DestinationIndexAndType(index, typ string) *ReindexService {
110	if s.destination == nil {
111		s.destination = NewReindexDestination()
112	}
113	s.destination = s.destination.Index(index)
114	s.destination = s.destination.Type(typ)
115	return s
116}
117
118// Conflicts indicates what to do when the process detects version conflicts.
119// Possible values are "proceed" and "abort".
120func (s *ReindexService) Conflicts(conflicts string) *ReindexService {
121	s.conflicts = conflicts
122	return s
123}
124
125// AbortOnVersionConflict aborts the request on version conflicts.
126// It is an alias to setting Conflicts("abort").
127func (s *ReindexService) AbortOnVersionConflict() *ReindexService {
128	s.conflicts = "abort"
129	return s
130}
131
132// ProceedOnVersionConflict aborts the request on version conflicts.
133// It is an alias to setting Conflicts("proceed").
134func (s *ReindexService) ProceedOnVersionConflict() *ReindexService {
135	s.conflicts = "proceed"
136	return s
137}
138
139// Size sets an upper limit for the number of processed documents.
140func (s *ReindexService) Size(size int) *ReindexService {
141	s.size = &size
142	return s
143}
144
145// Script allows for modification of the documents as they are reindexed
146// from source to destination.
147func (s *ReindexService) Script(script *Script) *ReindexService {
148	s.script = script
149	return s
150}
151
152// BodyJson specifies e.g. the query to restrict the results specified with the
153// Query DSL (optional). The interface{} will be serialized to a JSON document,
154// so use a map[string]interface{}.
155func (s *ReindexService) BodyJson(body interface{}) *ReindexService {
156	s.bodyJson = body
157	return s
158}
159
160// Body specifies e.g. a query to restrict the results specified with
161// the Query DSL (optional).
162func (s *ReindexService) BodyString(body string) *ReindexService {
163	s.bodyString = body
164	return s
165}
166
167// buildURL builds the URL for the operation.
168func (s *ReindexService) buildURL() (string, url.Values, error) {
169	// Build URL path
170	path := "/_reindex"
171
172	// Add query string parameters
173	params := url.Values{}
174	if s.pretty {
175		params.Set("pretty", "1")
176	}
177	if s.consistency != "" {
178		params.Set("consistency", s.consistency)
179	}
180	if s.refresh != nil {
181		params.Set("refresh", fmt.Sprintf("%v", *s.refresh))
182	}
183	if s.timeout != "" {
184		params.Set("timeout", s.timeout)
185	}
186	if s.waitForCompletion != nil {
187		params.Set("wait_for_completion", fmt.Sprintf("%v", *s.waitForCompletion))
188	}
189	return path, params, nil
190}
191
192// Validate checks if the operation is valid.
193func (s *ReindexService) Validate() error {
194	var invalid []string
195	if s.source == nil {
196		invalid = append(invalid, "Source")
197	} else {
198		if len(s.source.indices) == 0 {
199			invalid = append(invalid, "Source.Index")
200		}
201	}
202	if s.destination == nil {
203		invalid = append(invalid, "Destination")
204	}
205	if len(invalid) > 0 {
206		return fmt.Errorf("missing required fields: %v", invalid)
207	}
208	return nil
209}
210
211// body returns the body part of the document request.
212func (s *ReindexService) body() (interface{}, error) {
213	if s.bodyJson != nil {
214		return s.bodyJson, nil
215	}
216	if s.bodyString != "" {
217		return s.bodyString, nil
218	}
219
220	body := make(map[string]interface{})
221
222	if s.conflicts != "" {
223		body["conflicts"] = s.conflicts
224	}
225	if s.size != nil {
226		body["size"] = *s.size
227	}
228	if s.script != nil {
229		out, err := s.script.Source()
230		if err != nil {
231			return nil, err
232		}
233		body["script"] = out
234	}
235
236	src, err := s.source.Source()
237	if err != nil {
238		return nil, err
239	}
240	body["source"] = src
241
242	dst, err := s.destination.Source()
243	if err != nil {
244		return nil, err
245	}
246	body["dest"] = dst
247
248	return body, nil
249}
250
251// Do executes the operation.
252func (s *ReindexService) Do() (*ReindexResponse, error) {
253	return s.DoC(nil)
254}
255
256// DoC executes the operation.
257func (s *ReindexService) DoC(ctx context.Context) (*ReindexResponse, error) {
258	// Check pre-conditions
259	if err := s.Validate(); err != nil {
260		return nil, err
261	}
262
263	// Get URL for request
264	path, params, err := s.buildURL()
265	if err != nil {
266		return nil, err
267	}
268
269	// Setup HTTP request body
270	body, err := s.body()
271	if err != nil {
272		return nil, err
273	}
274
275	// Get HTTP response
276	res, err := s.client.PerformRequestC(ctx, "POST", path, params, body)
277	if err != nil {
278		return nil, err
279	}
280
281	// Return operation response
282	ret := new(ReindexResponse)
283	if err := s.client.decoder.Decode(res.Body, ret); err != nil {
284		return nil, err
285	}
286	return ret, nil
287}
288
289// ReindexResponse is the response of ReindexService.Do.
290type ReindexResponse struct {
291	Took             interface{}             `json:"took"` // 2.3.0 returns "37.7ms" while 2.2 returns 38 for took
292	TimedOut         bool                    `json:"timed_out"`
293	Total            int64                   `json:"total"`
294	Updated          int64                   `json:"updated"`
295	Created          int64                   `json:"created"`
296	Deleted          int64                   `json:"deleted"`
297	Batches          int64                   `json:"batches"`
298	VersionConflicts int64                   `json:"version_conflicts"`
299	Noops            int64                   `json:"noops"`
300	Retries          int64                   `json:"retries"`
301	Canceled         string                  `json:"canceled"`
302	Failures         []shardOperationFailure `json:"failures"`
303}
304
305// -- Source of Reindex --
306
307// ReindexSource specifies the source of a Reindex process.
308type ReindexSource struct {
309	searchType   string // default in ES is "query_then_fetch"
310	indices      []string
311	types        []string
312	routing      *string
313	preference   *string
314	requestCache *bool
315	scroll       string
316	query        Query
317	sorts        []SortInfo
318	sorters      []Sorter
319	searchSource *SearchSource
320}
321
322// NewReindexSource creates a new ReindexSource.
323func NewReindexSource() *ReindexSource {
324	return &ReindexSource{
325		indices: make([]string, 0),
326		types:   make([]string, 0),
327		sorts:   make([]SortInfo, 0),
328		sorters: make([]Sorter, 0),
329	}
330}
331
332// SearchType is the search operation type. Possible values are
333// "query_then_fetch" and "dfs_query_then_fetch".
334func (r *ReindexSource) SearchType(searchType string) *ReindexSource {
335	r.searchType = searchType
336	return r
337}
338
339func (r *ReindexSource) SearchTypeDfsQueryThenFetch() *ReindexSource {
340	return r.SearchType("dfs_query_then_fetch")
341}
342
343func (r *ReindexSource) SearchTypeQueryThenFetch() *ReindexSource {
344	return r.SearchType("query_then_fetch")
345}
346
347func (r *ReindexSource) Index(indices ...string) *ReindexSource {
348	r.indices = append(r.indices, indices...)
349	return r
350}
351
352func (r *ReindexSource) Type(types ...string) *ReindexSource {
353	r.types = append(r.types, types...)
354	return r
355}
356
357func (r *ReindexSource) Preference(preference string) *ReindexSource {
358	r.preference = &preference
359	return r
360}
361
362func (r *ReindexSource) RequestCache(requestCache bool) *ReindexSource {
363	r.requestCache = &requestCache
364	return r
365}
366
367func (r *ReindexSource) Scroll(scroll string) *ReindexSource {
368	r.scroll = scroll
369	return r
370}
371
372func (r *ReindexSource) Query(query Query) *ReindexSource {
373	r.query = query
374	return r
375}
376
377// Sort adds a sort order.
378func (s *ReindexSource) Sort(field string, ascending bool) *ReindexSource {
379	s.sorts = append(s.sorts, SortInfo{Field: field, Ascending: ascending})
380	return s
381}
382
383// SortWithInfo adds a sort order.
384func (s *ReindexSource) SortWithInfo(info SortInfo) *ReindexSource {
385	s.sorts = append(s.sorts, info)
386	return s
387}
388
389// SortBy	adds a sort order.
390func (s *ReindexSource) SortBy(sorter ...Sorter) *ReindexSource {
391	s.sorters = append(s.sorters, sorter...)
392	return s
393}
394
395// Source returns a serializable JSON request for the request.
396func (r *ReindexSource) Source() (interface{}, error) {
397	source := make(map[string]interface{})
398
399	if r.query != nil {
400		src, err := r.query.Source()
401		if err != nil {
402			return nil, err
403		}
404		source["query"] = src
405	} else if r.searchSource != nil {
406		src, err := r.searchSource.Source()
407		if err != nil {
408			return nil, err
409		}
410		source["source"] = src
411	}
412
413	if r.searchType != "" {
414		source["search_type"] = r.searchType
415	}
416
417	switch len(r.indices) {
418	case 0:
419	case 1:
420		source["index"] = r.indices[0]
421	default:
422		source["index"] = r.indices
423	}
424
425	switch len(r.types) {
426	case 0:
427	case 1:
428		source["type"] = r.types[0]
429	default:
430		source["type"] = r.types
431	}
432
433	if r.preference != nil && *r.preference != "" {
434		source["preference"] = *r.preference
435	}
436
437	if r.requestCache != nil {
438		source["request_cache"] = fmt.Sprintf("%v", *r.requestCache)
439	}
440
441	if r.scroll != "" {
442		source["scroll"] = r.scroll
443	}
444
445	if len(r.sorters) > 0 {
446		var sortarr []interface{}
447		for _, sorter := range r.sorters {
448			src, err := sorter.Source()
449			if err != nil {
450				return nil, err
451			}
452			sortarr = append(sortarr, src)
453		}
454		source["sort"] = sortarr
455	} else if len(r.sorts) > 0 {
456		var sortarr []interface{}
457		for _, sort := range r.sorts {
458			src, err := sort.Source()
459			if err != nil {
460				return nil, err
461			}
462			sortarr = append(sortarr, src)
463		}
464		source["sort"] = sortarr
465	}
466
467	return source, nil
468}
469
470// -source Destination of Reindex --
471
472// ReindexDestination is the destination of a Reindex API call.
473// It is basically the meta data of a BulkIndexRequest.
474//
475// See https://www.elastic.co/guide/en/elasticsearch/reference/2.3/docs-reindex.html
476// fsourcer details.
477type ReindexDestination struct {
478	index       string
479	typ         string
480	routing     string
481	parent      string
482	opType      string
483	version     int64  // default is MATCH_ANY
484	versionType string // default is "internal"
485}
486
487// NewReindexDestination returns a new ReindexDestination.
488func NewReindexDestination() *ReindexDestination {
489	return &ReindexDestination{}
490}
491
492// Index specifies name of the Elasticsearch index to use as the destination
493// of a reindexing process.
494func (r *ReindexDestination) Index(index string) *ReindexDestination {
495	r.index = index
496	return r
497}
498
499// Type specifies the Elasticsearch type to use for reindexing.
500func (r *ReindexDestination) Type(typ string) *ReindexDestination {
501	r.typ = typ
502	return r
503}
504
505// Routing specifies a routing value for the reindexing request.
506// It can be "keep", "discard", or start with "=". The latter specifies
507// the routing on the bulk request.
508func (r *ReindexDestination) Routing(routing string) *ReindexDestination {
509	r.routing = routing
510	return r
511}
512
513// Keep sets the routing on the bulk request sent for each match to the routing
514// of the match (the default).
515func (r *ReindexDestination) Keep() *ReindexDestination {
516	r.routing = "keep"
517	return r
518}
519
520// Discard sets the routing on the bulk request sent for each match to null.
521func (r *ReindexDestination) Discard() *ReindexDestination {
522	r.routing = "discard"
523	return r
524}
525
526// Parent specifies the identifier of the parent document (if available).
527func (r *ReindexDestination) Parent(parent string) *ReindexDestination {
528	r.parent = parent
529	return r
530}
531
532// OpType specifies if this request should follow create-only or upsert
533// behavior. This follows the OpType of the standard document index API.
534// See https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-index_.html#operation-type
535// for details.
536func (r *ReindexDestination) OpType(opType string) *ReindexDestination {
537	r.opType = opType
538	return r
539}
540
541// Version indicates the version of the document as part of an optimistic
542// concurrency model.
543func (r *ReindexDestination) Version(version int64) *ReindexDestination {
544	r.version = version
545	return r
546}
547
548// VersionType specifies how versions are created.
549func (r *ReindexDestination) VersionType(versionType string) *ReindexDestination {
550	r.versionType = versionType
551	return r
552}
553
554// Source returns a serializable JSON request for the request.
555func (r *ReindexDestination) Source() (interface{}, error) {
556	source := make(map[string]interface{})
557	if r.index != "" {
558		source["index"] = r.index
559	}
560	if r.typ != "" {
561		source["type"] = r.typ
562	}
563	if r.routing != "" {
564		source["routing"] = r.routing
565	}
566	if r.opType != "" {
567		source["op_type"] = r.opType
568	}
569	if r.parent != "" {
570		source["parent"] = r.parent
571	}
572	if r.version > 0 {
573		source["version"] = r.version
574	}
575	if r.versionType != "" {
576		source["version_type"] = r.versionType
577	}
578	return source, nil
579}
580