1 /*++ 2 Copyright (c) Microsoft Corporation 3 4 Module Name: 5 6 _CacheStreams.cs 7 8 Abstract: 9 The file contains two streams used in conjunction with caching. 10 The first class will combine two streams for reading into just one continued stream. 11 The second class will forward (as writes) to external stream all reads issued on a "this" stream. 12 13 Author: 14 15 Alexei Vopilov 21-Dec-2002 16 17 Revision History: 18 19 --*/ 20 namespace System.Net.Cache { 21 using System; 22 using System.Net; 23 using System.IO; 24 using System.Threading; 25 using System.Collections.Specialized; 26 using System.Diagnostics; 27 28 29 internal abstract class BaseWrapperStream : Stream, IRequestLifetimeTracker 30 { 31 private Stream m_WrappedStream; 32 33 protected Stream WrappedStream 34 { 35 get { return m_WrappedStream; } 36 } 37 BaseWrapperStream(Stream wrappedStream)38 public BaseWrapperStream(Stream wrappedStream) 39 { 40 Debug.Assert(wrappedStream != null); 41 m_WrappedStream = wrappedStream; 42 } 43 TrackRequestLifetime(long requestStartTimestamp)44 public void TrackRequestLifetime(long requestStartTimestamp) 45 { 46 IRequestLifetimeTracker stream = m_WrappedStream as IRequestLifetimeTracker; 47 Debug.Assert(stream != null, "Wrapped stream must implement IRequestLifetimeTracker interface"); 48 stream.TrackRequestLifetime(requestStartTimestamp); 49 } 50 } 51 52 // 53 // This stream will take two Streams (head and tail) and combine them into a single stream 54 // Only read IO is supported! 55 // 56 internal class CombinedReadStream : BaseWrapperStream, ICloseEx { 57 private Stream m_HeadStream; 58 private bool m_HeadEOF; 59 private long m_HeadLength; 60 private int m_ReadNesting; 61 private AsyncCallback m_ReadCallback; //lazy initialized 62 63 CombinedReadStream(Stream headStream, Stream tailStream)64 internal CombinedReadStream(Stream headStream, Stream tailStream) 65 : base(tailStream) 66 { 67 m_HeadStream = headStream; 68 m_HeadEOF = headStream == Stream.Null; 69 } 70 71 public override bool CanRead { 72 get {return m_HeadEOF? WrappedStream.CanRead: m_HeadStream.CanRead;} 73 } 74 75 // If CanSeek is false, Position, Seek, Length, and SetLength should throw. 76 public override bool CanSeek { 77 get {return false;} 78 } 79 80 public override bool CanWrite { 81 get {return false;} 82 } 83 84 public override long Length { 85 get { 86 return WrappedStream.Length + (m_HeadEOF? m_HeadLength: m_HeadStream.Length); 87 } 88 } 89 90 public override long Position { 91 get { 92 return WrappedStream.Position + (m_HeadEOF? m_HeadLength: m_HeadStream.Position); 93 } 94 95 set { 96 throw new NotSupportedException(SR.GetString(SR.net_noseek)); 97 } 98 } 99 Seek(long offset, SeekOrigin origin)100 public override long Seek(long offset, SeekOrigin origin) { 101 throw new NotSupportedException(SR.GetString(SR.net_noseek)); 102 } 103 SetLength(long value)104 public override void SetLength(long value) { 105 throw new NotSupportedException(SR.GetString(SR.net_noseek)); 106 } 107 Write(byte[] buffer, int offset, int count)108 public override void Write(byte[] buffer, int offset, int count) { 109 throw new NotSupportedException(SR.GetString(SR.net_noseek)); 110 } 111 BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, Object state)112 public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, Object state) { 113 throw new NotSupportedException(SR.GetString(SR.net_noseek)); 114 } 115 EndWrite(IAsyncResult asyncResult)116 public override void EndWrite(IAsyncResult asyncResult) { 117 throw new NotSupportedException(SR.GetString(SR.net_noseek)); 118 } 119 Flush()120 public override void Flush() { 121 } 122 Read(byte[] buffer, int offset, int count)123 public override int Read(byte[] buffer, int offset, int count) { 124 125 try { 126 if (Interlocked.Increment(ref m_ReadNesting) != 1) { 127 throw new NotSupportedException(SR.GetString(SR.net_io_invalidnestedcall, "Read", "read")); 128 } 129 130 if (m_HeadEOF) { 131 return WrappedStream.Read(buffer, offset, count); 132 } 133 else { 134 int result = m_HeadStream.Read(buffer, offset, count); 135 m_HeadLength += result; 136 if (result == 0 && count != 0) { 137 m_HeadEOF = true; 138 m_HeadStream.Close(); 139 result = WrappedStream.Read(buffer, offset, count); 140 } 141 return result; 142 } 143 } 144 finally { 145 Interlocked.Decrement(ref m_ReadNesting); 146 } 147 148 } 149 150 151 // 152 // This is a wrapper result used to substitue the AsyncResult returned from m_HeadStream IO 153 // Note that once seen a EOF on m_HeadStream we will stop using this wrapper. 154 // 155 private class InnerAsyncResult: LazyAsyncResult { 156 public byte[] Buffer; 157 public int Offset; 158 public int Count; 159 InnerAsyncResult(object userState, AsyncCallback userCallback, byte[] buffer, int offset, int count)160 public InnerAsyncResult(object userState, AsyncCallback userCallback, byte[] buffer, int offset, int count) 161 :base (null, userState, userCallback) { 162 163 Buffer = buffer; 164 Offset = offset; 165 Count = count; 166 } 167 168 } 169 ReadCallback(IAsyncResult transportResult)170 private void ReadCallback(IAsyncResult transportResult) { 171 GlobalLog.Assert(transportResult.AsyncState is InnerAsyncResult, "InnerAsyncResult::ReadCallback|The state expected to be of type InnerAsyncResult, received {0}.", transportResult.GetType().FullName); 172 if (transportResult.CompletedSynchronously) 173 { 174 return; 175 } 176 177 InnerAsyncResult userResult = transportResult.AsyncState as InnerAsyncResult; 178 try { 179 // Complete transport IO, in this callback that is always the head stream 180 int count; 181 if (!m_HeadEOF) { 182 count = m_HeadStream.EndRead(transportResult); 183 m_HeadLength += count; 184 } 185 else { 186 count = WrappedStream.EndRead(transportResult); 187 } 188 189 190 //check on EOF condition 191 if (!m_HeadEOF && count == 0 && userResult.Count != 0) { 192 //Got a first stream EOF 193 m_HeadEOF = true; 194 m_HeadStream.Close(); 195 IAsyncResult ar = WrappedStream.BeginRead(userResult.Buffer, userResult.Offset, userResult.Count, m_ReadCallback, userResult); 196 if (!ar.CompletedSynchronously) { 197 return; 198 } 199 count = WrappedStream.EndRead(ar); 200 } 201 // just complete user IO 202 userResult.Buffer = null; 203 userResult.InvokeCallback(count); 204 } 205 catch (Exception e) { 206 //ASYNC: try to ignore even serious exceptions (nothing to loose?) 207 if (userResult.InternalPeekCompleted) 208 throw; 209 210 userResult.InvokeCallback(e); 211 } 212 } 213 BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, Object state)214 public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, Object state) { 215 try { 216 if (Interlocked.Increment(ref m_ReadNesting) != 1) { 217 throw new NotSupportedException(SR.GetString(SR.net_io_invalidnestedcall, "BeginRead", "read")); 218 } 219 220 if (m_ReadCallback == null) { 221 m_ReadCallback = new AsyncCallback(ReadCallback); 222 } 223 224 if (m_HeadEOF) { 225 return WrappedStream.BeginRead(buffer, offset, count, callback, state); 226 } 227 else { 228 InnerAsyncResult userResult = new InnerAsyncResult(state, callback, buffer, offset, count); 229 IAsyncResult ar = m_HeadStream.BeginRead(buffer, offset, count, m_ReadCallback, userResult); 230 231 if (!ar.CompletedSynchronously) 232 { 233 return userResult; 234 } 235 236 int bytes = m_HeadStream.EndRead(ar); 237 m_HeadLength += bytes; 238 239 //check on EOF condition 240 if (bytes == 0 && userResult.Count != 0) { 241 //Got a first stream EOF 242 m_HeadEOF = true; 243 m_HeadStream.Close(); 244 return WrappedStream.BeginRead(buffer, offset, count, callback, state); 245 } 246 else { 247 // just complete user IO 248 userResult.Buffer = null; 249 userResult.InvokeCallback(count); 250 return userResult; 251 } 252 253 } 254 } 255 catch { 256 Interlocked.Decrement(ref m_ReadNesting); 257 throw; 258 } 259 } 260 EndRead(IAsyncResult asyncResult)261 public override int EndRead(IAsyncResult asyncResult) { 262 263 if (Interlocked.Decrement(ref m_ReadNesting) != 0) { 264 Interlocked.Increment(ref m_ReadNesting); 265 throw new InvalidOperationException(SR.GetString(SR.net_io_invalidendcall, "EndRead")); 266 } 267 268 if (asyncResult == null) { 269 throw new ArgumentNullException("asyncResult"); 270 } 271 272 InnerAsyncResult myResult = asyncResult as InnerAsyncResult; 273 274 if (myResult == null) { 275 // We are just passing IO down, although m_HeadEOF should be always true here. 276 GlobalLog.Assert(m_HeadEOF, "CombinedReadStream::EndRead|m_HeadEOF is false and asyncResult is not of InnerAsyncResult type {0).", asyncResult.GetType().FullName); 277 return m_HeadEOF? WrappedStream.EndRead(asyncResult): m_HeadStream.EndRead(asyncResult); 278 } 279 280 // this is our wrapped AsyncResult 281 myResult.InternalWaitForCompletion(); 282 283 // Exception? 284 if (myResult.Result is Exception) { 285 throw (Exception)(myResult.Result); 286 } 287 288 // Report the count read 289 return (int)myResult.Result; 290 } 291 292 // Subclasses should use Dispose(bool, CloseExState) Dispose(bool disposing)293 protected override sealed void Dispose(bool disposing) { 294 Dispose(disposing, CloseExState.Normal); 295 } 296 ICloseEx.CloseEx(CloseExState closeState)297 void ICloseEx.CloseEx(CloseExState closeState) { 298 Dispose(true, closeState); 299 } 300 Dispose(bool disposing, CloseExState closeState)301 protected virtual void Dispose(bool disposing, CloseExState closeState) { 302 303 // All below calls should already be idempotent 304 305 try { 306 if (disposing) { 307 try { 308 if (!m_HeadEOF) { 309 ICloseEx icloseEx = m_HeadStream as ICloseEx; 310 if (icloseEx != null) { 311 icloseEx.CloseEx(closeState); 312 } 313 else { 314 m_HeadStream.Close(); 315 } 316 } 317 } 318 finally { 319 ICloseEx icloseEx = WrappedStream as ICloseEx; 320 if (icloseEx != null) { 321 icloseEx.CloseEx(closeState); 322 } 323 else { 324 WrappedStream.Close(); 325 } 326 } 327 } 328 } 329 finally { 330 base.Dispose(disposing); 331 } 332 } 333 334 public override bool CanTimeout { 335 get { 336 return WrappedStream.CanTimeout && m_HeadStream.CanTimeout; 337 } 338 } 339 340 public override int ReadTimeout { 341 get { 342 return (m_HeadEOF) ? WrappedStream.ReadTimeout : m_HeadStream.ReadTimeout; 343 } 344 set { 345 WrappedStream.ReadTimeout = m_HeadStream.ReadTimeout = value; 346 } 347 } 348 349 public override int WriteTimeout { 350 get { 351 return (m_HeadEOF) ? WrappedStream.WriteTimeout : m_HeadStream.WriteTimeout; 352 } 353 set { 354 WrappedStream.WriteTimeout = m_HeadStream.WriteTimeout = value; 355 } 356 } 357 } 358 359 // 360 // This stream will plug into a stream and listen for all reads on it 361 // It is also constructed with yet another stream used for multiplexing IO to 362 // 363 // When it sees a read on this stream the result gets forwarded as write to a shadow stream. 364 // ONLY READ IO is supported! 365 // 366 internal class ForwardingReadStream : BaseWrapperStream, ICloseEx { 367 private Stream m_ShadowStream; 368 private int m_ReadNesting; 369 private bool m_ShadowStreamIsDead; 370 private AsyncCallback m_ReadCallback; // lazy initialized 371 private long m_BytesToSkip; // suppress from the read first number of bytes 372 private bool m_ThrowOnWriteError; 373 private bool m_SeenReadEOF; 374 375 ForwardingReadStream(Stream originalStream, Stream shadowStream, long bytesToSkip, bool throwOnWriteError)376 internal ForwardingReadStream(Stream originalStream, Stream shadowStream, long bytesToSkip, bool throwOnWriteError) 377 : base(originalStream) 378 { 379 if (!shadowStream.CanWrite) { 380 throw new ArgumentException(SR.GetString(SR.net_cache_shadowstream_not_writable), "shadowStream"); 381 } 382 m_ShadowStream = shadowStream; 383 m_BytesToSkip = bytesToSkip; 384 m_ThrowOnWriteError = throwOnWriteError; 385 } 386 387 public override bool CanRead { 388 get {return WrappedStream.CanRead;} 389 } 390 391 // If CanSeek is false, Position, Seek, Length, and SetLength should throw. 392 public override bool CanSeek { 393 get {return false;} 394 } 395 396 public override bool CanWrite { 397 get {return false;} 398 } 399 400 public override long Length { 401 get { 402 return WrappedStream.Length - m_BytesToSkip; 403 } 404 } 405 406 public override long Position { 407 get { 408 return WrappedStream.Position - m_BytesToSkip; 409 } 410 411 set { 412 throw new NotSupportedException(SR.GetString(SR.net_noseek)); 413 } 414 } 415 Seek(long offset, SeekOrigin origin)416 public override long Seek(long offset, SeekOrigin origin) { 417 throw new NotSupportedException(SR.GetString(SR.net_noseek)); 418 } 419 SetLength(long value)420 public override void SetLength(long value) { 421 throw new NotSupportedException(SR.GetString(SR.net_noseek)); 422 } 423 Write(byte[] buffer, int offset, int count)424 public override void Write(byte[] buffer, int offset, int count) { 425 throw new NotSupportedException(SR.GetString(SR.net_noseek)); 426 } 427 BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, Object state)428 public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, Object state) { 429 throw new NotSupportedException(SR.GetString(SR.net_noseek)); 430 } 431 EndWrite(IAsyncResult asyncResult)432 public override void EndWrite(IAsyncResult asyncResult) { 433 throw new NotSupportedException(SR.GetString(SR.net_noseek)); 434 } 435 Flush()436 public override void Flush() { 437 } 438 Read(byte[] buffer, int offset, int count)439 public override int Read(byte[] buffer, int offset, int count) { 440 441 bool isDoingWrite = false; 442 int result = -1; 443 if (Interlocked.Increment(ref m_ReadNesting) != 1) { 444 throw new NotSupportedException(SR.GetString(SR.net_io_invalidnestedcall, "Read", "read")); 445 } 446 447 try { 448 449 if (m_BytesToSkip != 0L) { 450 // Sometime we want to combine cached + live stream AND the user requested explicit range starts from not 0 451 byte[] tempBuffer = new byte[4096]; 452 while (m_BytesToSkip != 0L) { 453 int bytes = WrappedStream.Read(tempBuffer, 0, (m_BytesToSkip < (long)tempBuffer.Length? (int)m_BytesToSkip: tempBuffer.Length)); 454 if (bytes == 0) 455 m_SeenReadEOF = true; 456 457 m_BytesToSkip -= bytes; 458 if (!m_ShadowStreamIsDead) 459 m_ShadowStream.Write(tempBuffer, 0, bytes); 460 } 461 } 462 463 result = WrappedStream.Read(buffer, offset, count); 464 if (result == 0) 465 m_SeenReadEOF = true; 466 467 if (m_ShadowStreamIsDead) { 468 return result; 469 } 470 isDoingWrite = true; 471 m_ShadowStream.Write(buffer, offset, result); 472 return result; 473 } 474 catch (Exception e) { 475 if (e is ThreadAbortException || e is StackOverflowException || e is OutOfMemoryException) 476 throw; 477 478 GlobalLog.Print("ShadowReadStream::Read() Got Exception, disabling the shadow stream, stack trace = " + e.ToString()); 479 if (!m_ShadowStreamIsDead) { 480 // try to ignore even serious exception, since got nothing to loose? 481 m_ShadowStreamIsDead = true; 482 try { 483 if (m_ShadowStream is ICloseEx) 484 ((ICloseEx)m_ShadowStream).CloseEx(CloseExState.Abort | CloseExState.Silent); 485 else 486 m_ShadowStream.Close(); 487 } 488 catch (Exception ee) { 489 if (e is ThreadAbortException || e is StackOverflowException || e is OutOfMemoryException) 490 throw; 491 GlobalLog.Print("ShadowReadStream::Read() Got (ignoring) Exception, on shadow stream.Close, stack trace = " + ee.ToString()); 492 } 493 } 494 if (!isDoingWrite || m_ThrowOnWriteError) 495 throw; 496 497 return result; 498 } 499 finally { 500 Interlocked.Decrement(ref m_ReadNesting); 501 } 502 } 503 504 505 // 506 // This is a wrapper result used to substitue the AsyncResult returned from WrappedStream IO 507 // Note that once seen a m_ShadowStream error we will stop using this wrapper. 508 // 509 private class InnerAsyncResult: LazyAsyncResult { 510 public byte[] Buffer; 511 public int Offset; 512 public int Count; 513 public bool IsWriteCompletion; 514 InnerAsyncResult(object userState, AsyncCallback userCallback, byte[] buffer, int offset, int count)515 public InnerAsyncResult(object userState, AsyncCallback userCallback, byte[] buffer, int offset, int count) 516 :base (null, userState, userCallback) { 517 518 Buffer = buffer; 519 Offset = offset; 520 Count = count; 521 } 522 523 } 524 ReadCallback(IAsyncResult transportResult)525 private void ReadCallback(IAsyncResult transportResult) { 526 GlobalLog.Assert(transportResult.AsyncState is InnerAsyncResult, "InnerAsyncResult::ReadCallback|The state expected to be of type InnerAsyncResult, received {0}.", transportResult.GetType().FullName); 527 if (transportResult.CompletedSynchronously) 528 { 529 return; 530 } 531 532 // Recover our asyncResult 533 InnerAsyncResult userResult = transportResult.AsyncState as InnerAsyncResult; 534 535 ReadComplete(transportResult); 536 } 537 ReadComplete(IAsyncResult transportResult)538 private void ReadComplete(IAsyncResult transportResult) 539 { 540 while(true) 541 { 542 // Recover our asyncResult 543 InnerAsyncResult userResult = transportResult.AsyncState as InnerAsyncResult; 544 545 try 546 { 547 if (!userResult.IsWriteCompletion) 548 { 549 userResult.Count = WrappedStream.EndRead(transportResult); 550 if (userResult.Count == 0) 551 m_SeenReadEOF = true; 552 553 554 if (!m_ShadowStreamIsDead) { 555 userResult.IsWriteCompletion = true; 556 //Optionally charge notification write IO 557 transportResult = m_ShadowStream.BeginWrite(userResult.Buffer, userResult.Offset, userResult.Count, m_ReadCallback, userResult); 558 if (transportResult.CompletedSynchronously) 559 { 560 continue; 561 } 562 return; 563 } 564 } 565 else 566 { 567 GlobalLog.Assert(!m_ShadowStreamIsDead, "ForwardingReadStream::ReadComplete|ERROR: IsWriteCompletion && m_ShadowStreamIsDead"); 568 569 m_ShadowStream.EndWrite(transportResult); 570 userResult.IsWriteCompletion = false; 571 } 572 } 573 catch (Exception e) 574 { 575 //ASYNC: try to ignore even serious exceptions (nothing to loose?) 576 if (userResult.InternalPeekCompleted) 577 { 578 GlobalLog.Print("ShadowReadStream::ReadComplete() Rethrowing Exception (end), userResult.IsCompleted, stack trace = " + e.ToString()); 579 throw; 580 } 581 582 try 583 { 584 m_ShadowStreamIsDead = true; 585 if (m_ShadowStream is ICloseEx) 586 ((ICloseEx)m_ShadowStream).CloseEx(CloseExState.Abort | CloseExState.Silent); 587 else 588 m_ShadowStream.Close(); 589 } 590 catch (Exception ee) 591 { 592 //ASYNC: Again try to ignore even serious exceptions 593 GlobalLog.Print("ShadowReadStream::ReadComplete() Got (ignoring) Exception, on shadow stream.Close, stack trace = " + ee.ToString()); 594 } 595 596 if (!userResult.IsWriteCompletion || m_ThrowOnWriteError) 597 { 598 if (transportResult.CompletedSynchronously) 599 { 600 throw; 601 } 602 603 userResult.InvokeCallback(e); 604 return; 605 } 606 } 607 608 // Need to process, re-issue the read. 609 try 610 { 611 if (m_BytesToSkip != 0L) { 612 m_BytesToSkip -= userResult.Count; 613 userResult.Count = m_BytesToSkip < (long)userResult.Buffer.Length? (int)m_BytesToSkip: userResult.Buffer.Length; 614 if (m_BytesToSkip == 0L) { 615 // we did hide the original IO request in the outer iaresult state. 616 // charge the real user operation now 617 transportResult = userResult; 618 userResult = userResult.AsyncState as InnerAsyncResult; 619 GlobalLog.Assert(userResult != null, "ForwardingReadStream::ReadComplete|ERROR: Inner IAResult is null after stream FastForwarding."); 620 } 621 transportResult = WrappedStream.BeginRead(userResult.Buffer, userResult.Offset, userResult.Count, m_ReadCallback, userResult); 622 if (transportResult.CompletedSynchronously) 623 { 624 continue; 625 } 626 return; 627 } 628 //if came to here, complete original user IO 629 userResult.InvokeCallback(userResult.Count); 630 return; 631 } 632 catch (Exception e) 633 { 634 //ASYNC: try to ignore even serious exceptions (nothing to loose?) 635 if (userResult.InternalPeekCompleted) 636 { 637 GlobalLog.Print("ShadowReadStream::ReadComplete() Rethrowing Exception (begin), userResult.IsCompleted, stack trace = " + e.ToString()); 638 throw; 639 } 640 641 try 642 { 643 m_ShadowStreamIsDead = true; 644 if (m_ShadowStream is ICloseEx) 645 ((ICloseEx)m_ShadowStream).CloseEx(CloseExState.Abort | CloseExState.Silent); 646 else 647 m_ShadowStream.Close(); 648 } 649 catch (Exception ee) 650 { 651 //ASYNC: Again try to ignore even serious exceptions 652 GlobalLog.Print("ShadowReadStream::ReadComplete() Got (ignoring) Exception, on shadow stream.Close (after begin), stack trace = " + ee.ToString()); 653 } 654 655 if (transportResult.CompletedSynchronously) 656 { 657 throw; 658 } 659 660 // This will set the exception result first then try to execute a user callback 661 userResult.InvokeCallback(e); 662 return; 663 } 664 } 665 } 666 BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, Object state)667 public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, Object state) { 668 if (Interlocked.Increment(ref m_ReadNesting) != 1) { 669 throw new NotSupportedException(SR.GetString(SR.net_io_invalidnestedcall, "BeginRead", "read")); 670 } 671 672 try { 673 674 if (m_ReadCallback == null) { 675 m_ReadCallback = new AsyncCallback(ReadCallback); 676 } 677 678 if (m_ShadowStreamIsDead && m_BytesToSkip == 0L) { 679 return WrappedStream.BeginRead(buffer, offset, count, callback, state); 680 } 681 else { 682 InnerAsyncResult userResult = new InnerAsyncResult(state, callback, buffer, offset, count); 683 if (m_BytesToSkip != 0L) { 684 InnerAsyncResult temp = userResult; 685 userResult = new InnerAsyncResult(temp, null, new byte[4096], 686 0, m_BytesToSkip < (long) buffer.Length? (int)m_BytesToSkip: buffer.Length); 687 } 688 IAsyncResult result = WrappedStream.BeginRead(userResult.Buffer, userResult.Offset, userResult.Count, m_ReadCallback, userResult); 689 if (result.CompletedSynchronously) 690 { 691 ReadComplete(result); 692 } 693 return userResult; 694 } 695 } 696 catch { 697 Interlocked.Decrement(ref m_ReadNesting); 698 throw; 699 } 700 } 701 EndRead(IAsyncResult asyncResult)702 public override int EndRead(IAsyncResult asyncResult) { 703 704 if (Interlocked.Decrement(ref m_ReadNesting) != 0) { 705 Interlocked.Increment(ref m_ReadNesting); 706 throw new InvalidOperationException(SR.GetString(SR.net_io_invalidendcall, "EndRead")); 707 } 708 709 if (asyncResult == null) { 710 throw new ArgumentNullException("asyncResult"); 711 } 712 713 InnerAsyncResult myResult = asyncResult as InnerAsyncResult; 714 715 if (myResult == null) { 716 // We are just passing IO down, although the shadow stream should be dead for now. 717 GlobalLog.Assert(m_ShadowStreamIsDead, "ForwardingReadStream::EndRead|m_ShadowStreamIsDead is false and asyncResult is not of InnerAsyncResult type {0}.", asyncResult.GetType().FullName); 718 int bytes = WrappedStream.EndRead(asyncResult); 719 if (bytes == 0) 720 m_SeenReadEOF = true; 721 } 722 723 // this is our wrapped AsyncResult 724 bool suceess = false; 725 try { 726 myResult.InternalWaitForCompletion(); 727 // Exception? 728 if (myResult.Result is Exception) 729 throw (Exception)(myResult.Result); 730 suceess = true; 731 } 732 finally { 733 if (!suceess && !m_ShadowStreamIsDead) { 734 m_ShadowStreamIsDead = true; 735 if (m_ShadowStream is ICloseEx) 736 ((ICloseEx)m_ShadowStream).CloseEx(CloseExState.Abort | CloseExState.Silent); 737 else 738 m_ShadowStream.Close(); 739 } 740 } 741 742 // Report the read count 743 return (int)myResult.Result; 744 } 745 746 // Subclasses should use Dispose(bool, CloseExState) Dispose(bool disposing)747 protected sealed override void Dispose(bool disposing) { 748 Dispose(disposing, CloseExState.Normal); 749 } 750 751 private int _Disposed; ICloseEx.CloseEx(CloseExState closeState)752 void ICloseEx.CloseEx(CloseExState closeState) { 753 754 if (Interlocked.Increment(ref _Disposed) == 1) { 755 // This would allow us to cache the response stream that user throws away 756 // Next time the cached version could save us from an extra roundtrip 757 if (closeState == CloseExState.Silent) { 758 try { 759 int total = 0; 760 int bytesRead; 761 while (total < ConnectStream.s_DrainingBuffer.Length && (bytesRead = Read(ConnectStream.s_DrainingBuffer, 0, ConnectStream.s_DrainingBuffer.Length)) > 0) { 762 total += bytesRead; 763 } 764 } 765 catch (Exception exception) { 766 //ATTN: this path will swalow errors regardless of m_IsThrowOnWriteError setting 767 // A "Silent" close is for an intermediate response that is to be ignored anyway 768 if (exception is ThreadAbortException || exception is StackOverflowException || exception is OutOfMemoryException) { 769 throw; 770 } 771 } 772 } 773 774 Dispose(true, closeState); 775 } 776 } 777 Dispose(bool disposing, CloseExState closeState)778 protected virtual void Dispose(bool disposing, CloseExState closeState) { 779 780 // All below calls should already be idempotent 781 782 try { 783 if (disposing) { 784 try { 785 ICloseEx icloseEx = WrappedStream as ICloseEx; 786 if (icloseEx != null) { 787 icloseEx.CloseEx(closeState); 788 } 789 else { 790 WrappedStream.Close(); 791 } 792 } 793 finally { 794 795 // Notify the wirte stream on a partial response if did not see EOF on read 796 if (!m_SeenReadEOF) 797 closeState |= CloseExState.Abort; 798 799 // 800 // We don't want to touch m_ShadowStreamIsDead because Close() can be called from other thread while IO is in progress. 801 // We assume that all streams used by this class are thread safe on Close(). 802 // m_ShadowStreamIsDead = true; 803 804 if (m_ShadowStream is ICloseEx) 805 ((ICloseEx)m_ShadowStream).CloseEx(closeState); 806 else 807 m_ShadowStream.Close(); 808 } 809 } 810 } 811 finally { 812 base.Dispose(disposing); 813 } 814 } 815 816 public override bool CanTimeout { 817 get { 818 return WrappedStream.CanTimeout && m_ShadowStream.CanTimeout; 819 } 820 } 821 822 public override int ReadTimeout { 823 get { 824 return WrappedStream.ReadTimeout; 825 } 826 set { 827 WrappedStream.ReadTimeout = m_ShadowStream.ReadTimeout = value; 828 } 829 } 830 831 public override int WriteTimeout { 832 get { 833 return m_ShadowStream.WriteTimeout; 834 } 835 set { 836 WrappedStream.WriteTimeout = m_ShadowStream.WriteTimeout = value; 837 } 838 } 839 } 840 841 // 842 // This stream will listen on the parent stream Close. 843 // Assuming the parent stream represents a READ stream such as CombinedReadStream or a response stream. 844 // When the paretn stream is closed this wrapper will update the metadata associated with the entry. 845 internal class MetadataUpdateStream : BaseWrapperStream, ICloseEx { 846 private RequestCache m_Cache; 847 private string m_Key; 848 private DateTime m_Expires; 849 private DateTime m_LastModified; 850 private DateTime m_LastSynchronized; 851 private TimeSpan m_MaxStale; 852 private StringCollection m_EntryMetadata; 853 private StringCollection m_SystemMetadata; 854 private bool m_CacheDestroy; 855 private bool m_IsStrictCacheErrors; 856 857 MetadataUpdateStream( Stream parentStream, RequestCache cache, string key, DateTime expiresGMT, DateTime lastModifiedGMT, DateTime lastSynchronizedGMT, TimeSpan maxStale, StringCollection entryMetadata, StringCollection systemMetadata, bool isStrictCacheErrors)858 internal MetadataUpdateStream( Stream parentStream, 859 RequestCache cache, 860 string key, 861 DateTime expiresGMT, 862 DateTime lastModifiedGMT, 863 DateTime lastSynchronizedGMT, 864 TimeSpan maxStale, 865 StringCollection entryMetadata, 866 StringCollection systemMetadata, 867 bool isStrictCacheErrors) 868 : base(parentStream) 869 { 870 m_Cache = cache; 871 m_Key = key; 872 m_Expires = expiresGMT; 873 m_LastModified = lastModifiedGMT; 874 m_LastSynchronized = lastSynchronizedGMT; 875 m_MaxStale = maxStale; 876 m_EntryMetadata = entryMetadata; 877 m_SystemMetadata = systemMetadata; 878 m_IsStrictCacheErrors = isStrictCacheErrors; 879 } 880 881 // 882 // This constructor will result in removing a cache entry upon closure 883 // MetadataUpdateStream(Stream parentStream, RequestCache cache, string key, bool isStrictCacheErrors)884 private MetadataUpdateStream(Stream parentStream, RequestCache cache, string key, bool isStrictCacheErrors) 885 : base(parentStream) 886 { 887 m_Cache = cache; 888 m_Key = key; 889 m_CacheDestroy = true; 890 m_IsStrictCacheErrors = isStrictCacheErrors; 891 } 892 // 893 // 894 // 895 /* 896 // Consider removing. 897 public static Stream CreateEntryRemovalStream( Stream parentStream, RequestCache cache, string key, bool isStrictCacheErrors) 898 { 899 return new MetadataUpdateStream(parentStream, cache, key, isStrictCacheErrors); 900 } 901 */ 902 // 903 public override bool CanRead { 904 get {return WrappedStream.CanRead;} 905 } 906 // 907 // If CanSeek is false, Position, Seek, Length, and SetLength should throw. 908 public override bool CanSeek { 909 get {return WrappedStream.CanSeek;} 910 } 911 // 912 public override bool CanWrite { 913 get {return WrappedStream.CanWrite;} 914 } 915 // 916 public override long Length { 917 get {return WrappedStream.Length;} 918 } 919 // 920 public override long Position { 921 get {return WrappedStream.Position;} 922 923 set {WrappedStream.Position = value;} 924 } 925 Seek(long offset, SeekOrigin origin)926 public override long Seek(long offset, SeekOrigin origin) { 927 return WrappedStream.Seek(offset, origin); 928 } 929 SetLength(long value)930 public override void SetLength(long value) { 931 WrappedStream.SetLength(value); 932 } 933 Write(byte[] buffer, int offset, int count)934 public override void Write(byte[] buffer, int offset, int count) { 935 WrappedStream.Write(buffer, offset, count); 936 } 937 BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, Object state)938 public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, Object state) { 939 return WrappedStream.BeginWrite(buffer, offset, count, callback, state); 940 } 941 EndWrite(IAsyncResult asyncResult)942 public override void EndWrite(IAsyncResult asyncResult) { 943 WrappedStream.EndWrite(asyncResult); 944 } 945 Flush()946 public override void Flush() { 947 WrappedStream.Flush(); 948 } 949 Read(byte[] buffer, int offset, int count)950 public override int Read(byte[] buffer, int offset, int count) { 951 return WrappedStream.Read(buffer, offset, count); 952 } 953 BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, Object state)954 public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, Object state) { 955 return WrappedStream.BeginRead(buffer, offset, count, callback, state); 956 } 957 EndRead(IAsyncResult asyncResult)958 public override int EndRead(IAsyncResult asyncResult) { 959 return WrappedStream.EndRead(asyncResult); 960 } 961 962 // Subclasses should use Dispose(bool, CloseExState) Dispose(bool disposing)963 protected sealed override void Dispose(bool disposing) { 964 Dispose(disposing, CloseExState.Normal); 965 } 966 ICloseEx.CloseEx(CloseExState closeState)967 void ICloseEx.CloseEx(CloseExState closeState) { 968 Dispose(true, closeState); 969 } 970 971 public override bool CanTimeout { 972 get { 973 return WrappedStream.CanTimeout; 974 } 975 } 976 977 public override int ReadTimeout { 978 get { 979 return WrappedStream.ReadTimeout; 980 } 981 set { 982 WrappedStream.ReadTimeout = value; 983 } 984 } 985 986 public override int WriteTimeout { 987 get { 988 return WrappedStream.WriteTimeout; 989 } 990 set { 991 WrappedStream.WriteTimeout = value; 992 } 993 } 994 995 private int _Disposed; Dispose(bool disposing, CloseExState closeState)996 protected virtual void Dispose(bool disposing, CloseExState closeState) { 997 998 try { 999 if (Interlocked.Increment(ref _Disposed) == 1) { 1000 if (disposing) { 1001 ICloseEx icloseEx = WrappedStream as ICloseEx; 1002 1003 if (icloseEx != null) { 1004 icloseEx.CloseEx(closeState); 1005 } 1006 else { 1007 WrappedStream.Close(); 1008 } 1009 1010 if (m_CacheDestroy) 1011 { 1012 if (m_IsStrictCacheErrors) 1013 { 1014 m_Cache.Remove(m_Key); 1015 } 1016 else 1017 { 1018 m_Cache.TryRemove(m_Key); 1019 } 1020 } 1021 else 1022 { 1023 if (m_IsStrictCacheErrors) 1024 { 1025 m_Cache.Update(m_Key, m_Expires, m_LastModified, m_LastSynchronized, m_MaxStale, m_EntryMetadata, m_SystemMetadata); 1026 } 1027 else 1028 { 1029 m_Cache.TryUpdate(m_Key, m_Expires, m_LastModified, m_LastSynchronized, m_MaxStale, m_EntryMetadata, m_SystemMetadata); 1030 } 1031 1032 } 1033 } 1034 } 1035 } 1036 finally { 1037 base.Dispose(disposing); 1038 } 1039 } 1040 } 1041 1042 // 1043 // This stream is for Partial responses. 1044 // It will scroll to the given position and limit the original stream windows to given size 1045 internal class RangeStream : BaseWrapperStream, ICloseEx { 1046 long m_Offset; 1047 long m_Size; 1048 long m_Position; 1049 RangeStream(Stream parentStream, long offset, long size)1050 internal RangeStream (Stream parentStream, long offset, long size) 1051 : base(parentStream) 1052 { 1053 m_Offset = offset; 1054 m_Size = size; 1055 if (WrappedStream.CanSeek) { 1056 WrappedStream.Position = offset; 1057 m_Position = offset; 1058 } 1059 else { 1060 // for now we expect a FileStream that is seekable. 1061 throw new NotSupportedException(SR.GetString(SR.net_cache_non_seekable_stream_not_supported)); 1062 } 1063 } 1064 1065 public override bool CanRead { 1066 get {return WrappedStream.CanRead;} 1067 } 1068 1069 // If CanSeek is false, Position, Seek, Length, and SetLength should throw. 1070 public override bool CanSeek { 1071 get {return WrappedStream.CanSeek;} 1072 } 1073 1074 public override bool CanWrite { 1075 get {return WrappedStream.CanWrite;} 1076 } 1077 1078 public override long Length { 1079 get { 1080 long dummy = WrappedStream.Length; 1081 return m_Size; 1082 } 1083 } 1084 1085 public override long Position { 1086 get {return WrappedStream.Position-m_Offset;} 1087 1088 set { 1089 value += m_Offset; 1090 if (value > m_Offset + m_Size) { 1091 value = m_Offset + m_Size; 1092 } 1093 WrappedStream.Position = value; 1094 } 1095 } 1096 Seek(long offset, SeekOrigin origin)1097 public override long Seek(long offset, SeekOrigin origin) { 1098 switch (origin) { 1099 case SeekOrigin.Begin: 1100 offset += m_Offset; 1101 if (offset > m_Offset+m_Size) { 1102 offset = m_Offset+m_Size; 1103 } 1104 if (offset < m_Offset) { 1105 offset = m_Offset; 1106 } 1107 break; 1108 case SeekOrigin.End: 1109 offset -= (m_Offset+m_Size); 1110 if (offset > 0) { 1111 offset = 0; 1112 } 1113 if (offset < -m_Size) { 1114 offset = -m_Size; 1115 } 1116 break; 1117 default: 1118 if (m_Position+offset > m_Offset+m_Size) { 1119 offset = (m_Offset+m_Size) - m_Position; 1120 } 1121 if (m_Position+offset < m_Offset) { 1122 offset = m_Offset-m_Position; 1123 } 1124 break; 1125 } 1126 m_Position=WrappedStream.Seek(offset, origin); 1127 return m_Position-m_Offset; 1128 } 1129 SetLength(long value)1130 public override void SetLength(long value) { 1131 throw new NotSupportedException(SR.GetString(SR.net_cache_unsupported_partial_stream)); 1132 } 1133 Write(byte[] buffer, int offset, int count)1134 public override void Write(byte[] buffer, int offset, int count) { 1135 if (m_Position + count > m_Offset+m_Size) { 1136 throw new NotSupportedException(SR.GetString(SR.net_cache_unsupported_partial_stream)); 1137 } 1138 WrappedStream.Write(buffer, offset, count); 1139 m_Position += count; 1140 } 1141 BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, Object state)1142 public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, Object state) { 1143 if (m_Position+offset > m_Offset+m_Size) { 1144 throw new NotSupportedException(SR.GetString(SR.net_cache_unsupported_partial_stream)); 1145 } 1146 return WrappedStream.BeginWrite(buffer, offset, count, callback, state); 1147 } 1148 EndWrite(IAsyncResult asyncResult)1149 public override void EndWrite(IAsyncResult asyncResult) { 1150 WrappedStream.EndWrite(asyncResult); 1151 m_Position = WrappedStream.Position; 1152 } 1153 Flush()1154 public override void Flush() { 1155 WrappedStream.Flush(); 1156 } 1157 Read(byte[] buffer, int offset, int count)1158 public override int Read(byte[] buffer, int offset, int count) { 1159 if (m_Position >= m_Offset+m_Size) { 1160 return 0; 1161 } 1162 if (m_Position + count > m_Offset+m_Size) { 1163 count = (int)(m_Offset + m_Size - m_Position); 1164 } 1165 int result = WrappedStream.Read(buffer, offset, count); 1166 m_Position += result; 1167 return result; 1168 } 1169 BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, Object state)1170 public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, Object state) { 1171 if (m_Position >= m_Offset+m_Size) { 1172 count = 0; 1173 } 1174 else if (m_Position + count > m_Offset+m_Size) { 1175 count = (int)(m_Offset + m_Size - m_Position); 1176 } 1177 return WrappedStream.BeginRead(buffer, offset, count, callback, state); 1178 } 1179 EndRead(IAsyncResult asyncResult)1180 public override int EndRead(IAsyncResult asyncResult) { 1181 int result = WrappedStream.EndRead(asyncResult); 1182 m_Position += result; 1183 return result; 1184 } 1185 1186 // Subclasses should use Dispose(bool, CloseExState) Dispose(bool disposing)1187 protected sealed override void Dispose(bool disposing) { 1188 Dispose(disposing, CloseExState.Normal); 1189 } 1190 ICloseEx.CloseEx(CloseExState closeState)1191 void ICloseEx.CloseEx(CloseExState closeState) { 1192 Dispose(true, closeState); 1193 } 1194 1195 public override bool CanTimeout { 1196 get { 1197 return WrappedStream.CanTimeout; 1198 } 1199 } 1200 1201 public override int ReadTimeout { 1202 get { 1203 return WrappedStream.ReadTimeout; 1204 } 1205 set { 1206 WrappedStream.ReadTimeout = value; 1207 } 1208 } 1209 1210 public override int WriteTimeout { 1211 get { 1212 return WrappedStream.WriteTimeout; 1213 } 1214 set { 1215 WrappedStream.WriteTimeout = value; 1216 } 1217 } 1218 Dispose(bool disposing, CloseExState closeState)1219 protected virtual void Dispose(bool disposing, CloseExState closeState) { 1220 1221 // All calls below should already be idempotent. 1222 1223 try 1224 { 1225 if (disposing) { 1226 1227 ICloseEx icloseEx = WrappedStream as ICloseEx; 1228 1229 if (icloseEx != null) { 1230 icloseEx.CloseEx(closeState); 1231 } 1232 else { 1233 WrappedStream.Close(); 1234 } 1235 } 1236 } 1237 finally 1238 { 1239 base.Dispose(disposing); 1240 } 1241 1242 } 1243 } 1244 1245 } 1246