1/*
2Copyright 2015 The Kubernetes Authors.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package workqueue
18
19import (
20	"sync"
21	"time"
22
23	"k8s.io/apimachinery/pkg/util/clock"
24)
25
26type Interface interface {
27	Add(item interface{})
28	Len() int
29	Get() (item interface{}, shutdown bool)
30	Done(item interface{})
31	ShutDown()
32	ShuttingDown() bool
33}
34
35// New constructs a new work queue (see the package comment).
36func New() *Type {
37	return NewNamed("")
38}
39
40func NewNamed(name string) *Type {
41	rc := clock.RealClock{}
42	return newQueue(
43		rc,
44		globalMetricsFactory.newQueueMetrics(name, rc),
45		defaultUnfinishedWorkUpdatePeriod,
46	)
47}
48
49func newQueue(c clock.Clock, metrics queueMetrics, updatePeriod time.Duration) *Type {
50	t := &Type{
51		clock:                      c,
52		dirty:                      set{},
53		processing:                 set{},
54		cond:                       sync.NewCond(&sync.Mutex{}),
55		metrics:                    metrics,
56		unfinishedWorkUpdatePeriod: updatePeriod,
57	}
58
59	// Don't start the goroutine for a type of noMetrics so we don't consume
60	// resources unnecessarily
61	if _, ok := metrics.(noMetrics); !ok {
62		go t.updateUnfinishedWorkLoop()
63	}
64
65	return t
66}
67
68const defaultUnfinishedWorkUpdatePeriod = 500 * time.Millisecond
69
70// Type is a work queue (see the package comment).
71type Type struct {
72	// queue defines the order in which we will work on items. Every
73	// element of queue should be in the dirty set and not in the
74	// processing set.
75	queue []t
76
77	// dirty defines all of the items that need to be processed.
78	dirty set
79
80	// Things that are currently being processed are in the processing set.
81	// These things may be simultaneously in the dirty set. When we finish
82	// processing something and remove it from this set, we'll check if
83	// it's in the dirty set, and if so, add it to the queue.
84	processing set
85
86	cond *sync.Cond
87
88	shuttingDown bool
89
90	metrics queueMetrics
91
92	unfinishedWorkUpdatePeriod time.Duration
93	clock                      clock.Clock
94}
95
96type empty struct{}
97type t interface{}
98type set map[t]empty
99
100func (s set) has(item t) bool {
101	_, exists := s[item]
102	return exists
103}
104
105func (s set) insert(item t) {
106	s[item] = empty{}
107}
108
109func (s set) delete(item t) {
110	delete(s, item)
111}
112
113// Add marks item as needing processing.
114func (q *Type) Add(item interface{}) {
115	q.cond.L.Lock()
116	defer q.cond.L.Unlock()
117	if q.shuttingDown {
118		return
119	}
120	if q.dirty.has(item) {
121		return
122	}
123
124	q.metrics.add(item)
125
126	q.dirty.insert(item)
127	if q.processing.has(item) {
128		return
129	}
130
131	q.queue = append(q.queue, item)
132	q.cond.Signal()
133}
134
135// Len returns the current queue length, for informational purposes only. You
136// shouldn't e.g. gate a call to Add() or Get() on Len() being a particular
137// value, that can't be synchronized properly.
138func (q *Type) Len() int {
139	q.cond.L.Lock()
140	defer q.cond.L.Unlock()
141	return len(q.queue)
142}
143
144// Get blocks until it can return an item to be processed. If shutdown = true,
145// the caller should end their goroutine. You must call Done with item when you
146// have finished processing it.
147func (q *Type) Get() (item interface{}, shutdown bool) {
148	q.cond.L.Lock()
149	defer q.cond.L.Unlock()
150	for len(q.queue) == 0 && !q.shuttingDown {
151		q.cond.Wait()
152	}
153	if len(q.queue) == 0 {
154		// We must be shutting down.
155		return nil, true
156	}
157
158	item, q.queue = q.queue[0], q.queue[1:]
159
160	q.metrics.get(item)
161
162	q.processing.insert(item)
163	q.dirty.delete(item)
164
165	return item, false
166}
167
168// Done marks item as done processing, and if it has been marked as dirty again
169// while it was being processed, it will be re-added to the queue for
170// re-processing.
171func (q *Type) Done(item interface{}) {
172	q.cond.L.Lock()
173	defer q.cond.L.Unlock()
174
175	q.metrics.done(item)
176
177	q.processing.delete(item)
178	if q.dirty.has(item) {
179		q.queue = append(q.queue, item)
180		q.cond.Signal()
181	}
182}
183
184// ShutDown will cause q to ignore all new items added to it. As soon as the
185// worker goroutines have drained the existing items in the queue, they will be
186// instructed to exit.
187func (q *Type) ShutDown() {
188	q.cond.L.Lock()
189	defer q.cond.L.Unlock()
190	q.shuttingDown = true
191	q.cond.Broadcast()
192}
193
194func (q *Type) ShuttingDown() bool {
195	q.cond.L.Lock()
196	defer q.cond.L.Unlock()
197
198	return q.shuttingDown
199}
200
201func (q *Type) updateUnfinishedWorkLoop() {
202	t := q.clock.NewTicker(q.unfinishedWorkUpdatePeriod)
203	defer t.Stop()
204	for range t.C() {
205		if !func() bool {
206			q.cond.L.Lock()
207			defer q.cond.L.Unlock()
208			if !q.shuttingDown {
209				q.metrics.updateUnfinishedWork()
210				return true
211			}
212			return false
213
214		}() {
215			return
216		}
217	}
218}
219