1// Copyright 2012-present Oliver Eilhard. All rights reserved. 2// Use of this source code is governed by a MIT-license. 3// See http://olivere.mit-license.org/license.txt for details. 4 5package elastic 6 7import ( 8 "context" 9 "fmt" 10 "net/url" 11 12 "gopkg.in/olivere/elastic.v5/uritemplates" 13) 14 15// IndexService adds or updates a typed JSON document in a specified index, 16// making it searchable. 17// 18// See https://www.elastic.co/guide/en/elasticsearch/reference/5.2/docs-index_.html 19// for details. 20type IndexService struct { 21 client *Client 22 pretty bool 23 id string 24 index string 25 typ string 26 parent string 27 routing string 28 timeout string 29 timestamp string 30 ttl string 31 version interface{} 32 opType string 33 versionType string 34 refresh string 35 waitForActiveShards string 36 pipeline string 37 bodyJson interface{} 38 bodyString string 39} 40 41// NewIndexService creates a new IndexService. 42func NewIndexService(client *Client) *IndexService { 43 return &IndexService{ 44 client: client, 45 } 46} 47 48// Id is the document ID. 49func (s *IndexService) Id(id string) *IndexService { 50 s.id = id 51 return s 52} 53 54// Index is the name of the index. 55func (s *IndexService) Index(index string) *IndexService { 56 s.index = index 57 return s 58} 59 60// Type is the type of the document. 61func (s *IndexService) Type(typ string) *IndexService { 62 s.typ = typ 63 return s 64} 65 66// WaitForActiveShards sets the number of shard copies that must be active 67// before proceeding with the index operation. Defaults to 1, meaning the 68// primary shard only. Set to `all` for all shard copies, otherwise set to 69// any non-negative value less than or equal to the total number of copies 70// for the shard (number of replicas + 1). 71func (s *IndexService) WaitForActiveShards(waitForActiveShards string) *IndexService { 72 s.waitForActiveShards = waitForActiveShards 73 return s 74} 75 76// Pipeline specifies the pipeline id to preprocess incoming documents with. 77func (s *IndexService) Pipeline(pipeline string) *IndexService { 78 s.pipeline = pipeline 79 return s 80} 81 82// Refresh the index after performing the operation. 83func (s *IndexService) Refresh(refresh string) *IndexService { 84 s.refresh = refresh 85 return s 86} 87 88// Ttl is an expiration time for the document. 89func (s *IndexService) Ttl(ttl string) *IndexService { 90 s.ttl = ttl 91 return s 92} 93 94// TTL is an expiration time for the document (alias for Ttl). 95func (s *IndexService) TTL(ttl string) *IndexService { 96 s.ttl = ttl 97 return s 98} 99 100// Version is an explicit version number for concurrency control. 101func (s *IndexService) Version(version interface{}) *IndexService { 102 s.version = version 103 return s 104} 105 106// OpType is an explicit operation type, i.e. "create" or "index" (default). 107func (s *IndexService) OpType(opType string) *IndexService { 108 s.opType = opType 109 return s 110} 111 112// Parent is the ID of the parent document. 113func (s *IndexService) Parent(parent string) *IndexService { 114 s.parent = parent 115 return s 116} 117 118// Routing is a specific routing value. 119func (s *IndexService) Routing(routing string) *IndexService { 120 s.routing = routing 121 return s 122} 123 124// Timeout is an explicit operation timeout. 125func (s *IndexService) Timeout(timeout string) *IndexService { 126 s.timeout = timeout 127 return s 128} 129 130// Timestamp is an explicit timestamp for the document. 131func (s *IndexService) Timestamp(timestamp string) *IndexService { 132 s.timestamp = timestamp 133 return s 134} 135 136// VersionType is a specific version type. 137func (s *IndexService) VersionType(versionType string) *IndexService { 138 s.versionType = versionType 139 return s 140} 141 142// Pretty indicates that the JSON response be indented and human readable. 143func (s *IndexService) Pretty(pretty bool) *IndexService { 144 s.pretty = pretty 145 return s 146} 147 148// BodyJson is the document as a serializable JSON interface. 149func (s *IndexService) BodyJson(body interface{}) *IndexService { 150 s.bodyJson = body 151 return s 152} 153 154// BodyString is the document encoded as a string. 155func (s *IndexService) BodyString(body string) *IndexService { 156 s.bodyString = body 157 return s 158} 159 160// buildURL builds the URL for the operation. 161func (s *IndexService) buildURL() (string, string, url.Values, error) { 162 var err error 163 var method, path string 164 165 if s.id != "" { 166 // Create document with manual id 167 method = "PUT" 168 path, err = uritemplates.Expand("/{index}/{type}/{id}", map[string]string{ 169 "id": s.id, 170 "index": s.index, 171 "type": s.typ, 172 }) 173 } else { 174 // Automatic ID generation 175 // See https://www.elastic.co/guide/en/elasticsearch/reference/5.2/docs-index_.html#index-creation 176 method = "POST" 177 path, err = uritemplates.Expand("/{index}/{type}/", map[string]string{ 178 "index": s.index, 179 "type": s.typ, 180 }) 181 } 182 if err != nil { 183 return "", "", url.Values{}, err 184 } 185 186 // Add query string parameters 187 params := url.Values{} 188 if s.pretty { 189 params.Set("pretty", "1") 190 } 191 if s.waitForActiveShards != "" { 192 params.Set("wait_for_active_shards", s.waitForActiveShards) 193 } 194 if s.refresh != "" { 195 params.Set("refresh", s.refresh) 196 } 197 if s.opType != "" { 198 params.Set("op_type", s.opType) 199 } 200 if s.parent != "" { 201 params.Set("parent", s.parent) 202 } 203 if s.pipeline != "" { 204 params.Set("pipeline", s.pipeline) 205 } 206 if s.routing != "" { 207 params.Set("routing", s.routing) 208 } 209 if s.timeout != "" { 210 params.Set("timeout", s.timeout) 211 } 212 if s.timestamp != "" { 213 params.Set("timestamp", s.timestamp) 214 } 215 if s.ttl != "" { 216 params.Set("ttl", s.ttl) 217 } 218 if s.version != nil { 219 params.Set("version", fmt.Sprintf("%v", s.version)) 220 } 221 if s.versionType != "" { 222 params.Set("version_type", s.versionType) 223 } 224 return method, path, params, nil 225} 226 227// Validate checks if the operation is valid. 228func (s *IndexService) Validate() error { 229 var invalid []string 230 if s.index == "" { 231 invalid = append(invalid, "Index") 232 } 233 if s.typ == "" { 234 invalid = append(invalid, "Type") 235 } 236 if s.bodyString == "" && s.bodyJson == nil { 237 invalid = append(invalid, "BodyJson") 238 } 239 if len(invalid) > 0 { 240 return fmt.Errorf("missing required fields: %v", invalid) 241 } 242 return nil 243} 244 245// Do executes the operation. 246func (s *IndexService) Do(ctx context.Context) (*IndexResponse, error) { 247 // Check pre-conditions 248 if err := s.Validate(); err != nil { 249 return nil, err 250 } 251 252 // Get URL for request 253 method, path, params, err := s.buildURL() 254 if err != nil { 255 return nil, err 256 } 257 258 // Setup HTTP request body 259 var body interface{} 260 if s.bodyJson != nil { 261 body = s.bodyJson 262 } else { 263 body = s.bodyString 264 } 265 266 // Get HTTP response 267 res, err := s.client.PerformRequest(ctx, method, path, params, body) 268 if err != nil { 269 return nil, err 270 } 271 272 // Return operation response 273 ret := new(IndexResponse) 274 if err := s.client.decoder.Decode(res.Body, ret); err != nil { 275 return nil, err 276 } 277 return ret, nil 278} 279 280// IndexResponse is the result of indexing a document in Elasticsearch. 281type IndexResponse struct { 282 // TODO _shards { total, failed, successful } 283 Index string `json:"_index"` 284 Type string `json:"_type"` 285 Id string `json:"_id"` 286 Version int `json:"_version"` 287 Created bool `json:"created"` 288} 289