1/* 2Copyright 2015 The Kubernetes Authors. 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package workqueue 18 19import ( 20 "sync" 21 "time" 22 23 "k8s.io/apimachinery/pkg/util/clock" 24) 25 26type Interface interface { 27 Add(item interface{}) 28 Len() int 29 Get() (item interface{}, shutdown bool) 30 Done(item interface{}) 31 ShutDown() 32 ShuttingDown() bool 33} 34 35// New constructs a new work queue (see the package comment). 36func New() *Type { 37 return NewNamed("") 38} 39 40func NewNamed(name string) *Type { 41 rc := clock.RealClock{} 42 return newQueue( 43 rc, 44 globalMetricsFactory.newQueueMetrics(name, rc), 45 defaultUnfinishedWorkUpdatePeriod, 46 ) 47} 48 49func newQueue(c clock.Clock, metrics queueMetrics, updatePeriod time.Duration) *Type { 50 t := &Type{ 51 clock: c, 52 dirty: set{}, 53 processing: set{}, 54 cond: sync.NewCond(&sync.Mutex{}), 55 metrics: metrics, 56 unfinishedWorkUpdatePeriod: updatePeriod, 57 } 58 go t.updateUnfinishedWorkLoop() 59 return t 60} 61 62const defaultUnfinishedWorkUpdatePeriod = 500 * time.Millisecond 63 64// Type is a work queue (see the package comment). 65type Type struct { 66 // queue defines the order in which we will work on items. Every 67 // element of queue should be in the dirty set and not in the 68 // processing set. 69 queue []t 70 71 // dirty defines all of the items that need to be processed. 72 dirty set 73 74 // Things that are currently being processed are in the processing set. 75 // These things may be simultaneously in the dirty set. When we finish 76 // processing something and remove it from this set, we'll check if 77 // it's in the dirty set, and if so, add it to the queue. 78 processing set 79 80 cond *sync.Cond 81 82 shuttingDown bool 83 84 metrics queueMetrics 85 86 unfinishedWorkUpdatePeriod time.Duration 87 clock clock.Clock 88} 89 90type empty struct{} 91type t interface{} 92type set map[t]empty 93 94func (s set) has(item t) bool { 95 _, exists := s[item] 96 return exists 97} 98 99func (s set) insert(item t) { 100 s[item] = empty{} 101} 102 103func (s set) delete(item t) { 104 delete(s, item) 105} 106 107// Add marks item as needing processing. 108func (q *Type) Add(item interface{}) { 109 q.cond.L.Lock() 110 defer q.cond.L.Unlock() 111 if q.shuttingDown { 112 return 113 } 114 if q.dirty.has(item) { 115 return 116 } 117 118 q.metrics.add(item) 119 120 q.dirty.insert(item) 121 if q.processing.has(item) { 122 return 123 } 124 125 q.queue = append(q.queue, item) 126 q.cond.Signal() 127} 128 129// Len returns the current queue length, for informational purposes only. You 130// shouldn't e.g. gate a call to Add() or Get() on Len() being a particular 131// value, that can't be synchronized properly. 132func (q *Type) Len() int { 133 q.cond.L.Lock() 134 defer q.cond.L.Unlock() 135 return len(q.queue) 136} 137 138// Get blocks until it can return an item to be processed. If shutdown = true, 139// the caller should end their goroutine. You must call Done with item when you 140// have finished processing it. 141func (q *Type) Get() (item interface{}, shutdown bool) { 142 q.cond.L.Lock() 143 defer q.cond.L.Unlock() 144 for len(q.queue) == 0 && !q.shuttingDown { 145 q.cond.Wait() 146 } 147 if len(q.queue) == 0 { 148 // We must be shutting down. 149 return nil, true 150 } 151 152 item, q.queue = q.queue[0], q.queue[1:] 153 154 q.metrics.get(item) 155 156 q.processing.insert(item) 157 q.dirty.delete(item) 158 159 return item, false 160} 161 162// Done marks item as done processing, and if it has been marked as dirty again 163// while it was being processed, it will be re-added to the queue for 164// re-processing. 165func (q *Type) Done(item interface{}) { 166 q.cond.L.Lock() 167 defer q.cond.L.Unlock() 168 169 q.metrics.done(item) 170 171 q.processing.delete(item) 172 if q.dirty.has(item) { 173 q.queue = append(q.queue, item) 174 q.cond.Signal() 175 } 176} 177 178// ShutDown will cause q to ignore all new items added to it. As soon as the 179// worker goroutines have drained the existing items in the queue, they will be 180// instructed to exit. 181func (q *Type) ShutDown() { 182 q.cond.L.Lock() 183 defer q.cond.L.Unlock() 184 q.shuttingDown = true 185 q.cond.Broadcast() 186} 187 188func (q *Type) ShuttingDown() bool { 189 q.cond.L.Lock() 190 defer q.cond.L.Unlock() 191 192 return q.shuttingDown 193} 194 195func (q *Type) updateUnfinishedWorkLoop() { 196 t := q.clock.NewTicker(q.unfinishedWorkUpdatePeriod) 197 defer t.Stop() 198 for range t.C() { 199 if !func() bool { 200 q.cond.L.Lock() 201 defer q.cond.L.Unlock() 202 if !q.shuttingDown { 203 q.metrics.updateUnfinishedWork() 204 return true 205 } 206 return false 207 208 }() { 209 return 210 } 211 } 212} 213