1// Copyright 2016 The etcd Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package leasehttp 16 17import ( 18 "bytes" 19 "context" 20 "errors" 21 "fmt" 22 "io/ioutil" 23 "net/http" 24 "time" 25 26 pb "go.etcd.io/etcd/etcdserver/etcdserverpb" 27 "go.etcd.io/etcd/lease" 28 "go.etcd.io/etcd/lease/leasepb" 29 "go.etcd.io/etcd/pkg/httputil" 30) 31 32var ( 33 LeasePrefix = "/leases" 34 LeaseInternalPrefix = "/leases/internal" 35 applyTimeout = time.Second 36 ErrLeaseHTTPTimeout = errors.New("waiting for node to catch up its applied index has timed out") 37) 38 39// NewHandler returns an http Handler for lease renewals 40func NewHandler(l lease.Lessor, waitch func() <-chan struct{}) http.Handler { 41 return &leaseHandler{l, waitch} 42} 43 44type leaseHandler struct { 45 l lease.Lessor 46 waitch func() <-chan struct{} 47} 48 49func (h *leaseHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { 50 if r.Method != "POST" { 51 http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed) 52 return 53 } 54 55 defer r.Body.Close() 56 b, err := ioutil.ReadAll(r.Body) 57 if err != nil { 58 http.Error(w, "error reading body", http.StatusBadRequest) 59 return 60 } 61 62 var v []byte 63 switch r.URL.Path { 64 case LeasePrefix: 65 lreq := pb.LeaseKeepAliveRequest{} 66 if uerr := lreq.Unmarshal(b); uerr != nil { 67 http.Error(w, "error unmarshalling request", http.StatusBadRequest) 68 return 69 } 70 select { 71 case <-h.waitch(): 72 case <-time.After(applyTimeout): 73 http.Error(w, ErrLeaseHTTPTimeout.Error(), http.StatusRequestTimeout) 74 return 75 } 76 ttl, rerr := h.l.Renew(lease.LeaseID(lreq.ID)) 77 if rerr != nil { 78 if rerr == lease.ErrLeaseNotFound { 79 http.Error(w, rerr.Error(), http.StatusNotFound) 80 return 81 } 82 83 http.Error(w, rerr.Error(), http.StatusBadRequest) 84 return 85 } 86 // TODO: fill out ResponseHeader 87 resp := &pb.LeaseKeepAliveResponse{ID: lreq.ID, TTL: ttl} 88 v, err = resp.Marshal() 89 if err != nil { 90 http.Error(w, err.Error(), http.StatusInternalServerError) 91 return 92 } 93 94 case LeaseInternalPrefix: 95 lreq := leasepb.LeaseInternalRequest{} 96 if lerr := lreq.Unmarshal(b); lerr != nil { 97 http.Error(w, "error unmarshalling request", http.StatusBadRequest) 98 return 99 } 100 select { 101 case <-h.waitch(): 102 case <-time.After(applyTimeout): 103 http.Error(w, ErrLeaseHTTPTimeout.Error(), http.StatusRequestTimeout) 104 return 105 } 106 l := h.l.Lookup(lease.LeaseID(lreq.LeaseTimeToLiveRequest.ID)) 107 if l == nil { 108 http.Error(w, lease.ErrLeaseNotFound.Error(), http.StatusNotFound) 109 return 110 } 111 // TODO: fill out ResponseHeader 112 resp := &leasepb.LeaseInternalResponse{ 113 LeaseTimeToLiveResponse: &pb.LeaseTimeToLiveResponse{ 114 Header: &pb.ResponseHeader{}, 115 ID: lreq.LeaseTimeToLiveRequest.ID, 116 TTL: int64(l.Remaining().Seconds()), 117 GrantedTTL: l.TTL(), 118 }, 119 } 120 if lreq.LeaseTimeToLiveRequest.Keys { 121 ks := l.Keys() 122 kbs := make([][]byte, len(ks)) 123 for i := range ks { 124 kbs[i] = []byte(ks[i]) 125 } 126 resp.LeaseTimeToLiveResponse.Keys = kbs 127 } 128 129 v, err = resp.Marshal() 130 if err != nil { 131 http.Error(w, err.Error(), http.StatusInternalServerError) 132 return 133 } 134 135 default: 136 http.Error(w, fmt.Sprintf("unknown request path %q", r.URL.Path), http.StatusBadRequest) 137 return 138 } 139 140 w.Header().Set("Content-Type", "application/protobuf") 141 w.Write(v) 142} 143 144// RenewHTTP renews a lease at a given primary server. 145// TODO: Batch request in future? 146func RenewHTTP(ctx context.Context, id lease.LeaseID, url string, rt http.RoundTripper) (int64, error) { 147 // will post lreq protobuf to leader 148 lreq, err := (&pb.LeaseKeepAliveRequest{ID: int64(id)}).Marshal() 149 if err != nil { 150 return -1, err 151 } 152 153 cc := &http.Client{Transport: rt} 154 req, err := http.NewRequest("POST", url, bytes.NewReader(lreq)) 155 if err != nil { 156 return -1, err 157 } 158 req.Header.Set("Content-Type", "application/protobuf") 159 req.Cancel = ctx.Done() 160 161 resp, err := cc.Do(req) 162 if err != nil { 163 return -1, err 164 } 165 b, err := readResponse(resp) 166 if err != nil { 167 return -1, err 168 } 169 170 if resp.StatusCode == http.StatusRequestTimeout { 171 return -1, ErrLeaseHTTPTimeout 172 } 173 174 if resp.StatusCode == http.StatusNotFound { 175 return -1, lease.ErrLeaseNotFound 176 } 177 178 if resp.StatusCode != http.StatusOK { 179 return -1, fmt.Errorf("lease: unknown error(%s)", string(b)) 180 } 181 182 lresp := &pb.LeaseKeepAliveResponse{} 183 if err := lresp.Unmarshal(b); err != nil { 184 return -1, fmt.Errorf(`lease: %v. data = "%s"`, err, string(b)) 185 } 186 if lresp.ID != int64(id) { 187 return -1, fmt.Errorf("lease: renew id mismatch") 188 } 189 return lresp.TTL, nil 190} 191 192// TimeToLiveHTTP retrieves lease information of the given lease ID. 193func TimeToLiveHTTP(ctx context.Context, id lease.LeaseID, keys bool, url string, rt http.RoundTripper) (*leasepb.LeaseInternalResponse, error) { 194 // will post lreq protobuf to leader 195 lreq, err := (&leasepb.LeaseInternalRequest{ 196 LeaseTimeToLiveRequest: &pb.LeaseTimeToLiveRequest{ 197 ID: int64(id), 198 Keys: keys, 199 }, 200 }).Marshal() 201 if err != nil { 202 return nil, err 203 } 204 205 req, err := http.NewRequest("POST", url, bytes.NewReader(lreq)) 206 if err != nil { 207 return nil, err 208 } 209 req.Header.Set("Content-Type", "application/protobuf") 210 211 req = req.WithContext(ctx) 212 213 cc := &http.Client{Transport: rt} 214 var b []byte 215 // buffer errc channel so that errc don't block inside the go routinue 216 resp, err := cc.Do(req) 217 if err != nil { 218 return nil, err 219 } 220 b, err = readResponse(resp) 221 if err != nil { 222 return nil, err 223 } 224 if resp.StatusCode == http.StatusRequestTimeout { 225 return nil, ErrLeaseHTTPTimeout 226 } 227 if resp.StatusCode == http.StatusNotFound { 228 return nil, lease.ErrLeaseNotFound 229 } 230 if resp.StatusCode != http.StatusOK { 231 return nil, fmt.Errorf("lease: unknown error(%s)", string(b)) 232 } 233 234 lresp := &leasepb.LeaseInternalResponse{} 235 if err := lresp.Unmarshal(b); err != nil { 236 return nil, fmt.Errorf(`lease: %v. data = "%s"`, err, string(b)) 237 } 238 if lresp.LeaseTimeToLiveResponse.ID != int64(id) { 239 return nil, fmt.Errorf("lease: renew id mismatch") 240 } 241 return lresp, nil 242} 243 244func readResponse(resp *http.Response) (b []byte, err error) { 245 b, err = ioutil.ReadAll(resp.Body) 246 httputil.GracefulClose(resp) 247 return 248} 249