1/* 2Copyright 2016 The Kubernetes Authors. 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package workqueue 18 19import ( 20 "container/heap" 21 "sync" 22 "time" 23 24 "k8s.io/apimachinery/pkg/util/clock" 25 utilruntime "k8s.io/apimachinery/pkg/util/runtime" 26) 27 28// DelayingInterface is an Interface that can Add an item at a later time. This makes it easier to 29// requeue items after failures without ending up in a hot-loop. 30type DelayingInterface interface { 31 Interface 32 // AddAfter adds an item to the workqueue after the indicated duration has passed 33 AddAfter(item interface{}, duration time.Duration) 34} 35 36// NewDelayingQueue constructs a new workqueue with delayed queuing ability 37func NewDelayingQueue() DelayingInterface { 38 return NewDelayingQueueWithCustomClock(clock.RealClock{}, "") 39} 40 41// NewNamedDelayingQueue constructs a new named workqueue with delayed queuing ability 42func NewNamedDelayingQueue(name string) DelayingInterface { 43 return NewDelayingQueueWithCustomClock(clock.RealClock{}, name) 44} 45 46// NewDelayingQueueWithCustomClock constructs a new named workqueue 47// with ability to inject real or fake clock for testing purposes 48func NewDelayingQueueWithCustomClock(clock clock.Clock, name string) DelayingInterface { 49 ret := &delayingType{ 50 Interface: NewNamed(name), 51 clock: clock, 52 heartbeat: clock.NewTicker(maxWait), 53 stopCh: make(chan struct{}), 54 waitingForAddCh: make(chan *waitFor, 1000), 55 metrics: newRetryMetrics(name), 56 } 57 58 go ret.waitingLoop() 59 60 return ret 61} 62 63// delayingType wraps an Interface and provides delayed re-enquing 64type delayingType struct { 65 Interface 66 67 // clock tracks time for delayed firing 68 clock clock.Clock 69 70 // stopCh lets us signal a shutdown to the waiting loop 71 stopCh chan struct{} 72 // stopOnce guarantees we only signal shutdown a single time 73 stopOnce sync.Once 74 75 // heartbeat ensures we wait no more than maxWait before firing 76 heartbeat clock.Ticker 77 78 // waitingForAddCh is a buffered channel that feeds waitingForAdd 79 waitingForAddCh chan *waitFor 80 81 // metrics counts the number of retries 82 metrics retryMetrics 83} 84 85// waitFor holds the data to add and the time it should be added 86type waitFor struct { 87 data t 88 readyAt time.Time 89 // index in the priority queue (heap) 90 index int 91} 92 93// waitForPriorityQueue implements a priority queue for waitFor items. 94// 95// waitForPriorityQueue implements heap.Interface. The item occurring next in 96// time (i.e., the item with the smallest readyAt) is at the root (index 0). 97// Peek returns this minimum item at index 0. Pop returns the minimum item after 98// it has been removed from the queue and placed at index Len()-1 by 99// container/heap. Push adds an item at index Len(), and container/heap 100// percolates it into the correct location. 101type waitForPriorityQueue []*waitFor 102 103func (pq waitForPriorityQueue) Len() int { 104 return len(pq) 105} 106func (pq waitForPriorityQueue) Less(i, j int) bool { 107 return pq[i].readyAt.Before(pq[j].readyAt) 108} 109func (pq waitForPriorityQueue) Swap(i, j int) { 110 pq[i], pq[j] = pq[j], pq[i] 111 pq[i].index = i 112 pq[j].index = j 113} 114 115// Push adds an item to the queue. Push should not be called directly; instead, 116// use `heap.Push`. 117func (pq *waitForPriorityQueue) Push(x interface{}) { 118 n := len(*pq) 119 item := x.(*waitFor) 120 item.index = n 121 *pq = append(*pq, item) 122} 123 124// Pop removes an item from the queue. Pop should not be called directly; 125// instead, use `heap.Pop`. 126func (pq *waitForPriorityQueue) Pop() interface{} { 127 n := len(*pq) 128 item := (*pq)[n-1] 129 item.index = -1 130 *pq = (*pq)[0:(n - 1)] 131 return item 132} 133 134// Peek returns the item at the beginning of the queue, without removing the 135// item or otherwise mutating the queue. It is safe to call directly. 136func (pq waitForPriorityQueue) Peek() interface{} { 137 return pq[0] 138} 139 140// ShutDown stops the queue. After the queue drains, the returned shutdown bool 141// on Get() will be true. This method may be invoked more than once. 142func (q *delayingType) ShutDown() { 143 q.stopOnce.Do(func() { 144 q.Interface.ShutDown() 145 close(q.stopCh) 146 q.heartbeat.Stop() 147 }) 148} 149 150// AddAfter adds the given item to the work queue after the given delay 151func (q *delayingType) AddAfter(item interface{}, duration time.Duration) { 152 // don't add if we're already shutting down 153 if q.ShuttingDown() { 154 return 155 } 156 157 q.metrics.retry() 158 159 // immediately add things with no delay 160 if duration <= 0 { 161 q.Add(item) 162 return 163 } 164 165 select { 166 case <-q.stopCh: 167 // unblock if ShutDown() is called 168 case q.waitingForAddCh <- &waitFor{data: item, readyAt: q.clock.Now().Add(duration)}: 169 } 170} 171 172// maxWait keeps a max bound on the wait time. It's just insurance against weird things happening. 173// Checking the queue every 10 seconds isn't expensive and we know that we'll never end up with an 174// expired item sitting for more than 10 seconds. 175const maxWait = 10 * time.Second 176 177// waitingLoop runs until the workqueue is shutdown and keeps a check on the list of items to be added. 178func (q *delayingType) waitingLoop() { 179 defer utilruntime.HandleCrash() 180 181 // Make a placeholder channel to use when there are no items in our list 182 never := make(<-chan time.Time) 183 184 // Make a timer that expires when the item at the head of the waiting queue is ready 185 var nextReadyAtTimer clock.Timer 186 187 waitingForQueue := &waitForPriorityQueue{} 188 heap.Init(waitingForQueue) 189 190 waitingEntryByData := map[t]*waitFor{} 191 192 for { 193 if q.Interface.ShuttingDown() { 194 return 195 } 196 197 now := q.clock.Now() 198 199 // Add ready entries 200 for waitingForQueue.Len() > 0 { 201 entry := waitingForQueue.Peek().(*waitFor) 202 if entry.readyAt.After(now) { 203 break 204 } 205 206 entry = heap.Pop(waitingForQueue).(*waitFor) 207 q.Add(entry.data) 208 delete(waitingEntryByData, entry.data) 209 } 210 211 // Set up a wait for the first item's readyAt (if one exists) 212 nextReadyAt := never 213 if waitingForQueue.Len() > 0 { 214 if nextReadyAtTimer != nil { 215 nextReadyAtTimer.Stop() 216 } 217 entry := waitingForQueue.Peek().(*waitFor) 218 nextReadyAtTimer = q.clock.NewTimer(entry.readyAt.Sub(now)) 219 nextReadyAt = nextReadyAtTimer.C() 220 } 221 222 select { 223 case <-q.stopCh: 224 return 225 226 case <-q.heartbeat.C(): 227 // continue the loop, which will add ready items 228 229 case <-nextReadyAt: 230 // continue the loop, which will add ready items 231 232 case waitEntry := <-q.waitingForAddCh: 233 if waitEntry.readyAt.After(q.clock.Now()) { 234 insert(waitingForQueue, waitingEntryByData, waitEntry) 235 } else { 236 q.Add(waitEntry.data) 237 } 238 239 drained := false 240 for !drained { 241 select { 242 case waitEntry := <-q.waitingForAddCh: 243 if waitEntry.readyAt.After(q.clock.Now()) { 244 insert(waitingForQueue, waitingEntryByData, waitEntry) 245 } else { 246 q.Add(waitEntry.data) 247 } 248 default: 249 drained = true 250 } 251 } 252 } 253 } 254} 255 256// insert adds the entry to the priority queue, or updates the readyAt if it already exists in the queue 257func insert(q *waitForPriorityQueue, knownEntries map[t]*waitFor, entry *waitFor) { 258 // if the entry already exists, update the time only if it would cause the item to be queued sooner 259 existing, exists := knownEntries[entry.data] 260 if exists { 261 if existing.readyAt.After(entry.readyAt) { 262 existing.readyAt = entry.readyAt 263 heap.Fix(q, existing.index) 264 } 265 266 return 267 } 268 269 heap.Push(q, entry) 270 knownEntries[entry.data] = entry 271} 272