1 //
2 // Copyright (c) ZeroC, Inc. All rights reserved.
3 //
4 
5 namespace IceInternal
6 {
7     using System;
8     using System.Diagnostics;
9     using System.Net;
10     using System.Net.Sockets;
11 
12     public sealed class StreamSocket
13     {
StreamSocket(ProtocolInstance instance, NetworkProxy proxy, EndPoint addr, EndPoint sourceAddr)14         public StreamSocket(ProtocolInstance instance, NetworkProxy proxy, EndPoint addr, EndPoint sourceAddr)
15         {
16             _instance = instance;
17             _proxy = proxy;
18             _addr = addr;
19             _sourceAddr = sourceAddr;
20             _fd = Network.createSocket(false, (_proxy != null ? _proxy.getAddress() : _addr).AddressFamily);
21             _state = StateNeedConnect;
22 
23             init();
24         }
25 
StreamSocket(ProtocolInstance instance, Socket fd)26         public StreamSocket(ProtocolInstance instance, Socket fd)
27         {
28             _instance = instance;
29             _fd = fd;
30             _state = StateConnected;
31             try
32             {
33                 _desc = Network.fdToString(_fd);
34             }
35             catch(Exception)
36             {
37                 Network.closeSocketNoThrow(_fd);
38                 throw;
39             }
40             init();
41         }
42 
setBlock(bool block)43         public void setBlock(bool block)
44         {
45             Network.setBlock(_fd, block);
46         }
47 
connect(Buffer readBuffer, Buffer writeBuffer, ref bool moreData)48         public int connect(Buffer readBuffer, Buffer writeBuffer, ref bool moreData)
49         {
50             if(_state == StateNeedConnect)
51             {
52                 _state = StateConnectPending;
53                 return SocketOperation.Connect;
54             }
55             else if(_state <= StateConnectPending)
56             {
57                 if(_writeEventArgs.SocketError != SocketError.Success)
58                 {
59                     SocketException ex = new SocketException((int)_writeEventArgs.SocketError);
60                     if(Network.connectionRefused(ex))
61                     {
62                         throw new Ice.ConnectionRefusedException(ex);
63                     }
64                     else
65                     {
66                         throw new Ice.ConnectFailedException(ex);
67                     }
68                 }
69                 _desc = Network.fdToString(_fd, _proxy, _addr);
70                 _state = _proxy != null ? StateProxyWrite : StateConnected;
71             }
72 
73             if(_state == StateProxyWrite)
74             {
75                 _proxy.beginWrite(_addr, writeBuffer);
76                 return SocketOperation.Write;
77             }
78             else if(_state == StateProxyRead)
79             {
80                 _proxy.beginRead(readBuffer);
81                 return SocketOperation.Read;
82             }
83             else if(_state == StateProxyConnected)
84             {
85                 _proxy.finish(readBuffer, writeBuffer);
86 
87                 readBuffer.clear();
88                 writeBuffer.clear();
89 
90                 _state = StateConnected;
91             }
92 
93             Debug.Assert(_state == StateConnected);
94             return SocketOperation.None;
95         }
96 
isConnected()97         public bool isConnected()
98         {
99             return _state == StateConnected && _fd != null;
100         }
101 
fd()102         public Socket fd()
103         {
104             return _fd;
105         }
106 
getSendPacketSize(int length)107         public int getSendPacketSize(int length)
108         {
109             return _maxSendPacketSize > 0 ? Math.Min(length, _maxSendPacketSize) : length;
110         }
111 
getRecvPacketSize(int length)112         public int getRecvPacketSize(int length)
113         {
114             return _maxRecvPacketSize > 0 ? Math.Min(length, _maxRecvPacketSize) : length;
115         }
116 
setBufferSize(int rcvSize, int sndSize)117         public void setBufferSize(int rcvSize, int sndSize)
118         {
119             Network.setTcpBufSize(_fd, rcvSize, sndSize, _instance);
120         }
121 
read(Buffer buf)122         public int read(Buffer buf)
123         {
124             if(_state == StateProxyRead)
125             {
126                 while(true)
127                 {
128                     int ret = read(buf.b);
129                     if(ret == 0)
130                     {
131                         return SocketOperation.Read;
132                     }
133 
134                     _state = toState(_proxy.endRead(buf));
135                     if(_state != StateProxyRead)
136                     {
137                         return SocketOperation.None;
138                     }
139                 }
140             }
141             read(buf.b);
142             return buf.b.hasRemaining() ? SocketOperation.Read : SocketOperation.None;
143         }
144 
write(Buffer buf)145         public int write(Buffer buf)
146         {
147             if(_state == StateProxyWrite)
148             {
149                 while(true)
150                 {
151                     int ret = write(buf.b);
152                     if(ret == 0)
153                     {
154                         return SocketOperation.Write;
155                     }
156                     _state = toState(_proxy.endWrite(buf));
157                     if(_state != StateProxyWrite)
158                     {
159                         return SocketOperation.None;
160                     }
161                 }
162             }
163             write(buf.b);
164             return buf.b.hasRemaining() ? SocketOperation.Write : SocketOperation.None;
165         }
166 
startRead(Buffer buf, AsyncCallback callback, object state)167         public bool startRead(Buffer buf, AsyncCallback callback, object state)
168         {
169             Debug.Assert(_fd != null && _readEventArgs != null);
170 
171             int packetSize = getRecvPacketSize(buf.b.remaining());
172             try
173             {
174                 _readCallback = callback;
175                 _readEventArgs.UserToken = state;
176                 _readEventArgs.SetBuffer(buf.b.rawBytes(), buf.b.position(), packetSize);
177                 return !_fd.ReceiveAsync(_readEventArgs);
178             }
179             catch(SocketException ex)
180             {
181                 if(Network.connectionLost(ex))
182                 {
183                     throw new Ice.ConnectionLostException(ex);
184                 }
185                 throw new Ice.SocketException(ex);
186             }
187         }
188 
finishRead(Buffer buf)189         public void finishRead(Buffer buf)
190         {
191             if(_fd == null) // Transceiver was closed
192             {
193                 return;
194             }
195 
196             Debug.Assert(_fd != null && _readEventArgs != null);
197             try
198             {
199                 if(_readEventArgs.SocketError != SocketError.Success)
200                 {
201                     throw new SocketException((int)_readEventArgs.SocketError);
202                 }
203                 int ret = _readEventArgs.BytesTransferred;
204                 _readEventArgs.SetBuffer(null, 0, 0);
205 
206                 if(ret == 0)
207                 {
208                     throw new Ice.ConnectionLostException();
209                 }
210 
211                 Debug.Assert(ret > 0);
212                 buf.b.position(buf.b.position() + ret);
213 
214                 if(_state == StateProxyRead)
215                 {
216                     _state = toState(_proxy.endRead(buf));
217                 }
218             }
219             catch(SocketException ex)
220             {
221                 if(Network.connectionLost(ex))
222                 {
223                     throw new Ice.ConnectionLostException(ex);
224                 }
225                 throw new Ice.SocketException(ex);
226             }
227             catch(ObjectDisposedException ex)
228             {
229                 throw new Ice.ConnectionLostException(ex);
230             }
231         }
232 
startWrite(Buffer buf, AsyncCallback callback, object state, out bool completed)233         public bool startWrite(Buffer buf, AsyncCallback callback, object state, out bool completed)
234         {
235             Debug.Assert(_fd != null && _writeEventArgs != null);
236             if(_state == StateConnectPending)
237             {
238                 completed = false;
239                 _writeCallback = callback;
240                 try
241                 {
242                     EndPoint addr = _proxy != null ? _proxy.getAddress() : _addr;
243                     _writeEventArgs.RemoteEndPoint = addr;
244                     _writeEventArgs.UserToken = state;
245                     return !_fd.ConnectAsync(_writeEventArgs);
246                 }
247                 catch(Exception ex)
248                 {
249                     throw new Ice.SocketException(ex);
250                 }
251             }
252 
253             int packetSize = getSendPacketSize(buf.b.remaining());
254             try
255             {
256                 _writeCallback = callback;
257                 _writeEventArgs.UserToken = state;
258                 _writeEventArgs.SetBuffer(buf.b.rawBytes(), buf.b.position(), packetSize);
259                 bool completedSynchronously = !_fd.SendAsync(_writeEventArgs);
260                 completed = packetSize == buf.b.remaining();
261                 return completedSynchronously;
262             }
263             catch(SocketException ex)
264             {
265                 if(Network.connectionLost(ex))
266                 {
267                     throw new Ice.ConnectionLostException(ex);
268                 }
269                 throw new Ice.SocketException(ex);
270             }
271             catch(ObjectDisposedException ex)
272             {
273                 throw new Ice.ConnectionLostException(ex);
274             }
275         }
276 
finishWrite(Buffer buf)277         public void finishWrite(Buffer buf)
278         {
279             if(_fd == null) // Transceiver was closed
280             {
281                 if(buf.size() - buf.b.position() < _maxSendPacketSize)
282                 {
283                     buf.b.position(buf.b.limit()); // Assume all the data was sent for at-most-once semantics.
284                 }
285                 return;
286             }
287 
288             Debug.Assert(_fd != null && _writeEventArgs != null);
289 
290             if(_state < StateConnected && _state != StateProxyWrite)
291             {
292                 return;
293             }
294 
295             try
296             {
297                 if(_writeEventArgs.SocketError != SocketError.Success)
298                 {
299                     throw new SocketException((int)_writeEventArgs.SocketError);
300                 }
301                 int ret = _writeEventArgs.BytesTransferred;
302                 _writeEventArgs.SetBuffer(null, 0, 0);
303                 if(ret == 0)
304                 {
305                     throw new Ice.ConnectionLostException();
306                 }
307 
308                 Debug.Assert(ret > 0);
309                 buf.b.position(buf.b.position() + ret);
310 
311                 if(_state == StateProxyWrite)
312                 {
313                     _state = toState(_proxy.endWrite(buf));
314                 }
315             }
316             catch(SocketException ex)
317             {
318                 if(Network.connectionLost(ex))
319                 {
320                     throw new Ice.ConnectionLostException(ex);
321                 }
322 
323                 throw new Ice.SocketException(ex);
324             }
325             catch(ObjectDisposedException ex)
326             {
327                 throw new Ice.ConnectionLostException(ex);
328             }
329         }
330 
close()331         public void close()
332         {
333             Debug.Assert(_fd != null);
334             try
335             {
336                 Network.closeSocket(_fd);
337             }
338             finally
339             {
340                 _fd = null;
341             }
342         }
343 
destroy()344         public void destroy()
345         {
346             Debug.Assert(_readEventArgs != null && _writeEventArgs != null);
347             _readEventArgs.Dispose();
348             _writeEventArgs.Dispose();
349         }
350 
ToString()351         public override string ToString()
352         {
353             return _desc;
354         }
355 
read(ByteBuffer buf)356         private int read(ByteBuffer buf)
357         {
358             Debug.Assert(_fd != null);
359             if(AssemblyUtil.isMono)
360             {
361                 //
362                 // Mono on Android and iOS don't support the use of synchronous socket
363                 // operations on a non-blocking socket. Returning 0 here forces the caller to schedule
364                 // an asynchronous operation.
365                 //
366                 return 0;
367             }
368             int read = 0;
369             while(buf.hasRemaining())
370             {
371                 try
372                 {
373                     int ret = _fd.Receive(buf.rawBytes(), buf.position(), buf.remaining(), SocketFlags.None);
374                     if(ret == 0)
375                     {
376                         throw new Ice.ConnectionLostException();
377                     }
378                     read += ret;
379                     buf.position(buf.position() + ret);
380                 }
381                 catch(SocketException ex)
382                 {
383                     if(Network.wouldBlock(ex))
384                     {
385                         return read;
386                     }
387                     else if(Network.interrupted(ex))
388                     {
389                         continue;
390                     }
391                     else if(Network.connectionLost(ex))
392                     {
393                         throw new Ice.ConnectionLostException(ex);
394                     }
395 
396                     throw new Ice.SocketException(ex);
397                 }
398             }
399             return read;
400         }
401 
write(ByteBuffer buf)402         private int write(ByteBuffer buf)
403         {
404             Debug.Assert(_fd != null);
405             if(AssemblyUtil.isMono)
406             {
407                 //
408                 // Mono on Android and iOS don't support the use of synchronous socket
409                 // operations on a non-blocking socket. Returning 0 here forces the caller to schedule
410                 // an asynchronous operation.
411                 //
412                 return 0;
413             }
414             int packetSize = buf.remaining();
415             if(AssemblyUtil.isWindows)
416             {
417                 //
418                 // On Windows, limiting the buffer size is important to prevent
419                 // poor throughput performances when transfering large amount of
420                 // data. See Microsoft KB article KB823764.
421                 //
422                 if(_maxSendPacketSize > 0 && packetSize > _maxSendPacketSize / 2)
423                 {
424                     packetSize = _maxSendPacketSize / 2;
425                 }
426             }
427 
428             int sent = 0;
429             while(buf.hasRemaining())
430             {
431                 try
432                 {
433                     int ret = _fd.Send(buf.rawBytes(), buf.position(), packetSize, SocketFlags.None);
434                     Debug.Assert(ret > 0);
435 
436                     sent += ret;
437                     buf.position(buf.position() + ret);
438                     if(packetSize > buf.remaining())
439                     {
440                         packetSize = buf.remaining();
441                     }
442                 }
443                 catch(SocketException ex)
444                 {
445                     if(Network.wouldBlock(ex))
446                     {
447                         return sent;
448                     }
449                     else if(Network.connectionLost(ex))
450                     {
451                         throw new Ice.ConnectionLostException(ex);
452                     }
453                     throw new Ice.SocketException(ex);
454                 }
455             }
456             return sent;
457         }
ioCompleted(object sender, SocketAsyncEventArgs e)458         private void ioCompleted(object sender, SocketAsyncEventArgs e)
459         {
460             switch (e.LastOperation)
461             {
462             case SocketAsyncOperation.Receive:
463                 _readCallback(e.UserToken);
464                 break;
465             case SocketAsyncOperation.Send:
466             case SocketAsyncOperation.Connect:
467                 _writeCallback(e.UserToken);
468                 break;
469             default:
470                 throw new ArgumentException("The last operation completed on the socket was not a receive or send");
471             }
472         }
473 
init()474         private void init()
475         {
476             Network.setBlock(_fd, false);
477             Network.setTcpBufSize(_fd, _instance);
478 
479             _readEventArgs = new SocketAsyncEventArgs();
480             _readEventArgs.Completed += new EventHandler<SocketAsyncEventArgs>(ioCompleted);
481 
482             _writeEventArgs = new SocketAsyncEventArgs();
483             _writeEventArgs.Completed += new EventHandler<SocketAsyncEventArgs>(ioCompleted);
484 
485             //
486             // For timeouts to work properly, we need to receive/send
487             // the data in several chunks. Otherwise, we would only be
488             // notified when all the data is received/written. The
489             // connection timeout could easily be triggered when
490             // receiging/sending large messages.
491             //
492             _maxSendPacketSize = Math.Max(512, Network.getSendBufferSize(_fd));
493             _maxRecvPacketSize = Math.Max(512, Network.getRecvBufferSize(_fd));
494         }
495 
toState(int operation)496         private int toState(int operation)
497         {
498             switch(operation)
499             {
500             case SocketOperation.Read:
501                 return StateProxyRead;
502             case SocketOperation.Write:
503                 return StateProxyWrite;
504             default:
505                 return StateProxyConnected;
506             }
507         }
508 
509         private readonly ProtocolInstance _instance;
510         private readonly NetworkProxy _proxy;
511         private readonly EndPoint _addr;
512         private readonly EndPoint _sourceAddr;
513 
514         private Socket _fd;
515         private int _maxSendPacketSize;
516         private int _maxRecvPacketSize;
517         private int _state;
518         private string _desc;
519 
520         private SocketAsyncEventArgs _writeEventArgs;
521         private SocketAsyncEventArgs _readEventArgs;
522 
523         AsyncCallback _writeCallback;
524         AsyncCallback _readCallback;
525 
526         private const int StateNeedConnect = 0;
527         private const int StateConnectPending = 1;
528         private const int StateProxyWrite = 2;
529         private const int StateProxyRead = 3;
530         private const int StateProxyConnected = 4;
531         private const int StateConnected = 5;
532     }
533 
534 }
535