1// Copyright 2015 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package embed
16
17import (
18	"context"
19	"fmt"
20	"io/ioutil"
21	defaultLog "log"
22	"math"
23	"net"
24	"net/http"
25	"strings"
26
27	etcdservergw "go.etcd.io/etcd/api/v3/etcdserverpb/gw"
28	"go.etcd.io/etcd/client/pkg/v3/transport"
29	"go.etcd.io/etcd/client/v3/credentials"
30	"go.etcd.io/etcd/pkg/v3/debugutil"
31	"go.etcd.io/etcd/pkg/v3/httputil"
32	"go.etcd.io/etcd/server/v3/etcdserver"
33	"go.etcd.io/etcd/server/v3/etcdserver/api/v3client"
34	"go.etcd.io/etcd/server/v3/etcdserver/api/v3election"
35	"go.etcd.io/etcd/server/v3/etcdserver/api/v3election/v3electionpb"
36	v3electiongw "go.etcd.io/etcd/server/v3/etcdserver/api/v3election/v3electionpb/gw"
37	"go.etcd.io/etcd/server/v3/etcdserver/api/v3lock"
38	"go.etcd.io/etcd/server/v3/etcdserver/api/v3lock/v3lockpb"
39	v3lockgw "go.etcd.io/etcd/server/v3/etcdserver/api/v3lock/v3lockpb/gw"
40	"go.etcd.io/etcd/server/v3/etcdserver/api/v3rpc"
41
42	gw "github.com/grpc-ecosystem/grpc-gateway/runtime"
43	"github.com/soheilhy/cmux"
44	"github.com/tmc/grpc-websocket-proxy/wsproxy"
45	"go.uber.org/zap"
46	"golang.org/x/net/trace"
47	"google.golang.org/grpc"
48)
49
50type serveCtx struct {
51	lg       *zap.Logger
52	l        net.Listener
53	addr     string
54	network  string
55	secure   bool
56	insecure bool
57
58	ctx    context.Context
59	cancel context.CancelFunc
60
61	userHandlers    map[string]http.Handler
62	serviceRegister func(*grpc.Server)
63	serversC        chan *servers
64}
65
66type servers struct {
67	secure bool
68	grpc   *grpc.Server
69	http   *http.Server
70}
71
72func newServeCtx(lg *zap.Logger) *serveCtx {
73	ctx, cancel := context.WithCancel(context.Background())
74	if lg == nil {
75		lg = zap.NewNop()
76	}
77	return &serveCtx{
78		lg:           lg,
79		ctx:          ctx,
80		cancel:       cancel,
81		userHandlers: make(map[string]http.Handler),
82		serversC:     make(chan *servers, 2), // in case sctx.insecure,sctx.secure true
83	}
84}
85
86// serve accepts incoming connections on the listener l,
87// creating a new service goroutine for each. The service goroutines
88// read requests and then call handler to reply to them.
89func (sctx *serveCtx) serve(
90	s *etcdserver.EtcdServer,
91	tlsinfo *transport.TLSInfo,
92	handler http.Handler,
93	errHandler func(error),
94	gopts ...grpc.ServerOption) (err error) {
95	logger := defaultLog.New(ioutil.Discard, "etcdhttp", 0)
96	<-s.ReadyNotify()
97
98	sctx.lg.Info("ready to serve client requests")
99
100	m := cmux.New(sctx.l)
101	v3c := v3client.New(s)
102	servElection := v3election.NewElectionServer(v3c)
103	servLock := v3lock.NewLockServer(v3c)
104
105	var gs *grpc.Server
106	defer func() {
107		if err != nil && gs != nil {
108			gs.Stop()
109		}
110	}()
111
112	if sctx.insecure {
113		gs = v3rpc.Server(s, nil, gopts...)
114		v3electionpb.RegisterElectionServer(gs, servElection)
115		v3lockpb.RegisterLockServer(gs, servLock)
116		if sctx.serviceRegister != nil {
117			sctx.serviceRegister(gs)
118		}
119		grpcl := m.Match(cmux.HTTP2())
120		go func() { errHandler(gs.Serve(grpcl)) }()
121
122		var gwmux *gw.ServeMux
123		if s.Cfg.EnableGRPCGateway {
124			gwmux, err = sctx.registerGateway([]grpc.DialOption{grpc.WithInsecure()})
125			if err != nil {
126				return err
127			}
128		}
129
130		httpmux := sctx.createMux(gwmux, handler)
131
132		srvhttp := &http.Server{
133			Handler:  createAccessController(sctx.lg, s, httpmux),
134			ErrorLog: logger, // do not log user error
135		}
136		httpl := m.Match(cmux.HTTP1())
137		go func() { errHandler(srvhttp.Serve(httpl)) }()
138
139		sctx.serversC <- &servers{grpc: gs, http: srvhttp}
140		sctx.lg.Info(
141			"serving client traffic insecurely; this is strongly discouraged!",
142			zap.String("address", sctx.l.Addr().String()),
143		)
144	}
145
146	if sctx.secure {
147		tlscfg, tlsErr := tlsinfo.ServerConfig()
148		if tlsErr != nil {
149			return tlsErr
150		}
151		gs = v3rpc.Server(s, tlscfg, gopts...)
152		v3electionpb.RegisterElectionServer(gs, servElection)
153		v3lockpb.RegisterLockServer(gs, servLock)
154		if sctx.serviceRegister != nil {
155			sctx.serviceRegister(gs)
156		}
157		handler = grpcHandlerFunc(gs, handler)
158
159		var gwmux *gw.ServeMux
160		if s.Cfg.EnableGRPCGateway {
161			dtls := tlscfg.Clone()
162			// trust local server
163			dtls.InsecureSkipVerify = true
164			bundle := credentials.NewBundle(credentials.Config{TLSConfig: dtls})
165			opts := []grpc.DialOption{grpc.WithTransportCredentials(bundle.TransportCredentials())}
166			gwmux, err = sctx.registerGateway(opts)
167			if err != nil {
168				return err
169			}
170		}
171
172		var tlsl net.Listener
173		tlsl, err = transport.NewTLSListener(m.Match(cmux.Any()), tlsinfo)
174		if err != nil {
175			return err
176		}
177		// TODO: add debug flag; enable logging when debug flag is set
178		httpmux := sctx.createMux(gwmux, handler)
179
180		srv := &http.Server{
181			Handler:   createAccessController(sctx.lg, s, httpmux),
182			TLSConfig: tlscfg,
183			ErrorLog:  logger, // do not log user error
184		}
185		go func() { errHandler(srv.Serve(tlsl)) }()
186
187		sctx.serversC <- &servers{secure: true, grpc: gs, http: srv}
188		sctx.lg.Info(
189			"serving client traffic securely",
190			zap.String("address", sctx.l.Addr().String()),
191		)
192	}
193
194	close(sctx.serversC)
195	return m.Serve()
196}
197
198// grpcHandlerFunc returns an http.Handler that delegates to grpcServer on incoming gRPC
199// connections or otherHandler otherwise. Given in gRPC docs.
200func grpcHandlerFunc(grpcServer *grpc.Server, otherHandler http.Handler) http.Handler {
201	if otherHandler == nil {
202		return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
203			grpcServer.ServeHTTP(w, r)
204		})
205	}
206	return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
207		if r.ProtoMajor == 2 && strings.Contains(r.Header.Get("Content-Type"), "application/grpc") {
208			grpcServer.ServeHTTP(w, r)
209		} else {
210			otherHandler.ServeHTTP(w, r)
211		}
212	})
213}
214
215type registerHandlerFunc func(context.Context, *gw.ServeMux, *grpc.ClientConn) error
216
217func (sctx *serveCtx) registerGateway(opts []grpc.DialOption) (*gw.ServeMux, error) {
218	ctx := sctx.ctx
219
220	addr := sctx.addr
221	if network := sctx.network; network == "unix" {
222		// explicitly define unix network for gRPC socket support
223		addr = fmt.Sprintf("%s://%s", network, addr)
224	}
225
226	opts = append(opts, grpc.WithDefaultCallOptions([]grpc.CallOption{
227		grpc.MaxCallRecvMsgSize(math.MaxInt32),
228	}...))
229
230	conn, err := grpc.DialContext(ctx, addr, opts...)
231	if err != nil {
232		return nil, err
233	}
234	gwmux := gw.NewServeMux()
235
236	handlers := []registerHandlerFunc{
237		etcdservergw.RegisterKVHandler,
238		etcdservergw.RegisterWatchHandler,
239		etcdservergw.RegisterLeaseHandler,
240		etcdservergw.RegisterClusterHandler,
241		etcdservergw.RegisterMaintenanceHandler,
242		etcdservergw.RegisterAuthHandler,
243		v3lockgw.RegisterLockHandler,
244		v3electiongw.RegisterElectionHandler,
245	}
246	for _, h := range handlers {
247		if err := h(ctx, gwmux, conn); err != nil {
248			return nil, err
249		}
250	}
251	go func() {
252		<-ctx.Done()
253		if cerr := conn.Close(); cerr != nil {
254			sctx.lg.Warn(
255				"failed to close connection",
256				zap.String("address", sctx.l.Addr().String()),
257				zap.Error(cerr),
258			)
259		}
260	}()
261
262	return gwmux, nil
263}
264
265func (sctx *serveCtx) createMux(gwmux *gw.ServeMux, handler http.Handler) *http.ServeMux {
266	httpmux := http.NewServeMux()
267	for path, h := range sctx.userHandlers {
268		httpmux.Handle(path, h)
269	}
270
271	if gwmux != nil {
272		httpmux.Handle(
273			"/v3/",
274			wsproxy.WebsocketProxy(
275				gwmux,
276				wsproxy.WithRequestMutator(
277					// Default to the POST method for streams
278					func(_ *http.Request, outgoing *http.Request) *http.Request {
279						outgoing.Method = "POST"
280						return outgoing
281					},
282				),
283				wsproxy.WithMaxRespBodyBufferSize(0x7fffffff),
284			),
285		)
286	}
287	if handler != nil {
288		httpmux.Handle("/", handler)
289	}
290	return httpmux
291}
292
293// createAccessController wraps HTTP multiplexer:
294// - mutate gRPC gateway request paths
295// - check hostname whitelist
296// client HTTP requests goes here first
297func createAccessController(lg *zap.Logger, s *etcdserver.EtcdServer, mux *http.ServeMux) http.Handler {
298	if lg == nil {
299		lg = zap.NewNop()
300	}
301	return &accessController{lg: lg, s: s, mux: mux}
302}
303
304type accessController struct {
305	lg  *zap.Logger
306	s   *etcdserver.EtcdServer
307	mux *http.ServeMux
308}
309
310func (ac *accessController) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
311	if req == nil {
312		http.Error(rw, "Request is nil", http.StatusBadRequest)
313		return
314	}
315	// redirect for backward compatibilities
316	if req.URL != nil && strings.HasPrefix(req.URL.Path, "/v3beta/") {
317		req.URL.Path = strings.Replace(req.URL.Path, "/v3beta/", "/v3/", 1)
318	}
319
320	if req.TLS == nil { // check origin if client connection is not secure
321		host := httputil.GetHostname(req)
322		if !ac.s.AccessController.IsHostWhitelisted(host) {
323			ac.lg.Warn(
324				"rejecting HTTP request to prevent DNS rebinding attacks",
325				zap.String("host", host),
326			)
327			http.Error(rw, errCVE20185702(host), http.StatusMisdirectedRequest)
328			return
329		}
330	} else if ac.s.Cfg.ClientCertAuthEnabled && ac.s.Cfg.EnableGRPCGateway &&
331		ac.s.AuthStore().IsAuthEnabled() && strings.HasPrefix(req.URL.Path, "/v3/") {
332		for _, chains := range req.TLS.VerifiedChains {
333			if len(chains) < 1 {
334				continue
335			}
336			if len(chains[0].Subject.CommonName) != 0 {
337				http.Error(rw, "CommonName of client sending a request against gateway will be ignored and not used as expected", http.StatusBadRequest)
338				return
339			}
340		}
341	}
342
343	// Write CORS header.
344	if ac.s.AccessController.OriginAllowed("*") {
345		addCORSHeader(rw, "*")
346	} else if origin := req.Header.Get("Origin"); ac.s.OriginAllowed(origin) {
347		addCORSHeader(rw, origin)
348	}
349
350	if req.Method == "OPTIONS" {
351		rw.WriteHeader(http.StatusOK)
352		return
353	}
354
355	ac.mux.ServeHTTP(rw, req)
356}
357
358// addCORSHeader adds the correct cors headers given an origin
359func addCORSHeader(w http.ResponseWriter, origin string) {
360	w.Header().Add("Access-Control-Allow-Methods", "POST, GET, OPTIONS, PUT, DELETE")
361	w.Header().Add("Access-Control-Allow-Origin", origin)
362	w.Header().Add("Access-Control-Allow-Headers", "accept, content-type, authorization")
363}
364
365// https://github.com/transmission/transmission/pull/468
366func errCVE20185702(host string) string {
367	return fmt.Sprintf(`
368etcd received your request, but the Host header was unrecognized.
369
370To fix this, choose one of the following options:
371- Enable TLS, then any HTTPS request will be allowed.
372- Add the hostname you want to use to the whitelist in settings.
373  - e.g. etcd --host-whitelist %q
374
375This requirement has been added to help prevent "DNS Rebinding" attacks (CVE-2018-5702).
376`, host)
377}
378
379// WrapCORS wraps existing handler with CORS.
380// TODO: deprecate this after v2 proxy deprecate
381func WrapCORS(cors map[string]struct{}, h http.Handler) http.Handler {
382	return &corsHandler{
383		ac: &etcdserver.AccessController{CORS: cors},
384		h:  h,
385	}
386}
387
388type corsHandler struct {
389	ac *etcdserver.AccessController
390	h  http.Handler
391}
392
393func (ch *corsHandler) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
394	if ch.ac.OriginAllowed("*") {
395		addCORSHeader(rw, "*")
396	} else if origin := req.Header.Get("Origin"); ch.ac.OriginAllowed(origin) {
397		addCORSHeader(rw, origin)
398	}
399
400	if req.Method == "OPTIONS" {
401		rw.WriteHeader(http.StatusOK)
402		return
403	}
404
405	ch.h.ServeHTTP(rw, req)
406}
407
408func (sctx *serveCtx) registerUserHandler(s string, h http.Handler) {
409	if sctx.userHandlers[s] != nil {
410		sctx.lg.Warn("path is already registered by user handler", zap.String("path", s))
411		return
412	}
413	sctx.userHandlers[s] = h
414}
415
416func (sctx *serveCtx) registerPprof() {
417	for p, h := range debugutil.PProfHandlers() {
418		sctx.registerUserHandler(p, h)
419	}
420}
421
422func (sctx *serveCtx) registerTrace() {
423	reqf := func(w http.ResponseWriter, r *http.Request) { trace.Render(w, r, true) }
424	sctx.registerUserHandler("/debug/requests", http.HandlerFunc(reqf))
425	evf := func(w http.ResponseWriter, r *http.Request) { trace.RenderEvents(w, r, true) }
426	sctx.registerUserHandler("/debug/events", http.HandlerFunc(evf))
427}
428