1// Copyright 2015 CoreOS, Inc. 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package etcdhttp 16 17import ( 18 "encoding/json" 19 "errors" 20 "expvar" 21 "fmt" 22 "io/ioutil" 23 "net/http" 24 "net/http/pprof" 25 "net/url" 26 "path" 27 "strconv" 28 "strings" 29 "time" 30 31 "github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/pkg/capnslog" 32 "github.com/coreos/etcd/Godeps/_workspace/src/github.com/jonboulle/clockwork" 33 "github.com/coreos/etcd/Godeps/_workspace/src/github.com/prometheus/client_golang/prometheus" 34 "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" 35 etcdErr "github.com/coreos/etcd/error" 36 "github.com/coreos/etcd/etcdserver" 37 "github.com/coreos/etcd/etcdserver/auth" 38 "github.com/coreos/etcd/etcdserver/etcdhttp/httptypes" 39 "github.com/coreos/etcd/etcdserver/etcdserverpb" 40 "github.com/coreos/etcd/etcdserver/stats" 41 "github.com/coreos/etcd/pkg/types" 42 "github.com/coreos/etcd/raft" 43 "github.com/coreos/etcd/store" 44 "github.com/coreos/etcd/version" 45) 46 47const ( 48 authPrefix = "/v2/auth" 49 keysPrefix = "/v2/keys" 50 deprecatedMachinesPrefix = "/v2/machines" 51 membersPrefix = "/v2/members" 52 statsPrefix = "/v2/stats" 53 varsPath = "/debug/vars" 54 metricsPath = "/metrics" 55 healthPath = "/health" 56 versionPath = "/version" 57 configPath = "/config" 58 pprofPrefix = "/debug/pprof" 59) 60 61// NewClientHandler generates a muxed http.Handler with the given parameters to serve etcd client requests. 62func NewClientHandler(server *etcdserver.EtcdServer, timeout time.Duration) http.Handler { 63 go capabilityLoop(server) 64 65 sec := auth.NewStore(server, timeout) 66 67 kh := &keysHandler{ 68 sec: sec, 69 server: server, 70 cluster: server.Cluster(), 71 timer: server, 72 timeout: timeout, 73 } 74 75 sh := &statsHandler{ 76 stats: server, 77 } 78 79 mh := &membersHandler{ 80 sec: sec, 81 server: server, 82 cluster: server.Cluster(), 83 timeout: timeout, 84 clock: clockwork.NewRealClock(), 85 } 86 87 dmh := &deprecatedMachinesHandler{ 88 cluster: server.Cluster(), 89 } 90 91 sech := &authHandler{ 92 sec: sec, 93 cluster: server.Cluster(), 94 } 95 96 mux := http.NewServeMux() 97 mux.HandleFunc("/", http.NotFound) 98 mux.Handle(healthPath, healthHandler(server)) 99 mux.HandleFunc(versionPath, versionHandler(server.Cluster(), serveVersion)) 100 mux.Handle(keysPrefix, kh) 101 mux.Handle(keysPrefix+"/", kh) 102 mux.HandleFunc(statsPrefix+"/store", sh.serveStore) 103 mux.HandleFunc(statsPrefix+"/self", sh.serveSelf) 104 mux.HandleFunc(statsPrefix+"/leader", sh.serveLeader) 105 mux.HandleFunc(varsPath, serveVars) 106 mux.HandleFunc(configPath+"/local/log", logHandleFunc) 107 mux.Handle(metricsPath, prometheus.Handler()) 108 mux.Handle(membersPrefix, mh) 109 mux.Handle(membersPrefix+"/", mh) 110 mux.Handle(deprecatedMachinesPrefix, dmh) 111 handleAuth(mux, sech) 112 113 if server.IsPprofEnabled() { 114 plog.Infof("pprof is enabled under %s", pprofPrefix) 115 116 mux.HandleFunc(pprofPrefix, pprof.Index) 117 mux.HandleFunc(pprofPrefix+"/profile", pprof.Profile) 118 mux.HandleFunc(pprofPrefix+"/symbol", pprof.Symbol) 119 mux.HandleFunc(pprofPrefix+"/cmdline", pprof.Cmdline) 120 // TODO: currently, we don't create an entry for pprof.Trace, 121 // because go 1.4 doesn't provide it. After support of go 1.4 is dropped, 122 // we should add the entry. 123 124 mux.Handle(pprofPrefix+"/heap", pprof.Handler("heap")) 125 mux.Handle(pprofPrefix+"/goroutine", pprof.Handler("goroutine")) 126 mux.Handle(pprofPrefix+"/threadcreate", pprof.Handler("threadcreate")) 127 mux.Handle(pprofPrefix+"/block", pprof.Handler("block")) 128 } 129 130 return requestLogger(mux) 131} 132 133type keysHandler struct { 134 sec auth.Store 135 server etcdserver.Server 136 cluster etcdserver.Cluster 137 timer etcdserver.RaftTimer 138 timeout time.Duration 139} 140 141func (h *keysHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { 142 if !allowMethod(w, r.Method, "HEAD", "GET", "PUT", "POST", "DELETE") { 143 return 144 } 145 146 w.Header().Set("X-Etcd-Cluster-ID", h.cluster.ID().String()) 147 148 ctx, cancel := context.WithTimeout(context.Background(), h.timeout) 149 defer cancel() 150 clock := clockwork.NewRealClock() 151 startTime := clock.Now() 152 rr, err := parseKeyRequest(r, clock) 153 if err != nil { 154 writeKeyError(w, err) 155 return 156 } 157 // The path must be valid at this point (we've parsed the request successfully). 158 if !hasKeyPrefixAccess(h.sec, r, r.URL.Path[len(keysPrefix):], rr.Recursive) { 159 writeKeyNoAuth(w) 160 return 161 } 162 if !rr.Wait { 163 reportRequestReceived(rr) 164 } 165 resp, err := h.server.Do(ctx, rr) 166 if err != nil { 167 err = trimErrorPrefix(err, etcdserver.StoreKeysPrefix) 168 writeKeyError(w, err) 169 reportRequestFailed(rr, err) 170 return 171 } 172 switch { 173 case resp.Event != nil: 174 if err := writeKeyEvent(w, resp.Event, h.timer); err != nil { 175 // Should never be reached 176 plog.Errorf("error writing event (%v)", err) 177 } 178 reportRequestCompleted(rr, resp, startTime) 179 case resp.Watcher != nil: 180 ctx, cancel := context.WithTimeout(context.Background(), defaultWatchTimeout) 181 defer cancel() 182 handleKeyWatch(ctx, w, resp.Watcher, rr.Stream, h.timer) 183 default: 184 writeKeyError(w, errors.New("received response with no Event/Watcher!")) 185 } 186} 187 188type deprecatedMachinesHandler struct { 189 cluster etcdserver.Cluster 190} 191 192func (h *deprecatedMachinesHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { 193 if !allowMethod(w, r.Method, "GET", "HEAD") { 194 return 195 } 196 endpoints := h.cluster.ClientURLs() 197 w.Write([]byte(strings.Join(endpoints, ", "))) 198} 199 200type membersHandler struct { 201 sec auth.Store 202 server etcdserver.Server 203 cluster etcdserver.Cluster 204 timeout time.Duration 205 clock clockwork.Clock 206} 207 208func (h *membersHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { 209 if !allowMethod(w, r.Method, "GET", "POST", "DELETE", "PUT") { 210 return 211 } 212 if !hasWriteRootAccess(h.sec, r) { 213 writeNoAuth(w, r) 214 return 215 } 216 w.Header().Set("X-Etcd-Cluster-ID", h.cluster.ID().String()) 217 218 ctx, cancel := context.WithTimeout(context.Background(), h.timeout) 219 defer cancel() 220 221 switch r.Method { 222 case "GET": 223 switch trimPrefix(r.URL.Path, membersPrefix) { 224 case "": 225 mc := newMemberCollection(h.cluster.Members()) 226 w.Header().Set("Content-Type", "application/json") 227 if err := json.NewEncoder(w).Encode(mc); err != nil { 228 plog.Warningf("failed to encode members response (%v)", err) 229 } 230 case "leader": 231 id := h.server.Leader() 232 if id == 0 { 233 writeError(w, r, httptypes.NewHTTPError(http.StatusServiceUnavailable, "During election")) 234 return 235 } 236 m := newMember(h.cluster.Member(id)) 237 w.Header().Set("Content-Type", "application/json") 238 if err := json.NewEncoder(w).Encode(m); err != nil { 239 plog.Warningf("failed to encode members response (%v)", err) 240 } 241 default: 242 writeError(w, r, httptypes.NewHTTPError(http.StatusNotFound, "Not found")) 243 } 244 case "POST": 245 req := httptypes.MemberCreateRequest{} 246 if ok := unmarshalRequest(r, &req, w); !ok { 247 return 248 } 249 now := h.clock.Now() 250 m := etcdserver.NewMember("", req.PeerURLs, "", &now) 251 err := h.server.AddMember(ctx, *m) 252 switch { 253 case err == etcdserver.ErrIDExists || err == etcdserver.ErrPeerURLexists: 254 writeError(w, r, httptypes.NewHTTPError(http.StatusConflict, err.Error())) 255 return 256 case err != nil: 257 plog.Errorf("error adding member %s (%v)", m.ID, err) 258 writeError(w, r, err) 259 return 260 } 261 res := newMember(m) 262 w.Header().Set("Content-Type", "application/json") 263 w.WriteHeader(http.StatusCreated) 264 if err := json.NewEncoder(w).Encode(res); err != nil { 265 plog.Warningf("failed to encode members response (%v)", err) 266 } 267 case "DELETE": 268 id, ok := getID(r.URL.Path, w) 269 if !ok { 270 return 271 } 272 err := h.server.RemoveMember(ctx, uint64(id)) 273 switch { 274 case err == etcdserver.ErrIDRemoved: 275 writeError(w, r, httptypes.NewHTTPError(http.StatusGone, fmt.Sprintf("Member permanently removed: %s", id))) 276 case err == etcdserver.ErrIDNotFound: 277 writeError(w, r, httptypes.NewHTTPError(http.StatusNotFound, fmt.Sprintf("No such member: %s", id))) 278 case err != nil: 279 plog.Errorf("error removing member %s (%v)", id, err) 280 writeError(w, r, err) 281 default: 282 w.WriteHeader(http.StatusNoContent) 283 } 284 case "PUT": 285 id, ok := getID(r.URL.Path, w) 286 if !ok { 287 return 288 } 289 req := httptypes.MemberUpdateRequest{} 290 if ok := unmarshalRequest(r, &req, w); !ok { 291 return 292 } 293 m := etcdserver.Member{ 294 ID: id, 295 RaftAttributes: etcdserver.RaftAttributes{PeerURLs: req.PeerURLs.StringSlice()}, 296 } 297 err := h.server.UpdateMember(ctx, m) 298 switch { 299 case err == etcdserver.ErrPeerURLexists: 300 writeError(w, r, httptypes.NewHTTPError(http.StatusConflict, err.Error())) 301 case err == etcdserver.ErrIDNotFound: 302 writeError(w, r, httptypes.NewHTTPError(http.StatusNotFound, fmt.Sprintf("No such member: %s", id))) 303 case err != nil: 304 plog.Errorf("error updating member %s (%v)", m.ID, err) 305 writeError(w, r, err) 306 default: 307 w.WriteHeader(http.StatusNoContent) 308 } 309 } 310} 311 312type statsHandler struct { 313 stats stats.Stats 314} 315 316func (h *statsHandler) serveStore(w http.ResponseWriter, r *http.Request) { 317 if !allowMethod(w, r.Method, "GET") { 318 return 319 } 320 w.Header().Set("Content-Type", "application/json") 321 w.Write(h.stats.StoreStats()) 322} 323 324func (h *statsHandler) serveSelf(w http.ResponseWriter, r *http.Request) { 325 if !allowMethod(w, r.Method, "GET") { 326 return 327 } 328 w.Header().Set("Content-Type", "application/json") 329 w.Write(h.stats.SelfStats()) 330} 331 332func (h *statsHandler) serveLeader(w http.ResponseWriter, r *http.Request) { 333 if !allowMethod(w, r.Method, "GET") { 334 return 335 } 336 stats := h.stats.LeaderStats() 337 if stats == nil { 338 writeError(w, r, httptypes.NewHTTPError(http.StatusForbidden, "not current leader")) 339 return 340 } 341 w.Header().Set("Content-Type", "application/json") 342 w.Write(stats) 343} 344 345func serveVars(w http.ResponseWriter, r *http.Request) { 346 if !allowMethod(w, r.Method, "GET") { 347 return 348 } 349 350 w.Header().Set("Content-Type", "application/json; charset=utf-8") 351 fmt.Fprintf(w, "{\n") 352 first := true 353 expvar.Do(func(kv expvar.KeyValue) { 354 if !first { 355 fmt.Fprintf(w, ",\n") 356 } 357 first = false 358 fmt.Fprintf(w, "%q: %s", kv.Key, kv.Value) 359 }) 360 fmt.Fprintf(w, "\n}\n") 361} 362 363// TODO: change etcdserver to raft interface when we have it. 364// add test for healthHandler when we have the interface ready. 365func healthHandler(server *etcdserver.EtcdServer) http.HandlerFunc { 366 return func(w http.ResponseWriter, r *http.Request) { 367 if !allowMethod(w, r.Method, "GET") { 368 return 369 } 370 371 if uint64(server.Leader()) == raft.None { 372 http.Error(w, `{"health": "false"}`, http.StatusServiceUnavailable) 373 return 374 } 375 376 // wait for raft's progress 377 index := server.Index() 378 for i := 0; i < 3; i++ { 379 time.Sleep(250 * time.Millisecond) 380 if server.Index() > index { 381 w.WriteHeader(http.StatusOK) 382 w.Write([]byte(`{"health": "true"}`)) 383 return 384 } 385 } 386 387 http.Error(w, `{"health": "false"}`, http.StatusServiceUnavailable) 388 return 389 } 390} 391 392func versionHandler(c etcdserver.Cluster, fn func(http.ResponseWriter, *http.Request, string)) http.HandlerFunc { 393 return func(w http.ResponseWriter, r *http.Request) { 394 v := c.Version() 395 if v != nil { 396 fn(w, r, v.String()) 397 } else { 398 fn(w, r, "not_decided") 399 } 400 } 401} 402 403func serveVersion(w http.ResponseWriter, r *http.Request, clusterV string) { 404 if !allowMethod(w, r.Method, "GET") { 405 return 406 } 407 vs := version.Versions{ 408 Server: version.Version, 409 Cluster: clusterV, 410 } 411 412 w.Header().Set("Content-Type", "application/json") 413 b, err := json.Marshal(&vs) 414 if err != nil { 415 plog.Panicf("cannot marshal versions to json (%v)", err) 416 } 417 w.Write(b) 418} 419 420func logHandleFunc(w http.ResponseWriter, r *http.Request) { 421 if !allowMethod(w, r.Method, "PUT") { 422 return 423 } 424 425 in := struct{ Level string }{} 426 427 d := json.NewDecoder(r.Body) 428 if err := d.Decode(&in); err != nil { 429 writeError(w, r, httptypes.NewHTTPError(http.StatusBadRequest, "Invalid json body")) 430 return 431 } 432 433 logl, err := capnslog.ParseLevel(strings.ToUpper(in.Level)) 434 if err != nil { 435 writeError(w, r, httptypes.NewHTTPError(http.StatusBadRequest, "Invalid log level "+in.Level)) 436 return 437 } 438 439 plog.Noticef("globalLogLevel set to %q", logl.String()) 440 capnslog.SetGlobalLogLevel(logl) 441 w.WriteHeader(http.StatusNoContent) 442} 443 444// parseKeyRequest converts a received http.Request on keysPrefix to 445// a server Request, performing validation of supplied fields as appropriate. 446// If any validation fails, an empty Request and non-nil error is returned. 447func parseKeyRequest(r *http.Request, clock clockwork.Clock) (etcdserverpb.Request, error) { 448 emptyReq := etcdserverpb.Request{} 449 450 err := r.ParseForm() 451 if err != nil { 452 return emptyReq, etcdErr.NewRequestError( 453 etcdErr.EcodeInvalidForm, 454 err.Error(), 455 ) 456 } 457 458 if !strings.HasPrefix(r.URL.Path, keysPrefix) { 459 return emptyReq, etcdErr.NewRequestError( 460 etcdErr.EcodeInvalidForm, 461 "incorrect key prefix", 462 ) 463 } 464 p := path.Join(etcdserver.StoreKeysPrefix, r.URL.Path[len(keysPrefix):]) 465 466 var pIdx, wIdx uint64 467 if pIdx, err = getUint64(r.Form, "prevIndex"); err != nil { 468 return emptyReq, etcdErr.NewRequestError( 469 etcdErr.EcodeIndexNaN, 470 `invalid value for "prevIndex"`, 471 ) 472 } 473 if wIdx, err = getUint64(r.Form, "waitIndex"); err != nil { 474 return emptyReq, etcdErr.NewRequestError( 475 etcdErr.EcodeIndexNaN, 476 `invalid value for "waitIndex"`, 477 ) 478 } 479 480 var rec, sort, wait, dir, quorum, stream bool 481 if rec, err = getBool(r.Form, "recursive"); err != nil { 482 return emptyReq, etcdErr.NewRequestError( 483 etcdErr.EcodeInvalidField, 484 `invalid value for "recursive"`, 485 ) 486 } 487 if sort, err = getBool(r.Form, "sorted"); err != nil { 488 return emptyReq, etcdErr.NewRequestError( 489 etcdErr.EcodeInvalidField, 490 `invalid value for "sorted"`, 491 ) 492 } 493 if wait, err = getBool(r.Form, "wait"); err != nil { 494 return emptyReq, etcdErr.NewRequestError( 495 etcdErr.EcodeInvalidField, 496 `invalid value for "wait"`, 497 ) 498 } 499 // TODO(jonboulle): define what parameters dir is/isn't compatible with? 500 if dir, err = getBool(r.Form, "dir"); err != nil { 501 return emptyReq, etcdErr.NewRequestError( 502 etcdErr.EcodeInvalidField, 503 `invalid value for "dir"`, 504 ) 505 } 506 if quorum, err = getBool(r.Form, "quorum"); err != nil { 507 return emptyReq, etcdErr.NewRequestError( 508 etcdErr.EcodeInvalidField, 509 `invalid value for "quorum"`, 510 ) 511 } 512 if stream, err = getBool(r.Form, "stream"); err != nil { 513 return emptyReq, etcdErr.NewRequestError( 514 etcdErr.EcodeInvalidField, 515 `invalid value for "stream"`, 516 ) 517 } 518 519 if wait && r.Method != "GET" { 520 return emptyReq, etcdErr.NewRequestError( 521 etcdErr.EcodeInvalidField, 522 `"wait" can only be used with GET requests`, 523 ) 524 } 525 526 pV := r.FormValue("prevValue") 527 if _, ok := r.Form["prevValue"]; ok && pV == "" { 528 return emptyReq, etcdErr.NewRequestError( 529 etcdErr.EcodePrevValueRequired, 530 `"prevValue" cannot be empty`, 531 ) 532 } 533 534 // TTL is nullable, so leave it null if not specified 535 // or an empty string 536 var ttl *uint64 537 if len(r.FormValue("ttl")) > 0 { 538 i, err := getUint64(r.Form, "ttl") 539 if err != nil { 540 return emptyReq, etcdErr.NewRequestError( 541 etcdErr.EcodeTTLNaN, 542 `invalid value for "ttl"`, 543 ) 544 } 545 ttl = &i 546 } 547 548 // prevExist is nullable, so leave it null if not specified 549 var pe *bool 550 if _, ok := r.Form["prevExist"]; ok { 551 bv, err := getBool(r.Form, "prevExist") 552 if err != nil { 553 return emptyReq, etcdErr.NewRequestError( 554 etcdErr.EcodeInvalidField, 555 "invalid value for prevExist", 556 ) 557 } 558 pe = &bv 559 } 560 561 // refresh is nullable, so leave it null if not specified 562 var refresh *bool 563 if _, ok := r.Form["refresh"]; ok { 564 bv, err := getBool(r.Form, "refresh") 565 if err != nil { 566 return emptyReq, etcdErr.NewRequestError( 567 etcdErr.EcodeInvalidField, 568 "invalid value for refresh", 569 ) 570 } 571 refresh = &bv 572 if refresh != nil && *refresh { 573 val := r.FormValue("value") 574 if _, ok := r.Form["value"]; ok && val != "" { 575 return emptyReq, etcdErr.NewRequestError( 576 etcdErr.EcodeRefreshValue, 577 `A value was provided on a refresh`, 578 ) 579 } 580 if ttl == nil { 581 return emptyReq, etcdErr.NewRequestError( 582 etcdErr.EcodeRefreshTTLRequired, 583 `No TTL value set`, 584 ) 585 } 586 } 587 } 588 589 rr := etcdserverpb.Request{ 590 Method: r.Method, 591 Path: p, 592 Val: r.FormValue("value"), 593 Dir: dir, 594 PrevValue: pV, 595 PrevIndex: pIdx, 596 PrevExist: pe, 597 Wait: wait, 598 Since: wIdx, 599 Recursive: rec, 600 Sorted: sort, 601 Quorum: quorum, 602 Stream: stream, 603 } 604 605 if pe != nil { 606 rr.PrevExist = pe 607 } 608 609 if refresh != nil { 610 rr.Refresh = refresh 611 } 612 613 // Null TTL is equivalent to unset Expiration 614 if ttl != nil { 615 expr := time.Duration(*ttl) * time.Second 616 rr.Expiration = clock.Now().Add(expr).UnixNano() 617 } 618 619 return rr, nil 620} 621 622// writeKeyEvent trims the prefix of key path in a single Event under 623// StoreKeysPrefix, serializes it and writes the resulting JSON to the given 624// ResponseWriter, along with the appropriate headers. 625func writeKeyEvent(w http.ResponseWriter, ev *store.Event, rt etcdserver.RaftTimer) error { 626 if ev == nil { 627 return errors.New("cannot write empty Event!") 628 } 629 w.Header().Set("Content-Type", "application/json") 630 w.Header().Set("X-Etcd-Index", fmt.Sprint(ev.EtcdIndex)) 631 w.Header().Set("X-Raft-Index", fmt.Sprint(rt.Index())) 632 w.Header().Set("X-Raft-Term", fmt.Sprint(rt.Term())) 633 634 if ev.IsCreated() { 635 w.WriteHeader(http.StatusCreated) 636 } 637 638 ev = trimEventPrefix(ev, etcdserver.StoreKeysPrefix) 639 return json.NewEncoder(w).Encode(ev) 640} 641 642func writeKeyNoAuth(w http.ResponseWriter) { 643 e := etcdErr.NewError(etcdErr.EcodeUnauthorized, "Insufficient credentials", 0) 644 e.WriteTo(w) 645} 646 647// writeKeyError logs and writes the given Error to the ResponseWriter. 648// If Error is not an etcdErr, the error will be converted to an etcd error. 649func writeKeyError(w http.ResponseWriter, err error) { 650 if err == nil { 651 return 652 } 653 switch e := err.(type) { 654 case *etcdErr.Error: 655 e.WriteTo(w) 656 default: 657 switch err { 658 case etcdserver.ErrTimeoutDueToLeaderFail, etcdserver.ErrTimeoutDueToConnectionLost: 659 mlog.MergeError(err) 660 default: 661 mlog.MergeErrorf("got unexpected response error (%v)", err) 662 } 663 ee := etcdErr.NewError(etcdErr.EcodeRaftInternal, err.Error(), 0) 664 ee.WriteTo(w) 665 } 666} 667 668func handleKeyWatch(ctx context.Context, w http.ResponseWriter, wa store.Watcher, stream bool, rt etcdserver.RaftTimer) { 669 defer wa.Remove() 670 ech := wa.EventChan() 671 var nch <-chan bool 672 if x, ok := w.(http.CloseNotifier); ok { 673 nch = x.CloseNotify() 674 } 675 676 w.Header().Set("Content-Type", "application/json") 677 w.Header().Set("X-Etcd-Index", fmt.Sprint(wa.StartIndex())) 678 w.Header().Set("X-Raft-Index", fmt.Sprint(rt.Index())) 679 w.Header().Set("X-Raft-Term", fmt.Sprint(rt.Term())) 680 w.WriteHeader(http.StatusOK) 681 682 // Ensure headers are flushed early, in case of long polling 683 w.(http.Flusher).Flush() 684 685 for { 686 select { 687 case <-nch: 688 // Client closed connection. Nothing to do. 689 return 690 case <-ctx.Done(): 691 // Timed out. net/http will close the connection for us, so nothing to do. 692 return 693 case ev, ok := <-ech: 694 if !ok { 695 // If the channel is closed this may be an indication of 696 // that notifications are much more than we are able to 697 // send to the client in time. Then we simply end streaming. 698 return 699 } 700 ev = trimEventPrefix(ev, etcdserver.StoreKeysPrefix) 701 if err := json.NewEncoder(w).Encode(ev); err != nil { 702 // Should never be reached 703 plog.Warningf("error writing event (%v)", err) 704 return 705 } 706 if !stream { 707 return 708 } 709 w.(http.Flusher).Flush() 710 } 711 } 712} 713 714func trimEventPrefix(ev *store.Event, prefix string) *store.Event { 715 if ev == nil { 716 return nil 717 } 718 // Since the *Event may reference one in the store history 719 // history, we must copy it before modifying 720 e := ev.Clone() 721 e.Node = trimNodeExternPrefix(e.Node, prefix) 722 e.PrevNode = trimNodeExternPrefix(e.PrevNode, prefix) 723 return e 724} 725 726func trimNodeExternPrefix(n *store.NodeExtern, prefix string) *store.NodeExtern { 727 if n == nil { 728 return nil 729 } 730 n.Key = strings.TrimPrefix(n.Key, prefix) 731 for _, nn := range n.Nodes { 732 nn = trimNodeExternPrefix(nn, prefix) 733 } 734 return n 735} 736 737func trimErrorPrefix(err error, prefix string) error { 738 if e, ok := err.(*etcdErr.Error); ok { 739 e.Cause = strings.TrimPrefix(e.Cause, prefix) 740 } 741 return err 742} 743 744func unmarshalRequest(r *http.Request, req json.Unmarshaler, w http.ResponseWriter) bool { 745 ctype := r.Header.Get("Content-Type") 746 if ctype != "application/json" { 747 writeError(w, r, httptypes.NewHTTPError(http.StatusUnsupportedMediaType, fmt.Sprintf("Bad Content-Type %s, accept application/json", ctype))) 748 return false 749 } 750 b, err := ioutil.ReadAll(r.Body) 751 if err != nil { 752 writeError(w, r, httptypes.NewHTTPError(http.StatusBadRequest, err.Error())) 753 return false 754 } 755 if err := req.UnmarshalJSON(b); err != nil { 756 writeError(w, r, httptypes.NewHTTPError(http.StatusBadRequest, err.Error())) 757 return false 758 } 759 return true 760} 761 762func getID(p string, w http.ResponseWriter) (types.ID, bool) { 763 idStr := trimPrefix(p, membersPrefix) 764 if idStr == "" { 765 http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed) 766 return 0, false 767 } 768 id, err := types.IDFromString(idStr) 769 if err != nil { 770 writeError(w, nil, httptypes.NewHTTPError(http.StatusNotFound, fmt.Sprintf("No such member: %s", idStr))) 771 return 0, false 772 } 773 return id, true 774} 775 776// getUint64 extracts a uint64 by the given key from a Form. If the key does 777// not exist in the form, 0 is returned. If the key exists but the value is 778// badly formed, an error is returned. If multiple values are present only the 779// first is considered. 780func getUint64(form url.Values, key string) (i uint64, err error) { 781 if vals, ok := form[key]; ok { 782 i, err = strconv.ParseUint(vals[0], 10, 64) 783 } 784 return 785} 786 787// getBool extracts a bool by the given key from a Form. If the key does not 788// exist in the form, false is returned. If the key exists but the value is 789// badly formed, an error is returned. If multiple values are present only the 790// first is considered. 791func getBool(form url.Values, key string) (b bool, err error) { 792 if vals, ok := form[key]; ok { 793 b, err = strconv.ParseBool(vals[0]) 794 } 795 return 796} 797 798// trimPrefix removes a given prefix and any slash following the prefix 799// e.g.: trimPrefix("foo", "foo") == trimPrefix("foo/", "foo") == "" 800func trimPrefix(p, prefix string) (s string) { 801 s = strings.TrimPrefix(p, prefix) 802 s = strings.TrimPrefix(s, "/") 803 return 804} 805 806func newMemberCollection(ms []*etcdserver.Member) *httptypes.MemberCollection { 807 c := httptypes.MemberCollection(make([]httptypes.Member, len(ms))) 808 809 for i, m := range ms { 810 c[i] = newMember(m) 811 } 812 813 return &c 814} 815 816func newMember(m *etcdserver.Member) httptypes.Member { 817 tm := httptypes.Member{ 818 ID: m.ID.String(), 819 Name: m.Name, 820 PeerURLs: make([]string, len(m.PeerURLs)), 821 ClientURLs: make([]string, len(m.ClientURLs)), 822 } 823 824 copy(tm.PeerURLs, m.PeerURLs) 825 copy(tm.ClientURLs, m.ClientURLs) 826 827 return tm 828} 829