1 // ========================================================================== 2 // SeqAn - The Library for Sequence Analysis 3 // ========================================================================== 4 // Copyright (c) 2006-2018, Knut Reinert, FU Berlin 5 // All rights reserved. 6 // 7 // Redistribution and use in source and binary forms, with or without 8 // modification, are permitted provided that the following conditions are met: 9 // 10 // * Redistributions of source code must retain the above copyright 11 // notice, this list of conditions and the following disclaimer. 12 // * Redistributions in binary form must reproduce the above copyright 13 // notice, this list of conditions and the following disclaimer in the 14 // documentation and/or other materials provided with the distribution. 15 // * Neither the name of Knut Reinert or the FU Berlin nor the names of 16 // its contributors may be used to endorse or promote products derived 17 // from this software without specific prior written permission. 18 // 19 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 20 // AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 21 // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 22 // ARE DISCLAIMED. IN NO EVENT SHALL KNUT REINERT OR THE FU BERLIN BE LIABLE 23 // FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 24 // DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR 25 // SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 26 // CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 27 // LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 28 // OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH 29 // DAMAGE. 30 // 31 // ========================================================================== 32 // Author: Rene Rahn <rene.rahn@fu-berlin.de> 33 // ========================================================================== 34 35 #ifndef INCLUDE_SEQAN_ALIGN_PARALLEL_PARALLEL_ALIGNMENT_SCHEDULER_H_ 36 #define INCLUDE_SEQAN_ALIGN_PARALLEL_PARALLEL_ALIGNMENT_SCHEDULER_H_ 37 38 namespace seqan 39 { 40 41 // ============================================================================ 42 // Forwards 43 // ============================================================================ 44 45 // ============================================================================ 46 // Tags, Classes, Enums 47 // ============================================================================ 48 49 // Yet internal class. Might need some redesign to make it truly generic. 50 /* 51 * @class WavefrontAlignmentScheduler 52 * @headerfile <align_parallel.h> 53 * @brief A generic scheduler allowing to execute callables with a ring buffer for the stored tasks. 54 * 55 * @signature class WavefrontAlignmentScheduler; 56 * 57 * This schedule is at the moment only used for the wave-front alignment execution but could be generalized later. 58 * It stores all scheduled callables in a @link ConcurrentSuspendableQueue @endlink which can hold a user defined 59 * number of callables at the same time. It then uses recycable ids to fill up the queue with waiting jobs. 60 * If the queue is full and a thread tries to add a new job, it will be suspended, until resources are freed by 61 * the scheduler. 62 */ 63 class WavefrontAlignmentScheduler 64 { 65 public: 66 67 //------------------------------------------------------------------------- 68 // Member Types. 69 70 using TCallable = std::function<void(uint16_t)>; 71 using TAlignmentQueue = ConcurrentQueue<TCallable, Suspendable<Limit>>; 72 using TRecycleList = std::list<uint16_t>; 73 74 //------------------------------------------------------------------------- 75 // Private Member Variables. 76 77 WavefrontTaskScheduler _taskScheduler; 78 ThreadPool _pool; 79 TRecycleList _recycableIds; 80 TAlignmentQueue _queue; 81 bool _receivedEndSignal; 82 83 std::mutex _mutexRecycleId; 84 unsigned _numParallelAlignments; 85 86 std::mutex _mutexPushException; 87 std::vector<std::exception_ptr> _exceptionPointers; 88 89 std::atomic<bool> _isValid{true}; 90 91 std::function<void()> job = [this] () 92 { 93 while (true) 94 { 95 TCallable callable; 96 if (!popFront(callable, _queue)) 97 break; // End of thread => No writers and queue is empty. 98 99 uint16_t id = -1; 100 101 { // Receive id. 102 std::lock_guard<std::mutex> lck(_mutexRecycleId); 103 SEQAN_ASSERT_NOT(_recycableIds.empty()); 104 id = _recycableIds.front(); 105 _recycableIds.pop_front(); 106 } 107 108 try 109 { 110 callable(id); // invokes the alignment with assigned id. 111 } catch(...)112 catch (...) 113 { // Catch any exception thrown by callable. Store exception, and set *this invalid. 114 // We still keep running until the queue is empty. The thread is cleaned either by, 115 // explicit wait or by destruction of *this. 116 _isValid.store(false, std::memory_order_release); 117 { 118 std::lock_guard<std::mutex> lck(_mutexPushException); 119 _exceptionPointers.push_back(std::current_exception()); 120 } 121 } 122 123 // Check if task scheduler is still valid. 124 // If not, something went wrong, and we should not continue adding new tasks. 125 // So we propagate the invalid state to *this and break exceution chain. 126 if (!isValid(_taskScheduler)) 127 { 128 _isValid.store(false, std::memory_order_release); 129 } 130 131 { // recycle id, when done. 132 std::lock_guard<std::mutex> lck(_mutexRecycleId); 133 _recycableIds.push_back(id); 134 } 135 } 136 unlockReading(_queue); // Notify that this reader is finished. 137 unlockWriting(_taskScheduler); // Notify that this writer is finished. 138 }; 139 140 //------------------------------------------------------------------------- 141 // Constructors. 142 143 // implicitly deleted default constructor. 144 WavefrontAlignmentScheduler(size_t const numParallelAlignments,size_t const numParallelTasks)145 WavefrontAlignmentScheduler(size_t const numParallelAlignments, size_t const numParallelTasks) : 146 _taskScheduler(numParallelTasks), 147 _queue(numParallelAlignments), 148 _receivedEndSignal(false), 149 _numParallelAlignments(numParallelAlignments) 150 { 151 SEQAN_ASSERT_GT(numParallelAlignments, 0u); // Bad if reader is 0. 152 153 // Setup recycable ids. 154 _recycableIds.resize(numParallelAlignments); 155 std::iota(std::begin(_recycableIds), std::end(_recycableIds), 0); 156 157 setReaderWriterCount(_queue, numParallelAlignments, 1); 158 159 _exceptionPointers.resize(numParallelAlignments, nullptr); 160 161 try 162 { // Create the threads here, later we can try to make lazy thread creation. 163 for (unsigned i = 0; i < numParallelAlignments; ++i) 164 { 165 spawn(_pool, job); 166 } 167 } 168 catch (...) // Make sure all the spawned threads are safely stopped before re-throwing the exception. 169 { 170 unlockWriting(_queue); 171 waitForWriters(_taskScheduler); 172 join(_pool); 173 throw; 174 } 175 176 setWriterCount(_taskScheduler, numParallelAlignments); 177 // Notify task scheduler, that everything was setup correctly. 178 for (unsigned i = 0; i < numParallelAlignments; ++i) 179 { 180 lockWriting(_taskScheduler); 181 } 182 waitForWriters(_taskScheduler); // Invoke task scheduler. 183 } 184 185 // Default constructor. WavefrontAlignmentScheduler()186 WavefrontAlignmentScheduler() : WavefrontAlignmentScheduler(16, 8) 187 {} 188 189 // Copy & Move C'tor 190 WavefrontAlignmentScheduler(WavefrontAlignmentScheduler const &) = delete; 191 WavefrontAlignmentScheduler(WavefrontAlignmentScheduler &&) = delete; 192 193 ///------------------------------------------------------------------------- 194 // Destructor. 195 ~WavefrontAlignmentScheduler()196 ~WavefrontAlignmentScheduler() 197 { 198 // Signal that no more alignments will be added. 199 if (!_receivedEndSignal) 200 unlockWriting(_queue); 201 202 SEQAN_ASSERT(_queue.writerCount == 0); 203 204 // Wait until all remaining threads are finished with their execution. 205 join(_pool); 206 207 // In destructor of thread pool we wait for the outstanding alignments to be finished 208 // and then continue destruction of the remaining members and cleaning up the stack. 209 } 210 211 // ------------------------------------------------------------------------ 212 // Member Functions. 213 214 // Copy & Move assignment 215 WavefrontAlignmentScheduler& operator=(WavefrontAlignmentScheduler const &) = delete; 216 WavefrontAlignmentScheduler& operator=(WavefrontAlignmentScheduler &&) = delete; 217 }; 218 219 // ============================================================================ 220 // Metafunctions 221 // ============================================================================ 222 223 template<> 224 struct SchedulerTraits<WavefrontAlignmentScheduler> 225 { 226 using TTask = typename WavefrontAlignmentScheduler::TCallable; 227 }; 228 229 // ============================================================================ 230 // Functions 231 // ============================================================================ 232 233 /* 234 * @fn WavefrontAlignmentScheduler#isValid 235 * @headerfile <align_parallel.h> 236 * @brief Checks if scheduler is in a valid state. This means that no callable has terminated with an exception. 237 */ 238 inline bool 239 isValid(WavefrontAlignmentScheduler const & me) 240 { 241 return me._isValid.load(std::memory_order_acquire); 242 } 243 244 /* 245 * @fn WavefrontAlignmentScheduler#scheduleTask 246 * @headerfile <align_parallel.h> 247 * @brief Adds a new task to the scheduler. Suspends until resources become available. 248 * @throws ExceptionType? 249 */ 250 // basic exception-safety guarantee. 251 // Throws if appendValue failed. 252 inline void 253 scheduleTask(WavefrontAlignmentScheduler & me, 254 typename SchedulerTraits<WavefrontAlignmentScheduler>::TTask && callable) 255 { 256 if (!isValid(me)) 257 throw std::runtime_error("Invalid alignment scheduler!"); 258 259 // Spins until there is enough space to add to the queue. 260 if (!appendValue(me._queue, std::forward<decltype(callable)>(callable))) 261 throw std::runtime_error("Invalid alignment scheduler 2!"); 262 } 263 264 inline void 265 scheduleTask(WavefrontAlignmentScheduler & me, 266 typename SchedulerTraits<WavefrontAlignmentScheduler>::TTask & callable) 267 { 268 if (!isValid(me)) 269 throw std::runtime_error("Invalid alignment scheduler!"); 270 // Spins until there is enough space to add to the queue. 271 if(!appendValue(me._queue, callable)) 272 throw std::runtime_error("Invalid alignment scheduler 2!"); 273 } 274 275 /* 276 * @fn WavefrontAlignmentScheduler#notify 277 * @headerfile <align_parallel.h> 278 * @brief Notify the scheduler that no more jobs will follow. 279 */ 280 inline void 281 notify(WavefrontAlignmentScheduler & me) 282 { 283 unlockWriting(me._queue); 284 me._receivedEndSignal = true; 285 } 286 287 /* 288 * @fn WavefrontAlignmentScheduler#wait 289 * @headerfile <align_parallel.h> 290 * @brief Explicit barrier on the scheduler. Suspends until all scheduled jobs have been finsihed. 291 * 292 * Note, can dead lock if notify is never called. 293 */ 294 // Only possible if some other thread is signaling the end of it. 295 inline void 296 wait(WavefrontAlignmentScheduler & me) 297 { 298 join(me._pool); 299 wait(me._taskScheduler); 300 } 301 302 /* 303 * @fn WavefrontAlignmentScheduler#wait2 304 * @headerfile <align_parallel.h> 305 * @brief Explicit barrier on the scheduler. Suspends until all scheduled jobs have been finsihed. 306 * 307 * Note, can dead lock if notify is never called. 308 */ 309 template <typename TNotifiable> 310 inline void 311 wait2(WavefrontAlignmentScheduler & me, TNotifiable & notifiable) 312 { 313 join(me._pool); 314 notify(notifiable); 315 wait(me._taskScheduler); 316 } 317 318 /* 319 * @fn WavefrontAlignmentScheduler#getExceptions 320 * @headerfile <align_parallel.h> 321 * @brief Returns vector of captured exceptions if any was thrown by the callable. 322 * 323 * Note, can dead lock if notify is never called. 324 */ 325 inline auto 326 getExceptions(WavefrontAlignmentScheduler & me) 327 { 328 auto vec = me._exceptionPointers; 329 auto innerExceptions = getExceptions(me._taskScheduler); 330 std::copy(std::begin(innerExceptions), std::end(innerExceptions), std::back_inserter(vec)); 331 return vec; 332 } 333 334 /* 335 * @fn WavefrontAlignmentScheduler#taskScheduler 336 * @headerfile <align_parallel.h> 337 * @brief Returns lvalue reference to the underlying task_scheduler. 338 */ 339 inline auto& 340 taskScheduler(WavefrontAlignmentScheduler & me) 341 { 342 return me._taskScheduler; 343 } 344 345 } // namespace seqan 346 347 #endif // #ifndef INCLUDE_SEQAN_ALIGN_PARALLEL_PARALLEL_ALIGNMENT_SCHEDULER_H_ 348