1 // -*- Mode: C++; -*-
2 //                            Package   : omniORB
3 // SocketCollection.h         Created on: 23 Jul 2001
4 //                            Author    : Sai Lai Lo (sll)
5 //                            Author    : Duncan Grisby
6 //
7 //    Copyright (C) 2005-2013 Apasphere Ltd.
8 //    Copyright (C) 2001      AT&T Laboratories Cambridge
9 //
10 //    This file is part of the omniORB library
11 //
12 //    The omniORB library is free software; you can redistribute it and/or
13 //    modify it under the terms of the GNU Lesser General Public
14 //    License as published by the Free Software Foundation; either
15 //    version 2.1 of the License, or (at your option) any later version.
16 //
17 //    This library is distributed in the hope that it will be useful,
18 //    but WITHOUT ANY WARRANTY; without even the implied warranty of
19 //    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
20 //    Lesser General Public License for more details.
21 //
22 //    You should have received a copy of the GNU Lesser General Public
23 //    License along with this library. If not, see http://www.gnu.org/licenses/
24 //
25 //
26 // Description:
27 //	*** PROPRIETARY INTERFACE ***
28 //
29 
30 #ifndef __SOCKETCOLLECTION_H__
31 #define __SOCKETCOLLECTION_H__
32 
33 #include <tcpSocket.h>
34 
35 
OMNI_NAMESPACE_BEGIN(omni)36 OMNI_NAMESPACE_BEGIN(omni)
37 
38 class SocketCollection;
39 
40 
41 //
42 // Class SocketHolder holds a socket inside a collection. It contains
43 // flags to indicate whether the socket is selectable, and so on.
44 // Connection classes (e.g. tcpConnection) derive from this class.
45 
46 class SocketHolder {
47 
48 public:
49   SocketHolder(SocketHandle_t s)
50     : pd_socket(s),
51       pd_belong_to(0),
52       pd_shutdown(0),
53       pd_selectable(0),
54       pd_data_in_buffer(0),
55       pd_peeking(0),
56       pd_peek_go(0),
57       pd_nonblocking(0),
58       pd_peek_cond(0),
59       pd_fd_index(-1),
60       pd_next(0),
61       pd_prev(0) { }
62 
63   virtual ~SocketHolder();
64 
65   void setSelectable(int now,
66 		     CORBA::Boolean data_in_buffer,
67 		     CORBA::Boolean deprecated_hold_lock=0);
68   // Indicate that this socket should be watched for readability.
69   //
70   // If now is 1, immediately make the socket selectable (if the
71   // platform permits it), rather than waiting until the select loop
72   // rescans.
73   //
74   // If now is 2, immediately make the socket selectable (if the
75   // platform permits it), but only if it is already marked
76   // selectable.
77   //
78   // If data_in_buffer is true, the socket is considered to already
79   // have data available to read.
80   //
81   // deprecated_hold_lock used to be used to indicate that the caller
82   // already held the associated SocketCollection's lock during a
83   // notifyReadable callback. The lock is no longer held in callbacks,
84   // but the parameter is retained for backwards compatibility.
85 
86   void clearSelectable();
87   // Indicate that this socket should not be watched any more.
88 
89   CORBA::Boolean Peek();
90   // Watch the socket for a while to see if any data arrives. If the
91   // socket is not already selectable, wait for a bit in case it
92   // becomes selectable. Mark the socket as no longer selectable and
93   // return true if the socket becomes readable, otherwise return
94   // false.
95 
96   inline void
97   setBlocking()
98   {
99     if (pd_nonblocking) {
100       tcpSocket::setBlocking(pd_socket);
101       pd_nonblocking = 0;
102     }
103   }
104 
105   inline void
106   setNonBlocking()
107   {
108     if (!pd_nonblocking) {
109       tcpSocket::setNonBlocking(pd_socket);
110       pd_nonblocking = 1;
111     }
112   }
113 
114 
115   friend class SocketCollection;
116 
117 protected:
118   SocketHandle_t       	pd_socket;
119   SocketCollection*    	pd_belong_to;
120   CORBA::Boolean       	pd_shutdown;
121 
122 private:
123   CORBA::Boolean       	pd_selectable;     // True if socket is selectable
124   CORBA::Boolean       	pd_data_in_buffer; // True if data already available
125   CORBA::Boolean       	pd_peeking;        // True if a thread is currently
126 					   // peeking
127   CORBA::Boolean        pd_peek_go;        // True if the peeking thread
128 					   // should return true, even if it
129 					   // did not see data to read
130   CORBA::Boolean        pd_nonblocking;    // True if the socket is nonblocking
131   omni_tracedcondition* pd_peek_cond;      // Condition to signal a waiting
132 					   // peeker
133   int                  	pd_fd_index;       // -1 if select thread is not
134 					   // watching; otherwise, index of
135 					   // the fd within the poll / select
136 					   // list.
137   SocketHolder*        	pd_next;
138   SocketHolder**       	pd_prev;
139 };
140 
141 typedef omnivector<SocketHolder*> SocketHolderVec;
142 
143 
144 //
145 // SocketCollection manages a collection of sockets.
146 
147 class SocketCollection {
148 public:
149 
150   SocketCollection();
151 
152   virtual ~SocketCollection();
153 
154   virtual CORBA::Boolean notifyReadable(SocketHolder*) = 0;
155   // Callback used by Select(). If it returns false, something has
156   // gone very wrong with the collection and exits the Select loop.
157   // No locks are held during the call.
158 
159   CORBA::Boolean isSelectable(SocketHandle_t sock);
160   // Indicates whether the given socket can be selected upon.
161 
162   CORBA::Boolean Select();
163   // Returns true if the Select() has successfully done a scan.
164   // otherwise returns false to indicate that an error has been
165   // detected and this function should not be called again.
166   //
167   // For each of the sockets that has been marked watchable and indeed
168   // has become readable, call notifyReadable() with the socket as the
169   // argument.
170 
171   void wakeUp();
172   // On platforms where is is possible, immediately wake up a thread
173   // blocked in Select().
174 
175   void incrRefCount();
176   void decrRefCount();
177 
178   void addSocket(SocketHolder* sock);
179   // Add this socket to the collection. Increments this collection's
180   // refcount.
181 
182   void removeSocket(SocketHolder* sock);
183   // Remove the socket from this collection. Returns the socket which
184   // has been removed. Decrements this collection's refcount.
185 
186   static omni_time_t   scan_interval;
187   static unsigned      idle_scans;
188 
189 private:
190   int                  pd_refcount;
191   omni_tracedmutex     pd_collection_lock;
192 
193   // Absolute time at which we scan through the socket list choosing
194   // the selectable ones.
195   omni_time_t          pd_abs_time;
196 
197   // On platforms that support it, we use a pipe to wake up the select.
198   int                  pd_pipe_read;
199   int                  pd_pipe_write;
200   CORBA::Boolean       pd_pipe_full;
201 
202   // On platforms that support pipes, after scanning a while with no
203   // activity, we poll / select with an infinite timeout to prevent
204   // unnecessary processing.
205   unsigned             pd_idle_count;
206 
207 #if defined(USE_POLL)
208   // On platforms where we use poll(), we maintain a pre-allocated
209   // array of pollfd structures and a parallel array of pointers to
210   // SocketHolders. pd_pollfd_n is the number of populated pollfds.
211   // pd_pollfd_len is the current length of both arrays.
212   struct pollfd*       pd_pollfds;
213   SocketHolder**       pd_pollsockets;
214   unsigned             pd_pollfd_n;
215   unsigned             pd_pollfd_len;
216 
217   void growPollLists();
218   // Expand the pd_pollfds and pd_pollsockets to fit more values.
219 
220 #elif defined(__WIN32__)
221   // Windows has select() but its fd_sets are more like pollfds, just
222   // less convenient...
223   omni_tracedcondition pd_select_cond; // timedwait on if nothing to select
224   fd_set               pd_fd_set;
225   SocketHolder*        pd_fd_sockets[FD_SETSIZE];
226 
227 #else
228   // On other platforms we use select(). We maintain an fd_set and the
229   // highest socket number set in it plus 1.
230   fd_set               pd_fd_set;
231   int                  pd_fd_set_n;
232 #endif
233 
234   // Linked list of registered sockets.
235   SocketHolder*        pd_collection;
236   CORBA::Boolean       pd_changed;
237 
sendNotifications(SocketHolderVec & to_notify)238   inline void sendNotifications(SocketHolderVec& to_notify)
239   {
240     SocketHolderVec::iterator it  = to_notify.begin();
241     SocketHolderVec::iterator end = to_notify.end();
242 
243     for (; it != end; ++it)
244       notifyReadable(*it);
245 
246     to_notify.clear();
247   }
248 
249   friend class SocketHolder;
250 };
251 
252 OMNI_NAMESPACE_END(omni)
253 
254 #endif // __SOCKETCOLLECTION_H__
255