1// Copyright 2015 The etcd Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package v2http 16 17import ( 18 "encoding/json" 19 "errors" 20 "expvar" 21 "fmt" 22 "io/ioutil" 23 "net/http" 24 "net/url" 25 "path" 26 "strconv" 27 "strings" 28 "time" 29 30 etcdErr "github.com/coreos/etcd/error" 31 "github.com/coreos/etcd/etcdserver" 32 "github.com/coreos/etcd/etcdserver/api" 33 "github.com/coreos/etcd/etcdserver/api/v2http/httptypes" 34 "github.com/coreos/etcd/etcdserver/auth" 35 "github.com/coreos/etcd/etcdserver/etcdserverpb" 36 "github.com/coreos/etcd/etcdserver/membership" 37 "github.com/coreos/etcd/etcdserver/stats" 38 "github.com/coreos/etcd/pkg/types" 39 "github.com/coreos/etcd/raft" 40 "github.com/coreos/etcd/store" 41 "github.com/coreos/etcd/version" 42 "github.com/coreos/pkg/capnslog" 43 "github.com/jonboulle/clockwork" 44 "github.com/prometheus/client_golang/prometheus" 45 "golang.org/x/net/context" 46) 47 48const ( 49 authPrefix = "/v2/auth" 50 keysPrefix = "/v2/keys" 51 deprecatedMachinesPrefix = "/v2/machines" 52 membersPrefix = "/v2/members" 53 statsPrefix = "/v2/stats" 54 varsPath = "/debug/vars" 55 metricsPath = "/metrics" 56 healthPath = "/health" 57 versionPath = "/version" 58 configPath = "/config" 59) 60 61// NewClientHandler generates a muxed http.Handler with the given parameters to serve etcd client requests. 62func NewClientHandler(server *etcdserver.EtcdServer, timeout time.Duration) http.Handler { 63 sec := auth.NewStore(server, timeout) 64 65 kh := &keysHandler{ 66 sec: sec, 67 server: server, 68 cluster: server.Cluster(), 69 timer: server, 70 timeout: timeout, 71 clientCertAuthEnabled: server.Cfg.ClientCertAuthEnabled, 72 } 73 74 sh := &statsHandler{ 75 stats: server, 76 } 77 78 mh := &membersHandler{ 79 sec: sec, 80 server: server, 81 cluster: server.Cluster(), 82 timeout: timeout, 83 clock: clockwork.NewRealClock(), 84 clientCertAuthEnabled: server.Cfg.ClientCertAuthEnabled, 85 } 86 87 dmh := &deprecatedMachinesHandler{ 88 cluster: server.Cluster(), 89 } 90 91 sech := &authHandler{ 92 sec: sec, 93 cluster: server.Cluster(), 94 clientCertAuthEnabled: server.Cfg.ClientCertAuthEnabled, 95 } 96 97 mux := http.NewServeMux() 98 mux.HandleFunc("/", http.NotFound) 99 mux.Handle(healthPath, healthHandler(server)) 100 mux.HandleFunc(versionPath, versionHandler(server.Cluster(), serveVersion)) 101 mux.Handle(keysPrefix, kh) 102 mux.Handle(keysPrefix+"/", kh) 103 mux.HandleFunc(statsPrefix+"/store", sh.serveStore) 104 mux.HandleFunc(statsPrefix+"/self", sh.serveSelf) 105 mux.HandleFunc(statsPrefix+"/leader", sh.serveLeader) 106 mux.HandleFunc(varsPath, serveVars) 107 mux.HandleFunc(configPath+"/local/log", logHandleFunc) 108 mux.Handle(metricsPath, prometheus.Handler()) 109 mux.Handle(membersPrefix, mh) 110 mux.Handle(membersPrefix+"/", mh) 111 mux.Handle(deprecatedMachinesPrefix, dmh) 112 handleAuth(mux, sech) 113 114 return requestLogger(mux) 115} 116 117type keysHandler struct { 118 sec auth.Store 119 server etcdserver.Server 120 cluster api.Cluster 121 timer etcdserver.RaftTimer 122 timeout time.Duration 123 clientCertAuthEnabled bool 124} 125 126func (h *keysHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { 127 if !allowMethod(w, r.Method, "HEAD", "GET", "PUT", "POST", "DELETE") { 128 return 129 } 130 131 w.Header().Set("X-Etcd-Cluster-ID", h.cluster.ID().String()) 132 133 ctx, cancel := context.WithTimeout(context.Background(), h.timeout) 134 defer cancel() 135 clock := clockwork.NewRealClock() 136 startTime := clock.Now() 137 rr, noValueOnSuccess, err := parseKeyRequest(r, clock) 138 if err != nil { 139 writeKeyError(w, err) 140 return 141 } 142 // The path must be valid at this point (we've parsed the request successfully). 143 if !hasKeyPrefixAccess(h.sec, r, r.URL.Path[len(keysPrefix):], rr.Recursive, h.clientCertAuthEnabled) { 144 writeKeyNoAuth(w) 145 return 146 } 147 if !rr.Wait { 148 reportRequestReceived(rr) 149 } 150 resp, err := h.server.Do(ctx, rr) 151 if err != nil { 152 err = trimErrorPrefix(err, etcdserver.StoreKeysPrefix) 153 writeKeyError(w, err) 154 reportRequestFailed(rr, err) 155 return 156 } 157 switch { 158 case resp.Event != nil: 159 if err := writeKeyEvent(w, resp.Event, noValueOnSuccess, h.timer); err != nil { 160 // Should never be reached 161 plog.Errorf("error writing event (%v)", err) 162 } 163 reportRequestCompleted(rr, resp, startTime) 164 case resp.Watcher != nil: 165 ctx, cancel := context.WithTimeout(context.Background(), defaultWatchTimeout) 166 defer cancel() 167 handleKeyWatch(ctx, w, resp.Watcher, rr.Stream, h.timer) 168 default: 169 writeKeyError(w, errors.New("received response with no Event/Watcher!")) 170 } 171} 172 173type deprecatedMachinesHandler struct { 174 cluster api.Cluster 175} 176 177func (h *deprecatedMachinesHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { 178 if !allowMethod(w, r.Method, "GET", "HEAD") { 179 return 180 } 181 endpoints := h.cluster.ClientURLs() 182 w.Write([]byte(strings.Join(endpoints, ", "))) 183} 184 185type membersHandler struct { 186 sec auth.Store 187 server etcdserver.Server 188 cluster api.Cluster 189 timeout time.Duration 190 clock clockwork.Clock 191 clientCertAuthEnabled bool 192} 193 194func (h *membersHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { 195 if !allowMethod(w, r.Method, "GET", "POST", "DELETE", "PUT") { 196 return 197 } 198 if !hasWriteRootAccess(h.sec, r, h.clientCertAuthEnabled) { 199 writeNoAuth(w, r) 200 return 201 } 202 w.Header().Set("X-Etcd-Cluster-ID", h.cluster.ID().String()) 203 204 ctx, cancel := context.WithTimeout(context.Background(), h.timeout) 205 defer cancel() 206 207 switch r.Method { 208 case "GET": 209 switch trimPrefix(r.URL.Path, membersPrefix) { 210 case "": 211 mc := newMemberCollection(h.cluster.Members()) 212 w.Header().Set("Content-Type", "application/json") 213 if err := json.NewEncoder(w).Encode(mc); err != nil { 214 plog.Warningf("failed to encode members response (%v)", err) 215 } 216 case "leader": 217 id := h.server.Leader() 218 if id == 0 { 219 writeError(w, r, httptypes.NewHTTPError(http.StatusServiceUnavailable, "During election")) 220 return 221 } 222 m := newMember(h.cluster.Member(id)) 223 w.Header().Set("Content-Type", "application/json") 224 if err := json.NewEncoder(w).Encode(m); err != nil { 225 plog.Warningf("failed to encode members response (%v)", err) 226 } 227 default: 228 writeError(w, r, httptypes.NewHTTPError(http.StatusNotFound, "Not found")) 229 } 230 case "POST": 231 req := httptypes.MemberCreateRequest{} 232 if ok := unmarshalRequest(r, &req, w); !ok { 233 return 234 } 235 now := h.clock.Now() 236 m := membership.NewMember("", req.PeerURLs, "", &now) 237 err := h.server.AddMember(ctx, *m) 238 switch { 239 case err == membership.ErrIDExists || err == membership.ErrPeerURLexists: 240 writeError(w, r, httptypes.NewHTTPError(http.StatusConflict, err.Error())) 241 return 242 case err != nil: 243 plog.Errorf("error adding member %s (%v)", m.ID, err) 244 writeError(w, r, err) 245 return 246 } 247 res := newMember(m) 248 w.Header().Set("Content-Type", "application/json") 249 w.WriteHeader(http.StatusCreated) 250 if err := json.NewEncoder(w).Encode(res); err != nil { 251 plog.Warningf("failed to encode members response (%v)", err) 252 } 253 case "DELETE": 254 id, ok := getID(r.URL.Path, w) 255 if !ok { 256 return 257 } 258 err := h.server.RemoveMember(ctx, uint64(id)) 259 switch { 260 case err == membership.ErrIDRemoved: 261 writeError(w, r, httptypes.NewHTTPError(http.StatusGone, fmt.Sprintf("Member permanently removed: %s", id))) 262 case err == membership.ErrIDNotFound: 263 writeError(w, r, httptypes.NewHTTPError(http.StatusNotFound, fmt.Sprintf("No such member: %s", id))) 264 case err != nil: 265 plog.Errorf("error removing member %s (%v)", id, err) 266 writeError(w, r, err) 267 default: 268 w.WriteHeader(http.StatusNoContent) 269 } 270 case "PUT": 271 id, ok := getID(r.URL.Path, w) 272 if !ok { 273 return 274 } 275 req := httptypes.MemberUpdateRequest{} 276 if ok := unmarshalRequest(r, &req, w); !ok { 277 return 278 } 279 m := membership.Member{ 280 ID: id, 281 RaftAttributes: membership.RaftAttributes{PeerURLs: req.PeerURLs.StringSlice()}, 282 } 283 err := h.server.UpdateMember(ctx, m) 284 switch { 285 case err == membership.ErrPeerURLexists: 286 writeError(w, r, httptypes.NewHTTPError(http.StatusConflict, err.Error())) 287 case err == membership.ErrIDNotFound: 288 writeError(w, r, httptypes.NewHTTPError(http.StatusNotFound, fmt.Sprintf("No such member: %s", id))) 289 case err != nil: 290 plog.Errorf("error updating member %s (%v)", m.ID, err) 291 writeError(w, r, err) 292 default: 293 w.WriteHeader(http.StatusNoContent) 294 } 295 } 296} 297 298type statsHandler struct { 299 stats stats.Stats 300} 301 302func (h *statsHandler) serveStore(w http.ResponseWriter, r *http.Request) { 303 if !allowMethod(w, r.Method, "GET") { 304 return 305 } 306 w.Header().Set("Content-Type", "application/json") 307 w.Write(h.stats.StoreStats()) 308} 309 310func (h *statsHandler) serveSelf(w http.ResponseWriter, r *http.Request) { 311 if !allowMethod(w, r.Method, "GET") { 312 return 313 } 314 w.Header().Set("Content-Type", "application/json") 315 w.Write(h.stats.SelfStats()) 316} 317 318func (h *statsHandler) serveLeader(w http.ResponseWriter, r *http.Request) { 319 if !allowMethod(w, r.Method, "GET") { 320 return 321 } 322 stats := h.stats.LeaderStats() 323 if stats == nil { 324 writeError(w, r, httptypes.NewHTTPError(http.StatusForbidden, "not current leader")) 325 return 326 } 327 w.Header().Set("Content-Type", "application/json") 328 w.Write(stats) 329} 330 331func serveVars(w http.ResponseWriter, r *http.Request) { 332 if !allowMethod(w, r.Method, "GET") { 333 return 334 } 335 336 w.Header().Set("Content-Type", "application/json; charset=utf-8") 337 fmt.Fprintf(w, "{\n") 338 first := true 339 expvar.Do(func(kv expvar.KeyValue) { 340 if !first { 341 fmt.Fprintf(w, ",\n") 342 } 343 first = false 344 fmt.Fprintf(w, "%q: %s", kv.Key, kv.Value) 345 }) 346 fmt.Fprintf(w, "\n}\n") 347} 348 349var ( 350 healthSuccess = prometheus.NewCounter(prometheus.CounterOpts{ 351 Namespace: "etcd", 352 Subsystem: "server", 353 Name: "health_success", 354 Help: "The total number of successful health checks", 355 }) 356 healthFailed = prometheus.NewCounter(prometheus.CounterOpts{ 357 Namespace: "etcd", 358 Subsystem: "server", 359 Name: "health_failures", 360 Help: "The total number of failed health checks", 361 }) 362) 363 364func init() { 365 prometheus.MustRegister(healthSuccess) 366 prometheus.MustRegister(healthFailed) 367} 368 369func healthHandler(server *etcdserver.EtcdServer) http.HandlerFunc { 370 return func(w http.ResponseWriter, r *http.Request) { 371 if !allowMethod(w, r.Method, "GET") { 372 return 373 } 374 if uint64(server.Leader()) == raft.None { 375 http.Error(w, `{"health": "false"}`, http.StatusServiceUnavailable) 376 healthFailed.Inc() 377 return 378 } 379 ctx, cancel := context.WithTimeout(context.Background(), time.Second) 380 defer cancel() 381 if _, err := server.Do(ctx, etcdserverpb.Request{Method: "QGET"}); err != nil { 382 http.Error(w, `{"health": "false"}`, http.StatusServiceUnavailable) 383 healthFailed.Inc() 384 return 385 } 386 w.WriteHeader(http.StatusOK) 387 w.Write([]byte(`{"health": "true"}`)) 388 healthSuccess.Inc() 389 } 390} 391 392func versionHandler(c api.Cluster, fn func(http.ResponseWriter, *http.Request, string)) http.HandlerFunc { 393 return func(w http.ResponseWriter, r *http.Request) { 394 v := c.Version() 395 if v != nil { 396 fn(w, r, v.String()) 397 } else { 398 fn(w, r, "not_decided") 399 } 400 } 401} 402 403func serveVersion(w http.ResponseWriter, r *http.Request, clusterV string) { 404 if !allowMethod(w, r.Method, "GET") { 405 return 406 } 407 vs := version.Versions{ 408 Server: version.Version, 409 Cluster: clusterV, 410 } 411 412 w.Header().Set("Content-Type", "application/json") 413 b, err := json.Marshal(&vs) 414 if err != nil { 415 plog.Panicf("cannot marshal versions to json (%v)", err) 416 } 417 w.Write(b) 418} 419 420func logHandleFunc(w http.ResponseWriter, r *http.Request) { 421 if !allowMethod(w, r.Method, "PUT") { 422 return 423 } 424 425 in := struct{ Level string }{} 426 427 d := json.NewDecoder(r.Body) 428 if err := d.Decode(&in); err != nil { 429 writeError(w, r, httptypes.NewHTTPError(http.StatusBadRequest, "Invalid json body")) 430 return 431 } 432 433 logl, err := capnslog.ParseLevel(strings.ToUpper(in.Level)) 434 if err != nil { 435 writeError(w, r, httptypes.NewHTTPError(http.StatusBadRequest, "Invalid log level "+in.Level)) 436 return 437 } 438 439 plog.Noticef("globalLogLevel set to %q", logl.String()) 440 capnslog.SetGlobalLogLevel(logl) 441 w.WriteHeader(http.StatusNoContent) 442} 443 444// parseKeyRequest converts a received http.Request on keysPrefix to 445// a server Request, performing validation of supplied fields as appropriate. 446// If any validation fails, an empty Request and non-nil error is returned. 447func parseKeyRequest(r *http.Request, clock clockwork.Clock) (etcdserverpb.Request, bool, error) { 448 noValueOnSuccess := false 449 emptyReq := etcdserverpb.Request{} 450 451 err := r.ParseForm() 452 if err != nil { 453 return emptyReq, false, etcdErr.NewRequestError( 454 etcdErr.EcodeInvalidForm, 455 err.Error(), 456 ) 457 } 458 459 if !strings.HasPrefix(r.URL.Path, keysPrefix) { 460 return emptyReq, false, etcdErr.NewRequestError( 461 etcdErr.EcodeInvalidForm, 462 "incorrect key prefix", 463 ) 464 } 465 p := path.Join(etcdserver.StoreKeysPrefix, r.URL.Path[len(keysPrefix):]) 466 467 var pIdx, wIdx uint64 468 if pIdx, err = getUint64(r.Form, "prevIndex"); err != nil { 469 return emptyReq, false, etcdErr.NewRequestError( 470 etcdErr.EcodeIndexNaN, 471 `invalid value for "prevIndex"`, 472 ) 473 } 474 if wIdx, err = getUint64(r.Form, "waitIndex"); err != nil { 475 return emptyReq, false, etcdErr.NewRequestError( 476 etcdErr.EcodeIndexNaN, 477 `invalid value for "waitIndex"`, 478 ) 479 } 480 481 var rec, sort, wait, dir, quorum, stream bool 482 if rec, err = getBool(r.Form, "recursive"); err != nil { 483 return emptyReq, false, etcdErr.NewRequestError( 484 etcdErr.EcodeInvalidField, 485 `invalid value for "recursive"`, 486 ) 487 } 488 if sort, err = getBool(r.Form, "sorted"); err != nil { 489 return emptyReq, false, etcdErr.NewRequestError( 490 etcdErr.EcodeInvalidField, 491 `invalid value for "sorted"`, 492 ) 493 } 494 if wait, err = getBool(r.Form, "wait"); err != nil { 495 return emptyReq, false, etcdErr.NewRequestError( 496 etcdErr.EcodeInvalidField, 497 `invalid value for "wait"`, 498 ) 499 } 500 // TODO(jonboulle): define what parameters dir is/isn't compatible with? 501 if dir, err = getBool(r.Form, "dir"); err != nil { 502 return emptyReq, false, etcdErr.NewRequestError( 503 etcdErr.EcodeInvalidField, 504 `invalid value for "dir"`, 505 ) 506 } 507 if quorum, err = getBool(r.Form, "quorum"); err != nil { 508 return emptyReq, false, etcdErr.NewRequestError( 509 etcdErr.EcodeInvalidField, 510 `invalid value for "quorum"`, 511 ) 512 } 513 if stream, err = getBool(r.Form, "stream"); err != nil { 514 return emptyReq, false, etcdErr.NewRequestError( 515 etcdErr.EcodeInvalidField, 516 `invalid value for "stream"`, 517 ) 518 } 519 520 if wait && r.Method != "GET" { 521 return emptyReq, false, etcdErr.NewRequestError( 522 etcdErr.EcodeInvalidField, 523 `"wait" can only be used with GET requests`, 524 ) 525 } 526 527 pV := r.FormValue("prevValue") 528 if _, ok := r.Form["prevValue"]; ok && pV == "" { 529 return emptyReq, false, etcdErr.NewRequestError( 530 etcdErr.EcodePrevValueRequired, 531 `"prevValue" cannot be empty`, 532 ) 533 } 534 535 if noValueOnSuccess, err = getBool(r.Form, "noValueOnSuccess"); err != nil { 536 return emptyReq, false, etcdErr.NewRequestError( 537 etcdErr.EcodeInvalidField, 538 `invalid value for "noValueOnSuccess"`, 539 ) 540 } 541 542 // TTL is nullable, so leave it null if not specified 543 // or an empty string 544 var ttl *uint64 545 if len(r.FormValue("ttl")) > 0 { 546 i, err := getUint64(r.Form, "ttl") 547 if err != nil { 548 return emptyReq, false, etcdErr.NewRequestError( 549 etcdErr.EcodeTTLNaN, 550 `invalid value for "ttl"`, 551 ) 552 } 553 ttl = &i 554 } 555 556 // prevExist is nullable, so leave it null if not specified 557 var pe *bool 558 if _, ok := r.Form["prevExist"]; ok { 559 bv, err := getBool(r.Form, "prevExist") 560 if err != nil { 561 return emptyReq, false, etcdErr.NewRequestError( 562 etcdErr.EcodeInvalidField, 563 "invalid value for prevExist", 564 ) 565 } 566 pe = &bv 567 } 568 569 // refresh is nullable, so leave it null if not specified 570 var refresh *bool 571 if _, ok := r.Form["refresh"]; ok { 572 bv, err := getBool(r.Form, "refresh") 573 if err != nil { 574 return emptyReq, false, etcdErr.NewRequestError( 575 etcdErr.EcodeInvalidField, 576 "invalid value for refresh", 577 ) 578 } 579 refresh = &bv 580 if refresh != nil && *refresh { 581 val := r.FormValue("value") 582 if _, ok := r.Form["value"]; ok && val != "" { 583 return emptyReq, false, etcdErr.NewRequestError( 584 etcdErr.EcodeRefreshValue, 585 `A value was provided on a refresh`, 586 ) 587 } 588 if ttl == nil { 589 return emptyReq, false, etcdErr.NewRequestError( 590 etcdErr.EcodeRefreshTTLRequired, 591 `No TTL value set`, 592 ) 593 } 594 } 595 } 596 597 rr := etcdserverpb.Request{ 598 Method: r.Method, 599 Path: p, 600 Val: r.FormValue("value"), 601 Dir: dir, 602 PrevValue: pV, 603 PrevIndex: pIdx, 604 PrevExist: pe, 605 Wait: wait, 606 Since: wIdx, 607 Recursive: rec, 608 Sorted: sort, 609 Quorum: quorum, 610 Stream: stream, 611 } 612 613 if pe != nil { 614 rr.PrevExist = pe 615 } 616 617 if refresh != nil { 618 rr.Refresh = refresh 619 } 620 621 // Null TTL is equivalent to unset Expiration 622 if ttl != nil { 623 expr := time.Duration(*ttl) * time.Second 624 rr.Expiration = clock.Now().Add(expr).UnixNano() 625 } 626 627 return rr, noValueOnSuccess, nil 628} 629 630// writeKeyEvent trims the prefix of key path in a single Event under 631// StoreKeysPrefix, serializes it and writes the resulting JSON to the given 632// ResponseWriter, along with the appropriate headers. 633func writeKeyEvent(w http.ResponseWriter, ev *store.Event, noValueOnSuccess bool, rt etcdserver.RaftTimer) error { 634 if ev == nil { 635 return errors.New("cannot write empty Event!") 636 } 637 w.Header().Set("Content-Type", "application/json") 638 w.Header().Set("X-Etcd-Index", fmt.Sprint(ev.EtcdIndex)) 639 w.Header().Set("X-Raft-Index", fmt.Sprint(rt.Index())) 640 w.Header().Set("X-Raft-Term", fmt.Sprint(rt.Term())) 641 642 if ev.IsCreated() { 643 w.WriteHeader(http.StatusCreated) 644 } 645 646 ev = trimEventPrefix(ev, etcdserver.StoreKeysPrefix) 647 if noValueOnSuccess && 648 (ev.Action == store.Set || ev.Action == store.CompareAndSwap || 649 ev.Action == store.Create || ev.Action == store.Update) { 650 ev.Node = nil 651 ev.PrevNode = nil 652 } 653 return json.NewEncoder(w).Encode(ev) 654} 655 656func writeKeyNoAuth(w http.ResponseWriter) { 657 e := etcdErr.NewError(etcdErr.EcodeUnauthorized, "Insufficient credentials", 0) 658 e.WriteTo(w) 659} 660 661// writeKeyError logs and writes the given Error to the ResponseWriter. 662// If Error is not an etcdErr, the error will be converted to an etcd error. 663func writeKeyError(w http.ResponseWriter, err error) { 664 if err == nil { 665 return 666 } 667 switch e := err.(type) { 668 case *etcdErr.Error: 669 e.WriteTo(w) 670 default: 671 switch err { 672 case etcdserver.ErrTimeoutDueToLeaderFail, etcdserver.ErrTimeoutDueToConnectionLost: 673 mlog.MergeError(err) 674 default: 675 mlog.MergeErrorf("got unexpected response error (%v)", err) 676 } 677 ee := etcdErr.NewError(etcdErr.EcodeRaftInternal, err.Error(), 0) 678 ee.WriteTo(w) 679 } 680} 681 682func handleKeyWatch(ctx context.Context, w http.ResponseWriter, wa store.Watcher, stream bool, rt etcdserver.RaftTimer) { 683 defer wa.Remove() 684 ech := wa.EventChan() 685 var nch <-chan bool 686 if x, ok := w.(http.CloseNotifier); ok { 687 nch = x.CloseNotify() 688 } 689 690 w.Header().Set("Content-Type", "application/json") 691 w.Header().Set("X-Etcd-Index", fmt.Sprint(wa.StartIndex())) 692 w.Header().Set("X-Raft-Index", fmt.Sprint(rt.Index())) 693 w.Header().Set("X-Raft-Term", fmt.Sprint(rt.Term())) 694 w.WriteHeader(http.StatusOK) 695 696 // Ensure headers are flushed early, in case of long polling 697 w.(http.Flusher).Flush() 698 699 for { 700 select { 701 case <-nch: 702 // Client closed connection. Nothing to do. 703 return 704 case <-ctx.Done(): 705 // Timed out. net/http will close the connection for us, so nothing to do. 706 return 707 case ev, ok := <-ech: 708 if !ok { 709 // If the channel is closed this may be an indication of 710 // that notifications are much more than we are able to 711 // send to the client in time. Then we simply end streaming. 712 return 713 } 714 ev = trimEventPrefix(ev, etcdserver.StoreKeysPrefix) 715 if err := json.NewEncoder(w).Encode(ev); err != nil { 716 // Should never be reached 717 plog.Warningf("error writing event (%v)", err) 718 return 719 } 720 if !stream { 721 return 722 } 723 w.(http.Flusher).Flush() 724 } 725 } 726} 727 728func trimEventPrefix(ev *store.Event, prefix string) *store.Event { 729 if ev == nil { 730 return nil 731 } 732 // Since the *Event may reference one in the store history 733 // history, we must copy it before modifying 734 e := ev.Clone() 735 trimNodeExternPrefix(e.Node, prefix) 736 trimNodeExternPrefix(e.PrevNode, prefix) 737 return e 738} 739 740func trimNodeExternPrefix(n *store.NodeExtern, prefix string) { 741 if n == nil { 742 return 743 } 744 n.Key = strings.TrimPrefix(n.Key, prefix) 745 for _, nn := range n.Nodes { 746 trimNodeExternPrefix(nn, prefix) 747 } 748} 749 750func trimErrorPrefix(err error, prefix string) error { 751 if e, ok := err.(*etcdErr.Error); ok { 752 e.Cause = strings.TrimPrefix(e.Cause, prefix) 753 } 754 return err 755} 756 757func unmarshalRequest(r *http.Request, req json.Unmarshaler, w http.ResponseWriter) bool { 758 ctype := r.Header.Get("Content-Type") 759 semicolonPosition := strings.Index(ctype, ";") 760 if semicolonPosition != -1 { 761 ctype = strings.TrimSpace(strings.ToLower(ctype[0:semicolonPosition])) 762 } 763 if ctype != "application/json" { 764 writeError(w, r, httptypes.NewHTTPError(http.StatusUnsupportedMediaType, fmt.Sprintf("Bad Content-Type %s, accept application/json", ctype))) 765 return false 766 } 767 b, err := ioutil.ReadAll(r.Body) 768 if err != nil { 769 writeError(w, r, httptypes.NewHTTPError(http.StatusBadRequest, err.Error())) 770 return false 771 } 772 if err := req.UnmarshalJSON(b); err != nil { 773 writeError(w, r, httptypes.NewHTTPError(http.StatusBadRequest, err.Error())) 774 return false 775 } 776 return true 777} 778 779func getID(p string, w http.ResponseWriter) (types.ID, bool) { 780 idStr := trimPrefix(p, membersPrefix) 781 if idStr == "" { 782 http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed) 783 return 0, false 784 } 785 id, err := types.IDFromString(idStr) 786 if err != nil { 787 writeError(w, nil, httptypes.NewHTTPError(http.StatusNotFound, fmt.Sprintf("No such member: %s", idStr))) 788 return 0, false 789 } 790 return id, true 791} 792 793// getUint64 extracts a uint64 by the given key from a Form. If the key does 794// not exist in the form, 0 is returned. If the key exists but the value is 795// badly formed, an error is returned. If multiple values are present only the 796// first is considered. 797func getUint64(form url.Values, key string) (i uint64, err error) { 798 if vals, ok := form[key]; ok { 799 i, err = strconv.ParseUint(vals[0], 10, 64) 800 } 801 return 802} 803 804// getBool extracts a bool by the given key from a Form. If the key does not 805// exist in the form, false is returned. If the key exists but the value is 806// badly formed, an error is returned. If multiple values are present only the 807// first is considered. 808func getBool(form url.Values, key string) (b bool, err error) { 809 if vals, ok := form[key]; ok { 810 b, err = strconv.ParseBool(vals[0]) 811 } 812 return 813} 814 815// trimPrefix removes a given prefix and any slash following the prefix 816// e.g.: trimPrefix("foo", "foo") == trimPrefix("foo/", "foo") == "" 817func trimPrefix(p, prefix string) (s string) { 818 s = strings.TrimPrefix(p, prefix) 819 s = strings.TrimPrefix(s, "/") 820 return 821} 822 823func newMemberCollection(ms []*membership.Member) *httptypes.MemberCollection { 824 c := httptypes.MemberCollection(make([]httptypes.Member, len(ms))) 825 826 for i, m := range ms { 827 c[i] = newMember(m) 828 } 829 830 return &c 831} 832 833func newMember(m *membership.Member) httptypes.Member { 834 tm := httptypes.Member{ 835 ID: m.ID.String(), 836 Name: m.Name, 837 PeerURLs: make([]string, len(m.PeerURLs)), 838 ClientURLs: make([]string, len(m.ClientURLs)), 839 } 840 841 copy(tm.PeerURLs, m.PeerURLs) 842 copy(tm.ClientURLs, m.ClientURLs) 843 844 return tm 845} 846