1 // 2 // Copyright (c) ZeroC, Inc. All rights reserved. 3 // 4 5 namespace IceInternal 6 { 7 using System; 8 using System.Diagnostics; 9 using System.Net; 10 using System.Net.Sockets; 11 12 public sealed class StreamSocket 13 { StreamSocket(ProtocolInstance instance, NetworkProxy proxy, EndPoint addr, EndPoint sourceAddr)14 public StreamSocket(ProtocolInstance instance, NetworkProxy proxy, EndPoint addr, EndPoint sourceAddr) 15 { 16 _instance = instance; 17 _proxy = proxy; 18 _addr = addr; 19 _sourceAddr = sourceAddr; 20 _fd = Network.createSocket(false, (_proxy != null ? _proxy.getAddress() : _addr).AddressFamily); 21 _state = StateNeedConnect; 22 23 init(); 24 } 25 StreamSocket(ProtocolInstance instance, Socket fd)26 public StreamSocket(ProtocolInstance instance, Socket fd) 27 { 28 _instance = instance; 29 _fd = fd; 30 _state = StateConnected; 31 try 32 { 33 _desc = Network.fdToString(_fd); 34 } 35 catch(Exception) 36 { 37 Network.closeSocketNoThrow(_fd); 38 throw; 39 } 40 init(); 41 } 42 setBlock(bool block)43 public void setBlock(bool block) 44 { 45 Network.setBlock(_fd, block); 46 } 47 connect(Buffer readBuffer, Buffer writeBuffer, ref bool moreData)48 public int connect(Buffer readBuffer, Buffer writeBuffer, ref bool moreData) 49 { 50 if(_state == StateNeedConnect) 51 { 52 _state = StateConnectPending; 53 return SocketOperation.Connect; 54 } 55 else if(_state <= StateConnectPending) 56 { 57 if(_writeEventArgs.SocketError != SocketError.Success) 58 { 59 SocketException ex = new SocketException((int)_writeEventArgs.SocketError); 60 if(Network.connectionRefused(ex)) 61 { 62 throw new Ice.ConnectionRefusedException(ex); 63 } 64 else 65 { 66 throw new Ice.ConnectFailedException(ex); 67 } 68 } 69 _desc = Network.fdToString(_fd, _proxy, _addr); 70 _state = _proxy != null ? StateProxyWrite : StateConnected; 71 } 72 73 if(_state == StateProxyWrite) 74 { 75 _proxy.beginWrite(_addr, writeBuffer); 76 return SocketOperation.Write; 77 } 78 else if(_state == StateProxyRead) 79 { 80 _proxy.beginRead(readBuffer); 81 return SocketOperation.Read; 82 } 83 else if(_state == StateProxyConnected) 84 { 85 _proxy.finish(readBuffer, writeBuffer); 86 87 readBuffer.clear(); 88 writeBuffer.clear(); 89 90 _state = StateConnected; 91 } 92 93 Debug.Assert(_state == StateConnected); 94 return SocketOperation.None; 95 } 96 isConnected()97 public bool isConnected() 98 { 99 return _state == StateConnected && _fd != null; 100 } 101 fd()102 public Socket fd() 103 { 104 return _fd; 105 } 106 getSendPacketSize(int length)107 public int getSendPacketSize(int length) 108 { 109 return _maxSendPacketSize > 0 ? Math.Min(length, _maxSendPacketSize) : length; 110 } 111 getRecvPacketSize(int length)112 public int getRecvPacketSize(int length) 113 { 114 return _maxRecvPacketSize > 0 ? Math.Min(length, _maxRecvPacketSize) : length; 115 } 116 setBufferSize(int rcvSize, int sndSize)117 public void setBufferSize(int rcvSize, int sndSize) 118 { 119 Network.setTcpBufSize(_fd, rcvSize, sndSize, _instance); 120 } 121 read(Buffer buf)122 public int read(Buffer buf) 123 { 124 if(_state == StateProxyRead) 125 { 126 while(true) 127 { 128 int ret = read(buf.b); 129 if(ret == 0) 130 { 131 return SocketOperation.Read; 132 } 133 134 _state = toState(_proxy.endRead(buf)); 135 if(_state != StateProxyRead) 136 { 137 return SocketOperation.None; 138 } 139 } 140 } 141 read(buf.b); 142 return buf.b.hasRemaining() ? SocketOperation.Read : SocketOperation.None; 143 } 144 write(Buffer buf)145 public int write(Buffer buf) 146 { 147 if(_state == StateProxyWrite) 148 { 149 while(true) 150 { 151 int ret = write(buf.b); 152 if(ret == 0) 153 { 154 return SocketOperation.Write; 155 } 156 _state = toState(_proxy.endWrite(buf)); 157 if(_state != StateProxyWrite) 158 { 159 return SocketOperation.None; 160 } 161 } 162 } 163 write(buf.b); 164 return buf.b.hasRemaining() ? SocketOperation.Write : SocketOperation.None; 165 } 166 startRead(Buffer buf, AsyncCallback callback, object state)167 public bool startRead(Buffer buf, AsyncCallback callback, object state) 168 { 169 Debug.Assert(_fd != null && _readEventArgs != null); 170 171 int packetSize = getRecvPacketSize(buf.b.remaining()); 172 try 173 { 174 _readCallback = callback; 175 _readEventArgs.UserToken = state; 176 _readEventArgs.SetBuffer(buf.b.rawBytes(), buf.b.position(), packetSize); 177 return !_fd.ReceiveAsync(_readEventArgs); 178 } 179 catch(SocketException ex) 180 { 181 if(Network.connectionLost(ex)) 182 { 183 throw new Ice.ConnectionLostException(ex); 184 } 185 throw new Ice.SocketException(ex); 186 } 187 } 188 finishRead(Buffer buf)189 public void finishRead(Buffer buf) 190 { 191 if(_fd == null) // Transceiver was closed 192 { 193 return; 194 } 195 196 Debug.Assert(_fd != null && _readEventArgs != null); 197 try 198 { 199 if(_readEventArgs.SocketError != SocketError.Success) 200 { 201 throw new SocketException((int)_readEventArgs.SocketError); 202 } 203 int ret = _readEventArgs.BytesTransferred; 204 _readEventArgs.SetBuffer(null, 0, 0); 205 206 if(ret == 0) 207 { 208 throw new Ice.ConnectionLostException(); 209 } 210 211 Debug.Assert(ret > 0); 212 buf.b.position(buf.b.position() + ret); 213 214 if(_state == StateProxyRead) 215 { 216 _state = toState(_proxy.endRead(buf)); 217 } 218 } 219 catch(SocketException ex) 220 { 221 if(Network.connectionLost(ex)) 222 { 223 throw new Ice.ConnectionLostException(ex); 224 } 225 throw new Ice.SocketException(ex); 226 } 227 catch(ObjectDisposedException ex) 228 { 229 throw new Ice.ConnectionLostException(ex); 230 } 231 } 232 startWrite(Buffer buf, AsyncCallback callback, object state, out bool completed)233 public bool startWrite(Buffer buf, AsyncCallback callback, object state, out bool completed) 234 { 235 Debug.Assert(_fd != null && _writeEventArgs != null); 236 if(_state == StateConnectPending) 237 { 238 completed = false; 239 _writeCallback = callback; 240 try 241 { 242 EndPoint addr = _proxy != null ? _proxy.getAddress() : _addr; 243 _writeEventArgs.RemoteEndPoint = addr; 244 _writeEventArgs.UserToken = state; 245 return !_fd.ConnectAsync(_writeEventArgs); 246 } 247 catch(Exception ex) 248 { 249 throw new Ice.SocketException(ex); 250 } 251 } 252 253 int packetSize = getSendPacketSize(buf.b.remaining()); 254 try 255 { 256 _writeCallback = callback; 257 _writeEventArgs.UserToken = state; 258 _writeEventArgs.SetBuffer(buf.b.rawBytes(), buf.b.position(), packetSize); 259 bool completedSynchronously = !_fd.SendAsync(_writeEventArgs); 260 completed = packetSize == buf.b.remaining(); 261 return completedSynchronously; 262 } 263 catch(SocketException ex) 264 { 265 if(Network.connectionLost(ex)) 266 { 267 throw new Ice.ConnectionLostException(ex); 268 } 269 throw new Ice.SocketException(ex); 270 } 271 catch(ObjectDisposedException ex) 272 { 273 throw new Ice.ConnectionLostException(ex); 274 } 275 } 276 finishWrite(Buffer buf)277 public void finishWrite(Buffer buf) 278 { 279 if(_fd == null) // Transceiver was closed 280 { 281 if(buf.size() - buf.b.position() < _maxSendPacketSize) 282 { 283 buf.b.position(buf.b.limit()); // Assume all the data was sent for at-most-once semantics. 284 } 285 return; 286 } 287 288 Debug.Assert(_fd != null && _writeEventArgs != null); 289 290 if(_state < StateConnected && _state != StateProxyWrite) 291 { 292 return; 293 } 294 295 try 296 { 297 if(_writeEventArgs.SocketError != SocketError.Success) 298 { 299 throw new SocketException((int)_writeEventArgs.SocketError); 300 } 301 int ret = _writeEventArgs.BytesTransferred; 302 _writeEventArgs.SetBuffer(null, 0, 0); 303 if(ret == 0) 304 { 305 throw new Ice.ConnectionLostException(); 306 } 307 308 Debug.Assert(ret > 0); 309 buf.b.position(buf.b.position() + ret); 310 311 if(_state == StateProxyWrite) 312 { 313 _state = toState(_proxy.endWrite(buf)); 314 } 315 } 316 catch(SocketException ex) 317 { 318 if(Network.connectionLost(ex)) 319 { 320 throw new Ice.ConnectionLostException(ex); 321 } 322 323 throw new Ice.SocketException(ex); 324 } 325 catch(ObjectDisposedException ex) 326 { 327 throw new Ice.ConnectionLostException(ex); 328 } 329 } 330 close()331 public void close() 332 { 333 Debug.Assert(_fd != null); 334 try 335 { 336 Network.closeSocket(_fd); 337 } 338 finally 339 { 340 _fd = null; 341 } 342 } 343 destroy()344 public void destroy() 345 { 346 Debug.Assert(_readEventArgs != null && _writeEventArgs != null); 347 _readEventArgs.Dispose(); 348 _writeEventArgs.Dispose(); 349 } 350 ToString()351 public override string ToString() 352 { 353 return _desc; 354 } 355 read(ByteBuffer buf)356 private int read(ByteBuffer buf) 357 { 358 Debug.Assert(_fd != null); 359 if(AssemblyUtil.isMono) 360 { 361 // 362 // Mono on Android and iOS don't support the use of synchronous socket 363 // operations on a non-blocking socket. Returning 0 here forces the caller to schedule 364 // an asynchronous operation. 365 // 366 return 0; 367 } 368 int read = 0; 369 while(buf.hasRemaining()) 370 { 371 try 372 { 373 int ret = _fd.Receive(buf.rawBytes(), buf.position(), buf.remaining(), SocketFlags.None); 374 if(ret == 0) 375 { 376 throw new Ice.ConnectionLostException(); 377 } 378 read += ret; 379 buf.position(buf.position() + ret); 380 } 381 catch(SocketException ex) 382 { 383 if(Network.wouldBlock(ex)) 384 { 385 return read; 386 } 387 else if(Network.interrupted(ex)) 388 { 389 continue; 390 } 391 else if(Network.connectionLost(ex)) 392 { 393 throw new Ice.ConnectionLostException(ex); 394 } 395 396 throw new Ice.SocketException(ex); 397 } 398 } 399 return read; 400 } 401 write(ByteBuffer buf)402 private int write(ByteBuffer buf) 403 { 404 Debug.Assert(_fd != null); 405 if(AssemblyUtil.isMono) 406 { 407 // 408 // Mono on Android and iOS don't support the use of synchronous socket 409 // operations on a non-blocking socket. Returning 0 here forces the caller to schedule 410 // an asynchronous operation. 411 // 412 return 0; 413 } 414 int packetSize = buf.remaining(); 415 if(AssemblyUtil.isWindows) 416 { 417 // 418 // On Windows, limiting the buffer size is important to prevent 419 // poor throughput performances when transfering large amount of 420 // data. See Microsoft KB article KB823764. 421 // 422 if(_maxSendPacketSize > 0 && packetSize > _maxSendPacketSize / 2) 423 { 424 packetSize = _maxSendPacketSize / 2; 425 } 426 } 427 428 int sent = 0; 429 while(buf.hasRemaining()) 430 { 431 try 432 { 433 int ret = _fd.Send(buf.rawBytes(), buf.position(), packetSize, SocketFlags.None); 434 Debug.Assert(ret > 0); 435 436 sent += ret; 437 buf.position(buf.position() + ret); 438 if(packetSize > buf.remaining()) 439 { 440 packetSize = buf.remaining(); 441 } 442 } 443 catch(SocketException ex) 444 { 445 if(Network.wouldBlock(ex)) 446 { 447 return sent; 448 } 449 else if(Network.connectionLost(ex)) 450 { 451 throw new Ice.ConnectionLostException(ex); 452 } 453 throw new Ice.SocketException(ex); 454 } 455 } 456 return sent; 457 } ioCompleted(object sender, SocketAsyncEventArgs e)458 private void ioCompleted(object sender, SocketAsyncEventArgs e) 459 { 460 switch (e.LastOperation) 461 { 462 case SocketAsyncOperation.Receive: 463 _readCallback(e.UserToken); 464 break; 465 case SocketAsyncOperation.Send: 466 case SocketAsyncOperation.Connect: 467 _writeCallback(e.UserToken); 468 break; 469 default: 470 throw new ArgumentException("The last operation completed on the socket was not a receive or send"); 471 } 472 } 473 init()474 private void init() 475 { 476 Network.setBlock(_fd, false); 477 Network.setTcpBufSize(_fd, _instance); 478 479 _readEventArgs = new SocketAsyncEventArgs(); 480 _readEventArgs.Completed += new EventHandler<SocketAsyncEventArgs>(ioCompleted); 481 482 _writeEventArgs = new SocketAsyncEventArgs(); 483 _writeEventArgs.Completed += new EventHandler<SocketAsyncEventArgs>(ioCompleted); 484 485 // 486 // For timeouts to work properly, we need to receive/send 487 // the data in several chunks. Otherwise, we would only be 488 // notified when all the data is received/written. The 489 // connection timeout could easily be triggered when 490 // receiging/sending large messages. 491 // 492 _maxSendPacketSize = Math.Max(512, Network.getSendBufferSize(_fd)); 493 _maxRecvPacketSize = Math.Max(512, Network.getRecvBufferSize(_fd)); 494 } 495 toState(int operation)496 private int toState(int operation) 497 { 498 switch(operation) 499 { 500 case SocketOperation.Read: 501 return StateProxyRead; 502 case SocketOperation.Write: 503 return StateProxyWrite; 504 default: 505 return StateProxyConnected; 506 } 507 } 508 509 private readonly ProtocolInstance _instance; 510 private readonly NetworkProxy _proxy; 511 private readonly EndPoint _addr; 512 private readonly EndPoint _sourceAddr; 513 514 private Socket _fd; 515 private int _maxSendPacketSize; 516 private int _maxRecvPacketSize; 517 private int _state; 518 private string _desc; 519 520 private SocketAsyncEventArgs _writeEventArgs; 521 private SocketAsyncEventArgs _readEventArgs; 522 523 AsyncCallback _writeCallback; 524 AsyncCallback _readCallback; 525 526 private const int StateNeedConnect = 0; 527 private const int StateConnectPending = 1; 528 private const int StateProxyWrite = 2; 529 private const int StateProxyRead = 3; 530 private const int StateProxyConnected = 4; 531 private const int StateConnected = 5; 532 } 533 534 } 535