1// +build linux
2
3/*
4   Copyright The containerd Authors.
5
6   Licensed under the Apache License, Version 2.0 (the "License");
7   you may not use this file except in compliance with the License.
8   You may obtain a copy of the License at
9
10       http://www.apache.org/licenses/LICENSE-2.0
11
12   Unless required by applicable law or agreed to in writing, software
13   distributed under the License is distributed on an "AS IS" BASIS,
14   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   See the License for the specific language governing permissions and
16   limitations under the License.
17*/
18
19package v2
20
21import (
22	"context"
23
24	cgroupsv2 "github.com/containerd/cgroups/v2"
25	eventstypes "github.com/containerd/containerd/api/events"
26	"github.com/containerd/containerd/pkg/oom"
27	"github.com/containerd/containerd/runtime"
28	"github.com/containerd/containerd/runtime/v2/shim"
29	"github.com/pkg/errors"
30	"github.com/sirupsen/logrus"
31)
32
33// New returns an implementation that listens to OOM events
34// from a container's cgroups.
35func New(publisher shim.Publisher) (oom.Watcher, error) {
36	return &watcher{
37		itemCh:    make(chan item),
38		publisher: publisher,
39	}, nil
40}
41
42// watcher implementation for handling OOM events from a container's cgroup
43type watcher struct {
44	itemCh    chan item
45	publisher shim.Publisher
46}
47
48type item struct {
49	id  string
50	ev  cgroupsv2.Event
51	err error
52}
53
54// Close closes the watcher
55func (w *watcher) Close() error {
56	return nil
57}
58
59// Run the loop
60func (w *watcher) Run(ctx context.Context) {
61	lastOOMMap := make(map[string]uint64) // key: id, value: ev.OOM
62	for {
63		select {
64		case <-ctx.Done():
65			w.Close()
66			return
67		case i := <-w.itemCh:
68			if i.err != nil {
69				delete(lastOOMMap, i.id)
70				continue
71			}
72			lastOOM := lastOOMMap[i.id]
73			if i.ev.OOM > lastOOM {
74				if err := w.publisher.Publish(ctx, runtime.TaskOOMEventTopic, &eventstypes.TaskOOM{
75					ContainerID: i.id,
76				}); err != nil {
77					logrus.WithError(err).Error("publish OOM event")
78				}
79			}
80			if i.ev.OOM > 0 {
81				lastOOMMap[i.id] = i.ev.OOM
82			}
83		}
84	}
85}
86
87// Add cgroups.Cgroup to the epoll monitor
88func (w *watcher) Add(id string, cgx interface{}) error {
89	cg, ok := cgx.(*cgroupsv2.Manager)
90	if !ok {
91		return errors.Errorf("expected *cgroupsv2.Manager, got: %T", cgx)
92	}
93	// FIXME: cgroupsv2.Manager does not support closing eventCh routine currently.
94	// The routine shuts down when an error happens, mostly when the cgroup is deleted.
95	eventCh, errCh := cg.EventChan()
96	go func() {
97		for {
98			i := item{id: id}
99			select {
100			case ev := <-eventCh:
101				i.ev = ev
102				w.itemCh <- i
103			case err := <-errCh:
104				i.err = err
105				w.itemCh <- i
106				// we no longer get any event/err when we got an err
107				logrus.WithError(err).Warn("error from *cgroupsv2.Manager.EventChan")
108				return
109			}
110		}
111	}()
112	return nil
113}
114