1/* 2 * 3 * Copyright 2019 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18package wrr 19 20import ( 21 "container/heap" 22 "sync" 23) 24 25// edfWrr is a struct for EDF weighted round robin implementation. 26type edfWrr struct { 27 lock sync.Mutex 28 items edfPriorityQueue 29} 30 31// NewEDF creates Earliest Deadline First (EDF) 32// (https://en.wikipedia.org/wiki/Earliest_deadline_first_scheduling) implementation for weighted round robin. 33// Each pick from the schedule has the earliest deadline entry selected. Entries have deadlines set 34// at current time + 1 / weight, providing weighted round robin behavior with O(log n) pick time. 35func NewEDF() WRR { 36 return &edfWrr{} 37} 38 39// edfEntry is an internal wrapper for item that also stores weight and relative position in the queue. 40type edfEntry struct { 41 deadline float64 42 weight int64 43 item interface{} 44} 45 46// edfPriorityQueue is a heap.Interface implementation for edfEntry elements. 47type edfPriorityQueue []*edfEntry 48 49func (pq edfPriorityQueue) Len() int { return len(pq) } 50func (pq edfPriorityQueue) Less(i, j int) bool { return pq[i].deadline < pq[j].deadline } 51func (pq edfPriorityQueue) Swap(i, j int) { pq[i], pq[j] = pq[j], pq[i] } 52 53func (pq *edfPriorityQueue) Push(x interface{}) { 54 *pq = append(*pq, x.(*edfEntry)) 55} 56 57func (pq *edfPriorityQueue) Pop() interface{} { 58 old := *pq 59 *pq = old[0 : len(old)-1] 60 return old[len(old)-1] 61} 62 63// Current time in EDF scheduler. 64func (edf *edfWrr) currentTime() float64 { 65 if len(edf.items) == 0 { 66 return 0.0 67 } 68 return edf.items[0].deadline 69} 70 71func (edf *edfWrr) Add(item interface{}, weight int64) { 72 edf.lock.Lock() 73 defer edf.lock.Unlock() 74 entry := edfEntry{ 75 deadline: edf.currentTime() + 1.0/float64(weight), 76 weight: weight, 77 item: item, 78 } 79 heap.Push(&edf.items, &entry) 80} 81 82func (edf *edfWrr) Next() interface{} { 83 edf.lock.Lock() 84 defer edf.lock.Unlock() 85 if len(edf.items) == 0 { 86 return nil 87 } 88 item := edf.items[0] 89 item.deadline = edf.currentTime() + 1.0/float64(item.weight) 90 heap.Fix(&edf.items, 0) 91 return item.item 92} 93