1// Copyright 2015 The etcd Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package client 16 17//go:generate codecgen -d 1819 -r "Node|Response|Nodes" -o keys.generated.go keys.go 18 19import ( 20 "context" 21 "encoding/json" 22 "errors" 23 "fmt" 24 "net/http" 25 "net/url" 26 "strconv" 27 "strings" 28 "time" 29 30 "github.com/ugorji/go/codec" 31 "go.etcd.io/etcd/pkg/pathutil" 32) 33 34const ( 35 ErrorCodeKeyNotFound = 100 36 ErrorCodeTestFailed = 101 37 ErrorCodeNotFile = 102 38 ErrorCodeNotDir = 104 39 ErrorCodeNodeExist = 105 40 ErrorCodeRootROnly = 107 41 ErrorCodeDirNotEmpty = 108 42 ErrorCodeUnauthorized = 110 43 44 ErrorCodePrevValueRequired = 201 45 ErrorCodeTTLNaN = 202 46 ErrorCodeIndexNaN = 203 47 ErrorCodeInvalidField = 209 48 ErrorCodeInvalidForm = 210 49 50 ErrorCodeRaftInternal = 300 51 ErrorCodeLeaderElect = 301 52 53 ErrorCodeWatcherCleared = 400 54 ErrorCodeEventIndexCleared = 401 55) 56 57type Error struct { 58 Code int `json:"errorCode"` 59 Message string `json:"message"` 60 Cause string `json:"cause"` 61 Index uint64 `json:"index"` 62} 63 64func (e Error) Error() string { 65 return fmt.Sprintf("%v: %v (%v) [%v]", e.Code, e.Message, e.Cause, e.Index) 66} 67 68var ( 69 ErrInvalidJSON = errors.New("client: response is invalid json. The endpoint is probably not valid etcd cluster endpoint.") 70 ErrEmptyBody = errors.New("client: response body is empty") 71) 72 73// PrevExistType is used to define an existence condition when setting 74// or deleting Nodes. 75type PrevExistType string 76 77const ( 78 PrevIgnore = PrevExistType("") 79 PrevExist = PrevExistType("true") 80 PrevNoExist = PrevExistType("false") 81) 82 83var ( 84 defaultV2KeysPrefix = "/v2/keys" 85) 86 87// NewKeysAPI builds a KeysAPI that interacts with etcd's key-value 88// API over HTTP. 89func NewKeysAPI(c Client) KeysAPI { 90 return NewKeysAPIWithPrefix(c, defaultV2KeysPrefix) 91} 92 93// NewKeysAPIWithPrefix acts like NewKeysAPI, but allows the caller 94// to provide a custom base URL path. This should only be used in 95// very rare cases. 96func NewKeysAPIWithPrefix(c Client, p string) KeysAPI { 97 return &httpKeysAPI{ 98 client: c, 99 prefix: p, 100 } 101} 102 103type KeysAPI interface { 104 // Get retrieves a set of Nodes from etcd 105 Get(ctx context.Context, key string, opts *GetOptions) (*Response, error) 106 107 // Set assigns a new value to a Node identified by a given key. The caller 108 // may define a set of conditions in the SetOptions. If SetOptions.Dir=true 109 // then value is ignored. 110 Set(ctx context.Context, key, value string, opts *SetOptions) (*Response, error) 111 112 // Delete removes a Node identified by the given key, optionally destroying 113 // all of its children as well. The caller may define a set of required 114 // conditions in an DeleteOptions object. 115 Delete(ctx context.Context, key string, opts *DeleteOptions) (*Response, error) 116 117 // Create is an alias for Set w/ PrevExist=false 118 Create(ctx context.Context, key, value string) (*Response, error) 119 120 // CreateInOrder is used to atomically create in-order keys within the given directory. 121 CreateInOrder(ctx context.Context, dir, value string, opts *CreateInOrderOptions) (*Response, error) 122 123 // Update is an alias for Set w/ PrevExist=true 124 Update(ctx context.Context, key, value string) (*Response, error) 125 126 // Watcher builds a new Watcher targeted at a specific Node identified 127 // by the given key. The Watcher may be configured at creation time 128 // through a WatcherOptions object. The returned Watcher is designed 129 // to emit events that happen to a Node, and optionally to its children. 130 Watcher(key string, opts *WatcherOptions) Watcher 131} 132 133type WatcherOptions struct { 134 // AfterIndex defines the index after-which the Watcher should 135 // start emitting events. For example, if a value of 5 is 136 // provided, the first event will have an index >= 6. 137 // 138 // Setting AfterIndex to 0 (default) means that the Watcher 139 // should start watching for events starting at the current 140 // index, whatever that may be. 141 AfterIndex uint64 142 143 // Recursive specifies whether or not the Watcher should emit 144 // events that occur in children of the given keyspace. If set 145 // to false (default), events will be limited to those that 146 // occur for the exact key. 147 Recursive bool 148} 149 150type CreateInOrderOptions struct { 151 // TTL defines a period of time after-which the Node should 152 // expire and no longer exist. Values <= 0 are ignored. Given 153 // that the zero-value is ignored, TTL cannot be used to set 154 // a TTL of 0. 155 TTL time.Duration 156} 157 158type SetOptions struct { 159 // PrevValue specifies what the current value of the Node must 160 // be in order for the Set operation to succeed. 161 // 162 // Leaving this field empty means that the caller wishes to 163 // ignore the current value of the Node. This cannot be used 164 // to compare the Node's current value to an empty string. 165 // 166 // PrevValue is ignored if Dir=true 167 PrevValue string 168 169 // PrevIndex indicates what the current ModifiedIndex of the 170 // Node must be in order for the Set operation to succeed. 171 // 172 // If PrevIndex is set to 0 (default), no comparison is made. 173 PrevIndex uint64 174 175 // PrevExist specifies whether the Node must currently exist 176 // (PrevExist) or not (PrevNoExist). If the caller does not 177 // care about existence, set PrevExist to PrevIgnore, or simply 178 // leave it unset. 179 PrevExist PrevExistType 180 181 // TTL defines a period of time after-which the Node should 182 // expire and no longer exist. Values <= 0 are ignored. Given 183 // that the zero-value is ignored, TTL cannot be used to set 184 // a TTL of 0. 185 TTL time.Duration 186 187 // Refresh set to true means a TTL value can be updated 188 // without firing a watch or changing the node value. A 189 // value must not be provided when refreshing a key. 190 Refresh bool 191 192 // Dir specifies whether or not this Node should be created as a directory. 193 Dir bool 194 195 // NoValueOnSuccess specifies whether the response contains the current value of the Node. 196 // If set, the response will only contain the current value when the request fails. 197 NoValueOnSuccess bool 198} 199 200type GetOptions struct { 201 // Recursive defines whether or not all children of the Node 202 // should be returned. 203 Recursive bool 204 205 // Sort instructs the server whether or not to sort the Nodes. 206 // If true, the Nodes are sorted alphabetically by key in 207 // ascending order (A to z). If false (default), the Nodes will 208 // not be sorted and the ordering used should not be considered 209 // predictable. 210 Sort bool 211 212 // Quorum specifies whether it gets the latest committed value that 213 // has been applied in quorum of members, which ensures external 214 // consistency (or linearizability). 215 Quorum bool 216} 217 218type DeleteOptions struct { 219 // PrevValue specifies what the current value of the Node must 220 // be in order for the Delete operation to succeed. 221 // 222 // Leaving this field empty means that the caller wishes to 223 // ignore the current value of the Node. This cannot be used 224 // to compare the Node's current value to an empty string. 225 PrevValue string 226 227 // PrevIndex indicates what the current ModifiedIndex of the 228 // Node must be in order for the Delete operation to succeed. 229 // 230 // If PrevIndex is set to 0 (default), no comparison is made. 231 PrevIndex uint64 232 233 // Recursive defines whether or not all children of the Node 234 // should be deleted. If set to true, all children of the Node 235 // identified by the given key will be deleted. If left unset 236 // or explicitly set to false, only a single Node will be 237 // deleted. 238 Recursive bool 239 240 // Dir specifies whether or not this Node should be removed as a directory. 241 Dir bool 242} 243 244type Watcher interface { 245 // Next blocks until an etcd event occurs, then returns a Response 246 // representing that event. The behavior of Next depends on the 247 // WatcherOptions used to construct the Watcher. Next is designed to 248 // be called repeatedly, each time blocking until a subsequent event 249 // is available. 250 // 251 // If the provided context is cancelled, Next will return a non-nil 252 // error. Any other failures encountered while waiting for the next 253 // event (connection issues, deserialization failures, etc) will 254 // also result in a non-nil error. 255 Next(context.Context) (*Response, error) 256} 257 258type Response struct { 259 // Action is the name of the operation that occurred. Possible values 260 // include get, set, delete, update, create, compareAndSwap, 261 // compareAndDelete and expire. 262 Action string `json:"action"` 263 264 // Node represents the state of the relevant etcd Node. 265 Node *Node `json:"node"` 266 267 // PrevNode represents the previous state of the Node. PrevNode is non-nil 268 // only if the Node existed before the action occurred and the action 269 // caused a change to the Node. 270 PrevNode *Node `json:"prevNode"` 271 272 // Index holds the cluster-level index at the time the Response was generated. 273 // This index is not tied to the Node(s) contained in this Response. 274 Index uint64 `json:"-"` 275 276 // ClusterID holds the cluster-level ID reported by the server. This 277 // should be different for different etcd clusters. 278 ClusterID string `json:"-"` 279} 280 281type Node struct { 282 // Key represents the unique location of this Node (e.g. "/foo/bar"). 283 Key string `json:"key"` 284 285 // Dir reports whether node describes a directory. 286 Dir bool `json:"dir,omitempty"` 287 288 // Value is the current data stored on this Node. If this Node 289 // is a directory, Value will be empty. 290 Value string `json:"value"` 291 292 // Nodes holds the children of this Node, only if this Node is a directory. 293 // This slice of will be arbitrarily deep (children, grandchildren, great- 294 // grandchildren, etc.) if a recursive Get or Watch request were made. 295 Nodes Nodes `json:"nodes"` 296 297 // CreatedIndex is the etcd index at-which this Node was created. 298 CreatedIndex uint64 `json:"createdIndex"` 299 300 // ModifiedIndex is the etcd index at-which this Node was last modified. 301 ModifiedIndex uint64 `json:"modifiedIndex"` 302 303 // Expiration is the server side expiration time of the key. 304 Expiration *time.Time `json:"expiration,omitempty"` 305 306 // TTL is the time to live of the key in second. 307 TTL int64 `json:"ttl,omitempty"` 308} 309 310func (n *Node) String() string { 311 return fmt.Sprintf("{Key: %s, CreatedIndex: %d, ModifiedIndex: %d, TTL: %d}", n.Key, n.CreatedIndex, n.ModifiedIndex, n.TTL) 312} 313 314// TTLDuration returns the Node's TTL as a time.Duration object 315func (n *Node) TTLDuration() time.Duration { 316 return time.Duration(n.TTL) * time.Second 317} 318 319type Nodes []*Node 320 321// interfaces for sorting 322 323func (ns Nodes) Len() int { return len(ns) } 324func (ns Nodes) Less(i, j int) bool { return ns[i].Key < ns[j].Key } 325func (ns Nodes) Swap(i, j int) { ns[i], ns[j] = ns[j], ns[i] } 326 327type httpKeysAPI struct { 328 client httpClient 329 prefix string 330} 331 332func (k *httpKeysAPI) Set(ctx context.Context, key, val string, opts *SetOptions) (*Response, error) { 333 act := &setAction{ 334 Prefix: k.prefix, 335 Key: key, 336 Value: val, 337 } 338 339 if opts != nil { 340 act.PrevValue = opts.PrevValue 341 act.PrevIndex = opts.PrevIndex 342 act.PrevExist = opts.PrevExist 343 act.TTL = opts.TTL 344 act.Refresh = opts.Refresh 345 act.Dir = opts.Dir 346 act.NoValueOnSuccess = opts.NoValueOnSuccess 347 } 348 349 doCtx := ctx 350 if act.PrevExist == PrevNoExist { 351 doCtx = context.WithValue(doCtx, &oneShotCtxValue, &oneShotCtxValue) 352 } 353 resp, body, err := k.client.Do(doCtx, act) 354 if err != nil { 355 return nil, err 356 } 357 358 return unmarshalHTTPResponse(resp.StatusCode, resp.Header, body) 359} 360 361func (k *httpKeysAPI) Create(ctx context.Context, key, val string) (*Response, error) { 362 return k.Set(ctx, key, val, &SetOptions{PrevExist: PrevNoExist}) 363} 364 365func (k *httpKeysAPI) CreateInOrder(ctx context.Context, dir, val string, opts *CreateInOrderOptions) (*Response, error) { 366 act := &createInOrderAction{ 367 Prefix: k.prefix, 368 Dir: dir, 369 Value: val, 370 } 371 372 if opts != nil { 373 act.TTL = opts.TTL 374 } 375 376 resp, body, err := k.client.Do(ctx, act) 377 if err != nil { 378 return nil, err 379 } 380 381 return unmarshalHTTPResponse(resp.StatusCode, resp.Header, body) 382} 383 384func (k *httpKeysAPI) Update(ctx context.Context, key, val string) (*Response, error) { 385 return k.Set(ctx, key, val, &SetOptions{PrevExist: PrevExist}) 386} 387 388func (k *httpKeysAPI) Delete(ctx context.Context, key string, opts *DeleteOptions) (*Response, error) { 389 act := &deleteAction{ 390 Prefix: k.prefix, 391 Key: key, 392 } 393 394 if opts != nil { 395 act.PrevValue = opts.PrevValue 396 act.PrevIndex = opts.PrevIndex 397 act.Dir = opts.Dir 398 act.Recursive = opts.Recursive 399 } 400 401 doCtx := context.WithValue(ctx, &oneShotCtxValue, &oneShotCtxValue) 402 resp, body, err := k.client.Do(doCtx, act) 403 if err != nil { 404 return nil, err 405 } 406 407 return unmarshalHTTPResponse(resp.StatusCode, resp.Header, body) 408} 409 410func (k *httpKeysAPI) Get(ctx context.Context, key string, opts *GetOptions) (*Response, error) { 411 act := &getAction{ 412 Prefix: k.prefix, 413 Key: key, 414 } 415 416 if opts != nil { 417 act.Recursive = opts.Recursive 418 act.Sorted = opts.Sort 419 act.Quorum = opts.Quorum 420 } 421 422 resp, body, err := k.client.Do(ctx, act) 423 if err != nil { 424 return nil, err 425 } 426 427 return unmarshalHTTPResponse(resp.StatusCode, resp.Header, body) 428} 429 430func (k *httpKeysAPI) Watcher(key string, opts *WatcherOptions) Watcher { 431 act := waitAction{ 432 Prefix: k.prefix, 433 Key: key, 434 } 435 436 if opts != nil { 437 act.Recursive = opts.Recursive 438 if opts.AfterIndex > 0 { 439 act.WaitIndex = opts.AfterIndex + 1 440 } 441 } 442 443 return &httpWatcher{ 444 client: k.client, 445 nextWait: act, 446 } 447} 448 449type httpWatcher struct { 450 client httpClient 451 nextWait waitAction 452} 453 454func (hw *httpWatcher) Next(ctx context.Context) (*Response, error) { 455 for { 456 httpresp, body, err := hw.client.Do(ctx, &hw.nextWait) 457 if err != nil { 458 return nil, err 459 } 460 461 resp, err := unmarshalHTTPResponse(httpresp.StatusCode, httpresp.Header, body) 462 if err != nil { 463 if err == ErrEmptyBody { 464 continue 465 } 466 return nil, err 467 } 468 469 hw.nextWait.WaitIndex = resp.Node.ModifiedIndex + 1 470 return resp, nil 471 } 472} 473 474// v2KeysURL forms a URL representing the location of a key. 475// The endpoint argument represents the base URL of an etcd 476// server. The prefix is the path needed to route from the 477// provided endpoint's path to the root of the keys API 478// (typically "/v2/keys"). 479func v2KeysURL(ep url.URL, prefix, key string) *url.URL { 480 // We concatenate all parts together manually. We cannot use 481 // path.Join because it does not reserve trailing slash. 482 // We call CanonicalURLPath to further cleanup the path. 483 if prefix != "" && prefix[0] != '/' { 484 prefix = "/" + prefix 485 } 486 if key != "" && key[0] != '/' { 487 key = "/" + key 488 } 489 ep.Path = pathutil.CanonicalURLPath(ep.Path + prefix + key) 490 return &ep 491} 492 493type getAction struct { 494 Prefix string 495 Key string 496 Recursive bool 497 Sorted bool 498 Quorum bool 499} 500 501func (g *getAction) HTTPRequest(ep url.URL) *http.Request { 502 u := v2KeysURL(ep, g.Prefix, g.Key) 503 504 params := u.Query() 505 params.Set("recursive", strconv.FormatBool(g.Recursive)) 506 params.Set("sorted", strconv.FormatBool(g.Sorted)) 507 params.Set("quorum", strconv.FormatBool(g.Quorum)) 508 u.RawQuery = params.Encode() 509 510 req, _ := http.NewRequest("GET", u.String(), nil) 511 return req 512} 513 514type waitAction struct { 515 Prefix string 516 Key string 517 WaitIndex uint64 518 Recursive bool 519} 520 521func (w *waitAction) HTTPRequest(ep url.URL) *http.Request { 522 u := v2KeysURL(ep, w.Prefix, w.Key) 523 524 params := u.Query() 525 params.Set("wait", "true") 526 params.Set("waitIndex", strconv.FormatUint(w.WaitIndex, 10)) 527 params.Set("recursive", strconv.FormatBool(w.Recursive)) 528 u.RawQuery = params.Encode() 529 530 req, _ := http.NewRequest("GET", u.String(), nil) 531 return req 532} 533 534type setAction struct { 535 Prefix string 536 Key string 537 Value string 538 PrevValue string 539 PrevIndex uint64 540 PrevExist PrevExistType 541 TTL time.Duration 542 Refresh bool 543 Dir bool 544 NoValueOnSuccess bool 545} 546 547func (a *setAction) HTTPRequest(ep url.URL) *http.Request { 548 u := v2KeysURL(ep, a.Prefix, a.Key) 549 550 params := u.Query() 551 form := url.Values{} 552 553 // we're either creating a directory or setting a key 554 if a.Dir { 555 params.Set("dir", strconv.FormatBool(a.Dir)) 556 } else { 557 // These options are only valid for setting a key 558 if a.PrevValue != "" { 559 params.Set("prevValue", a.PrevValue) 560 } 561 form.Add("value", a.Value) 562 } 563 564 // Options which apply to both setting a key and creating a dir 565 if a.PrevIndex != 0 { 566 params.Set("prevIndex", strconv.FormatUint(a.PrevIndex, 10)) 567 } 568 if a.PrevExist != PrevIgnore { 569 params.Set("prevExist", string(a.PrevExist)) 570 } 571 if a.TTL > 0 { 572 form.Add("ttl", strconv.FormatUint(uint64(a.TTL.Seconds()), 10)) 573 } 574 575 if a.Refresh { 576 form.Add("refresh", "true") 577 } 578 if a.NoValueOnSuccess { 579 params.Set("noValueOnSuccess", strconv.FormatBool(a.NoValueOnSuccess)) 580 } 581 582 u.RawQuery = params.Encode() 583 body := strings.NewReader(form.Encode()) 584 585 req, _ := http.NewRequest("PUT", u.String(), body) 586 req.Header.Set("Content-Type", "application/x-www-form-urlencoded") 587 588 return req 589} 590 591type deleteAction struct { 592 Prefix string 593 Key string 594 PrevValue string 595 PrevIndex uint64 596 Dir bool 597 Recursive bool 598} 599 600func (a *deleteAction) HTTPRequest(ep url.URL) *http.Request { 601 u := v2KeysURL(ep, a.Prefix, a.Key) 602 603 params := u.Query() 604 if a.PrevValue != "" { 605 params.Set("prevValue", a.PrevValue) 606 } 607 if a.PrevIndex != 0 { 608 params.Set("prevIndex", strconv.FormatUint(a.PrevIndex, 10)) 609 } 610 if a.Dir { 611 params.Set("dir", "true") 612 } 613 if a.Recursive { 614 params.Set("recursive", "true") 615 } 616 u.RawQuery = params.Encode() 617 618 req, _ := http.NewRequest("DELETE", u.String(), nil) 619 req.Header.Set("Content-Type", "application/x-www-form-urlencoded") 620 621 return req 622} 623 624type createInOrderAction struct { 625 Prefix string 626 Dir string 627 Value string 628 TTL time.Duration 629} 630 631func (a *createInOrderAction) HTTPRequest(ep url.URL) *http.Request { 632 u := v2KeysURL(ep, a.Prefix, a.Dir) 633 634 form := url.Values{} 635 form.Add("value", a.Value) 636 if a.TTL > 0 { 637 form.Add("ttl", strconv.FormatUint(uint64(a.TTL.Seconds()), 10)) 638 } 639 body := strings.NewReader(form.Encode()) 640 641 req, _ := http.NewRequest("POST", u.String(), body) 642 req.Header.Set("Content-Type", "application/x-www-form-urlencoded") 643 return req 644} 645 646func unmarshalHTTPResponse(code int, header http.Header, body []byte) (res *Response, err error) { 647 switch code { 648 case http.StatusOK, http.StatusCreated: 649 if len(body) == 0 { 650 return nil, ErrEmptyBody 651 } 652 res, err = unmarshalSuccessfulKeysResponse(header, body) 653 default: 654 err = unmarshalFailedKeysResponse(body) 655 } 656 return res, err 657} 658 659func unmarshalSuccessfulKeysResponse(header http.Header, body []byte) (*Response, error) { 660 var res Response 661 err := codec.NewDecoderBytes(body, new(codec.JsonHandle)).Decode(&res) 662 if err != nil { 663 return nil, ErrInvalidJSON 664 } 665 if header.Get("X-Etcd-Index") != "" { 666 res.Index, err = strconv.ParseUint(header.Get("X-Etcd-Index"), 10, 64) 667 if err != nil { 668 return nil, err 669 } 670 } 671 res.ClusterID = header.Get("X-Etcd-Cluster-ID") 672 return &res, nil 673} 674 675func unmarshalFailedKeysResponse(body []byte) error { 676 var etcdErr Error 677 if err := json.Unmarshal(body, &etcdErr); err != nil { 678 return ErrInvalidJSON 679 } 680 return etcdErr 681} 682