1/*
2Copyright 2015 The Kubernetes Authors.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package workqueue
18
19import (
20	"sync"
21	"time"
22
23	"k8s.io/apimachinery/pkg/util/clock"
24)
25
26type Interface interface {
27	Add(item interface{})
28	Len() int
29	Get() (item interface{}, shutdown bool)
30	Done(item interface{})
31	ShutDown()
32	ShuttingDown() bool
33}
34
35// New constructs a new work queue (see the package comment).
36func New() *Type {
37	return NewNamed("")
38}
39
40func NewNamed(name string) *Type {
41	rc := clock.RealClock{}
42	return newQueue(
43		rc,
44		globalMetricsFactory.newQueueMetrics(name, rc),
45		defaultUnfinishedWorkUpdatePeriod,
46	)
47}
48
49func newQueue(c clock.Clock, metrics queueMetrics, updatePeriod time.Duration) *Type {
50	t := &Type{
51		clock:                      c,
52		dirty:                      set{},
53		processing:                 set{},
54		cond:                       sync.NewCond(&sync.Mutex{}),
55		metrics:                    metrics,
56		unfinishedWorkUpdatePeriod: updatePeriod,
57	}
58	go t.updateUnfinishedWorkLoop()
59	return t
60}
61
62const defaultUnfinishedWorkUpdatePeriod = 500 * time.Millisecond
63
64// Type is a work queue (see the package comment).
65type Type struct {
66	// queue defines the order in which we will work on items. Every
67	// element of queue should be in the dirty set and not in the
68	// processing set.
69	queue []t
70
71	// dirty defines all of the items that need to be processed.
72	dirty set
73
74	// Things that are currently being processed are in the processing set.
75	// These things may be simultaneously in the dirty set. When we finish
76	// processing something and remove it from this set, we'll check if
77	// it's in the dirty set, and if so, add it to the queue.
78	processing set
79
80	cond *sync.Cond
81
82	shuttingDown bool
83
84	metrics queueMetrics
85
86	unfinishedWorkUpdatePeriod time.Duration
87	clock                      clock.Clock
88}
89
90type empty struct{}
91type t interface{}
92type set map[t]empty
93
94func (s set) has(item t) bool {
95	_, exists := s[item]
96	return exists
97}
98
99func (s set) insert(item t) {
100	s[item] = empty{}
101}
102
103func (s set) delete(item t) {
104	delete(s, item)
105}
106
107// Add marks item as needing processing.
108func (q *Type) Add(item interface{}) {
109	q.cond.L.Lock()
110	defer q.cond.L.Unlock()
111	if q.shuttingDown {
112		return
113	}
114	if q.dirty.has(item) {
115		return
116	}
117
118	q.metrics.add(item)
119
120	q.dirty.insert(item)
121	if q.processing.has(item) {
122		return
123	}
124
125	q.queue = append(q.queue, item)
126	q.cond.Signal()
127}
128
129// Len returns the current queue length, for informational purposes only. You
130// shouldn't e.g. gate a call to Add() or Get() on Len() being a particular
131// value, that can't be synchronized properly.
132func (q *Type) Len() int {
133	q.cond.L.Lock()
134	defer q.cond.L.Unlock()
135	return len(q.queue)
136}
137
138// Get blocks until it can return an item to be processed. If shutdown = true,
139// the caller should end their goroutine. You must call Done with item when you
140// have finished processing it.
141func (q *Type) Get() (item interface{}, shutdown bool) {
142	q.cond.L.Lock()
143	defer q.cond.L.Unlock()
144	for len(q.queue) == 0 && !q.shuttingDown {
145		q.cond.Wait()
146	}
147	if len(q.queue) == 0 {
148		// We must be shutting down.
149		return nil, true
150	}
151
152	item, q.queue = q.queue[0], q.queue[1:]
153
154	q.metrics.get(item)
155
156	q.processing.insert(item)
157	q.dirty.delete(item)
158
159	return item, false
160}
161
162// Done marks item as done processing, and if it has been marked as dirty again
163// while it was being processed, it will be re-added to the queue for
164// re-processing.
165func (q *Type) Done(item interface{}) {
166	q.cond.L.Lock()
167	defer q.cond.L.Unlock()
168
169	q.metrics.done(item)
170
171	q.processing.delete(item)
172	if q.dirty.has(item) {
173		q.queue = append(q.queue, item)
174		q.cond.Signal()
175	}
176}
177
178// ShutDown will cause q to ignore all new items added to it. As soon as the
179// worker goroutines have drained the existing items in the queue, they will be
180// instructed to exit.
181func (q *Type) ShutDown() {
182	q.cond.L.Lock()
183	defer q.cond.L.Unlock()
184	q.shuttingDown = true
185	q.cond.Broadcast()
186}
187
188func (q *Type) ShuttingDown() bool {
189	q.cond.L.Lock()
190	defer q.cond.L.Unlock()
191
192	return q.shuttingDown
193}
194
195func (q *Type) updateUnfinishedWorkLoop() {
196	t := q.clock.NewTicker(q.unfinishedWorkUpdatePeriod)
197	defer t.Stop()
198	for range t.C() {
199		if !func() bool {
200			q.cond.L.Lock()
201			defer q.cond.L.Unlock()
202			if !q.shuttingDown {
203				q.metrics.updateUnfinishedWork()
204				return true
205			}
206			return false
207
208		}() {
209			return
210		}
211	}
212}
213