1 /*******************************************************************************
2  * thrill/net/dispatcher_thread.hpp
3  *
4  * Asynchronous callback wrapper around select(), epoll(), or other kernel-level
5  * dispatchers.
6  *
7  * Part of Project Thrill - http://project-thrill.org
8  *
9  * Copyright (C) 2015 Timo Bingmann <tb@panthema.net>
10  *
11  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
12  ******************************************************************************/
13 
14 #pragma once
15 #ifndef THRILL_NET_DISPATCHER_THREAD_HEADER
16 #define THRILL_NET_DISPATCHER_THREAD_HEADER
17 
18 #include <thrill/common/concurrent_queue.hpp>
19 #include <thrill/data/block.hpp>
20 #include <thrill/mem/allocator.hpp>
21 #include <thrill/net/buffer.hpp>
22 #include <thrill/net/connection.hpp>
23 #include <tlx/delegate.hpp>
24 
25 #include <string>
26 
27 namespace thrill {
28 namespace net {
29 
30 //! \addtogroup net_layer
31 //! \{
32 
33 //! Signature of timer callbacks.
34 using TimerCallback = tlx::delegate<bool (), mem::GPoolAllocator<char> >;
35 
36 //! Signature of async connection readability/writability callbacks.
37 using AsyncCallback = tlx::delegate<bool (), mem::GPoolAllocator<char> >;
38 
39 //! Signature of async read callbacks.
40 using AsyncReadCallback = tlx::delegate<
41     void (Connection& c, Buffer&& buffer), mem::GPoolAllocator<char> >;
42 
43 //! Signature of async read ByteBlock callbacks.
44 using AsyncReadByteBlockCallback = tlx::delegate<
45     void (Connection& c, data::PinnedByteBlockPtr&& block),
46     mem::GPoolAllocator<char> >;
47 
48 //! Signature of async write callbacks.
49 using AsyncWriteCallback = tlx::delegate<
50     void (Connection&), mem::GPoolAllocator<char> >;
51 
52 //! Signature of generic dispatcher callback.
53 using AsyncDispatcherThreadCallback = tlx::delegate<
54     void (class Dispatcher&), mem::GPoolAllocator<char> >;
55 
56 /*!
57  * DispatcherThread contains a net::Dispatcher object and an associated thread
58  * that runs in the dispatching loop.
59  */
60 class DispatcherThread
61 {
62     static constexpr bool debug = false;
63 
64 public:
65     //! Signature of async jobs to be run by the dispatcher thread.
66     using Job = tlx::delegate<void (), mem::GPoolAllocator<char> >;
67 
68     DispatcherThread(
69         std::unique_ptr<class Dispatcher> dispatcher,
70         size_t host_rank);
71 
72     ~DispatcherThread();
73 
74     //! non-copyable: delete copy-constructor
75     DispatcherThread(const DispatcherThread&) = delete;
76     //! non-copyable: delete assignment operator
77     DispatcherThread& operator = (const DispatcherThread&) = delete;
78 
79     //! Terminate the dispatcher thread (if now already done).
80     void Terminate();
81 
82     //! Run generic callback in dispatcher thread to enqueue stuff.
83     void RunInThread(const AsyncDispatcherThreadCallback& cb);
84 
85     //! \name Timeout Callbacks
86     //! \{
87 
88     //! Register a relative timeout callback
89     void AddTimer(std::chrono::milliseconds timeout, const TimerCallback& cb);
90 
91     //! \}
92 
93     //! \name Connection Callbacks
94     //! \{
95 
96     //! Register a buffered read callback and a default exception callback.
97     void AddRead(Connection& c, const AsyncCallback& read_cb);
98 
99     //! Register a buffered write callback and a default exception callback.
100     void AddWrite(Connection& c, const AsyncCallback& write_cb);
101 
102     //! Cancel all callbacks on a given connection.
103     void Cancel(Connection& c);
104 
105     //! \}
106 
107     //! \name Asynchronous Data Reader/Writer Callbacks
108     //! \{
109 
110     //! asynchronously read n bytes and deliver them to the callback
111     void AsyncRead(Connection& c, uint32_t seq, size_t size,
112                    const AsyncReadCallback& done_cb);
113 
114     //! asynchronously read the full ByteBlock and deliver it to the callback
115     void AsyncRead(Connection& c, uint32_t seq, size_t size,
116                    data::PinnedByteBlockPtr&& block,
117                    const AsyncReadByteBlockCallback& done_cb);
118 
119     //! asynchronously write byte and block and callback when delivered. The
120     //! block is reference counted by the async writer.
121     void AsyncWrite(Connection& c, uint32_t seq, Buffer&& buffer,
122                     const AsyncWriteCallback& done_cb = AsyncWriteCallback());
123 
124     //! asynchronously write TWO buffers and callback when delivered. The
125     //! buffer2 are MOVED into the async writer. This is most useful to write a
126     //! header and a payload Buffers that are hereby guaranteed to be written in
127     //! order.
128     void AsyncWrite(Connection& c, uint32_t seq,
129                     Buffer&& buffer, data::PinnedBlock&& block,
130                     const AsyncWriteCallback& done_cb = AsyncWriteCallback());
131 
132     //! asynchronously write buffer and callback when delivered. COPIES the data
133     //! into a Buffer!
134     void AsyncWriteCopy(
135         Connection& c, uint32_t seq, const void* buffer, size_t size,
136         const AsyncWriteCallback& done_cb = AsyncWriteCallback());
137 
138     //! asynchronously write buffer and callback when delivered. COPIES the data
139     //! into a Buffer!
140     void AsyncWriteCopy(
141         Connection& c, uint32_t seq, const std::string& str,
142         const AsyncWriteCallback& done_cb = AsyncWriteCallback());
143 
144     //! \}
145 
146 private:
147     //! Enqueue job in queue for dispatching thread to run at its discretion.
148     void Enqueue(Job&& job);
149 
150     //! What happens in the dispatcher thread
151     void Work();
152 
153     //! wake up select() in dispatching thread.
154     void WakeUpThread();
155 
156 private:
157     //! Queue of jobs to be run by dispatching thread at its discretion.
158     common::ConcurrentQueue<Job, mem::GPoolAllocator<Job> > jobqueue_;
159 
160     //! thread of dispatcher
161     std::thread thread_;
162 
163     //! enclosed dispatcher.
164     std::unique_ptr<class Dispatcher> dispatcher_;
165 
166     //! termination flag
167     std::atomic<bool> terminate_ { false };
168 
169     //! whether to call Interrupt() in WakeUpThread()
170     std::atomic<bool> busy_ { false };
171 
172     //! for thread name for logging
173     size_t host_rank_;
174 };
175 
176 //! \}
177 
178 } // namespace net
179 } // namespace thrill
180 
181 #endif // !THRILL_NET_DISPATCHER_THREAD_HEADER
182 
183 /******************************************************************************/
184