1// Copyright 2016 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package leasehttp
16
17import (
18	"bytes"
19	"context"
20	"errors"
21	"fmt"
22	"io/ioutil"
23	"net/http"
24	"time"
25
26	pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
27	"go.etcd.io/etcd/lease"
28	"go.etcd.io/etcd/lease/leasepb"
29	"go.etcd.io/etcd/pkg/httputil"
30)
31
32var (
33	LeasePrefix         = "/leases"
34	LeaseInternalPrefix = "/leases/internal"
35	applyTimeout        = time.Second
36	ErrLeaseHTTPTimeout = errors.New("waiting for node to catch up its applied index has timed out")
37)
38
39// NewHandler returns an http Handler for lease renewals
40func NewHandler(l lease.Lessor, waitch func() <-chan struct{}) http.Handler {
41	return &leaseHandler{l, waitch}
42}
43
44type leaseHandler struct {
45	l      lease.Lessor
46	waitch func() <-chan struct{}
47}
48
49func (h *leaseHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
50	if r.Method != "POST" {
51		http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
52		return
53	}
54
55	defer r.Body.Close()
56	b, err := ioutil.ReadAll(r.Body)
57	if err != nil {
58		http.Error(w, "error reading body", http.StatusBadRequest)
59		return
60	}
61
62	var v []byte
63	switch r.URL.Path {
64	case LeasePrefix:
65		lreq := pb.LeaseKeepAliveRequest{}
66		if uerr := lreq.Unmarshal(b); uerr != nil {
67			http.Error(w, "error unmarshalling request", http.StatusBadRequest)
68			return
69		}
70		select {
71		case <-h.waitch():
72		case <-time.After(applyTimeout):
73			http.Error(w, ErrLeaseHTTPTimeout.Error(), http.StatusRequestTimeout)
74			return
75		}
76		ttl, rerr := h.l.Renew(lease.LeaseID(lreq.ID))
77		if rerr != nil {
78			if rerr == lease.ErrLeaseNotFound {
79				http.Error(w, rerr.Error(), http.StatusNotFound)
80				return
81			}
82
83			http.Error(w, rerr.Error(), http.StatusBadRequest)
84			return
85		}
86		// TODO: fill out ResponseHeader
87		resp := &pb.LeaseKeepAliveResponse{ID: lreq.ID, TTL: ttl}
88		v, err = resp.Marshal()
89		if err != nil {
90			http.Error(w, err.Error(), http.StatusInternalServerError)
91			return
92		}
93
94	case LeaseInternalPrefix:
95		lreq := leasepb.LeaseInternalRequest{}
96		if lerr := lreq.Unmarshal(b); lerr != nil {
97			http.Error(w, "error unmarshalling request", http.StatusBadRequest)
98			return
99		}
100		select {
101		case <-h.waitch():
102		case <-time.After(applyTimeout):
103			http.Error(w, ErrLeaseHTTPTimeout.Error(), http.StatusRequestTimeout)
104			return
105		}
106		l := h.l.Lookup(lease.LeaseID(lreq.LeaseTimeToLiveRequest.ID))
107		if l == nil {
108			http.Error(w, lease.ErrLeaseNotFound.Error(), http.StatusNotFound)
109			return
110		}
111		// TODO: fill out ResponseHeader
112		resp := &leasepb.LeaseInternalResponse{
113			LeaseTimeToLiveResponse: &pb.LeaseTimeToLiveResponse{
114				Header:     &pb.ResponseHeader{},
115				ID:         lreq.LeaseTimeToLiveRequest.ID,
116				TTL:        int64(l.Remaining().Seconds()),
117				GrantedTTL: l.TTL(),
118			},
119		}
120		if lreq.LeaseTimeToLiveRequest.Keys {
121			ks := l.Keys()
122			kbs := make([][]byte, len(ks))
123			for i := range ks {
124				kbs[i] = []byte(ks[i])
125			}
126			resp.LeaseTimeToLiveResponse.Keys = kbs
127		}
128
129		v, err = resp.Marshal()
130		if err != nil {
131			http.Error(w, err.Error(), http.StatusInternalServerError)
132			return
133		}
134
135	default:
136		http.Error(w, fmt.Sprintf("unknown request path %q", r.URL.Path), http.StatusBadRequest)
137		return
138	}
139
140	w.Header().Set("Content-Type", "application/protobuf")
141	w.Write(v)
142}
143
144// RenewHTTP renews a lease at a given primary server.
145// TODO: Batch request in future?
146func RenewHTTP(ctx context.Context, id lease.LeaseID, url string, rt http.RoundTripper) (int64, error) {
147	// will post lreq protobuf to leader
148	lreq, err := (&pb.LeaseKeepAliveRequest{ID: int64(id)}).Marshal()
149	if err != nil {
150		return -1, err
151	}
152
153	cc := &http.Client{Transport: rt}
154	req, err := http.NewRequest("POST", url, bytes.NewReader(lreq))
155	if err != nil {
156		return -1, err
157	}
158	req.Header.Set("Content-Type", "application/protobuf")
159	req.Cancel = ctx.Done()
160
161	resp, err := cc.Do(req)
162	if err != nil {
163		return -1, err
164	}
165	b, err := readResponse(resp)
166	if err != nil {
167		return -1, err
168	}
169
170	if resp.StatusCode == http.StatusRequestTimeout {
171		return -1, ErrLeaseHTTPTimeout
172	}
173
174	if resp.StatusCode == http.StatusNotFound {
175		return -1, lease.ErrLeaseNotFound
176	}
177
178	if resp.StatusCode != http.StatusOK {
179		return -1, fmt.Errorf("lease: unknown error(%s)", string(b))
180	}
181
182	lresp := &pb.LeaseKeepAliveResponse{}
183	if err := lresp.Unmarshal(b); err != nil {
184		return -1, fmt.Errorf(`lease: %v. data = "%s"`, err, string(b))
185	}
186	if lresp.ID != int64(id) {
187		return -1, fmt.Errorf("lease: renew id mismatch")
188	}
189	return lresp.TTL, nil
190}
191
192// TimeToLiveHTTP retrieves lease information of the given lease ID.
193func TimeToLiveHTTP(ctx context.Context, id lease.LeaseID, keys bool, url string, rt http.RoundTripper) (*leasepb.LeaseInternalResponse, error) {
194	// will post lreq protobuf to leader
195	lreq, err := (&leasepb.LeaseInternalRequest{
196		LeaseTimeToLiveRequest: &pb.LeaseTimeToLiveRequest{
197			ID:   int64(id),
198			Keys: keys,
199		},
200	}).Marshal()
201	if err != nil {
202		return nil, err
203	}
204
205	req, err := http.NewRequest("POST", url, bytes.NewReader(lreq))
206	if err != nil {
207		return nil, err
208	}
209	req.Header.Set("Content-Type", "application/protobuf")
210
211	req = req.WithContext(ctx)
212
213	cc := &http.Client{Transport: rt}
214	var b []byte
215	// buffer errc channel so that errc don't block inside the go routinue
216	resp, err := cc.Do(req)
217	if err != nil {
218		return nil, err
219	}
220	b, err = readResponse(resp)
221	if err != nil {
222		return nil, err
223	}
224	if resp.StatusCode == http.StatusRequestTimeout {
225		return nil, ErrLeaseHTTPTimeout
226	}
227	if resp.StatusCode == http.StatusNotFound {
228		return nil, lease.ErrLeaseNotFound
229	}
230	if resp.StatusCode != http.StatusOK {
231		return nil, fmt.Errorf("lease: unknown error(%s)", string(b))
232	}
233
234	lresp := &leasepb.LeaseInternalResponse{}
235	if err := lresp.Unmarshal(b); err != nil {
236		return nil, fmt.Errorf(`lease: %v. data = "%s"`, err, string(b))
237	}
238	if lresp.LeaseTimeToLiveResponse.ID != int64(id) {
239		return nil, fmt.Errorf("lease: renew id mismatch")
240	}
241	return lresp, nil
242}
243
244func readResponse(resp *http.Response) (b []byte, err error) {
245	b, err = ioutil.ReadAll(resp.Body)
246	httputil.GracefulClose(resp)
247	return
248}
249