1 // Copyright (c) 2015-2020 The Bitcoin Core developers 2 // Distributed under the MIT software license, see the accompanying 3 // file COPYING or http://www.opensource.org/licenses/mit-license.php. 4 5 #ifndef BITCOIN_SCHEDULER_H 6 #define BITCOIN_SCHEDULER_H 7 8 // 9 // NOTE: 10 // boost::thread should be ported to std::thread 11 // when we support C++11. 12 // 13 #include <condition_variable> 14 #include <functional> 15 #include <list> 16 #include <map> 17 18 #include <sync.h> 19 20 // 21 // Simple class for background tasks that should be run 22 // periodically or once "after a while" 23 // 24 // Usage: 25 // 26 // CScheduler* s = new CScheduler(); 27 // s->scheduleFromNow(doSomething, std::chrono::milliseconds{11}); // Assuming a: void doSomething() { } 28 // s->scheduleFromNow([=] { this->func(argument); }, std::chrono::milliseconds{3}); 29 // boost::thread* t = new boost::thread(std::bind(CScheduler::serviceQueue, s)); 30 // 31 // ... then at program shutdown, make sure to call stop() to clean up the thread(s) running serviceQueue: 32 // s->stop(); 33 // t->join(); 34 // delete t; 35 // delete s; // Must be done after thread is interrupted/joined. 36 // 37 38 class CScheduler 39 { 40 public: 41 CScheduler(); 42 ~CScheduler(); 43 44 typedef std::function<void()> Function; 45 46 // Call func at/after time t 47 void schedule(Function f, std::chrono::system_clock::time_point t); 48 49 /** Call f once after the delta has passed */ scheduleFromNow(Function f,std::chrono::milliseconds delta)50 void scheduleFromNow(Function f, std::chrono::milliseconds delta) 51 { 52 schedule(std::move(f), std::chrono::system_clock::now() + delta); 53 } 54 55 /** 56 * Repeat f until the scheduler is stopped. First run is after delta has passed once. 57 * 58 * The timing is not exact: Every time f is finished, it is rescheduled to run again after delta. If you need more 59 * accurate scheduling, don't use this method. 60 */ 61 void scheduleEvery(Function f, std::chrono::milliseconds delta); 62 63 /** 64 * Mock the scheduler to fast forward in time. 65 * Iterates through items on taskQueue and reschedules them 66 * to be delta_seconds sooner. 67 */ 68 void MockForward(std::chrono::seconds delta_seconds); 69 70 // To keep things as simple as possible, there is no unschedule. 71 72 // Services the queue 'forever'. Should be run in a thread, 73 // and interrupted using boost::interrupt_thread 74 void serviceQueue(); 75 76 // Tell any threads running serviceQueue to stop as soon as they're 77 // done servicing whatever task they're currently servicing (drain=false) 78 // or when there is no work left to be done (drain=true) 79 void stop(bool drain=false); 80 81 // Returns number of tasks waiting to be serviced, 82 // and first and last task times 83 size_t getQueueInfo(std::chrono::system_clock::time_point &first, 84 std::chrono::system_clock::time_point &last) const; 85 86 // Returns true if there are threads actively running in serviceQueue() 87 bool AreThreadsServicingQueue() const; 88 89 private: 90 mutable Mutex newTaskMutex; 91 std::condition_variable newTaskScheduled; 92 std::multimap<std::chrono::system_clock::time_point, Function> taskQueue GUARDED_BY(newTaskMutex); GUARDED_BY(newTaskMutex)93 int nThreadsServicingQueue GUARDED_BY(newTaskMutex){0}; GUARDED_BY(newTaskMutex)94 bool stopRequested GUARDED_BY(newTaskMutex){false}; GUARDED_BY(newTaskMutex)95 bool stopWhenEmpty GUARDED_BY(newTaskMutex){false}; shouldStop()96 bool shouldStop() const EXCLUSIVE_LOCKS_REQUIRED(newTaskMutex) { return stopRequested || (stopWhenEmpty && taskQueue.empty()); } 97 }; 98 99 /** 100 * Class used by CScheduler clients which may schedule multiple jobs 101 * which are required to be run serially. Jobs may not be run on the 102 * same thread, but no two jobs will be executed 103 * at the same time and memory will be release-acquire consistent 104 * (the scheduler will internally do an acquire before invoking a callback 105 * as well as a release at the end). In practice this means that a callback 106 * B() will be able to observe all of the effects of callback A() which executed 107 * before it. 108 */ 109 class SingleThreadedSchedulerClient { 110 private: 111 CScheduler *m_pscheduler; 112 113 RecursiveMutex m_cs_callbacks_pending; 114 std::list<std::function<void ()>> m_callbacks_pending GUARDED_BY(m_cs_callbacks_pending); 115 bool m_are_callbacks_running GUARDED_BY(m_cs_callbacks_pending) = false; 116 117 void MaybeScheduleProcessQueue(); 118 void ProcessQueue(); 119 120 public: SingleThreadedSchedulerClient(CScheduler * pschedulerIn)121 explicit SingleThreadedSchedulerClient(CScheduler *pschedulerIn) : m_pscheduler(pschedulerIn) {} 122 123 /** 124 * Add a callback to be executed. Callbacks are executed serially 125 * and memory is release-acquire consistent between callback executions. 126 * Practically, this means that callbacks can behave as if they are executed 127 * in order by a single thread. 128 */ 129 void AddToProcessQueue(std::function<void ()> func); 130 131 // Processes all remaining queue members on the calling thread, blocking until queue is empty 132 // Must be called after the CScheduler has no remaining processing threads! 133 void EmptyQueue(); 134 135 size_t CallbacksPending(); 136 }; 137 138 #endif 139