1 // Copyright (c) 2015-2020 The Bitcoin Core developers
2 // Distributed under the MIT software license, see the accompanying
3 // file COPYING or http://www.opensource.org/licenses/mit-license.php.
4 
5 #ifndef BITCOIN_SCHEDULER_H
6 #define BITCOIN_SCHEDULER_H
7 
8 //
9 // NOTE:
10 // boost::thread should be ported to std::thread
11 // when we support C++11.
12 //
13 #include <condition_variable>
14 #include <functional>
15 #include <list>
16 #include <map>
17 
18 #include <sync.h>
19 
20 //
21 // Simple class for background tasks that should be run
22 // periodically or once "after a while"
23 //
24 // Usage:
25 //
26 // CScheduler* s = new CScheduler();
27 // s->scheduleFromNow(doSomething, std::chrono::milliseconds{11}); // Assuming a: void doSomething() { }
28 // s->scheduleFromNow([=] { this->func(argument); }, std::chrono::milliseconds{3});
29 // boost::thread* t = new boost::thread(std::bind(CScheduler::serviceQueue, s));
30 //
31 // ... then at program shutdown, make sure to call stop() to clean up the thread(s) running serviceQueue:
32 // s->stop();
33 // t->join();
34 // delete t;
35 // delete s; // Must be done after thread is interrupted/joined.
36 //
37 
38 class CScheduler
39 {
40 public:
41     CScheduler();
42     ~CScheduler();
43 
44     typedef std::function<void()> Function;
45 
46     // Call func at/after time t
47     void schedule(Function f, std::chrono::system_clock::time_point t);
48 
49     /** Call f once after the delta has passed */
scheduleFromNow(Function f,std::chrono::milliseconds delta)50     void scheduleFromNow(Function f, std::chrono::milliseconds delta)
51     {
52         schedule(std::move(f), std::chrono::system_clock::now() + delta);
53     }
54 
55     /**
56      * Repeat f until the scheduler is stopped. First run is after delta has passed once.
57      *
58      * The timing is not exact: Every time f is finished, it is rescheduled to run again after delta. If you need more
59      * accurate scheduling, don't use this method.
60      */
61     void scheduleEvery(Function f, std::chrono::milliseconds delta);
62 
63     /**
64      * Mock the scheduler to fast forward in time.
65      * Iterates through items on taskQueue and reschedules them
66      * to be delta_seconds sooner.
67      */
68     void MockForward(std::chrono::seconds delta_seconds);
69 
70     // To keep things as simple as possible, there is no unschedule.
71 
72     // Services the queue 'forever'. Should be run in a thread,
73     // and interrupted using boost::interrupt_thread
74     void serviceQueue();
75 
76     // Tell any threads running serviceQueue to stop as soon as they're
77     // done servicing whatever task they're currently servicing (drain=false)
78     // or when there is no work left to be done (drain=true)
79     void stop(bool drain=false);
80 
81     // Returns number of tasks waiting to be serviced,
82     // and first and last task times
83     size_t getQueueInfo(std::chrono::system_clock::time_point &first,
84                         std::chrono::system_clock::time_point &last) const;
85 
86     // Returns true if there are threads actively running in serviceQueue()
87     bool AreThreadsServicingQueue() const;
88 
89 private:
90     mutable Mutex newTaskMutex;
91     std::condition_variable newTaskScheduled;
92     std::multimap<std::chrono::system_clock::time_point, Function> taskQueue GUARDED_BY(newTaskMutex);
GUARDED_BY(newTaskMutex)93     int nThreadsServicingQueue GUARDED_BY(newTaskMutex){0};
GUARDED_BY(newTaskMutex)94     bool stopRequested GUARDED_BY(newTaskMutex){false};
GUARDED_BY(newTaskMutex)95     bool stopWhenEmpty GUARDED_BY(newTaskMutex){false};
shouldStop()96     bool shouldStop() const EXCLUSIVE_LOCKS_REQUIRED(newTaskMutex) { return stopRequested || (stopWhenEmpty && taskQueue.empty()); }
97 };
98 
99 /**
100  * Class used by CScheduler clients which may schedule multiple jobs
101  * which are required to be run serially. Jobs may not be run on the
102  * same thread, but no two jobs will be executed
103  * at the same time and memory will be release-acquire consistent
104  * (the scheduler will internally do an acquire before invoking a callback
105  * as well as a release at the end). In practice this means that a callback
106  * B() will be able to observe all of the effects of callback A() which executed
107  * before it.
108  */
109 class SingleThreadedSchedulerClient {
110 private:
111     CScheduler *m_pscheduler;
112 
113     RecursiveMutex m_cs_callbacks_pending;
114     std::list<std::function<void ()>> m_callbacks_pending GUARDED_BY(m_cs_callbacks_pending);
115     bool m_are_callbacks_running GUARDED_BY(m_cs_callbacks_pending) = false;
116 
117     void MaybeScheduleProcessQueue();
118     void ProcessQueue();
119 
120 public:
SingleThreadedSchedulerClient(CScheduler * pschedulerIn)121     explicit SingleThreadedSchedulerClient(CScheduler *pschedulerIn) : m_pscheduler(pschedulerIn) {}
122 
123     /**
124      * Add a callback to be executed. Callbacks are executed serially
125      * and memory is release-acquire consistent between callback executions.
126      * Practically, this means that callbacks can behave as if they are executed
127      * in order by a single thread.
128      */
129     void AddToProcessQueue(std::function<void ()> func);
130 
131     // Processes all remaining queue members on the calling thread, blocking until queue is empty
132     // Must be called after the CScheduler has no remaining processing threads!
133     void EmptyQueue();
134 
135     size_t CallbacksPending();
136 };
137 
138 #endif
139