1 /*******************************************************************************
2  * thrill/net/dispatcher_thread.cpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015 Timo Bingmann <tb@panthema.net>
7  *
8  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
9  ******************************************************************************/
10 
11 #include <thrill/net/dispatcher.hpp>
12 #include <thrill/net/dispatcher_thread.hpp>
13 #include <thrill/net/group.hpp>
14 
15 #include <deque>
16 #include <string>
17 #include <vector>
18 
19 namespace thrill {
20 namespace net {
21 
DispatcherThread(std::unique_ptr<class Dispatcher> dispatcher,size_t host_rank)22 DispatcherThread::DispatcherThread(
23     std::unique_ptr<class Dispatcher> dispatcher, size_t host_rank)
24     : dispatcher_(std::move(dispatcher)),
25       host_rank_(host_rank) {
26     // start thread
27     thread_ = std::thread(&DispatcherThread::Work, this);
28 }
29 
~DispatcherThread()30 DispatcherThread::~DispatcherThread() {
31     Terminate();
32 }
33 
Terminate()34 void DispatcherThread::Terminate() {
35     if (terminate_) return;
36 
37     // set termination flags.
38     terminate_ = true;
39     // interrupt select().
40     WakeUpThread();
41     // wait for last round to finish.
42     thread_.join();
43 }
44 
RunInThread(const AsyncDispatcherThreadCallback & cb)45 void DispatcherThread::RunInThread(const AsyncDispatcherThreadCallback& cb) {
46     Enqueue([this, cb = std::move(cb)]() {
47                 cb(*dispatcher_);
48             });
49     WakeUpThread();
50 }
51 
AddTimer(std::chrono::milliseconds timeout,const TimerCallback & cb)52 void DispatcherThread::AddTimer(
53     std::chrono::milliseconds timeout, const TimerCallback& cb) {
54     Enqueue([=]() {
55                 dispatcher_->AddTimer(timeout, cb);
56             });
57     WakeUpThread();
58 }
59 
AddRead(Connection & c,const AsyncCallback & read_cb)60 void DispatcherThread::AddRead(Connection& c, const AsyncCallback& read_cb) {
61     Enqueue([=, &c]() {
62                 dispatcher_->AddRead(c, read_cb);
63             });
64     WakeUpThread();
65 }
66 
AddWrite(Connection & c,const AsyncCallback & write_cb)67 void DispatcherThread::AddWrite(Connection& c, const AsyncCallback& write_cb) {
68     Enqueue([=, &c]() {
69                 dispatcher_->AddWrite(c, write_cb);
70             });
71     WakeUpThread();
72 }
73 
Cancel(Connection & c)74 void DispatcherThread::Cancel(Connection& c) {
75     Enqueue([=, &c]() {
76                 dispatcher_->Cancel(c);
77             });
78     WakeUpThread();
79 }
80 
AsyncRead(Connection & c,uint32_t seq,size_t size,const AsyncReadCallback & done_cb)81 void DispatcherThread::AsyncRead(
82     Connection& c, uint32_t seq, size_t size,
83     const AsyncReadCallback& done_cb) {
84     Enqueue([=, &c]() {
85                 dispatcher_->AsyncRead(c, seq, size, done_cb);
86             });
87     WakeUpThread();
88 }
89 
AsyncRead(Connection & c,uint32_t seq,size_t size,data::PinnedByteBlockPtr && block,const AsyncReadByteBlockCallback & done_cb)90 void DispatcherThread::AsyncRead(
91     Connection& c, uint32_t seq, size_t size, data::PinnedByteBlockPtr&& block,
92     const AsyncReadByteBlockCallback& done_cb) {
93     assert(block.valid());
94     Enqueue([=, &c, b = std::move(block)]() mutable {
95                 dispatcher_->AsyncRead(c, seq, size, std::move(b), done_cb);
96             });
97     WakeUpThread();
98 }
99 
AsyncWrite(Connection & c,uint32_t seq,Buffer && buffer,const AsyncWriteCallback & done_cb)100 void DispatcherThread::AsyncWrite(
101     Connection& c, uint32_t seq, Buffer&& buffer, const AsyncWriteCallback& done_cb) {
102     // the following captures the move-only buffer in a lambda.
103     Enqueue([=, &c, b = std::move(buffer)]() mutable {
104                 dispatcher_->AsyncWrite(c, seq, std::move(b), done_cb);
105             });
106     WakeUpThread();
107 }
108 
AsyncWrite(Connection & c,uint32_t seq,Buffer && buffer,data::PinnedBlock && block,const AsyncWriteCallback & done_cb)109 void DispatcherThread::AsyncWrite(
110     Connection& c, uint32_t seq, Buffer&& buffer, data::PinnedBlock&& block,
111     const AsyncWriteCallback& done_cb) {
112     assert(block.IsValid());
113     // the following captures the move-only buffer in a lambda.
114     Enqueue([=, &c,
115              b1 = std::move(buffer), b2 = std::move(block)]() mutable {
116                 dispatcher_->AsyncWrite(c, seq, std::move(b1));
117                 dispatcher_->AsyncWrite(c, seq + 1, std::move(b2), done_cb);
118             });
119     WakeUpThread();
120 }
121 
AsyncWriteCopy(Connection & c,uint32_t seq,const void * buffer,size_t size,const AsyncWriteCallback & done_cb)122 void DispatcherThread::AsyncWriteCopy(
123     Connection& c, uint32_t seq, const void* buffer, size_t size,
124     const AsyncWriteCallback& done_cb) {
125     return AsyncWrite(c, seq, Buffer(buffer, size), done_cb);
126 }
127 
AsyncWriteCopy(Connection & c,uint32_t seq,const std::string & str,const AsyncWriteCallback & done_cb)128 void DispatcherThread::AsyncWriteCopy(
129     Connection& c, uint32_t seq,
130     const std::string& str, const AsyncWriteCallback& done_cb) {
131     return AsyncWriteCopy(c, seq, str.data(), str.size(), done_cb);
132 }
133 
Enqueue(Job && job)134 void DispatcherThread::Enqueue(Job&& job) {
135     return jobqueue_.push(std::move(job));
136 }
137 
Work()138 void DispatcherThread::Work() {
139     common::NameThisThread(
140         "host " + std::to_string(host_rank_) + " dispatcher");
141     // pin DispatcherThread to last core
142     common::SetCpuAffinity(std::thread::hardware_concurrency() - 1);
143 
144     while (!terminate_ ||
145            dispatcher_->HasAsyncWrites() || !jobqueue_.empty())
146     {
147         // process jobs in jobqueue_
148         {
149             Job job;
150             while (jobqueue_.try_pop(job))
151                 job();
152         }
153 
154         // set busy flag, but check once again for jobs.
155         busy_ = true;
156         {
157             Job job;
158             if (jobqueue_.try_pop(job)) {
159                 busy_ = false;
160                 job();
161                 continue;
162             }
163         }
164 
165         // run one dispatch
166         dispatcher_->Dispatch();
167 
168         busy_ = false;
169     }
170 
171     LOG << "DispatcherThread finished.";
172 }
173 
WakeUpThread()174 void DispatcherThread::WakeUpThread() {
175     if (busy_)
176         dispatcher_->Interrupt();
177 }
178 
179 } // namespace net
180 } // namespace thrill
181 
182 /******************************************************************************/
183