1 /*
2  * barrier -- mouse and keyboard sharing utility
3  * Copyright (C) 2012-2016 Symless Ltd.
4  * Copyright (C) 2004 Chris Schoeneman
5  *
6  * This package is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * found in the file LICENSE that should have accompanied this file.
9  *
10  * This package is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
17  */
18 
19 #include "net/SocketMultiplexer.h"
20 
21 #include "net/ISocketMultiplexerJob.h"
22 #include "mt/CondVar.h"
23 #include "mt/Lock.h"
24 #include "mt/Mutex.h"
25 #include "mt/Thread.h"
26 #include "arch/Arch.h"
27 #include "arch/XArch.h"
28 #include "base/Log.h"
29 #include "base/TMethodJob.h"
30 #include "common/stdvector.h"
31 
32 //
33 // SocketMultiplexer
34 //
35 
36 class CursorMultiplexerJob : public ISocketMultiplexerJob {
37 public:
run(bool readable,bool writable,bool error)38     MultiplexerJobStatus run(bool readable, bool writable, bool error) override
39     {
40         return {false, {}};
41     }
42 
getSocket() const43     ArchSocket getSocket() const override { return {}; }
isReadable() const44     bool isReadable() const override { return false; }
isWritable() const45     bool isWritable() const override { return false; }
isCursor() const46     bool isCursor() const override { return true; }
47 };
48 
49 
SocketMultiplexer()50 SocketMultiplexer::SocketMultiplexer() :
51     m_mutex(new Mutex),
52     m_thread(NULL),
53     m_update(false),
54     m_jobsReady(new CondVar<bool>(m_mutex, false)),
55     m_jobListLock(new CondVar<bool>(m_mutex, false)),
56     m_jobListLockLocked(new CondVar<bool>(m_mutex, false)),
57     m_jobListLocker(NULL),
58     m_jobListLockLocker(NULL)
59 {
60     // start thread
61     m_thread = new Thread(new TMethodJob<SocketMultiplexer>(
62                                 this, &SocketMultiplexer::serviceThread));
63 }
64 
~SocketMultiplexer()65 SocketMultiplexer::~SocketMultiplexer()
66 {
67     m_thread->cancel();
68     m_thread->unblockPollSocket();
69     m_thread->wait();
70     delete m_thread;
71     delete m_jobsReady;
72     delete m_jobListLock;
73     delete m_jobListLockLocked;
74     delete m_jobListLocker;
75     delete m_jobListLockLocker;
76     delete m_mutex;
77 }
78 
addSocket(ISocket * socket,std::unique_ptr<ISocketMultiplexerJob> && job)79 void SocketMultiplexer::addSocket(ISocket* socket, std::unique_ptr<ISocketMultiplexerJob>&& job)
80 {
81     assert(socket != NULL);
82     assert(job    != NULL);
83 
84     // prevent other threads from locking the job list
85     lockJobListLock();
86 
87     // break thread out of poll
88     m_thread->unblockPollSocket();
89 
90     // lock the job list
91     lockJobList();
92 
93     // insert/replace job
94     SocketJobMap::iterator i = m_socketJobMap.find(socket);
95     if (i == m_socketJobMap.end()) {
96         // we *must* put the job at the end so the order of jobs in
97         // the list continue to match the order of jobs in pfds in
98         // serviceThread().
99         JobCursor j = m_socketJobs.insert(m_socketJobs.end(), std::move(job));
100         m_update     = true;
101         m_socketJobMap.insert(std::make_pair(socket, j));
102     }
103     else {
104         *(i->second) = std::move(job);
105         m_update = true;
106     }
107 
108     // unlock the job list
109     unlockJobList();
110 }
111 
112 void
removeSocket(ISocket * socket)113 SocketMultiplexer::removeSocket(ISocket* socket)
114 {
115     assert(socket != NULL);
116 
117     // prevent other threads from locking the job list
118     lockJobListLock();
119 
120     // break thread out of poll
121     m_thread->unblockPollSocket();
122 
123     // lock the job list
124     lockJobList();
125 
126     // remove job.  rather than removing it from the map we put NULL
127     // in the list instead so the order of jobs in the list continues
128     // to match the order of jobs in pfds in serviceThread().
129     SocketJobMap::iterator i = m_socketJobMap.find(socket);
130     if (i != m_socketJobMap.end()) {
131         if (*(i->second)) {
132             i->second->reset();
133             m_update = true;
134         }
135     }
136 
137     // unlock the job list
138     unlockJobList();
139 }
140 
141 void
serviceThread(void *)142 SocketMultiplexer::serviceThread(void*)
143 {
144     std::vector<IArchNetwork::PollEntry> pfds;
145     IArchNetwork::PollEntry pfd;
146 
147     // service the connections
148     for (;;) {
149         Thread::testCancel();
150 
151         // wait until there are jobs to handle
152         {
153             Lock lock(m_mutex);
154             while (!(bool)*m_jobsReady) {
155                 m_jobsReady->wait();
156             }
157         }
158 
159         // lock the job list
160         lockJobListLock();
161         lockJobList();
162 
163         // collect poll entries
164         if (m_update) {
165             m_update = false;
166             pfds.clear();
167             pfds.reserve(m_socketJobMap.size());
168 
169             JobCursor cursor    = newCursor();
170             JobCursor jobCursor = nextCursor(cursor);
171             while (jobCursor != m_socketJobs.end()) {
172                 if (*jobCursor) {
173                     pfd.m_socket = (*jobCursor)->getSocket();
174                     pfd.m_events = 0;
175                     if ((*jobCursor)->isReadable()) {
176                         pfd.m_events |= IArchNetwork::kPOLLIN;
177                     }
178                     if ((*jobCursor)->isWritable()) {
179                         pfd.m_events |= IArchNetwork::kPOLLOUT;
180                     }
181                     pfds.push_back(pfd);
182                 }
183                 jobCursor = nextCursor(cursor);
184             }
185             deleteCursor(cursor);
186         }
187 
188         int status;
189         try {
190             // check for status
191             if (!pfds.empty()) {
192                 status = ARCH->pollSocket(&pfds[0], (int)pfds.size(), -1);
193             }
194             else {
195                 status = 0;
196             }
197         }
198         catch (XArchNetwork& e) {
199             LOG((CLOG_WARN "error in socket multiplexer: %s", e.what()));
200             status = 0;
201         }
202 
203         if (status != 0) {
204             // iterate over socket jobs, invoking each and saving the
205             // new job.
206             UInt32 i             = 0;
207             JobCursor cursor    = newCursor();
208             JobCursor jobCursor = nextCursor(cursor);
209             while (i < pfds.size() && jobCursor != m_socketJobs.end()) {
210                 if (*jobCursor != NULL) {
211                     // get poll state
212                     unsigned short revents = pfds[i].m_revents;
213                     bool read  = ((revents & IArchNetwork::kPOLLIN) != 0);
214                     bool write = ((revents & IArchNetwork::kPOLLOUT) != 0);
215                     bool error = ((revents & (IArchNetwork::kPOLLERR |
216                                               IArchNetwork::kPOLLNVAL)) != 0);
217 
218                     // run job
219                     MultiplexerJobStatus status = (*jobCursor)->run(read, write, error);
220 
221                     if (!status.continue_servicing) {
222                         Lock lock(m_mutex);
223                         jobCursor->reset();
224                         m_update = true;
225                     } else if (status.new_job) {
226                         Lock lock(m_mutex);
227                         *jobCursor = std::move(status.new_job);
228                         m_update = true;
229                     }
230                     ++i;
231                 }
232 
233                 // next job
234                 jobCursor = nextCursor(cursor);
235             }
236             deleteCursor(cursor);
237         }
238 
239         // delete any removed socket jobs
240         for (SocketJobMap::iterator i = m_socketJobMap.begin();
241                             i != m_socketJobMap.end();) {
242             if (*(i->second) == NULL) {
243                 m_socketJobs.erase(i->second);
244                 m_socketJobMap.erase(i++);
245                 m_update = true;
246             }
247             else {
248                 ++i;
249             }
250         }
251 
252         // unlock the job list
253         unlockJobList();
254     }
255 }
256 
257 SocketMultiplexer::JobCursor
newCursor()258 SocketMultiplexer::newCursor()
259 {
260     Lock lock(m_mutex);
261     return m_socketJobs.insert(m_socketJobs.begin(), std::make_unique<CursorMultiplexerJob>());
262 }
263 
264 SocketMultiplexer::JobCursor
nextCursor(JobCursor cursor)265 SocketMultiplexer::nextCursor(JobCursor cursor)
266 {
267     Lock lock(m_mutex);
268     JobCursor j = m_socketJobs.end();
269     JobCursor i = cursor;
270     while (++i != m_socketJobs.end()) {
271         if (*i && !(*i)->isCursor()) {
272             // found a real job (as opposed to a cursor)
273             j = i;
274 
275             // move our cursor just past the job
276             m_socketJobs.splice(++i, m_socketJobs, cursor);
277             break;
278         }
279     }
280     return j;
281 }
282 
283 void
deleteCursor(JobCursor cursor)284 SocketMultiplexer::deleteCursor(JobCursor cursor)
285 {
286     Lock lock(m_mutex);
287     m_socketJobs.erase(cursor);
288 }
289 
290 void
lockJobListLock()291 SocketMultiplexer::lockJobListLock()
292 {
293     Lock lock(m_mutex);
294 
295     // wait for the lock on the lock
296     while (*m_jobListLockLocked) {
297         m_jobListLockLocked->wait();
298     }
299 
300     // take ownership of the lock on the lock
301     *m_jobListLockLocked = true;
302     m_jobListLockLocker  = new Thread(Thread::getCurrentThread());
303 }
304 
305 void
lockJobList()306 SocketMultiplexer::lockJobList()
307 {
308     Lock lock(m_mutex);
309 
310     // make sure we're the one that called lockJobListLock()
311     assert(*m_jobListLockLocker == Thread::getCurrentThread());
312 
313     // wait for the job list lock
314     while (*m_jobListLock) {
315         m_jobListLock->wait();
316     }
317 
318     // take ownership of the lock
319     *m_jobListLock      = true;
320     m_jobListLocker     = m_jobListLockLocker;
321     m_jobListLockLocker = NULL;
322 
323     // release the lock on the lock
324     *m_jobListLockLocked = false;
325     m_jobListLockLocked->broadcast();
326 }
327 
328 void
unlockJobList()329 SocketMultiplexer::unlockJobList()
330 {
331     Lock lock(m_mutex);
332 
333     // make sure we're the one that called lockJobList()
334     assert(*m_jobListLocker == Thread::getCurrentThread());
335 
336     // release the lock
337     delete m_jobListLocker;
338     m_jobListLocker = NULL;
339     *m_jobListLock  = false;
340     m_jobListLock->signal();
341 
342     // set new jobs ready state
343     bool isReady = !m_socketJobMap.empty();
344     if (*m_jobsReady != isReady) {
345         *m_jobsReady = isReady;
346         m_jobsReady->signal();
347     }
348 }
349