1/* 2Copyright 2015 The Kubernetes Authors. 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package workqueue 18 19import ( 20 "sync" 21 "time" 22 23 "k8s.io/apimachinery/pkg/util/clock" 24) 25 26type Interface interface { 27 Add(item interface{}) 28 Len() int 29 Get() (item interface{}, shutdown bool) 30 Done(item interface{}) 31 ShutDown() 32 ShuttingDown() bool 33} 34 35// New constructs a new work queue (see the package comment). 36func New() *Type { 37 return NewNamed("") 38} 39 40func NewNamed(name string) *Type { 41 rc := clock.RealClock{} 42 return newQueue( 43 rc, 44 globalMetricsFactory.newQueueMetrics(name, rc), 45 defaultUnfinishedWorkUpdatePeriod, 46 ) 47} 48 49func newQueue(c clock.Clock, metrics queueMetrics, updatePeriod time.Duration) *Type { 50 t := &Type{ 51 clock: c, 52 dirty: set{}, 53 processing: set{}, 54 cond: sync.NewCond(&sync.Mutex{}), 55 metrics: metrics, 56 unfinishedWorkUpdatePeriod: updatePeriod, 57 } 58 59 // Don't start the goroutine for a type of noMetrics so we don't consume 60 // resources unnecessarily 61 if _, ok := metrics.(noMetrics); !ok { 62 go t.updateUnfinishedWorkLoop() 63 } 64 65 return t 66} 67 68const defaultUnfinishedWorkUpdatePeriod = 500 * time.Millisecond 69 70// Type is a work queue (see the package comment). 71type Type struct { 72 // queue defines the order in which we will work on items. Every 73 // element of queue should be in the dirty set and not in the 74 // processing set. 75 queue []t 76 77 // dirty defines all of the items that need to be processed. 78 dirty set 79 80 // Things that are currently being processed are in the processing set. 81 // These things may be simultaneously in the dirty set. When we finish 82 // processing something and remove it from this set, we'll check if 83 // it's in the dirty set, and if so, add it to the queue. 84 processing set 85 86 cond *sync.Cond 87 88 shuttingDown bool 89 90 metrics queueMetrics 91 92 unfinishedWorkUpdatePeriod time.Duration 93 clock clock.Clock 94} 95 96type empty struct{} 97type t interface{} 98type set map[t]empty 99 100func (s set) has(item t) bool { 101 _, exists := s[item] 102 return exists 103} 104 105func (s set) insert(item t) { 106 s[item] = empty{} 107} 108 109func (s set) delete(item t) { 110 delete(s, item) 111} 112 113// Add marks item as needing processing. 114func (q *Type) Add(item interface{}) { 115 q.cond.L.Lock() 116 defer q.cond.L.Unlock() 117 if q.shuttingDown { 118 return 119 } 120 if q.dirty.has(item) { 121 return 122 } 123 124 q.metrics.add(item) 125 126 q.dirty.insert(item) 127 if q.processing.has(item) { 128 return 129 } 130 131 q.queue = append(q.queue, item) 132 q.cond.Signal() 133} 134 135// Len returns the current queue length, for informational purposes only. You 136// shouldn't e.g. gate a call to Add() or Get() on Len() being a particular 137// value, that can't be synchronized properly. 138func (q *Type) Len() int { 139 q.cond.L.Lock() 140 defer q.cond.L.Unlock() 141 return len(q.queue) 142} 143 144// Get blocks until it can return an item to be processed. If shutdown = true, 145// the caller should end their goroutine. You must call Done with item when you 146// have finished processing it. 147func (q *Type) Get() (item interface{}, shutdown bool) { 148 q.cond.L.Lock() 149 defer q.cond.L.Unlock() 150 for len(q.queue) == 0 && !q.shuttingDown { 151 q.cond.Wait() 152 } 153 if len(q.queue) == 0 { 154 // We must be shutting down. 155 return nil, true 156 } 157 158 item, q.queue = q.queue[0], q.queue[1:] 159 160 q.metrics.get(item) 161 162 q.processing.insert(item) 163 q.dirty.delete(item) 164 165 return item, false 166} 167 168// Done marks item as done processing, and if it has been marked as dirty again 169// while it was being processed, it will be re-added to the queue for 170// re-processing. 171func (q *Type) Done(item interface{}) { 172 q.cond.L.Lock() 173 defer q.cond.L.Unlock() 174 175 q.metrics.done(item) 176 177 q.processing.delete(item) 178 if q.dirty.has(item) { 179 q.queue = append(q.queue, item) 180 q.cond.Signal() 181 } 182} 183 184// ShutDown will cause q to ignore all new items added to it. As soon as the 185// worker goroutines have drained the existing items in the queue, they will be 186// instructed to exit. 187func (q *Type) ShutDown() { 188 q.cond.L.Lock() 189 defer q.cond.L.Unlock() 190 q.shuttingDown = true 191 q.cond.Broadcast() 192} 193 194func (q *Type) ShuttingDown() bool { 195 q.cond.L.Lock() 196 defer q.cond.L.Unlock() 197 198 return q.shuttingDown 199} 200 201func (q *Type) updateUnfinishedWorkLoop() { 202 t := q.clock.NewTicker(q.unfinishedWorkUpdatePeriod) 203 defer t.Stop() 204 for range t.C() { 205 if !func() bool { 206 q.cond.L.Lock() 207 defer q.cond.L.Unlock() 208 if !q.shuttingDown { 209 q.metrics.updateUnfinishedWork() 210 return true 211 } 212 return false 213 214 }() { 215 return 216 } 217 } 218} 219