1/*
2 *
3 * Copyright 2019 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18package wrr
19
20import (
21	"container/heap"
22	"sync"
23)
24
25// edfWrr is a struct for EDF weighted round robin implementation.
26type edfWrr struct {
27	lock  sync.Mutex
28	items edfPriorityQueue
29}
30
31// NewEDF creates Earliest Deadline First (EDF)
32// (https://en.wikipedia.org/wiki/Earliest_deadline_first_scheduling) implementation for weighted round robin.
33// Each pick from the schedule has the earliest deadline entry selected. Entries have deadlines set
34// at current time + 1 / weight, providing weighted round robin behavior with O(log n) pick time.
35func NewEDF() WRR {
36	return &edfWrr{}
37}
38
39// edfEntry is an internal wrapper for item that also stores weight and relative position in the queue.
40type edfEntry struct {
41	deadline float64
42	weight   int64
43	item     interface{}
44}
45
46// edfPriorityQueue is a heap.Interface implementation for edfEntry elements.
47type edfPriorityQueue []*edfEntry
48
49func (pq edfPriorityQueue) Len() int           { return len(pq) }
50func (pq edfPriorityQueue) Less(i, j int) bool { return pq[i].deadline < pq[j].deadline }
51func (pq edfPriorityQueue) Swap(i, j int)      { pq[i], pq[j] = pq[j], pq[i] }
52
53func (pq *edfPriorityQueue) Push(x interface{}) {
54	*pq = append(*pq, x.(*edfEntry))
55}
56
57func (pq *edfPriorityQueue) Pop() interface{} {
58	old := *pq
59	*pq = old[0 : len(old)-1]
60	return old[len(old)-1]
61}
62
63// Current time in EDF scheduler.
64func (edf *edfWrr) currentTime() float64 {
65	if len(edf.items) == 0 {
66		return 0.0
67	}
68	return edf.items[0].deadline
69}
70
71func (edf *edfWrr) Add(item interface{}, weight int64) {
72	edf.lock.Lock()
73	defer edf.lock.Unlock()
74	entry := edfEntry{
75		deadline: edf.currentTime() + 1.0/float64(weight),
76		weight:   weight,
77		item:     item,
78	}
79	heap.Push(&edf.items, &entry)
80}
81
82func (edf *edfWrr) Next() interface{} {
83	edf.lock.Lock()
84	defer edf.lock.Unlock()
85	if len(edf.items) == 0 {
86		return nil
87	}
88	item := edf.items[0]
89	item.deadline = edf.currentTime() + 1.0/float64(item.weight)
90	heap.Fix(&edf.items, 0)
91	return item.item
92}
93