1// Copyright 2012-2015 Oliver Eilhard. All rights reserved.
2// Use of this source code is governed by a MIT-license.
3// See http://olivere.mit-license.org/license.txt for details.
4
5package elastic
6
7import (
8	"bytes"
9	"encoding/json"
10	"errors"
11	"fmt"
12	"net/url"
13
14	"gopkg.in/olivere/elastic.v2/uritemplates"
15)
16
17type BulkService struct {
18	client *Client
19
20	index    string
21	_type    string
22	requests []BulkableRequest
23	//replicationType string
24	//consistencyLevel string
25	timeout string
26	refresh *bool
27	pretty  bool
28}
29
30func NewBulkService(client *Client) *BulkService {
31	builder := &BulkService{
32		client:   client,
33		requests: make([]BulkableRequest, 0),
34	}
35	return builder
36}
37
38func (s *BulkService) reset() {
39	s.requests = make([]BulkableRequest, 0)
40}
41
42func (s *BulkService) Index(index string) *BulkService {
43	s.index = index
44	return s
45}
46
47func (s *BulkService) Type(_type string) *BulkService {
48	s._type = _type
49	return s
50}
51
52func (s *BulkService) Timeout(timeout string) *BulkService {
53	s.timeout = timeout
54	return s
55}
56
57func (s *BulkService) Refresh(refresh bool) *BulkService {
58	s.refresh = &refresh
59	return s
60}
61
62func (s *BulkService) Pretty(pretty bool) *BulkService {
63	s.pretty = pretty
64	return s
65}
66
67func (s *BulkService) Add(r BulkableRequest) *BulkService {
68	s.requests = append(s.requests, r)
69	return s
70}
71
72func (s *BulkService) NumberOfActions() int {
73	return len(s.requests)
74}
75
76func (s *BulkService) bodyAsString() (string, error) {
77	buf := bytes.NewBufferString("")
78
79	for _, req := range s.requests {
80		source, err := req.Source()
81		if err != nil {
82			return "", err
83		}
84		for _, line := range source {
85			_, err := buf.WriteString(fmt.Sprintf("%s\n", line))
86			if err != nil {
87				return "", nil
88			}
89		}
90	}
91
92	return buf.String(), nil
93}
94
95func (s *BulkService) Do() (*BulkResponse, error) {
96	// No actions?
97	if s.NumberOfActions() == 0 {
98		return nil, errors.New("elastic: No bulk actions to commit")
99	}
100
101	// Get body
102	body, err := s.bodyAsString()
103	if err != nil {
104		return nil, err
105	}
106
107	// Build url
108	path := "/"
109	if s.index != "" {
110		index, err := uritemplates.Expand("{index}", map[string]string{
111			"index": s.index,
112		})
113		if err != nil {
114			return nil, err
115		}
116		path += index + "/"
117	}
118	if s._type != "" {
119		typ, err := uritemplates.Expand("{type}", map[string]string{
120			"type": s._type,
121		})
122		if err != nil {
123			return nil, err
124		}
125		path += typ + "/"
126	}
127	path += "_bulk"
128
129	// Parameters
130	params := make(url.Values)
131	if s.pretty {
132		params.Set("pretty", fmt.Sprintf("%v", s.pretty))
133	}
134	if s.refresh != nil {
135		params.Set("refresh", fmt.Sprintf("%v", *s.refresh))
136	}
137	if s.timeout != "" {
138		params.Set("timeout", s.timeout)
139	}
140
141	// Get response
142	res, err := s.client.PerformRequest("POST", path, params, body)
143	if err != nil {
144		return nil, err
145	}
146
147	// Return results
148	ret := new(BulkResponse)
149	if err := json.Unmarshal(res.Body, ret); err != nil {
150		return nil, err
151	}
152
153	// Reset so the request can be reused
154	s.reset()
155
156	return ret, nil
157}
158
159// BulkResponse is a response to a bulk execution.
160//
161// Example:
162// {
163//   "took":3,
164//   "errors":false,
165//   "items":[{
166//     "index":{
167//       "_index":"index1",
168//       "_type":"tweet",
169//       "_id":"1",
170//       "_version":3,
171//       "status":201
172//     }
173//   },{
174//     "index":{
175//       "_index":"index2",
176//       "_type":"tweet",
177//       "_id":"2",
178//       "_version":3,
179//       "status":200
180//     }
181//   },{
182//     "delete":{
183//       "_index":"index1",
184//       "_type":"tweet",
185//       "_id":"1",
186//       "_version":4,
187//       "status":200,
188//       "found":true
189//     }
190//   },{
191//     "update":{
192//       "_index":"index2",
193//       "_type":"tweet",
194//       "_id":"2",
195//       "_version":4,
196//       "status":200
197//     }
198//   }]
199// }
200type BulkResponse struct {
201	Took   int                            `json:"took,omitempty"`
202	Errors bool                           `json:"errors,omitempty"`
203	Items  []map[string]*BulkResponseItem `json:"items,omitempty"`
204}
205
206// BulkResponseItem is the result of a single bulk request.
207type BulkResponseItem struct {
208	Index   string `json:"_index,omitempty"`
209	Type    string `json:"_type,omitempty"`
210	Id      string `json:"_id,omitempty"`
211	Version int    `json:"_version,omitempty"`
212	Status  int    `json:"status,omitempty"`
213	Found   bool   `json:"found,omitempty"`
214	Error   string `json:"error,omitempty"`
215}
216
217// Indexed returns all bulk request results of "index" actions.
218func (r *BulkResponse) Indexed() []*BulkResponseItem {
219	return r.ByAction("index")
220}
221
222// Created returns all bulk request results of "create" actions.
223func (r *BulkResponse) Created() []*BulkResponseItem {
224	return r.ByAction("create")
225}
226
227// Updated returns all bulk request results of "update" actions.
228func (r *BulkResponse) Updated() []*BulkResponseItem {
229	return r.ByAction("update")
230}
231
232// Deleted returns all bulk request results of "delete" actions.
233func (r *BulkResponse) Deleted() []*BulkResponseItem {
234	return r.ByAction("delete")
235}
236
237// ByAction returns all bulk request results of a certain action,
238// e.g. "index" or "delete".
239func (r *BulkResponse) ByAction(action string) []*BulkResponseItem {
240	if r.Items == nil {
241		return nil
242	}
243	items := make([]*BulkResponseItem, 0)
244	for _, item := range r.Items {
245		if result, found := item[action]; found {
246			items = append(items, result)
247		}
248	}
249	return items
250}
251
252// ById returns all bulk request results of a given document id,
253// regardless of the action ("index", "delete" etc.).
254func (r *BulkResponse) ById(id string) []*BulkResponseItem {
255	if r.Items == nil {
256		return nil
257	}
258	items := make([]*BulkResponseItem, 0)
259	for _, item := range r.Items {
260		for _, result := range item {
261			if result.Id == id {
262				items = append(items, result)
263			}
264		}
265	}
266	return items
267}
268
269// Failed returns those items of a bulk response that have errors,
270// i.e. those that don't have a status code between 200 and 299.
271func (r *BulkResponse) Failed() []*BulkResponseItem {
272	if r.Items == nil {
273		return nil
274	}
275	errors := make([]*BulkResponseItem, 0)
276	for _, item := range r.Items {
277		for _, result := range item {
278			if !(result.Status >= 200 && result.Status <= 299) {
279				errors = append(errors, result)
280			}
281		}
282	}
283	return errors
284}
285
286// Succeeded returns those items of a bulk response that have no errors,
287// i.e. those have a status code between 200 and 299.
288func (r *BulkResponse) Succeeded() []*BulkResponseItem {
289	if r.Items == nil {
290		return nil
291	}
292	succeeded := make([]*BulkResponseItem, 0)
293	for _, item := range r.Items {
294		for _, result := range item {
295			if result.Status >= 200 && result.Status <= 299 {
296				succeeded = append(succeeded, result)
297			}
298		}
299	}
300	return succeeded
301}
302