1/*
2Copyright 2018 The Kubernetes Authors.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package buffered
18
19import (
20	"fmt"
21	"sync"
22	"time"
23
24	"k8s.io/apimachinery/pkg/util/runtime"
25	"k8s.io/apimachinery/pkg/util/wait"
26	auditinternal "k8s.io/apiserver/pkg/apis/audit"
27	"k8s.io/apiserver/pkg/audit"
28	"k8s.io/client-go/util/flowcontrol"
29)
30
31// PluginName is the name reported in error metrics.
32const PluginName = "buffered"
33
34// BatchConfig represents batching delegate audit backend configuration.
35type BatchConfig struct {
36	// BufferSize defines a size of the buffering queue.
37	BufferSize int
38	// MaxBatchSize defines maximum size of a batch.
39	MaxBatchSize int
40	// MaxBatchWait indicates the maximum interval between two batches.
41	MaxBatchWait time.Duration
42
43	// ThrottleEnable defines whether throttling will be applied to the batching process.
44	ThrottleEnable bool
45	// ThrottleQPS defines the allowed rate of batches per second sent to the delegate backend.
46	ThrottleQPS float32
47	// ThrottleBurst defines the maximum number of requests sent to the delegate backend at the same moment in case
48	// the capacity defined by ThrottleQPS was not utilized.
49	ThrottleBurst int
50
51	// Whether the delegate backend should be called asynchronously.
52	AsyncDelegate bool
53}
54
55type bufferedBackend struct {
56	// The delegate backend that actually exports events.
57	delegateBackend audit.Backend
58
59	// Channel to buffer events before sending to the delegate backend.
60	buffer chan *auditinternal.Event
61	// Maximum number of events in a batch sent to the delegate backend.
62	maxBatchSize int
63	// Amount of time to wait after sending a batch to the delegate backend before sending another one.
64	//
65	// Receiving maxBatchSize events will always trigger sending a batch, regardless of the amount of time passed.
66	maxBatchWait time.Duration
67
68	// Whether the delegate backend should be called asynchronously.
69	asyncDelegate bool
70
71	// Channel to signal that the batching routine has processed all remaining events and exited.
72	// Once `shutdownCh` is closed no new events will be sent to the delegate backend.
73	shutdownCh chan struct{}
74
75	// WaitGroup to control the concurrency of sending batches to the delegate backend.
76	// Worker routine calls Add before sending a batch and
77	// then spawns a routine that calls Done after batch was processed by the delegate backend.
78	// This WaitGroup is used to wait for all sending routines to finish before shutting down audit backend.
79	wg sync.WaitGroup
80
81	// Limits the number of batches sent to the delegate backend per second.
82	throttle flowcontrol.RateLimiter
83}
84
85var _ audit.Backend = &bufferedBackend{}
86
87// NewBackend returns a buffered audit backend that wraps delegate backend.
88// Buffered backend automatically runs and shuts down the delegate backend.
89func NewBackend(delegate audit.Backend, config BatchConfig) audit.Backend {
90	var throttle flowcontrol.RateLimiter
91	if config.ThrottleEnable {
92		throttle = flowcontrol.NewTokenBucketRateLimiter(config.ThrottleQPS, config.ThrottleBurst)
93	}
94	return &bufferedBackend{
95		delegateBackend: delegate,
96		buffer:          make(chan *auditinternal.Event, config.BufferSize),
97		maxBatchSize:    config.MaxBatchSize,
98		maxBatchWait:    config.MaxBatchWait,
99		asyncDelegate:   config.AsyncDelegate,
100		shutdownCh:      make(chan struct{}),
101		wg:              sync.WaitGroup{},
102		throttle:        throttle,
103	}
104}
105
106func (b *bufferedBackend) Run(stopCh <-chan struct{}) error {
107	go func() {
108		// Signal that the working routine has exited.
109		defer close(b.shutdownCh)
110
111		b.processIncomingEvents(stopCh)
112
113		// Handle the events that were received after the last buffer
114		// scraping and before this line. Since the buffer is closed, no new
115		// events will come through.
116		allEventsProcessed := false
117		timer := make(chan time.Time)
118		for !allEventsProcessed {
119			allEventsProcessed = func() bool {
120				// Recover from any panic in order to try to process all remaining events.
121				// Note, that in case of a panic, the return value will be false and
122				// the loop execution will continue.
123				defer runtime.HandleCrash()
124
125				events := b.collectEvents(timer, wait.NeverStop)
126				b.processEvents(events)
127				return len(events) == 0
128			}()
129		}
130	}()
131	return b.delegateBackend.Run(stopCh)
132}
133
134// Shutdown blocks until stopCh passed to the Run method is closed and all
135// events added prior to that moment are batched and sent to the delegate backend.
136func (b *bufferedBackend) Shutdown() {
137	// Wait until the routine spawned in Run method exits.
138	<-b.shutdownCh
139
140	// Wait until all sending routines exit.
141	//
142	// - When b.shutdownCh is closed, we know that the goroutine in Run has terminated.
143	// - This means that processIncomingEvents has terminated.
144	// - Which means that b.buffer is closed and cannot accept any new events anymore.
145	// - Because processEvents is called synchronously from the Run goroutine, the waitgroup has its final value.
146	// Hence wg.Wait will not miss any more outgoing batches.
147	b.wg.Wait()
148
149	b.delegateBackend.Shutdown()
150}
151
152// processIncomingEvents runs a loop that collects events from the buffer. When
153// b.stopCh is closed, processIncomingEvents stops and closes the buffer.
154func (b *bufferedBackend) processIncomingEvents(stopCh <-chan struct{}) {
155	defer close(b.buffer)
156
157	var (
158		maxWaitChan  <-chan time.Time
159		maxWaitTimer *time.Timer
160	)
161	// Only use max wait batching if batching is enabled.
162	if b.maxBatchSize > 1 {
163		maxWaitTimer = time.NewTimer(b.maxBatchWait)
164		maxWaitChan = maxWaitTimer.C
165		defer maxWaitTimer.Stop()
166	}
167
168	for {
169		func() {
170			// Recover from any panics caused by this function so a panic in the
171			// goroutine can't bring down the main routine.
172			defer runtime.HandleCrash()
173
174			if b.maxBatchSize > 1 {
175				maxWaitTimer.Reset(b.maxBatchWait)
176			}
177			b.processEvents(b.collectEvents(maxWaitChan, stopCh))
178		}()
179
180		select {
181		case <-stopCh:
182			return
183		default:
184		}
185	}
186}
187
188// collectEvents attempts to collect some number of events in a batch.
189//
190// The following things can cause collectEvents to stop and return the list
191// of events:
192//
193//   * Maximum number of events for a batch.
194//   * Timer has passed.
195//   * Buffer channel is closed and empty.
196//   * stopCh is closed.
197func (b *bufferedBackend) collectEvents(timer <-chan time.Time, stopCh <-chan struct{}) []*auditinternal.Event {
198	var events []*auditinternal.Event
199
200L:
201	for i := 0; i < b.maxBatchSize; i++ {
202		select {
203		case ev, ok := <-b.buffer:
204			// Buffer channel was closed and no new events will follow.
205			if !ok {
206				break L
207			}
208			events = append(events, ev)
209		case <-timer:
210			// Timer has expired. Send currently accumulated batch.
211			break L
212		case <-stopCh:
213			// Backend has been stopped. Send currently accumulated batch.
214			break L
215		}
216	}
217
218	return events
219}
220
221// processEvents process the batch events in a goroutine using delegateBackend's ProcessEvents.
222func (b *bufferedBackend) processEvents(events []*auditinternal.Event) {
223	if len(events) == 0 {
224		return
225	}
226
227	// TODO(audit): Should control the number of active goroutines
228	// if one goroutine takes 5 seconds to finish, the number of goroutines can be 5 * defaultBatchThrottleQPS
229	if b.throttle != nil {
230		b.throttle.Accept()
231	}
232
233	if b.asyncDelegate {
234		b.wg.Add(1)
235		go func() {
236			defer b.wg.Done()
237			defer runtime.HandleCrash()
238
239			// Execute the real processing in a goroutine to keep it from blocking.
240			// This lets the batching routine continue draining the queue immediately.
241			b.delegateBackend.ProcessEvents(events...)
242		}()
243	} else {
244		func() {
245			defer runtime.HandleCrash()
246
247			// Execute the real processing in a goroutine to keep it from blocking.
248			// This lets the batching routine continue draining the queue immediately.
249			b.delegateBackend.ProcessEvents(events...)
250		}()
251	}
252}
253
254func (b *bufferedBackend) ProcessEvents(ev ...*auditinternal.Event) bool {
255	// The following mechanism is in place to support the situation when audit
256	// events are still coming after the backend was stopped.
257	var sendErr error
258	var evIndex int
259
260	// If the delegateBackend was shutdown and the buffer channel was closed, an
261	// attempt to add an event to it will result in panic that we should
262	// recover from.
263	defer func() {
264		if err := recover(); err != nil {
265			sendErr = fmt.Errorf("audit backend shut down")
266		}
267		if sendErr != nil {
268			audit.HandlePluginError(PluginName, sendErr, ev[evIndex:]...)
269		}
270	}()
271
272	for i, e := range ev {
273		evIndex = i
274		// Per the audit.Backend interface these events are reused after being
275		// sent to the Sink. Deep copy and send the copy to the queue.
276		event := e.DeepCopy()
277
278		select {
279		case b.buffer <- event:
280		default:
281			sendErr = fmt.Errorf("audit buffer queue blocked")
282			return true
283		}
284	}
285	return true
286}
287
288func (b *bufferedBackend) String() string {
289	return fmt.Sprintf("%s<%s>", PluginName, b.delegateBackend)
290}
291