1// Copyright 2015 The etcd Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package client 16 17import ( 18 "context" 19 "encoding/json" 20 "errors" 21 "fmt" 22 "go.etcd.io/etcd/pkg/pathutil" 23 "net/http" 24 "net/url" 25 "strconv" 26 "strings" 27 "time" 28) 29 30const ( 31 ErrorCodeKeyNotFound = 100 32 ErrorCodeTestFailed = 101 33 ErrorCodeNotFile = 102 34 ErrorCodeNotDir = 104 35 ErrorCodeNodeExist = 105 36 ErrorCodeRootROnly = 107 37 ErrorCodeDirNotEmpty = 108 38 ErrorCodeUnauthorized = 110 39 40 ErrorCodePrevValueRequired = 201 41 ErrorCodeTTLNaN = 202 42 ErrorCodeIndexNaN = 203 43 ErrorCodeInvalidField = 209 44 ErrorCodeInvalidForm = 210 45 46 ErrorCodeRaftInternal = 300 47 ErrorCodeLeaderElect = 301 48 49 ErrorCodeWatcherCleared = 400 50 ErrorCodeEventIndexCleared = 401 51) 52 53type Error struct { 54 Code int `json:"errorCode"` 55 Message string `json:"message"` 56 Cause string `json:"cause"` 57 Index uint64 `json:"index"` 58} 59 60func (e Error) Error() string { 61 return fmt.Sprintf("%v: %v (%v) [%v]", e.Code, e.Message, e.Cause, e.Index) 62} 63 64var ( 65 ErrInvalidJSON = errors.New("client: response is invalid json. The endpoint is probably not valid etcd cluster endpoint") 66 ErrEmptyBody = errors.New("client: response body is empty") 67) 68 69// PrevExistType is used to define an existence condition when setting 70// or deleting Nodes. 71type PrevExistType string 72 73const ( 74 PrevIgnore = PrevExistType("") 75 PrevExist = PrevExistType("true") 76 PrevNoExist = PrevExistType("false") 77) 78 79var ( 80 defaultV2KeysPrefix = "/v2/keys" 81) 82 83// NewKeysAPI builds a KeysAPI that interacts with etcd's key-value 84// API over HTTP. 85func NewKeysAPI(c Client) KeysAPI { 86 return NewKeysAPIWithPrefix(c, defaultV2KeysPrefix) 87} 88 89// NewKeysAPIWithPrefix acts like NewKeysAPI, but allows the caller 90// to provide a custom base URL path. This should only be used in 91// very rare cases. 92func NewKeysAPIWithPrefix(c Client, p string) KeysAPI { 93 return &httpKeysAPI{ 94 client: c, 95 prefix: p, 96 } 97} 98 99type KeysAPI interface { 100 // Get retrieves a set of Nodes from etcd 101 Get(ctx context.Context, key string, opts *GetOptions) (*Response, error) 102 103 // Set assigns a new value to a Node identified by a given key. The caller 104 // may define a set of conditions in the SetOptions. If SetOptions.Dir=true 105 // then value is ignored. 106 Set(ctx context.Context, key, value string, opts *SetOptions) (*Response, error) 107 108 // Delete removes a Node identified by the given key, optionally destroying 109 // all of its children as well. The caller may define a set of required 110 // conditions in an DeleteOptions object. 111 Delete(ctx context.Context, key string, opts *DeleteOptions) (*Response, error) 112 113 // Create is an alias for Set w/ PrevExist=false 114 Create(ctx context.Context, key, value string) (*Response, error) 115 116 // CreateInOrder is used to atomically create in-order keys within the given directory. 117 CreateInOrder(ctx context.Context, dir, value string, opts *CreateInOrderOptions) (*Response, error) 118 119 // Update is an alias for Set w/ PrevExist=true 120 Update(ctx context.Context, key, value string) (*Response, error) 121 122 // Watcher builds a new Watcher targeted at a specific Node identified 123 // by the given key. The Watcher may be configured at creation time 124 // through a WatcherOptions object. The returned Watcher is designed 125 // to emit events that happen to a Node, and optionally to its children. 126 Watcher(key string, opts *WatcherOptions) Watcher 127} 128 129type WatcherOptions struct { 130 // AfterIndex defines the index after-which the Watcher should 131 // start emitting events. For example, if a value of 5 is 132 // provided, the first event will have an index >= 6. 133 // 134 // Setting AfterIndex to 0 (default) means that the Watcher 135 // should start watching for events starting at the current 136 // index, whatever that may be. 137 AfterIndex uint64 138 139 // Recursive specifies whether or not the Watcher should emit 140 // events that occur in children of the given keyspace. If set 141 // to false (default), events will be limited to those that 142 // occur for the exact key. 143 Recursive bool 144} 145 146type CreateInOrderOptions struct { 147 // TTL defines a period of time after-which the Node should 148 // expire and no longer exist. Values <= 0 are ignored. Given 149 // that the zero-value is ignored, TTL cannot be used to set 150 // a TTL of 0. 151 TTL time.Duration 152} 153 154type SetOptions struct { 155 // PrevValue specifies what the current value of the Node must 156 // be in order for the Set operation to succeed. 157 // 158 // Leaving this field empty means that the caller wishes to 159 // ignore the current value of the Node. This cannot be used 160 // to compare the Node's current value to an empty string. 161 // 162 // PrevValue is ignored if Dir=true 163 PrevValue string 164 165 // PrevIndex indicates what the current ModifiedIndex of the 166 // Node must be in order for the Set operation to succeed. 167 // 168 // If PrevIndex is set to 0 (default), no comparison is made. 169 PrevIndex uint64 170 171 // PrevExist specifies whether the Node must currently exist 172 // (PrevExist) or not (PrevNoExist). If the caller does not 173 // care about existence, set PrevExist to PrevIgnore, or simply 174 // leave it unset. 175 PrevExist PrevExistType 176 177 // TTL defines a period of time after-which the Node should 178 // expire and no longer exist. Values <= 0 are ignored. Given 179 // that the zero-value is ignored, TTL cannot be used to set 180 // a TTL of 0. 181 TTL time.Duration 182 183 // Refresh set to true means a TTL value can be updated 184 // without firing a watch or changing the node value. A 185 // value must not be provided when refreshing a key. 186 Refresh bool 187 188 // Dir specifies whether or not this Node should be created as a directory. 189 Dir bool 190 191 // NoValueOnSuccess specifies whether the response contains the current value of the Node. 192 // If set, the response will only contain the current value when the request fails. 193 NoValueOnSuccess bool 194} 195 196type GetOptions struct { 197 // Recursive defines whether or not all children of the Node 198 // should be returned. 199 Recursive bool 200 201 // Sort instructs the server whether or not to sort the Nodes. 202 // If true, the Nodes are sorted alphabetically by key in 203 // ascending order (A to z). If false (default), the Nodes will 204 // not be sorted and the ordering used should not be considered 205 // predictable. 206 Sort bool 207 208 // Quorum specifies whether it gets the latest committed value that 209 // has been applied in quorum of members, which ensures external 210 // consistency (or linearizability). 211 Quorum bool 212} 213 214type DeleteOptions struct { 215 // PrevValue specifies what the current value of the Node must 216 // be in order for the Delete operation to succeed. 217 // 218 // Leaving this field empty means that the caller wishes to 219 // ignore the current value of the Node. This cannot be used 220 // to compare the Node's current value to an empty string. 221 PrevValue string 222 223 // PrevIndex indicates what the current ModifiedIndex of the 224 // Node must be in order for the Delete operation to succeed. 225 // 226 // If PrevIndex is set to 0 (default), no comparison is made. 227 PrevIndex uint64 228 229 // Recursive defines whether or not all children of the Node 230 // should be deleted. If set to true, all children of the Node 231 // identified by the given key will be deleted. If left unset 232 // or explicitly set to false, only a single Node will be 233 // deleted. 234 Recursive bool 235 236 // Dir specifies whether or not this Node should be removed as a directory. 237 Dir bool 238} 239 240type Watcher interface { 241 // Next blocks until an etcd event occurs, then returns a Response 242 // representing that event. The behavior of Next depends on the 243 // WatcherOptions used to construct the Watcher. Next is designed to 244 // be called repeatedly, each time blocking until a subsequent event 245 // is available. 246 // 247 // If the provided context is cancelled, Next will return a non-nil 248 // error. Any other failures encountered while waiting for the next 249 // event (connection issues, deserialization failures, etc) will 250 // also result in a non-nil error. 251 Next(context.Context) (*Response, error) 252} 253 254type Response struct { 255 // Action is the name of the operation that occurred. Possible values 256 // include get, set, delete, update, create, compareAndSwap, 257 // compareAndDelete and expire. 258 Action string `json:"action"` 259 260 // Node represents the state of the relevant etcd Node. 261 Node *Node `json:"node"` 262 263 // PrevNode represents the previous state of the Node. PrevNode is non-nil 264 // only if the Node existed before the action occurred and the action 265 // caused a change to the Node. 266 PrevNode *Node `json:"prevNode"` 267 268 // Index holds the cluster-level index at the time the Response was generated. 269 // This index is not tied to the Node(s) contained in this Response. 270 Index uint64 `json:"-"` 271 272 // ClusterID holds the cluster-level ID reported by the server. This 273 // should be different for different etcd clusters. 274 ClusterID string `json:"-"` 275} 276 277type Node struct { 278 // Key represents the unique location of this Node (e.g. "/foo/bar"). 279 Key string `json:"key"` 280 281 // Dir reports whether node describes a directory. 282 Dir bool `json:"dir,omitempty"` 283 284 // Value is the current data stored on this Node. If this Node 285 // is a directory, Value will be empty. 286 Value string `json:"value"` 287 288 // Nodes holds the children of this Node, only if this Node is a directory. 289 // This slice of will be arbitrarily deep (children, grandchildren, great- 290 // grandchildren, etc.) if a recursive Get or Watch request were made. 291 Nodes Nodes `json:"nodes"` 292 293 // CreatedIndex is the etcd index at-which this Node was created. 294 CreatedIndex uint64 `json:"createdIndex"` 295 296 // ModifiedIndex is the etcd index at-which this Node was last modified. 297 ModifiedIndex uint64 `json:"modifiedIndex"` 298 299 // Expiration is the server side expiration time of the key. 300 Expiration *time.Time `json:"expiration,omitempty"` 301 302 // TTL is the time to live of the key in second. 303 TTL int64 `json:"ttl,omitempty"` 304} 305 306func (n *Node) String() string { 307 return fmt.Sprintf("{Key: %s, CreatedIndex: %d, ModifiedIndex: %d, TTL: %d}", n.Key, n.CreatedIndex, n.ModifiedIndex, n.TTL) 308} 309 310// TTLDuration returns the Node's TTL as a time.Duration object 311func (n *Node) TTLDuration() time.Duration { 312 return time.Duration(n.TTL) * time.Second 313} 314 315type Nodes []*Node 316 317// interfaces for sorting 318 319func (ns Nodes) Len() int { return len(ns) } 320func (ns Nodes) Less(i, j int) bool { return ns[i].Key < ns[j].Key } 321func (ns Nodes) Swap(i, j int) { ns[i], ns[j] = ns[j], ns[i] } 322 323type httpKeysAPI struct { 324 client httpClient 325 prefix string 326} 327 328func (k *httpKeysAPI) Set(ctx context.Context, key, val string, opts *SetOptions) (*Response, error) { 329 act := &setAction{ 330 Prefix: k.prefix, 331 Key: key, 332 Value: val, 333 } 334 335 if opts != nil { 336 act.PrevValue = opts.PrevValue 337 act.PrevIndex = opts.PrevIndex 338 act.PrevExist = opts.PrevExist 339 act.TTL = opts.TTL 340 act.Refresh = opts.Refresh 341 act.Dir = opts.Dir 342 act.NoValueOnSuccess = opts.NoValueOnSuccess 343 } 344 345 doCtx := ctx 346 if act.PrevExist == PrevNoExist { 347 doCtx = context.WithValue(doCtx, &oneShotCtxValue, &oneShotCtxValue) 348 } 349 resp, body, err := k.client.Do(doCtx, act) 350 if err != nil { 351 return nil, err 352 } 353 354 return unmarshalHTTPResponse(resp.StatusCode, resp.Header, body) 355} 356 357func (k *httpKeysAPI) Create(ctx context.Context, key, val string) (*Response, error) { 358 return k.Set(ctx, key, val, &SetOptions{PrevExist: PrevNoExist}) 359} 360 361func (k *httpKeysAPI) CreateInOrder(ctx context.Context, dir, val string, opts *CreateInOrderOptions) (*Response, error) { 362 act := &createInOrderAction{ 363 Prefix: k.prefix, 364 Dir: dir, 365 Value: val, 366 } 367 368 if opts != nil { 369 act.TTL = opts.TTL 370 } 371 372 resp, body, err := k.client.Do(ctx, act) 373 if err != nil { 374 return nil, err 375 } 376 377 return unmarshalHTTPResponse(resp.StatusCode, resp.Header, body) 378} 379 380func (k *httpKeysAPI) Update(ctx context.Context, key, val string) (*Response, error) { 381 return k.Set(ctx, key, val, &SetOptions{PrevExist: PrevExist}) 382} 383 384func (k *httpKeysAPI) Delete(ctx context.Context, key string, opts *DeleteOptions) (*Response, error) { 385 act := &deleteAction{ 386 Prefix: k.prefix, 387 Key: key, 388 } 389 390 if opts != nil { 391 act.PrevValue = opts.PrevValue 392 act.PrevIndex = opts.PrevIndex 393 act.Dir = opts.Dir 394 act.Recursive = opts.Recursive 395 } 396 397 doCtx := context.WithValue(ctx, &oneShotCtxValue, &oneShotCtxValue) 398 resp, body, err := k.client.Do(doCtx, act) 399 if err != nil { 400 return nil, err 401 } 402 403 return unmarshalHTTPResponse(resp.StatusCode, resp.Header, body) 404} 405 406func (k *httpKeysAPI) Get(ctx context.Context, key string, opts *GetOptions) (*Response, error) { 407 act := &getAction{ 408 Prefix: k.prefix, 409 Key: key, 410 } 411 412 if opts != nil { 413 act.Recursive = opts.Recursive 414 act.Sorted = opts.Sort 415 act.Quorum = opts.Quorum 416 } 417 418 resp, body, err := k.client.Do(ctx, act) 419 if err != nil { 420 return nil, err 421 } 422 423 return unmarshalHTTPResponse(resp.StatusCode, resp.Header, body) 424} 425 426func (k *httpKeysAPI) Watcher(key string, opts *WatcherOptions) Watcher { 427 act := waitAction{ 428 Prefix: k.prefix, 429 Key: key, 430 } 431 432 if opts != nil { 433 act.Recursive = opts.Recursive 434 if opts.AfterIndex > 0 { 435 act.WaitIndex = opts.AfterIndex + 1 436 } 437 } 438 439 return &httpWatcher{ 440 client: k.client, 441 nextWait: act, 442 } 443} 444 445type httpWatcher struct { 446 client httpClient 447 nextWait waitAction 448} 449 450func (hw *httpWatcher) Next(ctx context.Context) (*Response, error) { 451 for { 452 httpresp, body, err := hw.client.Do(ctx, &hw.nextWait) 453 if err != nil { 454 return nil, err 455 } 456 457 resp, err := unmarshalHTTPResponse(httpresp.StatusCode, httpresp.Header, body) 458 if err != nil { 459 if err == ErrEmptyBody { 460 continue 461 } 462 return nil, err 463 } 464 465 hw.nextWait.WaitIndex = resp.Node.ModifiedIndex + 1 466 return resp, nil 467 } 468} 469 470// v2KeysURL forms a URL representing the location of a key. 471// The endpoint argument represents the base URL of an etcd 472// server. The prefix is the path needed to route from the 473// provided endpoint's path to the root of the keys API 474// (typically "/v2/keys"). 475func v2KeysURL(ep url.URL, prefix, key string) *url.URL { 476 // We concatenate all parts together manually. We cannot use 477 // path.Join because it does not reserve trailing slash. 478 // We call CanonicalURLPath to further cleanup the path. 479 if prefix != "" && prefix[0] != '/' { 480 prefix = "/" + prefix 481 } 482 if key != "" && key[0] != '/' { 483 key = "/" + key 484 } 485 ep.Path = pathutil.CanonicalURLPath(ep.Path + prefix + key) 486 return &ep 487} 488 489type getAction struct { 490 Prefix string 491 Key string 492 Recursive bool 493 Sorted bool 494 Quorum bool 495} 496 497func (g *getAction) HTTPRequest(ep url.URL) *http.Request { 498 u := v2KeysURL(ep, g.Prefix, g.Key) 499 500 params := u.Query() 501 params.Set("recursive", strconv.FormatBool(g.Recursive)) 502 params.Set("sorted", strconv.FormatBool(g.Sorted)) 503 params.Set("quorum", strconv.FormatBool(g.Quorum)) 504 u.RawQuery = params.Encode() 505 506 req, _ := http.NewRequest("GET", u.String(), nil) 507 return req 508} 509 510type waitAction struct { 511 Prefix string 512 Key string 513 WaitIndex uint64 514 Recursive bool 515} 516 517func (w *waitAction) HTTPRequest(ep url.URL) *http.Request { 518 u := v2KeysURL(ep, w.Prefix, w.Key) 519 520 params := u.Query() 521 params.Set("wait", "true") 522 params.Set("waitIndex", strconv.FormatUint(w.WaitIndex, 10)) 523 params.Set("recursive", strconv.FormatBool(w.Recursive)) 524 u.RawQuery = params.Encode() 525 526 req, _ := http.NewRequest("GET", u.String(), nil) 527 return req 528} 529 530type setAction struct { 531 Prefix string 532 Key string 533 Value string 534 PrevValue string 535 PrevIndex uint64 536 PrevExist PrevExistType 537 TTL time.Duration 538 Refresh bool 539 Dir bool 540 NoValueOnSuccess bool 541} 542 543func (a *setAction) HTTPRequest(ep url.URL) *http.Request { 544 u := v2KeysURL(ep, a.Prefix, a.Key) 545 546 params := u.Query() 547 form := url.Values{} 548 549 // we're either creating a directory or setting a key 550 if a.Dir { 551 params.Set("dir", strconv.FormatBool(a.Dir)) 552 } else { 553 // These options are only valid for setting a key 554 if a.PrevValue != "" { 555 params.Set("prevValue", a.PrevValue) 556 } 557 form.Add("value", a.Value) 558 } 559 560 // Options which apply to both setting a key and creating a dir 561 if a.PrevIndex != 0 { 562 params.Set("prevIndex", strconv.FormatUint(a.PrevIndex, 10)) 563 } 564 if a.PrevExist != PrevIgnore { 565 params.Set("prevExist", string(a.PrevExist)) 566 } 567 if a.TTL > 0 { 568 form.Add("ttl", strconv.FormatUint(uint64(a.TTL.Seconds()), 10)) 569 } 570 571 if a.Refresh { 572 form.Add("refresh", "true") 573 } 574 if a.NoValueOnSuccess { 575 params.Set("noValueOnSuccess", strconv.FormatBool(a.NoValueOnSuccess)) 576 } 577 578 u.RawQuery = params.Encode() 579 body := strings.NewReader(form.Encode()) 580 581 req, _ := http.NewRequest("PUT", u.String(), body) 582 req.Header.Set("Content-Type", "application/x-www-form-urlencoded") 583 584 return req 585} 586 587type deleteAction struct { 588 Prefix string 589 Key string 590 PrevValue string 591 PrevIndex uint64 592 Dir bool 593 Recursive bool 594} 595 596func (a *deleteAction) HTTPRequest(ep url.URL) *http.Request { 597 u := v2KeysURL(ep, a.Prefix, a.Key) 598 599 params := u.Query() 600 if a.PrevValue != "" { 601 params.Set("prevValue", a.PrevValue) 602 } 603 if a.PrevIndex != 0 { 604 params.Set("prevIndex", strconv.FormatUint(a.PrevIndex, 10)) 605 } 606 if a.Dir { 607 params.Set("dir", "true") 608 } 609 if a.Recursive { 610 params.Set("recursive", "true") 611 } 612 u.RawQuery = params.Encode() 613 614 req, _ := http.NewRequest("DELETE", u.String(), nil) 615 req.Header.Set("Content-Type", "application/x-www-form-urlencoded") 616 617 return req 618} 619 620type createInOrderAction struct { 621 Prefix string 622 Dir string 623 Value string 624 TTL time.Duration 625} 626 627func (a *createInOrderAction) HTTPRequest(ep url.URL) *http.Request { 628 u := v2KeysURL(ep, a.Prefix, a.Dir) 629 630 form := url.Values{} 631 form.Add("value", a.Value) 632 if a.TTL > 0 { 633 form.Add("ttl", strconv.FormatUint(uint64(a.TTL.Seconds()), 10)) 634 } 635 body := strings.NewReader(form.Encode()) 636 637 req, _ := http.NewRequest("POST", u.String(), body) 638 req.Header.Set("Content-Type", "application/x-www-form-urlencoded") 639 return req 640} 641 642func unmarshalHTTPResponse(code int, header http.Header, body []byte) (res *Response, err error) { 643 switch code { 644 case http.StatusOK, http.StatusCreated: 645 if len(body) == 0 { 646 return nil, ErrEmptyBody 647 } 648 res, err = unmarshalSuccessfulKeysResponse(header, body) 649 default: 650 err = unmarshalFailedKeysResponse(body) 651 } 652 return res, err 653} 654 655var jsonIterator = caseSensitiveJsonIterator() 656 657func unmarshalSuccessfulKeysResponse(header http.Header, body []byte) (*Response, error) { 658 var res Response 659 err := jsonIterator.Unmarshal(body, &res) 660 if err != nil { 661 return nil, ErrInvalidJSON 662 } 663 if header.Get("X-Etcd-Index") != "" { 664 res.Index, err = strconv.ParseUint(header.Get("X-Etcd-Index"), 10, 64) 665 if err != nil { 666 return nil, err 667 } 668 } 669 res.ClusterID = header.Get("X-Etcd-Cluster-ID") 670 return &res, nil 671} 672 673func unmarshalFailedKeysResponse(body []byte) error { 674 var etcdErr Error 675 if err := json.Unmarshal(body, &etcdErr); err != nil { 676 return ErrInvalidJSON 677 } 678 return etcdErr 679} 680