1 // ==========================================================================
2 //                 SeqAn - The Library for Sequence Analysis
3 // ==========================================================================
4 // Copyright (c) 2006-2018, Knut Reinert, FU Berlin
5 // All rights reserved.
6 //
7 // Redistribution and use in source and binary forms, with or without
8 // modification, are permitted provided that the following conditions are met:
9 //
10 //     * Redistributions of source code must retain the above copyright
11 //       notice, this list of conditions and the following disclaimer.
12 //     * Redistributions in binary form must reproduce the above copyright
13 //       notice, this list of conditions and the following disclaimer in the
14 //       documentation and/or other materials provided with the distribution.
15 //     * Neither the name of Knut Reinert or the FU Berlin nor the names of
16 //       its contributors may be used to endorse or promote products derived
17 //       from this software without specific prior written permission.
18 //
19 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 // AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22 // ARE DISCLAIMED. IN NO EVENT SHALL KNUT REINERT OR THE FU BERLIN BE LIABLE
23 // FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24 // DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25 // SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26 // CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
27 // LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
28 // OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
29 // DAMAGE.
30 //
31 // ==========================================================================
32 // Author: Rene Rahn <rene.rahn@fu-berlin.de>
33 // ==========================================================================
34 
35 #ifndef INCLUDE_SEQAN_ALIGN_PARALLEL_PARALLEL_ALIGNMENT_SCHEDULER_H_
36 #define INCLUDE_SEQAN_ALIGN_PARALLEL_PARALLEL_ALIGNMENT_SCHEDULER_H_
37 
38 namespace seqan
39 {
40 
41 // ============================================================================
42 // Forwards
43 // ============================================================================
44 
45 // ============================================================================
46 // Tags, Classes, Enums
47 // ============================================================================
48 
49 // Yet internal class. Might need some redesign to make it truly generic.
50 /*
51  * @class WavefrontAlignmentScheduler
52  * @headerfile <align_parallel.h>
53  * @brief A generic scheduler allowing to execute callables with a ring buffer for the stored tasks.
54  *
55  * @signature class WavefrontAlignmentScheduler;
56  *
57  * This schedule is at the moment only used for the wave-front alignment execution but could be generalized later.
58  * It stores all scheduled callables in a @link ConcurrentSuspendableQueue @endlink which can hold a user defined
59  * number of callables at the same time. It then uses recycable ids to fill up the queue with waiting jobs.
60  * If the queue is full and a thread tries to add a new job, it will be suspended, until resources are freed by
61  * the scheduler.
62  */
63 class WavefrontAlignmentScheduler
64 {
65 public:
66 
67     //-------------------------------------------------------------------------
68     // Member Types.
69 
70     using TCallable         = std::function<void(uint16_t)>;
71     using TAlignmentQueue   = ConcurrentQueue<TCallable, Suspendable<Limit>>;
72     using TRecycleList      = std::list<uint16_t>;
73 
74     //-------------------------------------------------------------------------
75     // Private Member Variables.
76 
77     WavefrontTaskScheduler  _taskScheduler;
78     ThreadPool              _pool;
79     TRecycleList            _recycableIds;
80     TAlignmentQueue         _queue;
81     bool                    _receivedEndSignal;
82 
83     std::mutex              _mutexRecycleId;
84     unsigned                _numParallelAlignments;
85 
86     std::mutex                       _mutexPushException;
87     std::vector<std::exception_ptr>  _exceptionPointers;
88 
89     std::atomic<bool>               _isValid{true};
90 
91     std::function<void()> job = [this] ()
92     {
93         while (true)
94         {
95             TCallable callable;
96             if (!popFront(callable, _queue))
97                 break;  // End of thread => No writers and queue is empty.
98 
99             uint16_t id = -1;
100 
101             { // Receive id.
102                 std::lock_guard<std::mutex> lck(_mutexRecycleId);
103                 SEQAN_ASSERT_NOT(_recycableIds.empty());
104                 id = _recycableIds.front();
105                 _recycableIds.pop_front();
106             }
107 
108             try
109             {
110                 callable(id);  // invokes the alignment with assigned id.
111             }
catch(...)112             catch (...)
113             {  // Catch any exception thrown by callable. Store exception, and set *this invalid.
114                // We still keep running until the queue is empty. The thread is cleaned either by,
115                // explicit wait or by destruction of *this.
116                 _isValid.store(false, std::memory_order_release);
117                 {
118                     std::lock_guard<std::mutex> lck(_mutexPushException);
119                     _exceptionPointers.push_back(std::current_exception());
120                 }
121             }
122 
123             // Check if task scheduler is still valid.
124             // If not, something went wrong, and we should not continue adding new tasks.
125             // So we propagate the invalid state to *this and break exceution chain.
126             if (!isValid(_taskScheduler))
127             {
128                 _isValid.store(false, std::memory_order_release);
129             }
130 
131             { // recycle id, when done.
132                 std::lock_guard<std::mutex> lck(_mutexRecycleId);
133                 _recycableIds.push_back(id);
134             }
135         }
136         unlockReading(_queue);  // Notify that this reader is finished.
137         unlockWriting(_taskScheduler);  // Notify that this writer is finished.
138     };
139 
140     //-------------------------------------------------------------------------
141     // Constructors.
142 
143     // implicitly deleted default constructor.
144 
WavefrontAlignmentScheduler(size_t const numParallelAlignments,size_t const numParallelTasks)145     WavefrontAlignmentScheduler(size_t const numParallelAlignments, size_t const numParallelTasks) :
146         _taskScheduler(numParallelTasks),
147         _queue(numParallelAlignments),
148         _receivedEndSignal(false),
149         _numParallelAlignments(numParallelAlignments)
150     {
151         SEQAN_ASSERT_GT(numParallelAlignments, 0u);  // Bad if reader is 0.
152 
153         // Setup recycable ids.
154         _recycableIds.resize(numParallelAlignments);
155         std::iota(std::begin(_recycableIds), std::end(_recycableIds), 0);
156 
157         setReaderWriterCount(_queue, numParallelAlignments, 1);
158 
159         _exceptionPointers.resize(numParallelAlignments, nullptr);
160 
161         try
162         { // Create the threads here, later we can try to make lazy thread creation.
163             for (unsigned i = 0; i < numParallelAlignments; ++i)
164             {
165                 spawn(_pool, job);
166             }
167         }
168         catch (...)  // Make sure all the spawned threads are safely stopped before re-throwing the exception.
169         {
170             unlockWriting(_queue);
171             waitForWriters(_taskScheduler);
172             join(_pool);
173             throw;
174         }
175 
176         setWriterCount(_taskScheduler, numParallelAlignments);
177         // Notify task scheduler, that everything was setup correctly.
178         for (unsigned i = 0; i < numParallelAlignments; ++i)
179         {
180             lockWriting(_taskScheduler);
181         }
182         waitForWriters(_taskScheduler);  // Invoke task scheduler.
183     }
184 
185     // Default constructor.
WavefrontAlignmentScheduler()186     WavefrontAlignmentScheduler() : WavefrontAlignmentScheduler(16, 8)
187     {}
188 
189     // Copy & Move C'tor
190     WavefrontAlignmentScheduler(WavefrontAlignmentScheduler const &) = delete;
191     WavefrontAlignmentScheduler(WavefrontAlignmentScheduler &&)      = delete;
192 
193     ///-------------------------------------------------------------------------
194     // Destructor.
195 
~WavefrontAlignmentScheduler()196     ~WavefrontAlignmentScheduler()
197     {
198         // Signal that no more alignments will be added.
199         if (!_receivedEndSignal)
200             unlockWriting(_queue);
201 
202         SEQAN_ASSERT(_queue.writerCount == 0);
203 
204         // Wait until all remaining threads are finished with their execution.
205         join(_pool);
206 
207         // In destructor of thread pool we wait for the outstanding alignments to be finished
208         // and then continue destruction of the remaining members and cleaning up the stack.
209     }
210 
211     // ------------------------------------------------------------------------
212     // Member Functions.
213 
214     // Copy & Move assignment
215     WavefrontAlignmentScheduler& operator=(WavefrontAlignmentScheduler const &) = delete;
216     WavefrontAlignmentScheduler& operator=(WavefrontAlignmentScheduler &&)      = delete;
217 };
218 
219 // ============================================================================
220 // Metafunctions
221 // ============================================================================
222 
223 template<>
224 struct SchedulerTraits<WavefrontAlignmentScheduler>
225 {
226     using TTask = typename WavefrontAlignmentScheduler::TCallable;
227 };
228 
229 // ============================================================================
230 // Functions
231 // ============================================================================
232 
233 /*
234  * @fn WavefrontAlignmentScheduler#isValid
235  * @headerfile <align_parallel.h>
236  * @brief Checks if scheduler is in a valid state. This means that no callable has terminated with an exception.
237  */
238 inline bool
239 isValid(WavefrontAlignmentScheduler const & me)
240 {
241     return me._isValid.load(std::memory_order_acquire);
242 }
243 
244 /*
245  * @fn WavefrontAlignmentScheduler#scheduleTask
246  * @headerfile <align_parallel.h>
247  * @brief Adds a new task to the scheduler. Suspends until resources become available.
248  * @throws ExceptionType?
249  */
250 // basic exception-safety guarantee.
251 // Throws if appendValue failed.
252 inline void
253 scheduleTask(WavefrontAlignmentScheduler & me,
254              typename SchedulerTraits<WavefrontAlignmentScheduler>::TTask && callable)
255 {
256     if (!isValid(me))
257         throw std::runtime_error("Invalid alignment scheduler!");
258 
259     // Spins until there is enough space to add to the queue.
260     if (!appendValue(me._queue, std::forward<decltype(callable)>(callable)))
261         throw std::runtime_error("Invalid alignment scheduler 2!");
262 }
263 
264 inline void
265 scheduleTask(WavefrontAlignmentScheduler & me,
266              typename SchedulerTraits<WavefrontAlignmentScheduler>::TTask & callable)
267 {
268     if (!isValid(me))
269         throw std::runtime_error("Invalid alignment scheduler!");
270     // Spins until there is enough space to add to the queue.
271     if(!appendValue(me._queue, callable))
272         throw std::runtime_error("Invalid alignment scheduler 2!");
273 }
274 
275 /*
276  * @fn WavefrontAlignmentScheduler#notify
277  * @headerfile <align_parallel.h>
278  * @brief Notify the scheduler that no more jobs will follow.
279  */
280 inline void
281 notify(WavefrontAlignmentScheduler & me)
282 {
283     unlockWriting(me._queue);
284     me._receivedEndSignal = true;
285 }
286 
287 /*
288  * @fn WavefrontAlignmentScheduler#wait
289  * @headerfile <align_parallel.h>
290  * @brief Explicit barrier on the scheduler. Suspends until all scheduled jobs have been finsihed.
291  *
292  * Note, can dead lock if notify is never called.
293  */
294 // Only possible if some other thread is signaling the end of it.
295 inline void
296 wait(WavefrontAlignmentScheduler & me)
297 {
298     join(me._pool);
299     wait(me._taskScheduler);
300 }
301 
302 /*
303  * @fn WavefrontAlignmentScheduler#wait2
304  * @headerfile <align_parallel.h>
305  * @brief Explicit barrier on the scheduler. Suspends until all scheduled jobs have been finsihed.
306  *
307  * Note, can dead lock if notify is never called.
308  */
309 template <typename TNotifiable>
310 inline void
311 wait2(WavefrontAlignmentScheduler & me, TNotifiable & notifiable)
312 {
313     join(me._pool);
314     notify(notifiable);
315     wait(me._taskScheduler);
316 }
317 
318 /*
319  * @fn WavefrontAlignmentScheduler#getExceptions
320  * @headerfile <align_parallel.h>
321  * @brief Returns vector of captured exceptions if any was thrown by the callable.
322  *
323  * Note, can dead lock if notify is never called.
324  */
325 inline auto
326 getExceptions(WavefrontAlignmentScheduler & me)
327 {
328     auto vec = me._exceptionPointers;
329     auto innerExceptions = getExceptions(me._taskScheduler);
330     std::copy(std::begin(innerExceptions), std::end(innerExceptions), std::back_inserter(vec));
331     return vec;
332 }
333 
334 /*
335  * @fn WavefrontAlignmentScheduler#taskScheduler
336  * @headerfile <align_parallel.h>
337  * @brief Returns lvalue reference to the underlying task_scheduler.
338  */
339 inline auto&
340 taskScheduler(WavefrontAlignmentScheduler & me)
341 {
342     return me._taskScheduler;
343 }
344 
345 }  // namespace seqan
346 
347 #endif  // #ifndef INCLUDE_SEQAN_ALIGN_PARALLEL_PARALLEL_ALIGNMENT_SCHEDULER_H_
348