1// Copyright 2012-present Oliver Eilhard. All rights reserved. 2// Use of this source code is governed by a MIT-license. 3// See http://olivere.mit-license.org/license.txt for details. 4 5package elastic 6 7//go:generate easyjson bulk_update_request.go 8 9import ( 10 "encoding/json" 11 "fmt" 12 "strings" 13) 14 15// BulkUpdateRequest is a request to update a document in Elasticsearch. 16// 17// See https://www.elastic.co/guide/en/elasticsearch/reference/7.0/docs-bulk.html 18// for details. 19type BulkUpdateRequest struct { 20 BulkableRequest 21 index string 22 typ string 23 id string 24 25 routing string 26 parent string 27 script *Script 28 scriptedUpsert *bool 29 version int64 // default is MATCH_ANY 30 versionType string // default is "internal" 31 retryOnConflict *int 32 upsert interface{} 33 docAsUpsert *bool 34 detectNoop *bool 35 doc interface{} 36 returnSource *bool 37 ifSeqNo *int64 38 ifPrimaryTerm *int64 39 40 source []string 41 42 useEasyJSON bool 43} 44 45//easyjson:json 46type bulkUpdateRequestCommand map[string]bulkUpdateRequestCommandOp 47 48//easyjson:json 49type bulkUpdateRequestCommandOp struct { 50 Index string `json:"_index,omitempty"` 51 Type string `json:"_type,omitempty"` 52 Id string `json:"_id,omitempty"` 53 Parent string `json:"parent,omitempty"` 54 // RetryOnConflict is "_retry_on_conflict" for 6.0 and "retry_on_conflict" for 6.1+. 55 RetryOnConflict *int `json:"retry_on_conflict,omitempty"` 56 Routing string `json:"routing,omitempty"` 57 Version int64 `json:"version,omitempty"` 58 VersionType string `json:"version_type,omitempty"` 59 IfSeqNo *int64 `json:"if_seq_no,omitempty"` 60 IfPrimaryTerm *int64 `json:"if_primary_term,omitempty"` 61} 62 63//easyjson:json 64type bulkUpdateRequestCommandData struct { 65 DetectNoop *bool `json:"detect_noop,omitempty"` 66 Doc interface{} `json:"doc,omitempty"` 67 DocAsUpsert *bool `json:"doc_as_upsert,omitempty"` 68 Script interface{} `json:"script,omitempty"` 69 ScriptedUpsert *bool `json:"scripted_upsert,omitempty"` 70 Upsert interface{} `json:"upsert,omitempty"` 71 Source *bool `json:"_source,omitempty"` 72} 73 74// NewBulkUpdateRequest returns a new BulkUpdateRequest. 75func NewBulkUpdateRequest() *BulkUpdateRequest { 76 return &BulkUpdateRequest{} 77} 78 79// UseEasyJSON is an experimental setting that enables serialization 80// with github.com/mailru/easyjson, which should in faster serialization 81// time and less allocations, but removed compatibility with encoding/json, 82// usage of unsafe etc. See https://github.com/mailru/easyjson#issues-notes-and-limitations 83// for details. This setting is disabled by default. 84func (r *BulkUpdateRequest) UseEasyJSON(enable bool) *BulkUpdateRequest { 85 r.useEasyJSON = enable 86 return r 87} 88 89// Index specifies the Elasticsearch index to use for this update request. 90// If unspecified, the index set on the BulkService will be used. 91func (r *BulkUpdateRequest) Index(index string) *BulkUpdateRequest { 92 r.index = index 93 r.source = nil 94 return r 95} 96 97// Type specifies the Elasticsearch type to use for this update request. 98// If unspecified, the type set on the BulkService will be used. 99func (r *BulkUpdateRequest) Type(typ string) *BulkUpdateRequest { 100 r.typ = typ 101 r.source = nil 102 return r 103} 104 105// Id specifies the identifier of the document to update. 106func (r *BulkUpdateRequest) Id(id string) *BulkUpdateRequest { 107 r.id = id 108 r.source = nil 109 return r 110} 111 112// Routing specifies a routing value for the request. 113func (r *BulkUpdateRequest) Routing(routing string) *BulkUpdateRequest { 114 r.routing = routing 115 r.source = nil 116 return r 117} 118 119// Parent specifies the identifier of the parent document (if available). 120func (r *BulkUpdateRequest) Parent(parent string) *BulkUpdateRequest { 121 r.parent = parent 122 r.source = nil 123 return r 124} 125 126// Script specifies an update script. 127// See https://www.elastic.co/guide/en/elasticsearch/reference/7.0/docs-bulk.html#bulk-update 128// and https://www.elastic.co/guide/en/elasticsearch/reference/7.0/modules-scripting.html 129// for details. 130func (r *BulkUpdateRequest) Script(script *Script) *BulkUpdateRequest { 131 r.script = script 132 r.source = nil 133 return r 134} 135 136// ScripedUpsert specifies if your script will run regardless of 137// whether the document exists or not. 138// 139// See https://www.elastic.co/guide/en/elasticsearch/reference/7.0/docs-update.html#_literal_scripted_upsert_literal 140func (r *BulkUpdateRequest) ScriptedUpsert(upsert bool) *BulkUpdateRequest { 141 r.scriptedUpsert = &upsert 142 r.source = nil 143 return r 144} 145 146// RetryOnConflict specifies how often to retry in case of a version conflict. 147func (r *BulkUpdateRequest) RetryOnConflict(retryOnConflict int) *BulkUpdateRequest { 148 r.retryOnConflict = &retryOnConflict 149 r.source = nil 150 return r 151} 152 153// Version indicates the version of the document as part of an optimistic 154// concurrency model. 155func (r *BulkUpdateRequest) Version(version int64) *BulkUpdateRequest { 156 r.version = version 157 r.source = nil 158 return r 159} 160 161// VersionType can be "internal" (default), "external", "external_gte", 162// or "external_gt". 163func (r *BulkUpdateRequest) VersionType(versionType string) *BulkUpdateRequest { 164 r.versionType = versionType 165 r.source = nil 166 return r 167} 168 169// IfSeqNo indicates to only perform the index operation if the last 170// operation that has changed the document has the specified sequence number. 171func (r *BulkUpdateRequest) IfSeqNo(ifSeqNo int64) *BulkUpdateRequest { 172 r.ifSeqNo = &ifSeqNo 173 return r 174} 175 176// IfPrimaryTerm indicates to only perform the index operation if the 177// last operation that has changed the document has the specified primary term. 178func (r *BulkUpdateRequest) IfPrimaryTerm(ifPrimaryTerm int64) *BulkUpdateRequest { 179 r.ifPrimaryTerm = &ifPrimaryTerm 180 return r 181} 182 183// Doc specifies the updated document. 184func (r *BulkUpdateRequest) Doc(doc interface{}) *BulkUpdateRequest { 185 r.doc = doc 186 r.source = nil 187 return r 188} 189 190// DocAsUpsert indicates whether the contents of Doc should be used as 191// the Upsert value. 192// 193// See https://www.elastic.co/guide/en/elasticsearch/reference/7.0/docs-update.html#_literal_doc_as_upsert_literal 194// for details. 195func (r *BulkUpdateRequest) DocAsUpsert(docAsUpsert bool) *BulkUpdateRequest { 196 r.docAsUpsert = &docAsUpsert 197 r.source = nil 198 return r 199} 200 201// DetectNoop specifies whether changes that don't affect the document 202// should be ignored (true) or unignored (false). This is enabled by default 203// in Elasticsearch. 204func (r *BulkUpdateRequest) DetectNoop(detectNoop bool) *BulkUpdateRequest { 205 r.detectNoop = &detectNoop 206 r.source = nil 207 return r 208} 209 210// Upsert specifies the document to use for upserts. It will be used for 211// create if the original document does not exist. 212func (r *BulkUpdateRequest) Upsert(doc interface{}) *BulkUpdateRequest { 213 r.upsert = doc 214 r.source = nil 215 return r 216} 217 218// ReturnSource specifies whether Elasticsearch should return the source 219// after the update. In the request, this responds to the `_source` field. 220// It is false by default. 221func (r *BulkUpdateRequest) ReturnSource(source bool) *BulkUpdateRequest { 222 r.returnSource = &source 223 r.source = nil 224 return r 225} 226 227// String returns the on-wire representation of the update request, 228// concatenated as a single string. 229func (r *BulkUpdateRequest) String() string { 230 lines, err := r.Source() 231 if err != nil { 232 return fmt.Sprintf("error: %v", err) 233 } 234 return strings.Join(lines, "\n") 235} 236 237// Source returns the on-wire representation of the update request, 238// split into an action-and-meta-data line and an (optional) source line. 239// See https://www.elastic.co/guide/en/elasticsearch/reference/7.0/docs-bulk.html 240// for details. 241func (r *BulkUpdateRequest) Source() ([]string, error) { 242 // { "update" : { "_index" : "test", "_type" : "type1", "_id" : "1", ... } } 243 // { "doc" : { "field1" : "value1", ... } } 244 // or 245 // { "update" : { "_index" : "test", "_type" : "type1", "_id" : "1", ... } } 246 // { "script" : { ... } } 247 248 if r.source != nil { 249 return r.source, nil 250 } 251 252 lines := make([]string, 2) 253 254 // "update" ... 255 updateCommand := bulkUpdateRequestCommandOp{ 256 Index: r.index, 257 Type: r.typ, 258 Id: r.id, 259 Routing: r.routing, 260 Parent: r.parent, 261 Version: r.version, 262 VersionType: r.versionType, 263 RetryOnConflict: r.retryOnConflict, 264 IfSeqNo: r.ifSeqNo, 265 IfPrimaryTerm: r.ifPrimaryTerm, 266 } 267 command := bulkUpdateRequestCommand{ 268 "update": updateCommand, 269 } 270 271 var err error 272 var body []byte 273 if r.useEasyJSON { 274 // easyjson 275 body, err = command.MarshalJSON() 276 } else { 277 // encoding/json 278 body, err = json.Marshal(command) 279 } 280 if err != nil { 281 return nil, err 282 } 283 284 lines[0] = string(body) 285 286 // 2nd line: {"doc" : { ... }} or {"script": {...}} 287 var doc interface{} 288 if r.doc != nil { 289 // Automatically serialize strings as raw JSON 290 switch t := r.doc.(type) { 291 default: 292 doc = r.doc 293 case string: 294 if len(t) > 0 { 295 doc = json.RawMessage(t) 296 } 297 case *string: 298 if t != nil && len(*t) > 0 { 299 doc = json.RawMessage(*t) 300 } 301 } 302 } 303 data := bulkUpdateRequestCommandData{ 304 DocAsUpsert: r.docAsUpsert, 305 DetectNoop: r.detectNoop, 306 Upsert: r.upsert, 307 ScriptedUpsert: r.scriptedUpsert, 308 Doc: doc, 309 Source: r.returnSource, 310 } 311 if r.script != nil { 312 script, err := r.script.Source() 313 if err != nil { 314 return nil, err 315 } 316 data.Script = script 317 } 318 319 if r.useEasyJSON { 320 // easyjson 321 body, err = data.MarshalJSON() 322 } else { 323 // encoding/json 324 body, err = json.Marshal(data) 325 } 326 if err != nil { 327 return nil, err 328 } 329 330 lines[1] = string(body) 331 332 r.source = lines 333 return lines, nil 334} 335