1 /****************************************************************************
2 ** Copyright (c) 2001-2014
3 **
4 ** This file is part of the QuickFIX FIX Engine
5 **
6 ** This file may be distributed under the terms of the quickfixengine.org
7 ** license as defined by quickfixengine.org and appearing in the file
8 ** LICENSE included in the packaging of this file.
9 **
10 ** This file is provided AS IS with NO WARRANTY OF ANY KIND, INCLUDING THE
11 ** WARRANTY OF DESIGN, MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE.
12 **
13 ** See http://www.quickfixengine.org/LICENSE for licensing information.
14 **
15 ** Contact ask@quickfixengine.org if any conditions of this licensing are
16 ** not clear to you.
17 **
18 ****************************************************************************/
19 
20 #ifdef _MSC_VER
21 #include "stdafx.h"
22 #else
23 #include "config.h"
24 #endif
25 
26 #include "SocketMonitor.h"
27 #include "Utility.h"
28 #include <exception>
29 #include <set>
30 #include <algorithm>
31 #include <iostream>
32 
33 namespace FIX
34 {
SocketMonitor(int timeout)35 SocketMonitor::SocketMonitor( int timeout )
36 : m_timeout( timeout )
37 {
38   socket_init();
39 
40   std::pair<int, int> sockets = socket_createpair();
41   m_signal = sockets.first;
42   m_interrupt = sockets.second;
43   socket_setnonblock( m_signal );
44   socket_setnonblock( m_interrupt );
45   m_readSockets.insert( m_interrupt );
46 
47   m_timeval.tv_sec = 0;
48   m_timeval.tv_usec = 0;
49 #ifndef SELECT_DECREMENTS_TIME
50   m_ticks = clock();
51 #endif
52 }
53 
~SocketMonitor()54 SocketMonitor::~SocketMonitor()
55 {
56   Sockets::iterator i;
57   for ( i = m_readSockets.begin(); i != m_readSockets.end(); ++i ) {
58     socket_close( *i );
59   }
60 
61   socket_close( m_signal );
62   socket_term();
63 }
64 
addConnect(int s)65 bool SocketMonitor::addConnect( int s )
66 {
67   socket_setnonblock( s );
68   Sockets::iterator i = m_connectSockets.find( s );
69   if( i != m_connectSockets.end() ) return false;
70 
71   m_connectSockets.insert( s );
72   return true;
73 }
74 
addRead(int s)75 bool SocketMonitor::addRead( int s )
76 {
77   socket_setnonblock( s );
78   Sockets::iterator i = m_readSockets.find( s );
79   if( i != m_readSockets.end() ) return false;
80 
81   m_readSockets.insert( s );
82   return true;
83 }
84 
addWrite(int s)85 bool SocketMonitor::addWrite( int s )
86 {
87   if( m_readSockets.find(s) == m_readSockets.end() )
88     return false;
89 
90   socket_setnonblock( s );
91   Sockets::iterator i = m_writeSockets.find( s );
92   if( i != m_writeSockets.end() ) return false;
93 
94   m_writeSockets.insert( s );
95   return true;
96 }
97 
drop(int s)98 bool SocketMonitor::drop( int s )
99 {
100   Sockets::iterator i = m_readSockets.find( s );
101   Sockets::iterator j = m_writeSockets.find( s );
102   Sockets::iterator k = m_connectSockets.find( s );
103 
104   if ( i != m_readSockets.end() ||
105        j != m_writeSockets.end() ||
106        k != m_connectSockets.end() )
107   {
108     socket_close( s );
109     m_readSockets.erase( s );
110     m_writeSockets.erase( s );
111     m_connectSockets.erase( s );
112     m_dropped.push( s );
113     return true;
114   }
115   return false;
116 }
117 
getTimeval(bool poll,double timeout)118 inline timeval* SocketMonitor::getTimeval( bool poll, double timeout )
119 {
120   if ( poll )
121   {
122     m_timeval.tv_sec = 0;
123     m_timeval.tv_usec = 0;
124     return &m_timeval;
125   }
126 
127   timeout = m_timeout;
128 
129   if ( !timeout )
130     return 0;
131 #ifdef SELECT_MODIFIES_TIMEVAL
132   if ( !m_timeval.tv_sec && !m_timeval.tv_usec && timeout )
133     m_timeval.tv_sec = timeout;
134   return &m_timeval;
135 #else
136   double elapsed = ( double ) ( clock() - m_ticks ) / ( double ) CLOCKS_PER_SEC;
137   if ( elapsed >= timeout || elapsed == 0.0 )
138   {
139     m_ticks = clock();
140     m_timeval.tv_sec = 0;
141     m_timeval.tv_usec = (long)(timeout * 1000000);
142   }
143   else
144   {
145     m_timeval.tv_sec = 0;
146     m_timeval.tv_usec = (long)( ( timeout - elapsed ) * 1000000 );
147   }
148   return &m_timeval;
149 #endif
150 }
151 
sleepIfEmpty(bool poll)152 bool SocketMonitor::sleepIfEmpty( bool poll )
153 {
154   if( poll )
155     return false;
156 
157   if ( m_readSockets.empty() &&
158        m_writeSockets.empty() &&
159        m_connectSockets.empty() )
160   {
161     process_sleep( m_timeout );
162     return true;
163   }
164   else
165     return false;
166 }
167 
signal(int socket)168 void SocketMonitor::signal( int socket )
169 {
170   socket_send( m_signal, (char*)&socket, sizeof(socket) );
171 }
172 
unsignal(int s)173 void SocketMonitor::unsignal( int s )
174 {
175   Sockets::iterator i = m_writeSockets.find( s );
176   if( i == m_writeSockets.end() ) return;
177 
178   m_writeSockets.erase( s );
179 }
180 
block(Strategy & strategy,bool poll,double timeout)181 void SocketMonitor::block( Strategy& strategy, bool poll, double timeout )
182 {
183   while ( m_dropped.size() )
184   {
185     strategy.onError( *this, m_dropped.front() );
186     m_dropped.pop();
187     if ( m_dropped.size() == 0 )
188       return ;
189   }
190 
191   fd_set readSet;
192   FD_ZERO( &readSet );
193   buildSet( m_readSockets, readSet );
194   fd_set writeSet;
195   FD_ZERO( &writeSet );
196   buildSet( m_connectSockets, writeSet );
197   buildSet( m_writeSockets, writeSet );
198   fd_set exceptSet;
199   FD_ZERO( &exceptSet );
200   buildSet( m_connectSockets, exceptSet );
201 
202   if ( sleepIfEmpty(poll) )
203   {
204     strategy.onTimeout( *this );
205     return;
206   }
207 
208   int result = select( FD_SETSIZE, &readSet, &writeSet, &exceptSet, getTimeval(poll, timeout) );
209 
210   if ( result == 0 )
211   {
212     strategy.onTimeout( *this );
213     return;
214   }
215   else if ( result > 0 )
216   {
217     processExceptSet( strategy, exceptSet );
218     processWriteSet( strategy, writeSet );
219     processReadSet( strategy, readSet );
220   }
221   else
222   {
223     strategy.onError( *this );
224   }
225 }
226 
processReadSet(Strategy & strategy,fd_set & readSet)227 void SocketMonitor::processReadSet( Strategy& strategy, fd_set& readSet )
228 {
229 #ifdef _MSC_VER
230   for ( unsigned i = 0; i < readSet.fd_count; ++i )
231   {
232     int s = readSet.fd_array[ i ];
233     if( s == m_interrupt )
234     {
235       int socket = 0;
236       socket_recv( s, (char*)&socket, sizeof(socket) );
237       addWrite( socket );
238     }
239     else
240     {
241       strategy.onEvent( *this, s );
242     }
243   }
244 #else
245     Sockets::iterator i;
246     Sockets sockets = m_readSockets;
247     for ( i = sockets.begin(); i != sockets.end(); ++i )
248     {
249       int s = *i;
250       if ( !FD_ISSET( *i, &readSet ) )
251         continue;
252       if( s == m_interrupt )
253       {
254         int socket = 0;
255         socket_recv( s, (char*)&socket, sizeof(socket) );
256         addWrite( socket );
257       }
258       else
259       {
260         strategy.onEvent( *this, s );
261       }
262     }
263 #endif
264 }
265 
processWriteSet(Strategy & strategy,fd_set & writeSet)266 void SocketMonitor::processWriteSet( Strategy& strategy, fd_set& writeSet )
267 {
268 #ifdef _MSC_VER
269   for ( unsigned i = 0; i < writeSet.fd_count; ++i )
270   {
271     int s = writeSet.fd_array[ i ];
272     if( m_connectSockets.find(s) != m_connectSockets.end() )
273     {
274       m_connectSockets.erase( s );
275       m_readSockets.insert( s );
276       strategy.onConnect( *this, s );
277     }
278     else
279     {
280       strategy.onWrite( *this, s );
281     }
282   }
283 #else
284   Sockets::iterator i;
285   Sockets sockets = m_connectSockets;
286   for( i = sockets.begin(); i != sockets.end(); ++i )
287   {
288     int s = *i;
289     if ( !FD_ISSET( *i, &writeSet ) )
290       continue;
291     m_connectSockets.erase( s );
292     m_readSockets.insert( s );
293     strategy.onConnect( *this, s );
294   }
295 
296   sockets = m_writeSockets;
297   for( i = sockets.begin(); i != sockets.end(); ++i )
298   {
299     int s = *i;
300     if ( !FD_ISSET( *i, &writeSet ) )
301       continue;
302     strategy.onWrite( *this, s );
303   }
304 #endif
305 }
306 
processExceptSet(Strategy & strategy,fd_set & exceptSet)307 void SocketMonitor::processExceptSet( Strategy& strategy, fd_set& exceptSet )
308 {
309 #ifdef _MSC_VER
310   for ( unsigned i = 0; i < exceptSet.fd_count; ++i )
311   {
312     int s = exceptSet.fd_array[ i ];
313     strategy.onError( *this, s );
314   }
315 #else
316     Sockets::iterator i;
317     Sockets sockets = m_connectSockets;
318     for ( i = sockets.begin(); i != sockets.end(); ++i )
319     {
320       int s = *i;
321       if ( !FD_ISSET( *i, &exceptSet ) )
322         continue;
323       strategy.onError( *this, s );
324     }
325 #endif
326 }
327 
buildSet(const Sockets & sockets,fd_set & watchSet)328 void SocketMonitor::buildSet( const Sockets& sockets, fd_set& watchSet )
329 {
330   Sockets::const_iterator iter;
331   for ( iter = sockets.begin(); iter != sockets.end(); ++iter ) {
332     FD_SET( *iter, &watchSet );
333   }
334 }
335 }
336