1/* 2 Copyright The containerd Authors. 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15*/ 16 17package v2 18 19import ( 20 "context" 21 "io" 22 "io/ioutil" 23 "os" 24 "path/filepath" 25 "time" 26 27 eventstypes "github.com/containerd/containerd/api/events" 28 "github.com/containerd/containerd/api/types" 29 tasktypes "github.com/containerd/containerd/api/types/task" 30 "github.com/containerd/containerd/errdefs" 31 "github.com/containerd/containerd/events/exchange" 32 "github.com/containerd/containerd/identifiers" 33 "github.com/containerd/containerd/log" 34 "github.com/containerd/containerd/namespaces" 35 "github.com/containerd/containerd/pkg/timeout" 36 "github.com/containerd/containerd/runtime" 37 client "github.com/containerd/containerd/runtime/v2/shim" 38 "github.com/containerd/containerd/runtime/v2/task" 39 "github.com/containerd/ttrpc" 40 ptypes "github.com/gogo/protobuf/types" 41 "github.com/pkg/errors" 42 "github.com/sirupsen/logrus" 43) 44 45const ( 46 loadTimeout = "io.containerd.timeout.shim.load" 47 cleanupTimeout = "io.containerd.timeout.shim.cleanup" 48 shutdownTimeout = "io.containerd.timeout.shim.shutdown" 49) 50 51func init() { 52 timeout.Set(loadTimeout, 5*time.Second) 53 timeout.Set(cleanupTimeout, 5*time.Second) 54 timeout.Set(shutdownTimeout, 3*time.Second) 55} 56 57func loadAddress(path string) (string, error) { 58 data, err := ioutil.ReadFile(path) 59 if err != nil { 60 return "", err 61 } 62 return string(data), nil 63} 64 65func loadShim(ctx context.Context, bundle *Bundle, events *exchange.Exchange, rt *runtime.TaskList, onClose func()) (_ *shim, err error) { 66 address, err := loadAddress(filepath.Join(bundle.Path, "address")) 67 if err != nil { 68 return nil, err 69 } 70 conn, err := client.Connect(address, client.AnonReconnectDialer) 71 if err != nil { 72 return nil, err 73 } 74 defer func() { 75 if err != nil { 76 conn.Close() 77 } 78 }() 79 f, err := openShimLog(ctx, bundle, client.AnonReconnectDialer) 80 if err != nil { 81 return nil, errors.Wrap(err, "open shim log pipe") 82 } 83 defer func() { 84 if err != nil { 85 f.Close() 86 } 87 }() 88 // open the log pipe and block until the writer is ready 89 // this helps with synchronization of the shim 90 // copy the shim's logs to containerd's output 91 go func() { 92 defer f.Close() 93 if _, err := io.Copy(os.Stderr, f); err != nil { 94 // When using a multi-container shim the 2nd to Nth container in the 95 // shim will not have a separate log pipe. Ignore the failure log 96 // message here when the shim connect times out. 97 if !errors.Is(err, os.ErrNotExist) { 98 log.G(ctx).WithError(err).Error("copy shim log") 99 } 100 } 101 }() 102 103 client := ttrpc.NewClient(conn, ttrpc.WithOnClose(onClose)) 104 defer func() { 105 if err != nil { 106 client.Close() 107 } 108 }() 109 s := &shim{ 110 client: client, 111 task: task.NewTaskClient(client), 112 bundle: bundle, 113 events: events, 114 rtTasks: rt, 115 } 116 ctx, cancel := timeout.WithContext(ctx, loadTimeout) 117 defer cancel() 118 if err := s.Connect(ctx); err != nil { 119 return nil, err 120 } 121 return s, nil 122} 123 124func cleanupAfterDeadShim(ctx context.Context, id, ns string, events *exchange.Exchange, binaryCall *binary) { 125 ctx = namespaces.WithNamespace(ctx, ns) 126 ctx, cancel := timeout.WithContext(ctx, cleanupTimeout) 127 defer cancel() 128 129 log.G(ctx).WithFields(logrus.Fields{ 130 "id": id, 131 "namespace": ns, 132 }).Warn("cleaning up after shim disconnected") 133 response, err := binaryCall.Delete(ctx) 134 if err != nil { 135 log.G(ctx).WithError(err).WithFields(logrus.Fields{ 136 "id": id, 137 "namespace": ns, 138 }).Warn("failed to clean up after shim disconnected") 139 } 140 141 var ( 142 pid uint32 143 exitStatus uint32 144 exitedAt time.Time 145 ) 146 if response != nil { 147 pid = response.Pid 148 exitStatus = response.Status 149 exitedAt = response.Timestamp 150 } else { 151 exitStatus = 255 152 exitedAt = time.Now() 153 } 154 events.Publish(ctx, runtime.TaskExitEventTopic, &eventstypes.TaskExit{ 155 ContainerID: id, 156 ID: id, 157 Pid: pid, 158 ExitStatus: exitStatus, 159 ExitedAt: exitedAt, 160 }) 161 162 events.Publish(ctx, runtime.TaskDeleteEventTopic, &eventstypes.TaskDelete{ 163 ContainerID: id, 164 Pid: pid, 165 ExitStatus: exitStatus, 166 ExitedAt: exitedAt, 167 }) 168} 169 170type shim struct { 171 bundle *Bundle 172 client *ttrpc.Client 173 task task.TaskService 174 taskPid int 175 events *exchange.Exchange 176 rtTasks *runtime.TaskList 177} 178 179func (s *shim) Connect(ctx context.Context) error { 180 response, err := s.task.Connect(ctx, &task.ConnectRequest{ 181 ID: s.ID(), 182 }) 183 if err != nil { 184 return err 185 } 186 s.taskPid = int(response.TaskPid) 187 return nil 188} 189 190func (s *shim) Shutdown(ctx context.Context) error { 191 _, err := s.task.Shutdown(ctx, &task.ShutdownRequest{ 192 ID: s.ID(), 193 }) 194 if err != nil && !errors.Is(err, ttrpc.ErrClosed) { 195 return errdefs.FromGRPC(err) 196 } 197 return nil 198} 199 200func (s *shim) waitShutdown(ctx context.Context) error { 201 ctx, cancel := timeout.WithContext(ctx, shutdownTimeout) 202 defer cancel() 203 return s.Shutdown(ctx) 204} 205 206// ID of the shim/task 207func (s *shim) ID() string { 208 return s.bundle.ID 209} 210 211// PID of the task 212func (s *shim) PID() uint32 { 213 return uint32(s.taskPid) 214} 215 216func (s *shim) Namespace() string { 217 return s.bundle.Namespace 218} 219 220func (s *shim) Close() error { 221 return s.client.Close() 222} 223 224func (s *shim) Delete(ctx context.Context) (*runtime.Exit, error) { 225 response, shimErr := s.task.Delete(ctx, &task.DeleteRequest{ 226 ID: s.ID(), 227 }) 228 if shimErr != nil { 229 log.G(ctx).WithField("id", s.ID()).WithError(shimErr).Debug("failed to delete task") 230 if !errors.Is(shimErr, ttrpc.ErrClosed) { 231 shimErr = errdefs.FromGRPC(shimErr) 232 if !errdefs.IsNotFound(shimErr) { 233 return nil, shimErr 234 } 235 } 236 } 237 // remove self from the runtime task list 238 // this seems dirty but it cleans up the API across runtimes, tasks, and the service 239 s.rtTasks.Delete(ctx, s.ID()) 240 if err := s.waitShutdown(ctx); err != nil { 241 log.G(ctx).WithField("id", s.ID()).WithError(err).Error("failed to shutdown shim") 242 } 243 s.Close() 244 if err := s.bundle.Delete(); err != nil { 245 log.G(ctx).WithField("id", s.ID()).WithError(err).Error("failed to delete bundle") 246 } 247 if shimErr != nil { 248 return nil, shimErr 249 } 250 return &runtime.Exit{ 251 Status: response.ExitStatus, 252 Timestamp: response.ExitedAt, 253 Pid: response.Pid, 254 }, nil 255} 256 257func (s *shim) Create(ctx context.Context, opts runtime.CreateOpts) (runtime.Task, error) { 258 topts := opts.TaskOptions 259 if topts == nil { 260 topts = opts.RuntimeOptions 261 } 262 request := &task.CreateTaskRequest{ 263 ID: s.ID(), 264 Bundle: s.bundle.Path, 265 Stdin: opts.IO.Stdin, 266 Stdout: opts.IO.Stdout, 267 Stderr: opts.IO.Stderr, 268 Terminal: opts.IO.Terminal, 269 Checkpoint: opts.Checkpoint, 270 Options: topts, 271 } 272 for _, m := range opts.Rootfs { 273 request.Rootfs = append(request.Rootfs, &types.Mount{ 274 Type: m.Type, 275 Source: m.Source, 276 Options: m.Options, 277 }) 278 } 279 response, err := s.task.Create(ctx, request) 280 if err != nil { 281 return nil, errdefs.FromGRPC(err) 282 } 283 s.taskPid = int(response.Pid) 284 return s, nil 285} 286 287func (s *shim) Pause(ctx context.Context) error { 288 if _, err := s.task.Pause(ctx, &task.PauseRequest{ 289 ID: s.ID(), 290 }); err != nil { 291 return errdefs.FromGRPC(err) 292 } 293 return nil 294} 295 296func (s *shim) Resume(ctx context.Context) error { 297 if _, err := s.task.Resume(ctx, &task.ResumeRequest{ 298 ID: s.ID(), 299 }); err != nil { 300 return errdefs.FromGRPC(err) 301 } 302 return nil 303} 304 305func (s *shim) Start(ctx context.Context) error { 306 response, err := s.task.Start(ctx, &task.StartRequest{ 307 ID: s.ID(), 308 }) 309 if err != nil { 310 return errdefs.FromGRPC(err) 311 } 312 s.taskPid = int(response.Pid) 313 return nil 314} 315 316func (s *shim) Kill(ctx context.Context, signal uint32, all bool) error { 317 if _, err := s.task.Kill(ctx, &task.KillRequest{ 318 ID: s.ID(), 319 Signal: signal, 320 All: all, 321 }); err != nil { 322 return errdefs.FromGRPC(err) 323 } 324 return nil 325} 326 327func (s *shim) Exec(ctx context.Context, id string, opts runtime.ExecOpts) (runtime.Process, error) { 328 if err := identifiers.Validate(id); err != nil { 329 return nil, errors.Wrapf(err, "invalid exec id %s", id) 330 } 331 request := &task.ExecProcessRequest{ 332 ID: s.ID(), 333 ExecID: id, 334 Stdin: opts.IO.Stdin, 335 Stdout: opts.IO.Stdout, 336 Stderr: opts.IO.Stderr, 337 Terminal: opts.IO.Terminal, 338 Spec: opts.Spec, 339 } 340 if _, err := s.task.Exec(ctx, request); err != nil { 341 return nil, errdefs.FromGRPC(err) 342 } 343 return &process{ 344 id: id, 345 shim: s, 346 }, nil 347} 348 349func (s *shim) Pids(ctx context.Context) ([]runtime.ProcessInfo, error) { 350 resp, err := s.task.Pids(ctx, &task.PidsRequest{ 351 ID: s.ID(), 352 }) 353 if err != nil { 354 return nil, errdefs.FromGRPC(err) 355 } 356 var processList []runtime.ProcessInfo 357 for _, p := range resp.Processes { 358 processList = append(processList, runtime.ProcessInfo{ 359 Pid: p.Pid, 360 Info: p.Info, 361 }) 362 } 363 return processList, nil 364} 365 366func (s *shim) ResizePty(ctx context.Context, size runtime.ConsoleSize) error { 367 _, err := s.task.ResizePty(ctx, &task.ResizePtyRequest{ 368 ID: s.ID(), 369 Width: size.Width, 370 Height: size.Height, 371 }) 372 if err != nil { 373 return errdefs.FromGRPC(err) 374 } 375 return nil 376} 377 378func (s *shim) CloseIO(ctx context.Context) error { 379 _, err := s.task.CloseIO(ctx, &task.CloseIORequest{ 380 ID: s.ID(), 381 Stdin: true, 382 }) 383 if err != nil { 384 return errdefs.FromGRPC(err) 385 } 386 return nil 387} 388 389func (s *shim) Wait(ctx context.Context) (*runtime.Exit, error) { 390 response, err := s.task.Wait(ctx, &task.WaitRequest{ 391 ID: s.ID(), 392 }) 393 if err != nil { 394 return nil, errdefs.FromGRPC(err) 395 } 396 return &runtime.Exit{ 397 Pid: uint32(s.taskPid), 398 Timestamp: response.ExitedAt, 399 Status: response.ExitStatus, 400 }, nil 401} 402 403func (s *shim) Checkpoint(ctx context.Context, path string, options *ptypes.Any) error { 404 request := &task.CheckpointTaskRequest{ 405 ID: s.ID(), 406 Path: path, 407 Options: options, 408 } 409 if _, err := s.task.Checkpoint(ctx, request); err != nil { 410 return errdefs.FromGRPC(err) 411 } 412 return nil 413} 414 415func (s *shim) Update(ctx context.Context, resources *ptypes.Any) error { 416 if _, err := s.task.Update(ctx, &task.UpdateTaskRequest{ 417 ID: s.ID(), 418 Resources: resources, 419 }); err != nil { 420 return errdefs.FromGRPC(err) 421 } 422 return nil 423} 424 425func (s *shim) Stats(ctx context.Context) (*ptypes.Any, error) { 426 response, err := s.task.Stats(ctx, &task.StatsRequest{ 427 ID: s.ID(), 428 }) 429 if err != nil { 430 return nil, errdefs.FromGRPC(err) 431 } 432 return response.Stats, nil 433} 434 435func (s *shim) Process(ctx context.Context, id string) (runtime.Process, error) { 436 p := &process{ 437 id: id, 438 shim: s, 439 } 440 if _, err := p.State(ctx); err != nil { 441 return nil, err 442 } 443 return p, nil 444} 445 446func (s *shim) State(ctx context.Context) (runtime.State, error) { 447 response, err := s.task.State(ctx, &task.StateRequest{ 448 ID: s.ID(), 449 }) 450 if err != nil { 451 if !errors.Is(err, ttrpc.ErrClosed) { 452 return runtime.State{}, errdefs.FromGRPC(err) 453 } 454 return runtime.State{}, errdefs.ErrNotFound 455 } 456 var status runtime.Status 457 switch response.Status { 458 case tasktypes.StatusCreated: 459 status = runtime.CreatedStatus 460 case tasktypes.StatusRunning: 461 status = runtime.RunningStatus 462 case tasktypes.StatusStopped: 463 status = runtime.StoppedStatus 464 case tasktypes.StatusPaused: 465 status = runtime.PausedStatus 466 case tasktypes.StatusPausing: 467 status = runtime.PausingStatus 468 } 469 return runtime.State{ 470 Pid: response.Pid, 471 Status: status, 472 Stdin: response.Stdin, 473 Stdout: response.Stdout, 474 Stderr: response.Stderr, 475 Terminal: response.Terminal, 476 ExitStatus: response.ExitStatus, 477 ExitedAt: response.ExitedAt, 478 }, nil 479} 480