1 /*******************************************************************************
2 * thrill/net/dispatcher_thread.cpp
3 *
4 * Part of Project Thrill - http://project-thrill.org
5 *
6 * Copyright (C) 2015 Timo Bingmann <tb@panthema.net>
7 *
8 * All rights reserved. Published under the BSD-2 license in the LICENSE file.
9 ******************************************************************************/
10
11 #include <thrill/net/dispatcher.hpp>
12 #include <thrill/net/dispatcher_thread.hpp>
13 #include <thrill/net/group.hpp>
14
15 #include <deque>
16 #include <string>
17 #include <vector>
18
19 namespace thrill {
20 namespace net {
21
DispatcherThread(std::unique_ptr<class Dispatcher> dispatcher,size_t host_rank)22 DispatcherThread::DispatcherThread(
23 std::unique_ptr<class Dispatcher> dispatcher, size_t host_rank)
24 : dispatcher_(std::move(dispatcher)),
25 host_rank_(host_rank) {
26 // start thread
27 thread_ = std::thread(&DispatcherThread::Work, this);
28 }
29
~DispatcherThread()30 DispatcherThread::~DispatcherThread() {
31 Terminate();
32 }
33
Terminate()34 void DispatcherThread::Terminate() {
35 if (terminate_) return;
36
37 // set termination flags.
38 terminate_ = true;
39 // interrupt select().
40 WakeUpThread();
41 // wait for last round to finish.
42 thread_.join();
43 }
44
RunInThread(const AsyncDispatcherThreadCallback & cb)45 void DispatcherThread::RunInThread(const AsyncDispatcherThreadCallback& cb) {
46 Enqueue([this, cb = std::move(cb)]() {
47 cb(*dispatcher_);
48 });
49 WakeUpThread();
50 }
51
AddTimer(std::chrono::milliseconds timeout,const TimerCallback & cb)52 void DispatcherThread::AddTimer(
53 std::chrono::milliseconds timeout, const TimerCallback& cb) {
54 Enqueue([=]() {
55 dispatcher_->AddTimer(timeout, cb);
56 });
57 WakeUpThread();
58 }
59
AddRead(Connection & c,const AsyncCallback & read_cb)60 void DispatcherThread::AddRead(Connection& c, const AsyncCallback& read_cb) {
61 Enqueue([=, &c]() {
62 dispatcher_->AddRead(c, read_cb);
63 });
64 WakeUpThread();
65 }
66
AddWrite(Connection & c,const AsyncCallback & write_cb)67 void DispatcherThread::AddWrite(Connection& c, const AsyncCallback& write_cb) {
68 Enqueue([=, &c]() {
69 dispatcher_->AddWrite(c, write_cb);
70 });
71 WakeUpThread();
72 }
73
Cancel(Connection & c)74 void DispatcherThread::Cancel(Connection& c) {
75 Enqueue([=, &c]() {
76 dispatcher_->Cancel(c);
77 });
78 WakeUpThread();
79 }
80
AsyncRead(Connection & c,uint32_t seq,size_t size,const AsyncReadCallback & done_cb)81 void DispatcherThread::AsyncRead(
82 Connection& c, uint32_t seq, size_t size,
83 const AsyncReadCallback& done_cb) {
84 Enqueue([=, &c]() {
85 dispatcher_->AsyncRead(c, seq, size, done_cb);
86 });
87 WakeUpThread();
88 }
89
AsyncRead(Connection & c,uint32_t seq,size_t size,data::PinnedByteBlockPtr && block,const AsyncReadByteBlockCallback & done_cb)90 void DispatcherThread::AsyncRead(
91 Connection& c, uint32_t seq, size_t size, data::PinnedByteBlockPtr&& block,
92 const AsyncReadByteBlockCallback& done_cb) {
93 assert(block.valid());
94 Enqueue([=, &c, b = std::move(block)]() mutable {
95 dispatcher_->AsyncRead(c, seq, size, std::move(b), done_cb);
96 });
97 WakeUpThread();
98 }
99
AsyncWrite(Connection & c,uint32_t seq,Buffer && buffer,const AsyncWriteCallback & done_cb)100 void DispatcherThread::AsyncWrite(
101 Connection& c, uint32_t seq, Buffer&& buffer, const AsyncWriteCallback& done_cb) {
102 // the following captures the move-only buffer in a lambda.
103 Enqueue([=, &c, b = std::move(buffer)]() mutable {
104 dispatcher_->AsyncWrite(c, seq, std::move(b), done_cb);
105 });
106 WakeUpThread();
107 }
108
AsyncWrite(Connection & c,uint32_t seq,Buffer && buffer,data::PinnedBlock && block,const AsyncWriteCallback & done_cb)109 void DispatcherThread::AsyncWrite(
110 Connection& c, uint32_t seq, Buffer&& buffer, data::PinnedBlock&& block,
111 const AsyncWriteCallback& done_cb) {
112 assert(block.IsValid());
113 // the following captures the move-only buffer in a lambda.
114 Enqueue([=, &c,
115 b1 = std::move(buffer), b2 = std::move(block)]() mutable {
116 dispatcher_->AsyncWrite(c, seq, std::move(b1));
117 dispatcher_->AsyncWrite(c, seq + 1, std::move(b2), done_cb);
118 });
119 WakeUpThread();
120 }
121
AsyncWriteCopy(Connection & c,uint32_t seq,const void * buffer,size_t size,const AsyncWriteCallback & done_cb)122 void DispatcherThread::AsyncWriteCopy(
123 Connection& c, uint32_t seq, const void* buffer, size_t size,
124 const AsyncWriteCallback& done_cb) {
125 return AsyncWrite(c, seq, Buffer(buffer, size), done_cb);
126 }
127
AsyncWriteCopy(Connection & c,uint32_t seq,const std::string & str,const AsyncWriteCallback & done_cb)128 void DispatcherThread::AsyncWriteCopy(
129 Connection& c, uint32_t seq,
130 const std::string& str, const AsyncWriteCallback& done_cb) {
131 return AsyncWriteCopy(c, seq, str.data(), str.size(), done_cb);
132 }
133
Enqueue(Job && job)134 void DispatcherThread::Enqueue(Job&& job) {
135 return jobqueue_.push(std::move(job));
136 }
137
Work()138 void DispatcherThread::Work() {
139 common::NameThisThread(
140 "host " + std::to_string(host_rank_) + " dispatcher");
141 // pin DispatcherThread to last core
142 common::SetCpuAffinity(std::thread::hardware_concurrency() - 1);
143
144 while (!terminate_ ||
145 dispatcher_->HasAsyncWrites() || !jobqueue_.empty())
146 {
147 // process jobs in jobqueue_
148 {
149 Job job;
150 while (jobqueue_.try_pop(job))
151 job();
152 }
153
154 // set busy flag, but check once again for jobs.
155 busy_ = true;
156 {
157 Job job;
158 if (jobqueue_.try_pop(job)) {
159 busy_ = false;
160 job();
161 continue;
162 }
163 }
164
165 // run one dispatch
166 dispatcher_->Dispatch();
167
168 busy_ = false;
169 }
170
171 LOG << "DispatcherThread finished.";
172 }
173
WakeUpThread()174 void DispatcherThread::WakeUpThread() {
175 if (busy_)
176 dispatcher_->Interrupt();
177 }
178
179 } // namespace net
180 } // namespace thrill
181
182 /******************************************************************************/
183