1// Copyright 2012-2015 Oliver Eilhard. All rights reserved. 2// Use of this source code is governed by a MIT-license. 3// See http://olivere.mit-license.org/license.txt for details. 4 5package elastic 6 7import ( 8 "bytes" 9 "encoding/json" 10 "errors" 11 "fmt" 12 "net/url" 13 14 "gopkg.in/olivere/elastic.v2/uritemplates" 15) 16 17type BulkService struct { 18 client *Client 19 20 index string 21 _type string 22 requests []BulkableRequest 23 //replicationType string 24 //consistencyLevel string 25 timeout string 26 refresh *bool 27 pretty bool 28} 29 30func NewBulkService(client *Client) *BulkService { 31 builder := &BulkService{ 32 client: client, 33 requests: make([]BulkableRequest, 0), 34 } 35 return builder 36} 37 38func (s *BulkService) reset() { 39 s.requests = make([]BulkableRequest, 0) 40} 41 42func (s *BulkService) Index(index string) *BulkService { 43 s.index = index 44 return s 45} 46 47func (s *BulkService) Type(_type string) *BulkService { 48 s._type = _type 49 return s 50} 51 52func (s *BulkService) Timeout(timeout string) *BulkService { 53 s.timeout = timeout 54 return s 55} 56 57func (s *BulkService) Refresh(refresh bool) *BulkService { 58 s.refresh = &refresh 59 return s 60} 61 62func (s *BulkService) Pretty(pretty bool) *BulkService { 63 s.pretty = pretty 64 return s 65} 66 67func (s *BulkService) Add(r BulkableRequest) *BulkService { 68 s.requests = append(s.requests, r) 69 return s 70} 71 72func (s *BulkService) NumberOfActions() int { 73 return len(s.requests) 74} 75 76func (s *BulkService) bodyAsString() (string, error) { 77 buf := bytes.NewBufferString("") 78 79 for _, req := range s.requests { 80 source, err := req.Source() 81 if err != nil { 82 return "", err 83 } 84 for _, line := range source { 85 _, err := buf.WriteString(fmt.Sprintf("%s\n", line)) 86 if err != nil { 87 return "", nil 88 } 89 } 90 } 91 92 return buf.String(), nil 93} 94 95func (s *BulkService) Do() (*BulkResponse, error) { 96 // No actions? 97 if s.NumberOfActions() == 0 { 98 return nil, errors.New("elastic: No bulk actions to commit") 99 } 100 101 // Get body 102 body, err := s.bodyAsString() 103 if err != nil { 104 return nil, err 105 } 106 107 // Build url 108 path := "/" 109 if s.index != "" { 110 index, err := uritemplates.Expand("{index}", map[string]string{ 111 "index": s.index, 112 }) 113 if err != nil { 114 return nil, err 115 } 116 path += index + "/" 117 } 118 if s._type != "" { 119 typ, err := uritemplates.Expand("{type}", map[string]string{ 120 "type": s._type, 121 }) 122 if err != nil { 123 return nil, err 124 } 125 path += typ + "/" 126 } 127 path += "_bulk" 128 129 // Parameters 130 params := make(url.Values) 131 if s.pretty { 132 params.Set("pretty", fmt.Sprintf("%v", s.pretty)) 133 } 134 if s.refresh != nil { 135 params.Set("refresh", fmt.Sprintf("%v", *s.refresh)) 136 } 137 if s.timeout != "" { 138 params.Set("timeout", s.timeout) 139 } 140 141 // Get response 142 res, err := s.client.PerformRequest("POST", path, params, body) 143 if err != nil { 144 return nil, err 145 } 146 147 // Return results 148 ret := new(BulkResponse) 149 if err := json.Unmarshal(res.Body, ret); err != nil { 150 return nil, err 151 } 152 153 // Reset so the request can be reused 154 s.reset() 155 156 return ret, nil 157} 158 159// BulkResponse is a response to a bulk execution. 160// 161// Example: 162// { 163// "took":3, 164// "errors":false, 165// "items":[{ 166// "index":{ 167// "_index":"index1", 168// "_type":"tweet", 169// "_id":"1", 170// "_version":3, 171// "status":201 172// } 173// },{ 174// "index":{ 175// "_index":"index2", 176// "_type":"tweet", 177// "_id":"2", 178// "_version":3, 179// "status":200 180// } 181// },{ 182// "delete":{ 183// "_index":"index1", 184// "_type":"tweet", 185// "_id":"1", 186// "_version":4, 187// "status":200, 188// "found":true 189// } 190// },{ 191// "update":{ 192// "_index":"index2", 193// "_type":"tweet", 194// "_id":"2", 195// "_version":4, 196// "status":200 197// } 198// }] 199// } 200type BulkResponse struct { 201 Took int `json:"took,omitempty"` 202 Errors bool `json:"errors,omitempty"` 203 Items []map[string]*BulkResponseItem `json:"items,omitempty"` 204} 205 206// BulkResponseItem is the result of a single bulk request. 207type BulkResponseItem struct { 208 Index string `json:"_index,omitempty"` 209 Type string `json:"_type,omitempty"` 210 Id string `json:"_id,omitempty"` 211 Version int `json:"_version,omitempty"` 212 Status int `json:"status,omitempty"` 213 Found bool `json:"found,omitempty"` 214 Error string `json:"error,omitempty"` 215} 216 217// Indexed returns all bulk request results of "index" actions. 218func (r *BulkResponse) Indexed() []*BulkResponseItem { 219 return r.ByAction("index") 220} 221 222// Created returns all bulk request results of "create" actions. 223func (r *BulkResponse) Created() []*BulkResponseItem { 224 return r.ByAction("create") 225} 226 227// Updated returns all bulk request results of "update" actions. 228func (r *BulkResponse) Updated() []*BulkResponseItem { 229 return r.ByAction("update") 230} 231 232// Deleted returns all bulk request results of "delete" actions. 233func (r *BulkResponse) Deleted() []*BulkResponseItem { 234 return r.ByAction("delete") 235} 236 237// ByAction returns all bulk request results of a certain action, 238// e.g. "index" or "delete". 239func (r *BulkResponse) ByAction(action string) []*BulkResponseItem { 240 if r.Items == nil { 241 return nil 242 } 243 items := make([]*BulkResponseItem, 0) 244 for _, item := range r.Items { 245 if result, found := item[action]; found { 246 items = append(items, result) 247 } 248 } 249 return items 250} 251 252// ById returns all bulk request results of a given document id, 253// regardless of the action ("index", "delete" etc.). 254func (r *BulkResponse) ById(id string) []*BulkResponseItem { 255 if r.Items == nil { 256 return nil 257 } 258 items := make([]*BulkResponseItem, 0) 259 for _, item := range r.Items { 260 for _, result := range item { 261 if result.Id == id { 262 items = append(items, result) 263 } 264 } 265 } 266 return items 267} 268 269// Failed returns those items of a bulk response that have errors, 270// i.e. those that don't have a status code between 200 and 299. 271func (r *BulkResponse) Failed() []*BulkResponseItem { 272 if r.Items == nil { 273 return nil 274 } 275 errors := make([]*BulkResponseItem, 0) 276 for _, item := range r.Items { 277 for _, result := range item { 278 if !(result.Status >= 200 && result.Status <= 299) { 279 errors = append(errors, result) 280 } 281 } 282 } 283 return errors 284} 285 286// Succeeded returns those items of a bulk response that have no errors, 287// i.e. those have a status code between 200 and 299. 288func (r *BulkResponse) Succeeded() []*BulkResponseItem { 289 if r.Items == nil { 290 return nil 291 } 292 succeeded := make([]*BulkResponseItem, 0) 293 for _, item := range r.Items { 294 for _, result := range item { 295 if result.Status >= 200 && result.Status <= 299 { 296 succeeded = append(succeeded, result) 297 } 298 } 299 } 300 return succeeded 301} 302