1/* 2 Copyright The containerd Authors. 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15*/ 16 17/* 18Copyright 2016 The Kubernetes Authors. 19 20Licensed under the Apache License, Version 2.0 (the "License"); 21you may not use this file except in compliance with the License. 22You may obtain a copy of the License at 23 24 http://www.apache.org/licenses/LICENSE-2.0 25 26Unless required by applicable law or agreed to in writing, software 27distributed under the License is distributed on an "AS IS" BASIS, 28WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 29See the License for the specific language governing permissions and 30limitations under the License. 31*/ 32 33package streaming 34 35import ( 36 "crypto/tls" 37 "errors" 38 "io" 39 "net" 40 "net/http" 41 "net/url" 42 "path" 43 "time" 44 45 "google.golang.org/grpc/codes" 46 "google.golang.org/grpc/status" 47 48 restful "github.com/emicklei/go-restful" 49 50 "k8s.io/apimachinery/pkg/types" 51 remotecommandconsts "k8s.io/apimachinery/pkg/util/remotecommand" 52 "k8s.io/client-go/tools/remotecommand" 53 runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1alpha2" 54 55 "github.com/containerd/containerd/pkg/cri/streaming/portforward" 56 remotecommandserver "github.com/containerd/containerd/pkg/cri/streaming/remotecommand" 57) 58 59// Server is the library interface to serve the stream requests. 60type Server interface { 61 http.Handler 62 63 // Get the serving URL for the requests. 64 // Requests must not be nil. Responses may be nil iff an error is returned. 65 GetExec(*runtimeapi.ExecRequest) (*runtimeapi.ExecResponse, error) 66 GetAttach(req *runtimeapi.AttachRequest) (*runtimeapi.AttachResponse, error) 67 GetPortForward(*runtimeapi.PortForwardRequest) (*runtimeapi.PortForwardResponse, error) 68 69 // Start the server. 70 // addr is the address to serve on (address:port) stayUp indicates whether the server should 71 // listen until Stop() is called, or automatically stop after all expected connections are 72 // closed. Calling Get{Exec,Attach,PortForward} increments the expected connection count. 73 // Function does not return until the server is stopped. 74 Start(stayUp bool) error 75 // Stop the server, and terminate any open connections. 76 Stop() error 77} 78 79// Runtime is the interface to execute the commands and provide the streams. 80type Runtime interface { 81 Exec(containerID string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize) error 82 Attach(containerID string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize) error 83 PortForward(podSandboxID string, port int32, stream io.ReadWriteCloser) error 84} 85 86// Config defines the options used for running the stream server. 87type Config struct { 88 // The host:port address the server will listen on. 89 Addr string 90 // The optional base URL for constructing streaming URLs. If empty, the baseURL will be 91 // constructed from the serve address. 92 // Note that for port "0", the URL port will be set to actual port in use. 93 BaseURL *url.URL 94 95 // How long to leave idle connections open for. 96 StreamIdleTimeout time.Duration 97 // How long to wait for clients to create streams. Only used for SPDY streaming. 98 StreamCreationTimeout time.Duration 99 100 // The streaming protocols the server supports (understands and permits). See 101 // k8s.io/kubernetes/pkg/kubelet/server/remotecommand/constants.go for available protocols. 102 // Only used for SPDY streaming. 103 SupportedRemoteCommandProtocols []string 104 105 // The streaming protocols the server supports (understands and permits). See 106 // k8s.io/kubernetes/pkg/kubelet/server/portforward/constants.go for available protocols. 107 // Only used for SPDY streaming. 108 SupportedPortForwardProtocols []string 109 110 // The config for serving over TLS. If nil, TLS will not be used. 111 TLSConfig *tls.Config 112} 113 114// DefaultConfig provides default values for server Config. The DefaultConfig is partial, so 115// some fields like Addr must still be provided. 116var DefaultConfig = Config{ 117 StreamIdleTimeout: 4 * time.Hour, 118 StreamCreationTimeout: remotecommandconsts.DefaultStreamCreationTimeout, 119 SupportedRemoteCommandProtocols: remotecommandconsts.SupportedStreamingProtocols, 120 SupportedPortForwardProtocols: portforward.SupportedProtocols, 121} 122 123// NewServer creates a new Server for stream requests. 124// TODO(tallclair): Add auth(n/z) interface & handling. 125func NewServer(config Config, runtime Runtime) (Server, error) { 126 s := &server{ 127 config: config, 128 runtime: &criAdapter{runtime}, 129 cache: newRequestCache(), 130 } 131 132 if s.config.BaseURL == nil { 133 s.config.BaseURL = &url.URL{ 134 Scheme: "http", 135 Host: s.config.Addr, 136 } 137 if s.config.TLSConfig != nil { 138 s.config.BaseURL.Scheme = "https" 139 } 140 } 141 142 ws := &restful.WebService{} 143 endpoints := []struct { 144 path string 145 handler restful.RouteFunction 146 }{ 147 {"/exec/{token}", s.serveExec}, 148 {"/attach/{token}", s.serveAttach}, 149 {"/portforward/{token}", s.servePortForward}, 150 } 151 // If serving relative to a base path, set that here. 152 pathPrefix := path.Dir(s.config.BaseURL.Path) 153 for _, e := range endpoints { 154 for _, method := range []string{"GET", "POST"} { 155 ws.Route(ws. 156 Method(method). 157 Path(path.Join(pathPrefix, e.path)). 158 To(e.handler)) 159 } 160 } 161 handler := restful.NewContainer() 162 handler.Add(ws) 163 s.handler = handler 164 s.server = &http.Server{ 165 Addr: s.config.Addr, 166 Handler: s.handler, 167 TLSConfig: s.config.TLSConfig, 168 } 169 170 return s, nil 171} 172 173type server struct { 174 config Config 175 runtime *criAdapter 176 handler http.Handler 177 cache *requestCache 178 server *http.Server 179} 180 181func validateExecRequest(req *runtimeapi.ExecRequest) error { 182 if req.ContainerId == "" { 183 return status.Errorf(codes.InvalidArgument, "missing required container_id") 184 } 185 if req.Tty && req.Stderr { 186 // If TTY is set, stderr cannot be true because multiplexing is not 187 // supported. 188 return status.Errorf(codes.InvalidArgument, "tty and stderr cannot both be true") 189 } 190 if !req.Stdin && !req.Stdout && !req.Stderr { 191 return status.Errorf(codes.InvalidArgument, "one of stdin, stdout, or stderr must be set") 192 } 193 return nil 194} 195 196func (s *server) GetExec(req *runtimeapi.ExecRequest) (*runtimeapi.ExecResponse, error) { 197 if err := validateExecRequest(req); err != nil { 198 return nil, err 199 } 200 token, err := s.cache.Insert(req) 201 if err != nil { 202 return nil, err 203 } 204 return &runtimeapi.ExecResponse{ 205 Url: s.buildURL("exec", token), 206 }, nil 207} 208 209func validateAttachRequest(req *runtimeapi.AttachRequest) error { 210 if req.ContainerId == "" { 211 return status.Errorf(codes.InvalidArgument, "missing required container_id") 212 } 213 if req.Tty && req.Stderr { 214 // If TTY is set, stderr cannot be true because multiplexing is not 215 // supported. 216 return status.Errorf(codes.InvalidArgument, "tty and stderr cannot both be true") 217 } 218 if !req.Stdin && !req.Stdout && !req.Stderr { 219 return status.Errorf(codes.InvalidArgument, "one of stdin, stdout, and stderr must be set") 220 } 221 return nil 222} 223 224func (s *server) GetAttach(req *runtimeapi.AttachRequest) (*runtimeapi.AttachResponse, error) { 225 if err := validateAttachRequest(req); err != nil { 226 return nil, err 227 } 228 token, err := s.cache.Insert(req) 229 if err != nil { 230 return nil, err 231 } 232 return &runtimeapi.AttachResponse{ 233 Url: s.buildURL("attach", token), 234 }, nil 235} 236 237func (s *server) GetPortForward(req *runtimeapi.PortForwardRequest) (*runtimeapi.PortForwardResponse, error) { 238 if req.PodSandboxId == "" { 239 return nil, status.Errorf(codes.InvalidArgument, "missing required pod_sandbox_id") 240 } 241 token, err := s.cache.Insert(req) 242 if err != nil { 243 return nil, err 244 } 245 return &runtimeapi.PortForwardResponse{ 246 Url: s.buildURL("portforward", token), 247 }, nil 248} 249 250func (s *server) Start(stayUp bool) error { 251 if !stayUp { 252 // TODO(tallclair): Implement this. 253 return errors.New("stayUp=false is not yet implemented") 254 } 255 256 listener, err := net.Listen("tcp", s.config.Addr) 257 if err != nil { 258 return err 259 } 260 // Use the actual address as baseURL host. This handles the "0" port case. 261 s.config.BaseURL.Host = listener.Addr().String() 262 if s.config.TLSConfig != nil { 263 return s.server.ServeTLS(listener, "", "") // Use certs from TLSConfig. 264 } 265 return s.server.Serve(listener) 266} 267 268func (s *server) Stop() error { 269 return s.server.Close() 270} 271 272func (s *server) ServeHTTP(w http.ResponseWriter, r *http.Request) { 273 s.handler.ServeHTTP(w, r) 274} 275 276func (s *server) buildURL(method, token string) string { 277 return s.config.BaseURL.ResolveReference(&url.URL{ 278 Path: path.Join(method, token), 279 }).String() 280} 281 282func (s *server) serveExec(req *restful.Request, resp *restful.Response) { 283 token := req.PathParameter("token") 284 cachedRequest, ok := s.cache.Consume(token) 285 if !ok { 286 http.NotFound(resp.ResponseWriter, req.Request) 287 return 288 } 289 exec, ok := cachedRequest.(*runtimeapi.ExecRequest) 290 if !ok { 291 http.NotFound(resp.ResponseWriter, req.Request) 292 return 293 } 294 295 streamOpts := &remotecommandserver.Options{ 296 Stdin: exec.Stdin, 297 Stdout: exec.Stdout, 298 Stderr: exec.Stderr, 299 TTY: exec.Tty, 300 } 301 302 remotecommandserver.ServeExec( 303 resp.ResponseWriter, 304 req.Request, 305 s.runtime, 306 "", // unused: podName 307 "", // unusued: podUID 308 exec.ContainerId, 309 exec.Cmd, 310 streamOpts, 311 s.config.StreamIdleTimeout, 312 s.config.StreamCreationTimeout, 313 s.config.SupportedRemoteCommandProtocols) 314} 315 316func (s *server) serveAttach(req *restful.Request, resp *restful.Response) { 317 token := req.PathParameter("token") 318 cachedRequest, ok := s.cache.Consume(token) 319 if !ok { 320 http.NotFound(resp.ResponseWriter, req.Request) 321 return 322 } 323 attach, ok := cachedRequest.(*runtimeapi.AttachRequest) 324 if !ok { 325 http.NotFound(resp.ResponseWriter, req.Request) 326 return 327 } 328 329 streamOpts := &remotecommandserver.Options{ 330 Stdin: attach.Stdin, 331 Stdout: attach.Stdout, 332 Stderr: attach.Stderr, 333 TTY: attach.Tty, 334 } 335 remotecommandserver.ServeAttach( 336 resp.ResponseWriter, 337 req.Request, 338 s.runtime, 339 "", // unused: podName 340 "", // unusued: podUID 341 attach.ContainerId, 342 streamOpts, 343 s.config.StreamIdleTimeout, 344 s.config.StreamCreationTimeout, 345 s.config.SupportedRemoteCommandProtocols) 346} 347 348func (s *server) servePortForward(req *restful.Request, resp *restful.Response) { 349 token := req.PathParameter("token") 350 cachedRequest, ok := s.cache.Consume(token) 351 if !ok { 352 http.NotFound(resp.ResponseWriter, req.Request) 353 return 354 } 355 pf, ok := cachedRequest.(*runtimeapi.PortForwardRequest) 356 if !ok { 357 http.NotFound(resp.ResponseWriter, req.Request) 358 return 359 } 360 361 portForwardOptions, err := portforward.BuildV4Options(pf.Port) 362 if err != nil { 363 resp.WriteError(http.StatusBadRequest, err) 364 return 365 } 366 367 portforward.ServePortForward( 368 resp.ResponseWriter, 369 req.Request, 370 s.runtime, 371 pf.PodSandboxId, 372 "", // unused: podUID 373 portForwardOptions, 374 s.config.StreamIdleTimeout, 375 s.config.StreamCreationTimeout, 376 s.config.SupportedPortForwardProtocols) 377} 378 379// criAdapter wraps the Runtime functions to conform to the remotecommand interfaces. 380// The adapter binds the container ID to the container name argument, and the pod sandbox ID to the pod name. 381type criAdapter struct { 382 Runtime 383} 384 385var _ remotecommandserver.Executor = &criAdapter{} 386var _ remotecommandserver.Attacher = &criAdapter{} 387var _ portforward.PortForwarder = &criAdapter{} 388 389func (a *criAdapter) ExecInContainer(podName string, podUID types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error { 390 return a.Runtime.Exec(container, cmd, in, out, err, tty, resize) 391} 392 393func (a *criAdapter) AttachContainer(podName string, podUID types.UID, container string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize) error { 394 return a.Runtime.Attach(container, in, out, err, tty, resize) 395} 396 397func (a *criAdapter) PortForward(podName string, podUID types.UID, port int32, stream io.ReadWriteCloser) error { 398 return a.Runtime.PortForward(podName, port, stream) 399} 400