1// Copyright 2016 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package schedule
16
17import (
18	"context"
19	"sync"
20)
21
22type Job func(context.Context)
23
24// Scheduler can schedule jobs.
25type Scheduler interface {
26	// Schedule asks the scheduler to schedule a job defined by the given func.
27	// Schedule to a stopped scheduler might panic.
28	Schedule(j Job)
29
30	// Pending returns number of pending jobs
31	Pending() int
32
33	// Scheduled returns the number of scheduled jobs (excluding pending jobs)
34	Scheduled() int
35
36	// Finished returns the number of finished jobs
37	Finished() int
38
39	// WaitFinish waits until at least n job are finished and all pending jobs are finished.
40	WaitFinish(n int)
41
42	// Stop stops the scheduler.
43	Stop()
44}
45
46type fifo struct {
47	mu sync.Mutex
48
49	resume    chan struct{}
50	scheduled int
51	finished  int
52	pendings  []Job
53
54	ctx    context.Context
55	cancel context.CancelFunc
56
57	finishCond *sync.Cond
58	donec      chan struct{}
59}
60
61// NewFIFOScheduler returns a Scheduler that schedules jobs in FIFO
62// order sequentially
63func NewFIFOScheduler() Scheduler {
64	f := &fifo{
65		resume: make(chan struct{}, 1),
66		donec:  make(chan struct{}, 1),
67	}
68	f.finishCond = sync.NewCond(&f.mu)
69	f.ctx, f.cancel = context.WithCancel(context.Background())
70	go f.run()
71	return f
72}
73
74// Schedule schedules a job that will be ran in FIFO order sequentially.
75func (f *fifo) Schedule(j Job) {
76	f.mu.Lock()
77	defer f.mu.Unlock()
78
79	if f.cancel == nil {
80		panic("schedule: schedule to stopped scheduler")
81	}
82
83	if len(f.pendings) == 0 {
84		select {
85		case f.resume <- struct{}{}:
86		default:
87		}
88	}
89	f.pendings = append(f.pendings, j)
90}
91
92func (f *fifo) Pending() int {
93	f.mu.Lock()
94	defer f.mu.Unlock()
95	return len(f.pendings)
96}
97
98func (f *fifo) Scheduled() int {
99	f.mu.Lock()
100	defer f.mu.Unlock()
101	return f.scheduled
102}
103
104func (f *fifo) Finished() int {
105	f.finishCond.L.Lock()
106	defer f.finishCond.L.Unlock()
107	return f.finished
108}
109
110func (f *fifo) WaitFinish(n int) {
111	f.finishCond.L.Lock()
112	for f.finished < n || len(f.pendings) != 0 {
113		f.finishCond.Wait()
114	}
115	f.finishCond.L.Unlock()
116}
117
118// Stop stops the scheduler and cancels all pending jobs.
119func (f *fifo) Stop() {
120	f.mu.Lock()
121	f.cancel()
122	f.cancel = nil
123	f.mu.Unlock()
124	<-f.donec
125}
126
127func (f *fifo) run() {
128	// TODO: recover from job panic?
129	defer func() {
130		close(f.donec)
131		close(f.resume)
132	}()
133
134	for {
135		var todo Job
136		f.mu.Lock()
137		if len(f.pendings) != 0 {
138			f.scheduled++
139			todo = f.pendings[0]
140		}
141		f.mu.Unlock()
142		if todo == nil {
143			select {
144			case <-f.resume:
145			case <-f.ctx.Done():
146				f.mu.Lock()
147				pendings := f.pendings
148				f.pendings = nil
149				f.mu.Unlock()
150				// clean up pending jobs
151				for _, todo := range pendings {
152					todo(f.ctx)
153				}
154				return
155			}
156		} else {
157			todo(f.ctx)
158			f.finishCond.L.Lock()
159			f.finished++
160			f.pendings = f.pendings[1:]
161			f.finishCond.Broadcast()
162			f.finishCond.L.Unlock()
163		}
164	}
165}
166