1 /*
2 * barrier -- mouse and keyboard sharing utility
3 * Copyright (C) 2012-2016 Symless Ltd.
4 * Copyright (C) 2004 Chris Schoeneman
5 *
6 * This package is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU General Public License
8 * found in the file LICENSE that should have accompanied this file.
9 *
10 * This package is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program. If not, see <http://www.gnu.org/licenses/>.
17 */
18
19 #include "net/SocketMultiplexer.h"
20
21 #include "net/ISocketMultiplexerJob.h"
22 #include "mt/CondVar.h"
23 #include "mt/Lock.h"
24 #include "mt/Mutex.h"
25 #include "mt/Thread.h"
26 #include "arch/Arch.h"
27 #include "arch/XArch.h"
28 #include "base/Log.h"
29 #include "base/TMethodJob.h"
30 #include "common/stdvector.h"
31
32 //
33 // SocketMultiplexer
34 //
35
36 class CursorMultiplexerJob : public ISocketMultiplexerJob {
37 public:
run(bool readable,bool writable,bool error)38 MultiplexerJobStatus run(bool readable, bool writable, bool error) override
39 {
40 return {false, {}};
41 }
42
getSocket() const43 ArchSocket getSocket() const override { return {}; }
isReadable() const44 bool isReadable() const override { return false; }
isWritable() const45 bool isWritable() const override { return false; }
isCursor() const46 bool isCursor() const override { return true; }
47 };
48
49
SocketMultiplexer()50 SocketMultiplexer::SocketMultiplexer() :
51 m_mutex(new Mutex),
52 m_thread(NULL),
53 m_update(false),
54 m_jobsReady(new CondVar<bool>(m_mutex, false)),
55 m_jobListLock(new CondVar<bool>(m_mutex, false)),
56 m_jobListLockLocked(new CondVar<bool>(m_mutex, false)),
57 m_jobListLocker(NULL),
58 m_jobListLockLocker(NULL)
59 {
60 // start thread
61 m_thread = new Thread(new TMethodJob<SocketMultiplexer>(
62 this, &SocketMultiplexer::serviceThread));
63 }
64
~SocketMultiplexer()65 SocketMultiplexer::~SocketMultiplexer()
66 {
67 m_thread->cancel();
68 m_thread->unblockPollSocket();
69 m_thread->wait();
70 delete m_thread;
71 delete m_jobsReady;
72 delete m_jobListLock;
73 delete m_jobListLockLocked;
74 delete m_jobListLocker;
75 delete m_jobListLockLocker;
76 delete m_mutex;
77 }
78
addSocket(ISocket * socket,std::unique_ptr<ISocketMultiplexerJob> && job)79 void SocketMultiplexer::addSocket(ISocket* socket, std::unique_ptr<ISocketMultiplexerJob>&& job)
80 {
81 assert(socket != NULL);
82 assert(job != NULL);
83
84 // prevent other threads from locking the job list
85 lockJobListLock();
86
87 // break thread out of poll
88 m_thread->unblockPollSocket();
89
90 // lock the job list
91 lockJobList();
92
93 // insert/replace job
94 SocketJobMap::iterator i = m_socketJobMap.find(socket);
95 if (i == m_socketJobMap.end()) {
96 // we *must* put the job at the end so the order of jobs in
97 // the list continue to match the order of jobs in pfds in
98 // serviceThread().
99 JobCursor j = m_socketJobs.insert(m_socketJobs.end(), std::move(job));
100 m_update = true;
101 m_socketJobMap.insert(std::make_pair(socket, j));
102 }
103 else {
104 *(i->second) = std::move(job);
105 m_update = true;
106 }
107
108 // unlock the job list
109 unlockJobList();
110 }
111
112 void
removeSocket(ISocket * socket)113 SocketMultiplexer::removeSocket(ISocket* socket)
114 {
115 assert(socket != NULL);
116
117 // prevent other threads from locking the job list
118 lockJobListLock();
119
120 // break thread out of poll
121 m_thread->unblockPollSocket();
122
123 // lock the job list
124 lockJobList();
125
126 // remove job. rather than removing it from the map we put NULL
127 // in the list instead so the order of jobs in the list continues
128 // to match the order of jobs in pfds in serviceThread().
129 SocketJobMap::iterator i = m_socketJobMap.find(socket);
130 if (i != m_socketJobMap.end()) {
131 if (*(i->second)) {
132 i->second->reset();
133 m_update = true;
134 }
135 }
136
137 // unlock the job list
138 unlockJobList();
139 }
140
141 void
serviceThread(void *)142 SocketMultiplexer::serviceThread(void*)
143 {
144 std::vector<IArchNetwork::PollEntry> pfds;
145 IArchNetwork::PollEntry pfd;
146
147 // service the connections
148 for (;;) {
149 Thread::testCancel();
150
151 // wait until there are jobs to handle
152 {
153 Lock lock(m_mutex);
154 while (!(bool)*m_jobsReady) {
155 m_jobsReady->wait();
156 }
157 }
158
159 // lock the job list
160 lockJobListLock();
161 lockJobList();
162
163 // collect poll entries
164 if (m_update) {
165 m_update = false;
166 pfds.clear();
167 pfds.reserve(m_socketJobMap.size());
168
169 JobCursor cursor = newCursor();
170 JobCursor jobCursor = nextCursor(cursor);
171 while (jobCursor != m_socketJobs.end()) {
172 if (*jobCursor) {
173 pfd.m_socket = (*jobCursor)->getSocket();
174 pfd.m_events = 0;
175 if ((*jobCursor)->isReadable()) {
176 pfd.m_events |= IArchNetwork::kPOLLIN;
177 }
178 if ((*jobCursor)->isWritable()) {
179 pfd.m_events |= IArchNetwork::kPOLLOUT;
180 }
181 pfds.push_back(pfd);
182 }
183 jobCursor = nextCursor(cursor);
184 }
185 deleteCursor(cursor);
186 }
187
188 int status;
189 try {
190 // check for status
191 if (!pfds.empty()) {
192 status = ARCH->pollSocket(&pfds[0], (int)pfds.size(), -1);
193 }
194 else {
195 status = 0;
196 }
197 }
198 catch (XArchNetwork& e) {
199 LOG((CLOG_WARN "error in socket multiplexer: %s", e.what()));
200 status = 0;
201 }
202
203 if (status != 0) {
204 // iterate over socket jobs, invoking each and saving the
205 // new job.
206 UInt32 i = 0;
207 JobCursor cursor = newCursor();
208 JobCursor jobCursor = nextCursor(cursor);
209 while (i < pfds.size() && jobCursor != m_socketJobs.end()) {
210 if (*jobCursor != NULL) {
211 // get poll state
212 unsigned short revents = pfds[i].m_revents;
213 bool read = ((revents & IArchNetwork::kPOLLIN) != 0);
214 bool write = ((revents & IArchNetwork::kPOLLOUT) != 0);
215 bool error = ((revents & (IArchNetwork::kPOLLERR |
216 IArchNetwork::kPOLLNVAL)) != 0);
217
218 // run job
219 MultiplexerJobStatus status = (*jobCursor)->run(read, write, error);
220
221 if (!status.continue_servicing) {
222 Lock lock(m_mutex);
223 jobCursor->reset();
224 m_update = true;
225 } else if (status.new_job) {
226 Lock lock(m_mutex);
227 *jobCursor = std::move(status.new_job);
228 m_update = true;
229 }
230 ++i;
231 }
232
233 // next job
234 jobCursor = nextCursor(cursor);
235 }
236 deleteCursor(cursor);
237 }
238
239 // delete any removed socket jobs
240 for (SocketJobMap::iterator i = m_socketJobMap.begin();
241 i != m_socketJobMap.end();) {
242 if (*(i->second) == NULL) {
243 m_socketJobs.erase(i->second);
244 m_socketJobMap.erase(i++);
245 m_update = true;
246 }
247 else {
248 ++i;
249 }
250 }
251
252 // unlock the job list
253 unlockJobList();
254 }
255 }
256
257 SocketMultiplexer::JobCursor
newCursor()258 SocketMultiplexer::newCursor()
259 {
260 Lock lock(m_mutex);
261 return m_socketJobs.insert(m_socketJobs.begin(), std::make_unique<CursorMultiplexerJob>());
262 }
263
264 SocketMultiplexer::JobCursor
nextCursor(JobCursor cursor)265 SocketMultiplexer::nextCursor(JobCursor cursor)
266 {
267 Lock lock(m_mutex);
268 JobCursor j = m_socketJobs.end();
269 JobCursor i = cursor;
270 while (++i != m_socketJobs.end()) {
271 if (*i && !(*i)->isCursor()) {
272 // found a real job (as opposed to a cursor)
273 j = i;
274
275 // move our cursor just past the job
276 m_socketJobs.splice(++i, m_socketJobs, cursor);
277 break;
278 }
279 }
280 return j;
281 }
282
283 void
deleteCursor(JobCursor cursor)284 SocketMultiplexer::deleteCursor(JobCursor cursor)
285 {
286 Lock lock(m_mutex);
287 m_socketJobs.erase(cursor);
288 }
289
290 void
lockJobListLock()291 SocketMultiplexer::lockJobListLock()
292 {
293 Lock lock(m_mutex);
294
295 // wait for the lock on the lock
296 while (*m_jobListLockLocked) {
297 m_jobListLockLocked->wait();
298 }
299
300 // take ownership of the lock on the lock
301 *m_jobListLockLocked = true;
302 m_jobListLockLocker = new Thread(Thread::getCurrentThread());
303 }
304
305 void
lockJobList()306 SocketMultiplexer::lockJobList()
307 {
308 Lock lock(m_mutex);
309
310 // make sure we're the one that called lockJobListLock()
311 assert(*m_jobListLockLocker == Thread::getCurrentThread());
312
313 // wait for the job list lock
314 while (*m_jobListLock) {
315 m_jobListLock->wait();
316 }
317
318 // take ownership of the lock
319 *m_jobListLock = true;
320 m_jobListLocker = m_jobListLockLocker;
321 m_jobListLockLocker = NULL;
322
323 // release the lock on the lock
324 *m_jobListLockLocked = false;
325 m_jobListLockLocked->broadcast();
326 }
327
328 void
unlockJobList()329 SocketMultiplexer::unlockJobList()
330 {
331 Lock lock(m_mutex);
332
333 // make sure we're the one that called lockJobList()
334 assert(*m_jobListLocker == Thread::getCurrentThread());
335
336 // release the lock
337 delete m_jobListLocker;
338 m_jobListLocker = NULL;
339 *m_jobListLock = false;
340 m_jobListLock->signal();
341
342 // set new jobs ready state
343 bool isReady = !m_socketJobMap.empty();
344 if (*m_jobsReady != isReady) {
345 *m_jobsReady = isReady;
346 m_jobsReady->signal();
347 }
348 }
349