1/*
2Copyright 2016 The Kubernetes Authors.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package workqueue
18
19import (
20	"container/heap"
21	"sync"
22	"time"
23
24	"k8s.io/apimachinery/pkg/util/clock"
25	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
26)
27
28// DelayingInterface is an Interface that can Add an item at a later time. This makes it easier to
29// requeue items after failures without ending up in a hot-loop.
30type DelayingInterface interface {
31	Interface
32	// AddAfter adds an item to the workqueue after the indicated duration has passed
33	AddAfter(item interface{}, duration time.Duration)
34}
35
36// NewDelayingQueue constructs a new workqueue with delayed queuing ability
37func NewDelayingQueue() DelayingInterface {
38	return NewDelayingQueueWithCustomClock(clock.RealClock{}, "")
39}
40
41// NewNamedDelayingQueue constructs a new named workqueue with delayed queuing ability
42func NewNamedDelayingQueue(name string) DelayingInterface {
43	return NewDelayingQueueWithCustomClock(clock.RealClock{}, name)
44}
45
46// NewDelayingQueueWithCustomClock constructs a new named workqueue
47// with ability to inject real or fake clock for testing purposes
48func NewDelayingQueueWithCustomClock(clock clock.Clock, name string) DelayingInterface {
49	ret := &delayingType{
50		Interface:       NewNamed(name),
51		clock:           clock,
52		heartbeat:       clock.NewTicker(maxWait),
53		stopCh:          make(chan struct{}),
54		waitingForAddCh: make(chan *waitFor, 1000),
55		metrics:         newRetryMetrics(name),
56	}
57
58	go ret.waitingLoop()
59
60	return ret
61}
62
63// delayingType wraps an Interface and provides delayed re-enquing
64type delayingType struct {
65	Interface
66
67	// clock tracks time for delayed firing
68	clock clock.Clock
69
70	// stopCh lets us signal a shutdown to the waiting loop
71	stopCh chan struct{}
72	// stopOnce guarantees we only signal shutdown a single time
73	stopOnce sync.Once
74
75	// heartbeat ensures we wait no more than maxWait before firing
76	heartbeat clock.Ticker
77
78	// waitingForAddCh is a buffered channel that feeds waitingForAdd
79	waitingForAddCh chan *waitFor
80
81	// metrics counts the number of retries
82	metrics retryMetrics
83}
84
85// waitFor holds the data to add and the time it should be added
86type waitFor struct {
87	data    t
88	readyAt time.Time
89	// index in the priority queue (heap)
90	index int
91}
92
93// waitForPriorityQueue implements a priority queue for waitFor items.
94//
95// waitForPriorityQueue implements heap.Interface. The item occurring next in
96// time (i.e., the item with the smallest readyAt) is at the root (index 0).
97// Peek returns this minimum item at index 0. Pop returns the minimum item after
98// it has been removed from the queue and placed at index Len()-1 by
99// container/heap. Push adds an item at index Len(), and container/heap
100// percolates it into the correct location.
101type waitForPriorityQueue []*waitFor
102
103func (pq waitForPriorityQueue) Len() int {
104	return len(pq)
105}
106func (pq waitForPriorityQueue) Less(i, j int) bool {
107	return pq[i].readyAt.Before(pq[j].readyAt)
108}
109func (pq waitForPriorityQueue) Swap(i, j int) {
110	pq[i], pq[j] = pq[j], pq[i]
111	pq[i].index = i
112	pq[j].index = j
113}
114
115// Push adds an item to the queue. Push should not be called directly; instead,
116// use `heap.Push`.
117func (pq *waitForPriorityQueue) Push(x interface{}) {
118	n := len(*pq)
119	item := x.(*waitFor)
120	item.index = n
121	*pq = append(*pq, item)
122}
123
124// Pop removes an item from the queue. Pop should not be called directly;
125// instead, use `heap.Pop`.
126func (pq *waitForPriorityQueue) Pop() interface{} {
127	n := len(*pq)
128	item := (*pq)[n-1]
129	item.index = -1
130	*pq = (*pq)[0:(n - 1)]
131	return item
132}
133
134// Peek returns the item at the beginning of the queue, without removing the
135// item or otherwise mutating the queue. It is safe to call directly.
136func (pq waitForPriorityQueue) Peek() interface{} {
137	return pq[0]
138}
139
140// ShutDown stops the queue. After the queue drains, the returned shutdown bool
141// on Get() will be true. This method may be invoked more than once.
142func (q *delayingType) ShutDown() {
143	q.stopOnce.Do(func() {
144		q.Interface.ShutDown()
145		close(q.stopCh)
146		q.heartbeat.Stop()
147	})
148}
149
150// AddAfter adds the given item to the work queue after the given delay
151func (q *delayingType) AddAfter(item interface{}, duration time.Duration) {
152	// don't add if we're already shutting down
153	if q.ShuttingDown() {
154		return
155	}
156
157	q.metrics.retry()
158
159	// immediately add things with no delay
160	if duration <= 0 {
161		q.Add(item)
162		return
163	}
164
165	select {
166	case <-q.stopCh:
167		// unblock if ShutDown() is called
168	case q.waitingForAddCh <- &waitFor{data: item, readyAt: q.clock.Now().Add(duration)}:
169	}
170}
171
172// maxWait keeps a max bound on the wait time. It's just insurance against weird things happening.
173// Checking the queue every 10 seconds isn't expensive and we know that we'll never end up with an
174// expired item sitting for more than 10 seconds.
175const maxWait = 10 * time.Second
176
177// waitingLoop runs until the workqueue is shutdown and keeps a check on the list of items to be added.
178func (q *delayingType) waitingLoop() {
179	defer utilruntime.HandleCrash()
180
181	// Make a placeholder channel to use when there are no items in our list
182	never := make(<-chan time.Time)
183
184	// Make a timer that expires when the item at the head of the waiting queue is ready
185	var nextReadyAtTimer clock.Timer
186
187	waitingForQueue := &waitForPriorityQueue{}
188	heap.Init(waitingForQueue)
189
190	waitingEntryByData := map[t]*waitFor{}
191
192	for {
193		if q.Interface.ShuttingDown() {
194			return
195		}
196
197		now := q.clock.Now()
198
199		// Add ready entries
200		for waitingForQueue.Len() > 0 {
201			entry := waitingForQueue.Peek().(*waitFor)
202			if entry.readyAt.After(now) {
203				break
204			}
205
206			entry = heap.Pop(waitingForQueue).(*waitFor)
207			q.Add(entry.data)
208			delete(waitingEntryByData, entry.data)
209		}
210
211		// Set up a wait for the first item's readyAt (if one exists)
212		nextReadyAt := never
213		if waitingForQueue.Len() > 0 {
214			if nextReadyAtTimer != nil {
215				nextReadyAtTimer.Stop()
216			}
217			entry := waitingForQueue.Peek().(*waitFor)
218			nextReadyAtTimer = q.clock.NewTimer(entry.readyAt.Sub(now))
219			nextReadyAt = nextReadyAtTimer.C()
220		}
221
222		select {
223		case <-q.stopCh:
224			return
225
226		case <-q.heartbeat.C():
227			// continue the loop, which will add ready items
228
229		case <-nextReadyAt:
230			// continue the loop, which will add ready items
231
232		case waitEntry := <-q.waitingForAddCh:
233			if waitEntry.readyAt.After(q.clock.Now()) {
234				insert(waitingForQueue, waitingEntryByData, waitEntry)
235			} else {
236				q.Add(waitEntry.data)
237			}
238
239			drained := false
240			for !drained {
241				select {
242				case waitEntry := <-q.waitingForAddCh:
243					if waitEntry.readyAt.After(q.clock.Now()) {
244						insert(waitingForQueue, waitingEntryByData, waitEntry)
245					} else {
246						q.Add(waitEntry.data)
247					}
248				default:
249					drained = true
250				}
251			}
252		}
253	}
254}
255
256// insert adds the entry to the priority queue, or updates the readyAt if it already exists in the queue
257func insert(q *waitForPriorityQueue, knownEntries map[t]*waitFor, entry *waitFor) {
258	// if the entry already exists, update the time only if it would cause the item to be queued sooner
259	existing, exists := knownEntries[entry.data]
260	if exists {
261		if existing.readyAt.After(entry.readyAt) {
262			existing.readyAt = entry.readyAt
263			heap.Fix(q, existing.index)
264		}
265
266		return
267	}
268
269	heap.Push(q, entry)
270	knownEntries[entry.data] = entry
271}
272