1// Copyright 2012-present Oliver Eilhard. All rights reserved. 2// Use of this source code is governed by a MIT-license. 3// See http://olivere.mit-license.org/license.txt for details. 4 5package elastic 6 7import ( 8 "fmt" 9 "net/url" 10 11 "golang.org/x/net/context" 12) 13 14// ReindexService is a method to copy documents from one index to another. 15// It was introduced in Elasticsearch 2.3.0. 16// 17// Notice that Elastic already had a Reindexer service that pre-dated 18// the Reindex API. Use that if you're on an earlier version of Elasticsearch. 19// 20// It is documented at https://www.elastic.co/guide/en/elasticsearch/plugins/master/plugins-reindex.html. 21type ReindexService struct { 22 client *Client 23 pretty bool 24 consistency string 25 refresh *bool 26 timeout string 27 waitForCompletion *bool 28 bodyJson interface{} 29 bodyString string 30 source *ReindexSource 31 destination *ReindexDestination 32 conflicts string 33 size *int 34 script *Script 35} 36 37// NewReindexService creates a new ReindexService. 38func NewReindexService(client *Client) *ReindexService { 39 return &ReindexService{ 40 client: client, 41 } 42} 43 44// Consistency specifies an explicit write consistency setting for the operation. 45func (s *ReindexService) Consistency(consistency string) *ReindexService { 46 s.consistency = consistency 47 return s 48} 49 50// Refresh indicates whether Elasticsearch should refresh the effected indexes 51// immediately. 52func (s *ReindexService) Refresh(refresh bool) *ReindexService { 53 s.refresh = &refresh 54 return s 55} 56 57// Timeout is the time each individual bulk request should wait for shards 58// that are unavailable. 59func (s *ReindexService) Timeout(timeout string) *ReindexService { 60 s.timeout = timeout 61 return s 62} 63 64// WaitForCompletion indicates whether Elasticsearch should block until the 65// reindex is complete. 66func (s *ReindexService) WaitForCompletion(waitForCompletion bool) *ReindexService { 67 s.waitForCompletion = &waitForCompletion 68 return s 69} 70 71// Pretty indicates that the JSON response be indented and human readable. 72func (s *ReindexService) Pretty(pretty bool) *ReindexService { 73 s.pretty = pretty 74 return s 75} 76 77// Source specifies the source of the reindexing process. 78func (s *ReindexService) Source(source *ReindexSource) *ReindexService { 79 s.source = source 80 return s 81} 82 83// SourceIndex specifies the source index of the reindexing process. 84func (s *ReindexService) SourceIndex(index string) *ReindexService { 85 if s.source == nil { 86 s.source = NewReindexSource() 87 } 88 s.source = s.source.Index(index) 89 return s 90} 91 92// Destination specifies the destination of the reindexing process. 93func (s *ReindexService) Destination(destination *ReindexDestination) *ReindexService { 94 s.destination = destination 95 return s 96} 97 98// DestinationIndex specifies the destination index of the reindexing process. 99func (s *ReindexService) DestinationIndex(index string) *ReindexService { 100 if s.destination == nil { 101 s.destination = NewReindexDestination() 102 } 103 s.destination = s.destination.Index(index) 104 return s 105} 106 107// DestinationIndexAndType specifies both the destination index and type 108// of the reindexing process. 109func (s *ReindexService) DestinationIndexAndType(index, typ string) *ReindexService { 110 if s.destination == nil { 111 s.destination = NewReindexDestination() 112 } 113 s.destination = s.destination.Index(index) 114 s.destination = s.destination.Type(typ) 115 return s 116} 117 118// Conflicts indicates what to do when the process detects version conflicts. 119// Possible values are "proceed" and "abort". 120func (s *ReindexService) Conflicts(conflicts string) *ReindexService { 121 s.conflicts = conflicts 122 return s 123} 124 125// AbortOnVersionConflict aborts the request on version conflicts. 126// It is an alias to setting Conflicts("abort"). 127func (s *ReindexService) AbortOnVersionConflict() *ReindexService { 128 s.conflicts = "abort" 129 return s 130} 131 132// ProceedOnVersionConflict aborts the request on version conflicts. 133// It is an alias to setting Conflicts("proceed"). 134func (s *ReindexService) ProceedOnVersionConflict() *ReindexService { 135 s.conflicts = "proceed" 136 return s 137} 138 139// Size sets an upper limit for the number of processed documents. 140func (s *ReindexService) Size(size int) *ReindexService { 141 s.size = &size 142 return s 143} 144 145// Script allows for modification of the documents as they are reindexed 146// from source to destination. 147func (s *ReindexService) Script(script *Script) *ReindexService { 148 s.script = script 149 return s 150} 151 152// BodyJson specifies e.g. the query to restrict the results specified with the 153// Query DSL (optional). The interface{} will be serialized to a JSON document, 154// so use a map[string]interface{}. 155func (s *ReindexService) BodyJson(body interface{}) *ReindexService { 156 s.bodyJson = body 157 return s 158} 159 160// Body specifies e.g. a query to restrict the results specified with 161// the Query DSL (optional). 162func (s *ReindexService) BodyString(body string) *ReindexService { 163 s.bodyString = body 164 return s 165} 166 167// buildURL builds the URL for the operation. 168func (s *ReindexService) buildURL() (string, url.Values, error) { 169 // Build URL path 170 path := "/_reindex" 171 172 // Add query string parameters 173 params := url.Values{} 174 if s.pretty { 175 params.Set("pretty", "1") 176 } 177 if s.consistency != "" { 178 params.Set("consistency", s.consistency) 179 } 180 if s.refresh != nil { 181 params.Set("refresh", fmt.Sprintf("%v", *s.refresh)) 182 } 183 if s.timeout != "" { 184 params.Set("timeout", s.timeout) 185 } 186 if s.waitForCompletion != nil { 187 params.Set("wait_for_completion", fmt.Sprintf("%v", *s.waitForCompletion)) 188 } 189 return path, params, nil 190} 191 192// Validate checks if the operation is valid. 193func (s *ReindexService) Validate() error { 194 var invalid []string 195 if s.source == nil { 196 invalid = append(invalid, "Source") 197 } else { 198 if len(s.source.indices) == 0 { 199 invalid = append(invalid, "Source.Index") 200 } 201 } 202 if s.destination == nil { 203 invalid = append(invalid, "Destination") 204 } 205 if len(invalid) > 0 { 206 return fmt.Errorf("missing required fields: %v", invalid) 207 } 208 return nil 209} 210 211// body returns the body part of the document request. 212func (s *ReindexService) body() (interface{}, error) { 213 if s.bodyJson != nil { 214 return s.bodyJson, nil 215 } 216 if s.bodyString != "" { 217 return s.bodyString, nil 218 } 219 220 body := make(map[string]interface{}) 221 222 if s.conflicts != "" { 223 body["conflicts"] = s.conflicts 224 } 225 if s.size != nil { 226 body["size"] = *s.size 227 } 228 if s.script != nil { 229 out, err := s.script.Source() 230 if err != nil { 231 return nil, err 232 } 233 body["script"] = out 234 } 235 236 src, err := s.source.Source() 237 if err != nil { 238 return nil, err 239 } 240 body["source"] = src 241 242 dst, err := s.destination.Source() 243 if err != nil { 244 return nil, err 245 } 246 body["dest"] = dst 247 248 return body, nil 249} 250 251// Do executes the operation. 252func (s *ReindexService) Do() (*ReindexResponse, error) { 253 return s.DoC(nil) 254} 255 256// DoC executes the operation. 257func (s *ReindexService) DoC(ctx context.Context) (*ReindexResponse, error) { 258 // Check pre-conditions 259 if err := s.Validate(); err != nil { 260 return nil, err 261 } 262 263 // Get URL for request 264 path, params, err := s.buildURL() 265 if err != nil { 266 return nil, err 267 } 268 269 // Setup HTTP request body 270 body, err := s.body() 271 if err != nil { 272 return nil, err 273 } 274 275 // Get HTTP response 276 res, err := s.client.PerformRequestC(ctx, "POST", path, params, body) 277 if err != nil { 278 return nil, err 279 } 280 281 // Return operation response 282 ret := new(ReindexResponse) 283 if err := s.client.decoder.Decode(res.Body, ret); err != nil { 284 return nil, err 285 } 286 return ret, nil 287} 288 289// ReindexResponse is the response of ReindexService.Do. 290type ReindexResponse struct { 291 Took interface{} `json:"took"` // 2.3.0 returns "37.7ms" while 2.2 returns 38 for took 292 TimedOut bool `json:"timed_out"` 293 Total int64 `json:"total"` 294 Updated int64 `json:"updated"` 295 Created int64 `json:"created"` 296 Deleted int64 `json:"deleted"` 297 Batches int64 `json:"batches"` 298 VersionConflicts int64 `json:"version_conflicts"` 299 Noops int64 `json:"noops"` 300 Retries int64 `json:"retries"` 301 Canceled string `json:"canceled"` 302 Failures []shardOperationFailure `json:"failures"` 303} 304 305// -- Source of Reindex -- 306 307// ReindexSource specifies the source of a Reindex process. 308type ReindexSource struct { 309 searchType string // default in ES is "query_then_fetch" 310 indices []string 311 types []string 312 routing *string 313 preference *string 314 requestCache *bool 315 scroll string 316 query Query 317 sorts []SortInfo 318 sorters []Sorter 319 searchSource *SearchSource 320} 321 322// NewReindexSource creates a new ReindexSource. 323func NewReindexSource() *ReindexSource { 324 return &ReindexSource{ 325 indices: make([]string, 0), 326 types: make([]string, 0), 327 sorts: make([]SortInfo, 0), 328 sorters: make([]Sorter, 0), 329 } 330} 331 332// SearchType is the search operation type. Possible values are 333// "query_then_fetch" and "dfs_query_then_fetch". 334func (r *ReindexSource) SearchType(searchType string) *ReindexSource { 335 r.searchType = searchType 336 return r 337} 338 339func (r *ReindexSource) SearchTypeDfsQueryThenFetch() *ReindexSource { 340 return r.SearchType("dfs_query_then_fetch") 341} 342 343func (r *ReindexSource) SearchTypeQueryThenFetch() *ReindexSource { 344 return r.SearchType("query_then_fetch") 345} 346 347func (r *ReindexSource) Index(indices ...string) *ReindexSource { 348 r.indices = append(r.indices, indices...) 349 return r 350} 351 352func (r *ReindexSource) Type(types ...string) *ReindexSource { 353 r.types = append(r.types, types...) 354 return r 355} 356 357func (r *ReindexSource) Preference(preference string) *ReindexSource { 358 r.preference = &preference 359 return r 360} 361 362func (r *ReindexSource) RequestCache(requestCache bool) *ReindexSource { 363 r.requestCache = &requestCache 364 return r 365} 366 367func (r *ReindexSource) Scroll(scroll string) *ReindexSource { 368 r.scroll = scroll 369 return r 370} 371 372func (r *ReindexSource) Query(query Query) *ReindexSource { 373 r.query = query 374 return r 375} 376 377// Sort adds a sort order. 378func (s *ReindexSource) Sort(field string, ascending bool) *ReindexSource { 379 s.sorts = append(s.sorts, SortInfo{Field: field, Ascending: ascending}) 380 return s 381} 382 383// SortWithInfo adds a sort order. 384func (s *ReindexSource) SortWithInfo(info SortInfo) *ReindexSource { 385 s.sorts = append(s.sorts, info) 386 return s 387} 388 389// SortBy adds a sort order. 390func (s *ReindexSource) SortBy(sorter ...Sorter) *ReindexSource { 391 s.sorters = append(s.sorters, sorter...) 392 return s 393} 394 395// Source returns a serializable JSON request for the request. 396func (r *ReindexSource) Source() (interface{}, error) { 397 source := make(map[string]interface{}) 398 399 if r.query != nil { 400 src, err := r.query.Source() 401 if err != nil { 402 return nil, err 403 } 404 source["query"] = src 405 } else if r.searchSource != nil { 406 src, err := r.searchSource.Source() 407 if err != nil { 408 return nil, err 409 } 410 source["source"] = src 411 } 412 413 if r.searchType != "" { 414 source["search_type"] = r.searchType 415 } 416 417 switch len(r.indices) { 418 case 0: 419 case 1: 420 source["index"] = r.indices[0] 421 default: 422 source["index"] = r.indices 423 } 424 425 switch len(r.types) { 426 case 0: 427 case 1: 428 source["type"] = r.types[0] 429 default: 430 source["type"] = r.types 431 } 432 433 if r.preference != nil && *r.preference != "" { 434 source["preference"] = *r.preference 435 } 436 437 if r.requestCache != nil { 438 source["request_cache"] = fmt.Sprintf("%v", *r.requestCache) 439 } 440 441 if r.scroll != "" { 442 source["scroll"] = r.scroll 443 } 444 445 if len(r.sorters) > 0 { 446 var sortarr []interface{} 447 for _, sorter := range r.sorters { 448 src, err := sorter.Source() 449 if err != nil { 450 return nil, err 451 } 452 sortarr = append(sortarr, src) 453 } 454 source["sort"] = sortarr 455 } else if len(r.sorts) > 0 { 456 var sortarr []interface{} 457 for _, sort := range r.sorts { 458 src, err := sort.Source() 459 if err != nil { 460 return nil, err 461 } 462 sortarr = append(sortarr, src) 463 } 464 source["sort"] = sortarr 465 } 466 467 return source, nil 468} 469 470// -source Destination of Reindex -- 471 472// ReindexDestination is the destination of a Reindex API call. 473// It is basically the meta data of a BulkIndexRequest. 474// 475// See https://www.elastic.co/guide/en/elasticsearch/reference/2.3/docs-reindex.html 476// fsourcer details. 477type ReindexDestination struct { 478 index string 479 typ string 480 routing string 481 parent string 482 opType string 483 version int64 // default is MATCH_ANY 484 versionType string // default is "internal" 485} 486 487// NewReindexDestination returns a new ReindexDestination. 488func NewReindexDestination() *ReindexDestination { 489 return &ReindexDestination{} 490} 491 492// Index specifies name of the Elasticsearch index to use as the destination 493// of a reindexing process. 494func (r *ReindexDestination) Index(index string) *ReindexDestination { 495 r.index = index 496 return r 497} 498 499// Type specifies the Elasticsearch type to use for reindexing. 500func (r *ReindexDestination) Type(typ string) *ReindexDestination { 501 r.typ = typ 502 return r 503} 504 505// Routing specifies a routing value for the reindexing request. 506// It can be "keep", "discard", or start with "=". The latter specifies 507// the routing on the bulk request. 508func (r *ReindexDestination) Routing(routing string) *ReindexDestination { 509 r.routing = routing 510 return r 511} 512 513// Keep sets the routing on the bulk request sent for each match to the routing 514// of the match (the default). 515func (r *ReindexDestination) Keep() *ReindexDestination { 516 r.routing = "keep" 517 return r 518} 519 520// Discard sets the routing on the bulk request sent for each match to null. 521func (r *ReindexDestination) Discard() *ReindexDestination { 522 r.routing = "discard" 523 return r 524} 525 526// Parent specifies the identifier of the parent document (if available). 527func (r *ReindexDestination) Parent(parent string) *ReindexDestination { 528 r.parent = parent 529 return r 530} 531 532// OpType specifies if this request should follow create-only or upsert 533// behavior. This follows the OpType of the standard document index API. 534// See https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-index_.html#operation-type 535// for details. 536func (r *ReindexDestination) OpType(opType string) *ReindexDestination { 537 r.opType = opType 538 return r 539} 540 541// Version indicates the version of the document as part of an optimistic 542// concurrency model. 543func (r *ReindexDestination) Version(version int64) *ReindexDestination { 544 r.version = version 545 return r 546} 547 548// VersionType specifies how versions are created. 549func (r *ReindexDestination) VersionType(versionType string) *ReindexDestination { 550 r.versionType = versionType 551 return r 552} 553 554// Source returns a serializable JSON request for the request. 555func (r *ReindexDestination) Source() (interface{}, error) { 556 source := make(map[string]interface{}) 557 if r.index != "" { 558 source["index"] = r.index 559 } 560 if r.typ != "" { 561 source["type"] = r.typ 562 } 563 if r.routing != "" { 564 source["routing"] = r.routing 565 } 566 if r.opType != "" { 567 source["op_type"] = r.opType 568 } 569 if r.parent != "" { 570 source["parent"] = r.parent 571 } 572 if r.version > 0 { 573 source["version"] = r.version 574 } 575 if r.versionType != "" { 576 source["version_type"] = r.versionType 577 } 578 return source, nil 579} 580