1 /*******************************************************************************
2  * thrill/net/tcp/select_dispatcher.cpp
3  *
4  * Lightweight wrapper around BSD socket API.
5  *
6  * Part of Project Thrill - http://project-thrill.org
7  *
8  * Copyright (C) 2015 Timo Bingmann <tb@panthema.net>
9  *
10  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
11  ******************************************************************************/
12 
13 #include <thrill/net/tcp/select_dispatcher.hpp>
14 
15 #include <sstream>
16 
17 namespace thrill {
18 namespace net {
19 namespace tcp {
20 
21 //! Run one iteration of dispatching select().
DispatchOne(const std::chrono::milliseconds & timeout)22 void SelectDispatcher::DispatchOne(const std::chrono::milliseconds& timeout) {
23 
24     // copy select fdset
25     Select fdset = select_;
26 
27     if (self_verify_)
28     {
29         for (int fd = 3; fd < static_cast<int>(watch_.size()); ++fd) {
30             Watch& w = watch_[fd];
31 
32             if (!w.active) continue;
33 
34             assert((w.read_cb.size() == 0) != select_.InRead(fd));
35             assert((w.write_cb.size() == 0) != select_.InWrite(fd));
36         }
37     }
38 
39     if (debug)
40     {
41         std::ostringstream oss;
42         oss << "| ";
43 
44         for (int fd = 3; fd < static_cast<int>(watch_.size()); ++fd) {
45             Watch& w = watch_[fd];
46 
47             if (!w.active) continue;
48 
49             if (select_.InRead(fd))
50                 oss << "r" << fd << " ";
51             if (select_.InWrite(fd))
52                 oss << "w" << fd << " ";
53             if (select_.InException(fd))
54                 oss << "e" << fd << " ";
55         }
56 
57         LOG << "Performing select() on " << oss.str();
58     }
59 
60     int r = fdset.select_timeout(static_cast<double>(timeout.count()));
61 
62     if (r < 0) {
63         // if we caught a signal, this is intended to interrupt a select().
64         if (errno == EINTR) {
65             LOG << "Dispatch(): select() was interrupted due to a signal.";
66             return;
67         }
68 
69         throw Exception("Dispatch::Select() failed!", errno);
70     }
71     if (r == 0) return;
72 
73     // start running through the table at fd 3. 0 = stdin, 1 = stdout, 2 =
74     // stderr.
75 
76     for (int fd = 3; fd < static_cast<int>(watch_.size()); ++fd)
77     {
78         // we use a pointer into the watch_ table. however, since the
79         // std::vector may regrow when callback handlers are called, this
80         // pointer is reset a lot of times.
81         Watch* w = &watch_[fd];
82 
83         if (!w->active) continue;
84 
85         if (fdset.InRead(fd))
86         {
87             if (w->read_cb.size()) {
88                 // run read callbacks until one returns true (in which case
89                 // it wants to be called again), or the read_cb list is
90                 // empty.
91                 while (w->read_cb.size() && w->read_cb.front()() == false) {
92                     w = &watch_[fd];
93                     w->read_cb.pop_front();
94                 }
95                 w = &watch_[fd];
96 
97                 if (w->read_cb.size() == 0) {
98                     // if all read callbacks are done, listen no longer.
99                     select_.ClearRead(fd);
100                     if (w->write_cb.size() == 0 && !w->except_cb) {
101                         // if also all write callbacks are done, stop
102                         // listening.
103                         select_.ClearWrite(fd);
104                         select_.ClearException(fd);
105                         w->active = false;
106                     }
107                 }
108             }
109             else {
110                 LOG << "SelectDispatcher: got read event for fd "
111                     << fd << " without a read handler.";
112 
113                 select_.ClearRead(fd);
114             }
115         }
116 
117         if (fdset.InWrite(fd))
118         {
119             if (w->write_cb.size()) {
120                 // run write callbacks until one returns true (in which case
121                 // it wants to be called again), or the write_cb list is
122                 // empty.
123                 while (w->write_cb.size() && w->write_cb.front()() == false) {
124                     w = &watch_[fd];
125                     w->write_cb.pop_front();
126                 }
127                 w = &watch_[fd];
128 
129                 if (w->write_cb.size() == 0) {
130                     // if all write callbacks are done, listen no longer.
131                     select_.ClearWrite(fd);
132                     if (w->read_cb.size() == 0 && !w->except_cb) {
133                         // if also all write callbacks are done, stop
134                         // listening.
135                         select_.ClearRead(fd);
136                         select_.ClearException(fd);
137                         w->active = false;
138                     }
139                 }
140             }
141             else {
142                 LOG << "SelectDispatcher: got write event for fd "
143                     << fd << " without a write handler.";
144 
145                 select_.ClearWrite(fd);
146             }
147         }
148 
149         if (fdset.InException(fd))
150         {
151             if (w->except_cb) {
152                 if (!w->except_cb()) {
153                     // callback returned false: remove fd from set
154                     select_.ClearException(fd);
155                 }
156             }
157             else {
158                 DefaultExceptionCallback();
159             }
160         }
161     }
162 }
163 
Interrupt()164 void SelectDispatcher::Interrupt() {
165     // there are multiple very platform-dependent ways to do this. we'll try
166     // to use the self-pipe trick for now. The select() method waits on
167     // another fd, which we write one byte to when we need to interrupt the
168     // select().
169 
170     // another method would be to send a signal() via pthread_kill() to the
171     // select thread, but that had a race condition for waking up the other
172     // thread. -tb
173 
174     // send one byte to wake up the select() handler.
175     ssize_t wb;
176     while ((wb = write(self_pipe_[1], this, 1)) == 0) {
177         LOG1 << "WakeUp: error sending to self-pipe: " << errno;
178     }
179     die_unless(wb == 1);
180 }
181 
SelfPipeCallback()182 bool SelectDispatcher::SelfPipeCallback() {
183     while (read(self_pipe_[0],
184                 self_pipe_buffer_, sizeof(self_pipe_buffer_)) > 0) {
185         /* repeat, until empty pipe */
186     }
187     return true;
188 }
189 
190 } // namespace tcp
191 } // namespace net
192 } // namespace thrill
193 
194 /******************************************************************************/
195