1 /*******************************************************************************
2 * thrill/net/tcp/select_dispatcher.cpp
3 *
4 * Lightweight wrapper around BSD socket API.
5 *
6 * Part of Project Thrill - http://project-thrill.org
7 *
8 * Copyright (C) 2015 Timo Bingmann <tb@panthema.net>
9 *
10 * All rights reserved. Published under the BSD-2 license in the LICENSE file.
11 ******************************************************************************/
12
13 #include <thrill/net/tcp/select_dispatcher.hpp>
14
15 #include <sstream>
16
17 namespace thrill {
18 namespace net {
19 namespace tcp {
20
21 //! Run one iteration of dispatching select().
DispatchOne(const std::chrono::milliseconds & timeout)22 void SelectDispatcher::DispatchOne(const std::chrono::milliseconds& timeout) {
23
24 // copy select fdset
25 Select fdset = select_;
26
27 if (self_verify_)
28 {
29 for (int fd = 3; fd < static_cast<int>(watch_.size()); ++fd) {
30 Watch& w = watch_[fd];
31
32 if (!w.active) continue;
33
34 assert((w.read_cb.size() == 0) != select_.InRead(fd));
35 assert((w.write_cb.size() == 0) != select_.InWrite(fd));
36 }
37 }
38
39 if (debug)
40 {
41 std::ostringstream oss;
42 oss << "| ";
43
44 for (int fd = 3; fd < static_cast<int>(watch_.size()); ++fd) {
45 Watch& w = watch_[fd];
46
47 if (!w.active) continue;
48
49 if (select_.InRead(fd))
50 oss << "r" << fd << " ";
51 if (select_.InWrite(fd))
52 oss << "w" << fd << " ";
53 if (select_.InException(fd))
54 oss << "e" << fd << " ";
55 }
56
57 LOG << "Performing select() on " << oss.str();
58 }
59
60 int r = fdset.select_timeout(static_cast<double>(timeout.count()));
61
62 if (r < 0) {
63 // if we caught a signal, this is intended to interrupt a select().
64 if (errno == EINTR) {
65 LOG << "Dispatch(): select() was interrupted due to a signal.";
66 return;
67 }
68
69 throw Exception("Dispatch::Select() failed!", errno);
70 }
71 if (r == 0) return;
72
73 // start running through the table at fd 3. 0 = stdin, 1 = stdout, 2 =
74 // stderr.
75
76 for (int fd = 3; fd < static_cast<int>(watch_.size()); ++fd)
77 {
78 // we use a pointer into the watch_ table. however, since the
79 // std::vector may regrow when callback handlers are called, this
80 // pointer is reset a lot of times.
81 Watch* w = &watch_[fd];
82
83 if (!w->active) continue;
84
85 if (fdset.InRead(fd))
86 {
87 if (w->read_cb.size()) {
88 // run read callbacks until one returns true (in which case
89 // it wants to be called again), or the read_cb list is
90 // empty.
91 while (w->read_cb.size() && w->read_cb.front()() == false) {
92 w = &watch_[fd];
93 w->read_cb.pop_front();
94 }
95 w = &watch_[fd];
96
97 if (w->read_cb.size() == 0) {
98 // if all read callbacks are done, listen no longer.
99 select_.ClearRead(fd);
100 if (w->write_cb.size() == 0 && !w->except_cb) {
101 // if also all write callbacks are done, stop
102 // listening.
103 select_.ClearWrite(fd);
104 select_.ClearException(fd);
105 w->active = false;
106 }
107 }
108 }
109 else {
110 LOG << "SelectDispatcher: got read event for fd "
111 << fd << " without a read handler.";
112
113 select_.ClearRead(fd);
114 }
115 }
116
117 if (fdset.InWrite(fd))
118 {
119 if (w->write_cb.size()) {
120 // run write callbacks until one returns true (in which case
121 // it wants to be called again), or the write_cb list is
122 // empty.
123 while (w->write_cb.size() && w->write_cb.front()() == false) {
124 w = &watch_[fd];
125 w->write_cb.pop_front();
126 }
127 w = &watch_[fd];
128
129 if (w->write_cb.size() == 0) {
130 // if all write callbacks are done, listen no longer.
131 select_.ClearWrite(fd);
132 if (w->read_cb.size() == 0 && !w->except_cb) {
133 // if also all write callbacks are done, stop
134 // listening.
135 select_.ClearRead(fd);
136 select_.ClearException(fd);
137 w->active = false;
138 }
139 }
140 }
141 else {
142 LOG << "SelectDispatcher: got write event for fd "
143 << fd << " without a write handler.";
144
145 select_.ClearWrite(fd);
146 }
147 }
148
149 if (fdset.InException(fd))
150 {
151 if (w->except_cb) {
152 if (!w->except_cb()) {
153 // callback returned false: remove fd from set
154 select_.ClearException(fd);
155 }
156 }
157 else {
158 DefaultExceptionCallback();
159 }
160 }
161 }
162 }
163
Interrupt()164 void SelectDispatcher::Interrupt() {
165 // there are multiple very platform-dependent ways to do this. we'll try
166 // to use the self-pipe trick for now. The select() method waits on
167 // another fd, which we write one byte to when we need to interrupt the
168 // select().
169
170 // another method would be to send a signal() via pthread_kill() to the
171 // select thread, but that had a race condition for waking up the other
172 // thread. -tb
173
174 // send one byte to wake up the select() handler.
175 ssize_t wb;
176 while ((wb = write(self_pipe_[1], this, 1)) == 0) {
177 LOG1 << "WakeUp: error sending to self-pipe: " << errno;
178 }
179 die_unless(wb == 1);
180 }
181
SelfPipeCallback()182 bool SelectDispatcher::SelfPipeCallback() {
183 while (read(self_pipe_[0],
184 self_pipe_buffer_, sizeof(self_pipe_buffer_)) > 0) {
185 /* repeat, until empty pipe */
186 }
187 return true;
188 }
189
190 } // namespace tcp
191 } // namespace net
192 } // namespace thrill
193
194 /******************************************************************************/
195