1 /******************************************************************************* 2 * thrill/net/dispatcher_thread.hpp 3 * 4 * Asynchronous callback wrapper around select(), epoll(), or other kernel-level 5 * dispatchers. 6 * 7 * Part of Project Thrill - http://project-thrill.org 8 * 9 * Copyright (C) 2015 Timo Bingmann <tb@panthema.net> 10 * 11 * All rights reserved. Published under the BSD-2 license in the LICENSE file. 12 ******************************************************************************/ 13 14 #pragma once 15 #ifndef THRILL_NET_DISPATCHER_THREAD_HEADER 16 #define THRILL_NET_DISPATCHER_THREAD_HEADER 17 18 #include <thrill/common/concurrent_queue.hpp> 19 #include <thrill/data/block.hpp> 20 #include <thrill/mem/allocator.hpp> 21 #include <thrill/net/buffer.hpp> 22 #include <thrill/net/connection.hpp> 23 #include <tlx/delegate.hpp> 24 25 #include <string> 26 27 namespace thrill { 28 namespace net { 29 30 //! \addtogroup net_layer 31 //! \{ 32 33 //! Signature of timer callbacks. 34 using TimerCallback = tlx::delegate<bool (), mem::GPoolAllocator<char> >; 35 36 //! Signature of async connection readability/writability callbacks. 37 using AsyncCallback = tlx::delegate<bool (), mem::GPoolAllocator<char> >; 38 39 //! Signature of async read callbacks. 40 using AsyncReadCallback = tlx::delegate< 41 void (Connection& c, Buffer&& buffer), mem::GPoolAllocator<char> >; 42 43 //! Signature of async read ByteBlock callbacks. 44 using AsyncReadByteBlockCallback = tlx::delegate< 45 void (Connection& c, data::PinnedByteBlockPtr&& block), 46 mem::GPoolAllocator<char> >; 47 48 //! Signature of async write callbacks. 49 using AsyncWriteCallback = tlx::delegate< 50 void (Connection&), mem::GPoolAllocator<char> >; 51 52 //! Signature of generic dispatcher callback. 53 using AsyncDispatcherThreadCallback = tlx::delegate< 54 void (class Dispatcher&), mem::GPoolAllocator<char> >; 55 56 /*! 57 * DispatcherThread contains a net::Dispatcher object and an associated thread 58 * that runs in the dispatching loop. 59 */ 60 class DispatcherThread 61 { 62 static constexpr bool debug = false; 63 64 public: 65 //! Signature of async jobs to be run by the dispatcher thread. 66 using Job = tlx::delegate<void (), mem::GPoolAllocator<char> >; 67 68 DispatcherThread( 69 std::unique_ptr<class Dispatcher> dispatcher, 70 size_t host_rank); 71 72 ~DispatcherThread(); 73 74 //! non-copyable: delete copy-constructor 75 DispatcherThread(const DispatcherThread&) = delete; 76 //! non-copyable: delete assignment operator 77 DispatcherThread& operator = (const DispatcherThread&) = delete; 78 79 //! Terminate the dispatcher thread (if now already done). 80 void Terminate(); 81 82 //! Run generic callback in dispatcher thread to enqueue stuff. 83 void RunInThread(const AsyncDispatcherThreadCallback& cb); 84 85 //! \name Timeout Callbacks 86 //! \{ 87 88 //! Register a relative timeout callback 89 void AddTimer(std::chrono::milliseconds timeout, const TimerCallback& cb); 90 91 //! \} 92 93 //! \name Connection Callbacks 94 //! \{ 95 96 //! Register a buffered read callback and a default exception callback. 97 void AddRead(Connection& c, const AsyncCallback& read_cb); 98 99 //! Register a buffered write callback and a default exception callback. 100 void AddWrite(Connection& c, const AsyncCallback& write_cb); 101 102 //! Cancel all callbacks on a given connection. 103 void Cancel(Connection& c); 104 105 //! \} 106 107 //! \name Asynchronous Data Reader/Writer Callbacks 108 //! \{ 109 110 //! asynchronously read n bytes and deliver them to the callback 111 void AsyncRead(Connection& c, uint32_t seq, size_t size, 112 const AsyncReadCallback& done_cb); 113 114 //! asynchronously read the full ByteBlock and deliver it to the callback 115 void AsyncRead(Connection& c, uint32_t seq, size_t size, 116 data::PinnedByteBlockPtr&& block, 117 const AsyncReadByteBlockCallback& done_cb); 118 119 //! asynchronously write byte and block and callback when delivered. The 120 //! block is reference counted by the async writer. 121 void AsyncWrite(Connection& c, uint32_t seq, Buffer&& buffer, 122 const AsyncWriteCallback& done_cb = AsyncWriteCallback()); 123 124 //! asynchronously write TWO buffers and callback when delivered. The 125 //! buffer2 are MOVED into the async writer. This is most useful to write a 126 //! header and a payload Buffers that are hereby guaranteed to be written in 127 //! order. 128 void AsyncWrite(Connection& c, uint32_t seq, 129 Buffer&& buffer, data::PinnedBlock&& block, 130 const AsyncWriteCallback& done_cb = AsyncWriteCallback()); 131 132 //! asynchronously write buffer and callback when delivered. COPIES the data 133 //! into a Buffer! 134 void AsyncWriteCopy( 135 Connection& c, uint32_t seq, const void* buffer, size_t size, 136 const AsyncWriteCallback& done_cb = AsyncWriteCallback()); 137 138 //! asynchronously write buffer and callback when delivered. COPIES the data 139 //! into a Buffer! 140 void AsyncWriteCopy( 141 Connection& c, uint32_t seq, const std::string& str, 142 const AsyncWriteCallback& done_cb = AsyncWriteCallback()); 143 144 //! \} 145 146 private: 147 //! Enqueue job in queue for dispatching thread to run at its discretion. 148 void Enqueue(Job&& job); 149 150 //! What happens in the dispatcher thread 151 void Work(); 152 153 //! wake up select() in dispatching thread. 154 void WakeUpThread(); 155 156 private: 157 //! Queue of jobs to be run by dispatching thread at its discretion. 158 common::ConcurrentQueue<Job, mem::GPoolAllocator<Job> > jobqueue_; 159 160 //! thread of dispatcher 161 std::thread thread_; 162 163 //! enclosed dispatcher. 164 std::unique_ptr<class Dispatcher> dispatcher_; 165 166 //! termination flag 167 std::atomic<bool> terminate_ { false }; 168 169 //! whether to call Interrupt() in WakeUpThread() 170 std::atomic<bool> busy_ { false }; 171 172 //! for thread name for logging 173 size_t host_rank_; 174 }; 175 176 //! \} 177 178 } // namespace net 179 } // namespace thrill 180 181 #endif // !THRILL_NET_DISPATCHER_THREAD_HEADER 182 183 /******************************************************************************/ 184