1/*
2   Copyright The containerd Authors.
3
4   Licensed under the Apache License, Version 2.0 (the "License");
5   you may not use this file except in compliance with the License.
6   You may obtain a copy of the License at
7
8       http://www.apache.org/licenses/LICENSE-2.0
9
10   Unless required by applicable law or agreed to in writing, software
11   distributed under the License is distributed on an "AS IS" BASIS,
12   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   See the License for the specific language governing permissions and
14   limitations under the License.
15*/
16
17/*
18Copyright 2016 The Kubernetes Authors.
19
20Licensed under the Apache License, Version 2.0 (the "License");
21you may not use this file except in compliance with the License.
22You may obtain a copy of the License at
23
24    http://www.apache.org/licenses/LICENSE-2.0
25
26Unless required by applicable law or agreed to in writing, software
27distributed under the License is distributed on an "AS IS" BASIS,
28WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
29See the License for the specific language governing permissions and
30limitations under the License.
31*/
32
33package streaming
34
35import (
36	"crypto/tls"
37	"errors"
38	"io"
39	"net"
40	"net/http"
41	"net/url"
42	"path"
43	"time"
44
45	"google.golang.org/grpc/codes"
46	"google.golang.org/grpc/status"
47
48	restful "github.com/emicklei/go-restful"
49
50	"k8s.io/apimachinery/pkg/types"
51	remotecommandconsts "k8s.io/apimachinery/pkg/util/remotecommand"
52	"k8s.io/client-go/tools/remotecommand"
53	runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1alpha2"
54
55	"github.com/containerd/containerd/pkg/cri/streaming/portforward"
56	remotecommandserver "github.com/containerd/containerd/pkg/cri/streaming/remotecommand"
57)
58
59// Server is the library interface to serve the stream requests.
60type Server interface {
61	http.Handler
62
63	// Get the serving URL for the requests.
64	// Requests must not be nil. Responses may be nil iff an error is returned.
65	GetExec(*runtimeapi.ExecRequest) (*runtimeapi.ExecResponse, error)
66	GetAttach(req *runtimeapi.AttachRequest) (*runtimeapi.AttachResponse, error)
67	GetPortForward(*runtimeapi.PortForwardRequest) (*runtimeapi.PortForwardResponse, error)
68
69	// Start the server.
70	// addr is the address to serve on (address:port) stayUp indicates whether the server should
71	// listen until Stop() is called, or automatically stop after all expected connections are
72	// closed. Calling Get{Exec,Attach,PortForward} increments the expected connection count.
73	// Function does not return until the server is stopped.
74	Start(stayUp bool) error
75	// Stop the server, and terminate any open connections.
76	Stop() error
77}
78
79// Runtime is the interface to execute the commands and provide the streams.
80type Runtime interface {
81	Exec(containerID string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize) error
82	Attach(containerID string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize) error
83	PortForward(podSandboxID string, port int32, stream io.ReadWriteCloser) error
84}
85
86// Config defines the options used for running the stream server.
87type Config struct {
88	// The host:port address the server will listen on.
89	Addr string
90	// The optional base URL for constructing streaming URLs. If empty, the baseURL will be
91	// constructed from the serve address.
92	// Note that for port "0", the URL port will be set to actual port in use.
93	BaseURL *url.URL
94
95	// How long to leave idle connections open for.
96	StreamIdleTimeout time.Duration
97	// How long to wait for clients to create streams. Only used for SPDY streaming.
98	StreamCreationTimeout time.Duration
99
100	// The streaming protocols the server supports (understands and permits).  See
101	// k8s.io/kubernetes/pkg/kubelet/server/remotecommand/constants.go for available protocols.
102	// Only used for SPDY streaming.
103	SupportedRemoteCommandProtocols []string
104
105	// The streaming protocols the server supports (understands and permits).  See
106	// k8s.io/kubernetes/pkg/kubelet/server/portforward/constants.go for available protocols.
107	// Only used for SPDY streaming.
108	SupportedPortForwardProtocols []string
109
110	// The config for serving over TLS. If nil, TLS will not be used.
111	TLSConfig *tls.Config
112}
113
114// DefaultConfig provides default values for server Config. The DefaultConfig is partial, so
115// some fields like Addr must still be provided.
116var DefaultConfig = Config{
117	StreamIdleTimeout:               4 * time.Hour,
118	StreamCreationTimeout:           remotecommandconsts.DefaultStreamCreationTimeout,
119	SupportedRemoteCommandProtocols: remotecommandconsts.SupportedStreamingProtocols,
120	SupportedPortForwardProtocols:   portforward.SupportedProtocols,
121}
122
123// NewServer creates a new Server for stream requests.
124// TODO(tallclair): Add auth(n/z) interface & handling.
125func NewServer(config Config, runtime Runtime) (Server, error) {
126	s := &server{
127		config:  config,
128		runtime: &criAdapter{runtime},
129		cache:   newRequestCache(),
130	}
131
132	if s.config.BaseURL == nil {
133		s.config.BaseURL = &url.URL{
134			Scheme: "http",
135			Host:   s.config.Addr,
136		}
137		if s.config.TLSConfig != nil {
138			s.config.BaseURL.Scheme = "https"
139		}
140	}
141
142	ws := &restful.WebService{}
143	endpoints := []struct {
144		path    string
145		handler restful.RouteFunction
146	}{
147		{"/exec/{token}", s.serveExec},
148		{"/attach/{token}", s.serveAttach},
149		{"/portforward/{token}", s.servePortForward},
150	}
151	// If serving relative to a base path, set that here.
152	pathPrefix := path.Dir(s.config.BaseURL.Path)
153	for _, e := range endpoints {
154		for _, method := range []string{"GET", "POST"} {
155			ws.Route(ws.
156				Method(method).
157				Path(path.Join(pathPrefix, e.path)).
158				To(e.handler))
159		}
160	}
161	handler := restful.NewContainer()
162	handler.Add(ws)
163	s.handler = handler
164	s.server = &http.Server{
165		Addr:      s.config.Addr,
166		Handler:   s.handler,
167		TLSConfig: s.config.TLSConfig,
168	}
169
170	return s, nil
171}
172
173type server struct {
174	config  Config
175	runtime *criAdapter
176	handler http.Handler
177	cache   *requestCache
178	server  *http.Server
179}
180
181func validateExecRequest(req *runtimeapi.ExecRequest) error {
182	if req.ContainerId == "" {
183		return status.Errorf(codes.InvalidArgument, "missing required container_id")
184	}
185	if req.Tty && req.Stderr {
186		// If TTY is set, stderr cannot be true because multiplexing is not
187		// supported.
188		return status.Errorf(codes.InvalidArgument, "tty and stderr cannot both be true")
189	}
190	if !req.Stdin && !req.Stdout && !req.Stderr {
191		return status.Errorf(codes.InvalidArgument, "one of stdin, stdout, or stderr must be set")
192	}
193	return nil
194}
195
196func (s *server) GetExec(req *runtimeapi.ExecRequest) (*runtimeapi.ExecResponse, error) {
197	if err := validateExecRequest(req); err != nil {
198		return nil, err
199	}
200	token, err := s.cache.Insert(req)
201	if err != nil {
202		return nil, err
203	}
204	return &runtimeapi.ExecResponse{
205		Url: s.buildURL("exec", token),
206	}, nil
207}
208
209func validateAttachRequest(req *runtimeapi.AttachRequest) error {
210	if req.ContainerId == "" {
211		return status.Errorf(codes.InvalidArgument, "missing required container_id")
212	}
213	if req.Tty && req.Stderr {
214		// If TTY is set, stderr cannot be true because multiplexing is not
215		// supported.
216		return status.Errorf(codes.InvalidArgument, "tty and stderr cannot both be true")
217	}
218	if !req.Stdin && !req.Stdout && !req.Stderr {
219		return status.Errorf(codes.InvalidArgument, "one of stdin, stdout, and stderr must be set")
220	}
221	return nil
222}
223
224func (s *server) GetAttach(req *runtimeapi.AttachRequest) (*runtimeapi.AttachResponse, error) {
225	if err := validateAttachRequest(req); err != nil {
226		return nil, err
227	}
228	token, err := s.cache.Insert(req)
229	if err != nil {
230		return nil, err
231	}
232	return &runtimeapi.AttachResponse{
233		Url: s.buildURL("attach", token),
234	}, nil
235}
236
237func (s *server) GetPortForward(req *runtimeapi.PortForwardRequest) (*runtimeapi.PortForwardResponse, error) {
238	if req.PodSandboxId == "" {
239		return nil, status.Errorf(codes.InvalidArgument, "missing required pod_sandbox_id")
240	}
241	token, err := s.cache.Insert(req)
242	if err != nil {
243		return nil, err
244	}
245	return &runtimeapi.PortForwardResponse{
246		Url: s.buildURL("portforward", token),
247	}, nil
248}
249
250func (s *server) Start(stayUp bool) error {
251	if !stayUp {
252		// TODO(tallclair): Implement this.
253		return errors.New("stayUp=false is not yet implemented")
254	}
255
256	listener, err := net.Listen("tcp", s.config.Addr)
257	if err != nil {
258		return err
259	}
260	// Use the actual address as baseURL host. This handles the "0" port case.
261	s.config.BaseURL.Host = listener.Addr().String()
262	if s.config.TLSConfig != nil {
263		return s.server.ServeTLS(listener, "", "") // Use certs from TLSConfig.
264	}
265	return s.server.Serve(listener)
266}
267
268func (s *server) Stop() error {
269	return s.server.Close()
270}
271
272func (s *server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
273	s.handler.ServeHTTP(w, r)
274}
275
276func (s *server) buildURL(method, token string) string {
277	return s.config.BaseURL.ResolveReference(&url.URL{
278		Path: path.Join(method, token),
279	}).String()
280}
281
282func (s *server) serveExec(req *restful.Request, resp *restful.Response) {
283	token := req.PathParameter("token")
284	cachedRequest, ok := s.cache.Consume(token)
285	if !ok {
286		http.NotFound(resp.ResponseWriter, req.Request)
287		return
288	}
289	exec, ok := cachedRequest.(*runtimeapi.ExecRequest)
290	if !ok {
291		http.NotFound(resp.ResponseWriter, req.Request)
292		return
293	}
294
295	streamOpts := &remotecommandserver.Options{
296		Stdin:  exec.Stdin,
297		Stdout: exec.Stdout,
298		Stderr: exec.Stderr,
299		TTY:    exec.Tty,
300	}
301
302	remotecommandserver.ServeExec(
303		resp.ResponseWriter,
304		req.Request,
305		s.runtime,
306		"", // unused: podName
307		"", // unusued: podUID
308		exec.ContainerId,
309		exec.Cmd,
310		streamOpts,
311		s.config.StreamIdleTimeout,
312		s.config.StreamCreationTimeout,
313		s.config.SupportedRemoteCommandProtocols)
314}
315
316func (s *server) serveAttach(req *restful.Request, resp *restful.Response) {
317	token := req.PathParameter("token")
318	cachedRequest, ok := s.cache.Consume(token)
319	if !ok {
320		http.NotFound(resp.ResponseWriter, req.Request)
321		return
322	}
323	attach, ok := cachedRequest.(*runtimeapi.AttachRequest)
324	if !ok {
325		http.NotFound(resp.ResponseWriter, req.Request)
326		return
327	}
328
329	streamOpts := &remotecommandserver.Options{
330		Stdin:  attach.Stdin,
331		Stdout: attach.Stdout,
332		Stderr: attach.Stderr,
333		TTY:    attach.Tty,
334	}
335	remotecommandserver.ServeAttach(
336		resp.ResponseWriter,
337		req.Request,
338		s.runtime,
339		"", // unused: podName
340		"", // unusued: podUID
341		attach.ContainerId,
342		streamOpts,
343		s.config.StreamIdleTimeout,
344		s.config.StreamCreationTimeout,
345		s.config.SupportedRemoteCommandProtocols)
346}
347
348func (s *server) servePortForward(req *restful.Request, resp *restful.Response) {
349	token := req.PathParameter("token")
350	cachedRequest, ok := s.cache.Consume(token)
351	if !ok {
352		http.NotFound(resp.ResponseWriter, req.Request)
353		return
354	}
355	pf, ok := cachedRequest.(*runtimeapi.PortForwardRequest)
356	if !ok {
357		http.NotFound(resp.ResponseWriter, req.Request)
358		return
359	}
360
361	portForwardOptions, err := portforward.BuildV4Options(pf.Port)
362	if err != nil {
363		resp.WriteError(http.StatusBadRequest, err)
364		return
365	}
366
367	portforward.ServePortForward(
368		resp.ResponseWriter,
369		req.Request,
370		s.runtime,
371		pf.PodSandboxId,
372		"", // unused: podUID
373		portForwardOptions,
374		s.config.StreamIdleTimeout,
375		s.config.StreamCreationTimeout,
376		s.config.SupportedPortForwardProtocols)
377}
378
379// criAdapter wraps the Runtime functions to conform to the remotecommand interfaces.
380// The adapter binds the container ID to the container name argument, and the pod sandbox ID to the pod name.
381type criAdapter struct {
382	Runtime
383}
384
385var _ remotecommandserver.Executor = &criAdapter{}
386var _ remotecommandserver.Attacher = &criAdapter{}
387var _ portforward.PortForwarder = &criAdapter{}
388
389func (a *criAdapter) ExecInContainer(podName string, podUID types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error {
390	return a.Runtime.Exec(container, cmd, in, out, err, tty, resize)
391}
392
393func (a *criAdapter) AttachContainer(podName string, podUID types.UID, container string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize) error {
394	return a.Runtime.Attach(container, in, out, err, tty, resize)
395}
396
397func (a *criAdapter) PortForward(podName string, podUID types.UID, port int32, stream io.ReadWriteCloser) error {
398	return a.Runtime.PortForward(podName, port, stream)
399}
400