1 /*++
2 Copyright (c) Microsoft Corporation
3 
4 Module Name:
5 
6     _CacheStreams.cs
7 
8 Abstract:
9     The file contains two streams used in conjunction with caching.
10     The first class will combine two streams for reading into just one continued stream.
11     The second class will forward (as writes) to external stream all reads issued on a "this" stream.
12 
13 Author:
14 
15     Alexei Vopilov    21-Dec-2002
16 
17 Revision History:
18 
19 --*/
20 namespace System.Net.Cache {
21 	using System;
22 	using System.Net;
23 	using System.IO;
24 	using System.Threading;
25 	using System.Collections.Specialized;
26     using System.Diagnostics;
27 
28 
29     internal abstract class BaseWrapperStream : Stream, IRequestLifetimeTracker
30     {
31         private Stream m_WrappedStream;
32 
33         protected Stream WrappedStream
34         {
35             get { return m_WrappedStream; }
36         }
37 
BaseWrapperStream(Stream wrappedStream)38         public BaseWrapperStream(Stream wrappedStream)
39         {
40             Debug.Assert(wrappedStream != null);
41             m_WrappedStream = wrappedStream;
42         }
43 
TrackRequestLifetime(long requestStartTimestamp)44         public void TrackRequestLifetime(long requestStartTimestamp)
45         {
46             IRequestLifetimeTracker stream = m_WrappedStream as IRequestLifetimeTracker;
47             Debug.Assert(stream != null, "Wrapped stream must implement IRequestLifetimeTracker interface");
48             stream.TrackRequestLifetime(requestStartTimestamp);
49         }
50     }
51 
52     //
53     // This stream will take two Streams (head and tail) and combine them into a single stream
54     // Only read IO is supported!
55     //
56     internal class CombinedReadStream : BaseWrapperStream, ICloseEx {
57         private Stream  m_HeadStream;
58         private bool    m_HeadEOF;
59         private long    m_HeadLength;
60         private int     m_ReadNesting;
61         private AsyncCallback m_ReadCallback;   //lazy initialized
62 
63 
CombinedReadStream(Stream headStream, Stream tailStream)64         internal CombinedReadStream(Stream headStream, Stream tailStream)
65             : base(tailStream)
66         {
67             m_HeadStream = headStream;
68             m_HeadEOF = headStream == Stream.Null;
69         }
70 
71         public override bool CanRead {
72             get {return m_HeadEOF? WrappedStream.CanRead: m_HeadStream.CanRead;}
73         }
74 
75         // If CanSeek is false, Position, Seek, Length, and SetLength should throw.
76         public override bool CanSeek {
77             get {return false;}
78         }
79 
80         public override bool CanWrite {
81             get {return false;}
82         }
83 
84         public override long Length {
85             get {
86                 return WrappedStream.Length + (m_HeadEOF? m_HeadLength: m_HeadStream.Length);
87             }
88         }
89 
90         public override long Position {
91             get {
92                 return WrappedStream.Position + (m_HeadEOF? m_HeadLength: m_HeadStream.Position);
93             }
94 
95             set {
96                 throw new NotSupportedException(SR.GetString(SR.net_noseek));
97             }
98         }
99 
Seek(long offset, SeekOrigin origin)100         public override long Seek(long offset, SeekOrigin origin) {
101             throw new NotSupportedException(SR.GetString(SR.net_noseek));
102         }
103 
SetLength(long value)104         public override void SetLength(long value) {
105             throw new NotSupportedException(SR.GetString(SR.net_noseek));
106         }
107 
Write(byte[] buffer, int offset, int count)108         public override void Write(byte[] buffer, int offset, int count) {
109             throw new NotSupportedException(SR.GetString(SR.net_noseek));
110         }
111 
BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, Object state)112         public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, Object state) {
113             throw new NotSupportedException(SR.GetString(SR.net_noseek));
114         }
115 
EndWrite(IAsyncResult asyncResult)116         public override void EndWrite(IAsyncResult asyncResult) {
117             throw new NotSupportedException(SR.GetString(SR.net_noseek));
118         }
119 
Flush()120         public override void Flush() {
121         }
122 
Read(byte[] buffer, int offset, int count)123         public override int Read(byte[] buffer, int offset, int count) {
124 
125             try {
126                 if (Interlocked.Increment(ref m_ReadNesting) != 1) {
127                     throw new NotSupportedException(SR.GetString(SR.net_io_invalidnestedcall, "Read", "read"));
128                 }
129 
130                 if (m_HeadEOF) {
131                     return WrappedStream.Read(buffer, offset, count);
132                 }
133                 else {
134                     int result = m_HeadStream.Read(buffer, offset, count);
135                     m_HeadLength += result;
136                     if (result == 0 && count != 0) {
137                         m_HeadEOF = true;
138                         m_HeadStream.Close();
139                         result = WrappedStream.Read(buffer, offset, count);
140                     }
141                     return result;
142                 }
143             }
144             finally {
145                 Interlocked.Decrement(ref m_ReadNesting);
146             }
147 
148         }
149 
150 
151         //
152         // This is a wrapper result used to substitue the AsyncResult returned from m_HeadStream IO
153         // Note that once seen a EOF on m_HeadStream we will stop using this wrapper.
154         //
155         private class InnerAsyncResult: LazyAsyncResult {
156             public byte[] Buffer;
157             public int    Offset;
158             public int    Count;
159 
InnerAsyncResult(object userState, AsyncCallback userCallback, byte[] buffer, int offset, int count)160             public InnerAsyncResult(object userState, AsyncCallback userCallback, byte[] buffer, int offset, int count)
161             :base (null, userState, userCallback) {
162 
163                 Buffer = buffer;
164                 Offset = offset;
165                 Count  = count;
166             }
167 
168         }
169 
ReadCallback(IAsyncResult transportResult)170         private void ReadCallback(IAsyncResult transportResult) {
171             GlobalLog.Assert(transportResult.AsyncState is InnerAsyncResult, "InnerAsyncResult::ReadCallback|The state expected to be of type InnerAsyncResult, received {0}.", transportResult.GetType().FullName);
172             if (transportResult.CompletedSynchronously)
173             {
174                 return;
175             }
176 
177             InnerAsyncResult userResult = transportResult.AsyncState as InnerAsyncResult;
178             try {
179                 // Complete transport IO, in this callback that is always the head stream
180                 int count;
181                 if (!m_HeadEOF) {
182                     count = m_HeadStream.EndRead(transportResult);
183                     m_HeadLength += count;
184                 }
185                 else {
186                     count = WrappedStream.EndRead(transportResult);
187                 }
188 
189 
190                 //check on EOF condition
191                 if (!m_HeadEOF && count == 0 && userResult.Count != 0) {
192                     //Got a first stream EOF
193                     m_HeadEOF = true;
194                     m_HeadStream.Close();
195                     IAsyncResult ar = WrappedStream.BeginRead(userResult.Buffer, userResult.Offset, userResult.Count, m_ReadCallback, userResult);
196                     if (!ar.CompletedSynchronously) {
197                         return;
198                     }
199                     count = WrappedStream.EndRead(ar);
200                 }
201                 // just complete user IO
202                 userResult.Buffer = null;
203                 userResult.InvokeCallback(count);
204             }
205             catch (Exception e) {
206                 //ASYNC: try to ignore even serious exceptions (nothing to loose?)
207                 if (userResult.InternalPeekCompleted)
208                     throw;
209 
210                 userResult.InvokeCallback(e);
211             }
212         }
213 
BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, Object state)214         public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, Object state) {
215             try {
216                 if (Interlocked.Increment(ref m_ReadNesting) != 1) {
217                     throw new NotSupportedException(SR.GetString(SR.net_io_invalidnestedcall, "BeginRead", "read"));
218                 }
219 
220                 if (m_ReadCallback == null) {
221                     m_ReadCallback = new AsyncCallback(ReadCallback);
222                 }
223 
224                 if (m_HeadEOF) {
225                     return WrappedStream.BeginRead(buffer, offset, count, callback, state);
226                 }
227                 else {
228                     InnerAsyncResult userResult = new InnerAsyncResult(state, callback, buffer, offset, count);
229                     IAsyncResult ar = m_HeadStream.BeginRead(buffer, offset, count, m_ReadCallback, userResult);
230 
231                     if (!ar.CompletedSynchronously)
232                     {
233                         return userResult;
234                     }
235 
236                     int bytes = m_HeadStream.EndRead(ar);
237                     m_HeadLength += bytes;
238 
239                     //check on EOF condition
240                     if (bytes == 0 && userResult.Count != 0) {
241                         //Got a first stream EOF
242                         m_HeadEOF = true;
243                         m_HeadStream.Close();
244                         return WrappedStream.BeginRead(buffer, offset, count, callback, state);
245                     }
246                     else {
247                         // just complete user IO
248                         userResult.Buffer = null;
249                         userResult.InvokeCallback(count);
250                         return userResult;
251                     }
252 
253                 }
254             }
255             catch {
256                 Interlocked.Decrement(ref m_ReadNesting);
257                 throw;
258             }
259         }
260 
EndRead(IAsyncResult asyncResult)261         public override int EndRead(IAsyncResult asyncResult) {
262 
263             if (Interlocked.Decrement(ref m_ReadNesting) != 0) {
264                 Interlocked.Increment(ref m_ReadNesting);
265                 throw new InvalidOperationException(SR.GetString(SR.net_io_invalidendcall, "EndRead"));
266             }
267 
268             if (asyncResult == null) {
269                 throw new ArgumentNullException("asyncResult");
270             }
271 
272             InnerAsyncResult myResult = asyncResult as InnerAsyncResult;
273 
274             if (myResult == null) {
275                 // We are just passing IO down, although m_HeadEOF should be always true here.
276                 GlobalLog.Assert(m_HeadEOF, "CombinedReadStream::EndRead|m_HeadEOF is false and asyncResult is not of InnerAsyncResult type {0).", asyncResult.GetType().FullName);
277                 return m_HeadEOF? WrappedStream.EndRead(asyncResult): m_HeadStream.EndRead(asyncResult);
278             }
279 
280             // this is our wrapped AsyncResult
281             myResult.InternalWaitForCompletion();
282 
283             // Exception?
284             if (myResult.Result is Exception) {
285                 throw (Exception)(myResult.Result);
286             }
287 
288             // Report the count read
289             return (int)myResult.Result;
290         }
291 
292         // Subclasses should use Dispose(bool, CloseExState)
Dispose(bool disposing)293         protected override sealed void Dispose(bool disposing) {
294             Dispose(disposing, CloseExState.Normal);
295         }
296 
ICloseEx.CloseEx(CloseExState closeState)297         void ICloseEx.CloseEx(CloseExState closeState) {
298             Dispose(true, closeState);
299         }
300 
Dispose(bool disposing, CloseExState closeState)301         protected virtual void Dispose(bool disposing, CloseExState closeState) {
302 
303             // All below calls should already be idempotent
304 
305             try {
306                 if (disposing) {
307                     try {
308                         if (!m_HeadEOF) {
309                             ICloseEx icloseEx = m_HeadStream as ICloseEx;
310                             if (icloseEx != null) {
311                                 icloseEx.CloseEx(closeState);
312                             }
313                             else {
314                                 m_HeadStream.Close();
315                             }
316                         }
317                     }
318                     finally {
319                         ICloseEx icloseEx = WrappedStream as ICloseEx;
320                         if (icloseEx != null) {
321                             icloseEx.CloseEx(closeState);
322                         }
323                         else {
324                             WrappedStream.Close();
325                         }
326                     }
327                 }
328             }
329             finally {
330                 base.Dispose(disposing);
331             }
332         }
333 
334         public override bool CanTimeout {
335             get {
336                 return WrappedStream.CanTimeout && m_HeadStream.CanTimeout;
337             }
338         }
339 
340         public override int ReadTimeout {
341             get {
342                 return (m_HeadEOF) ? WrappedStream.ReadTimeout : m_HeadStream.ReadTimeout;
343             }
344             set {
345                 WrappedStream.ReadTimeout = m_HeadStream.ReadTimeout = value;
346             }
347         }
348 
349         public override int WriteTimeout {
350             get {
351                 return (m_HeadEOF) ? WrappedStream.WriteTimeout : m_HeadStream.WriteTimeout;
352             }
353             set {
354                 WrappedStream.WriteTimeout = m_HeadStream.WriteTimeout = value;
355             }
356         }
357     }
358 
359     //
360     // This stream will plug into a stream and listen for all reads on it
361     // It is also constructed with yet another stream used for multiplexing IO to
362     //
363     // When it sees a read on this stream the result gets forwarded as write to a shadow stream.
364     // ONLY READ IO is supported!
365     //
366     internal class ForwardingReadStream : BaseWrapperStream, ICloseEx {
367         private Stream  m_ShadowStream;
368         private int     m_ReadNesting;
369         private bool    m_ShadowStreamIsDead;
370         private AsyncCallback m_ReadCallback;   // lazy initialized
371         private long    m_BytesToSkip;       // suppress from the read first number of bytes
372         private bool    m_ThrowOnWriteError;
373         private bool    m_SeenReadEOF;
374 
375 
ForwardingReadStream(Stream originalStream, Stream shadowStream, long bytesToSkip, bool throwOnWriteError)376         internal ForwardingReadStream(Stream originalStream, Stream shadowStream, long bytesToSkip, bool throwOnWriteError)
377             : base(originalStream)
378         {
379             if (!shadowStream.CanWrite) {
380                 throw new ArgumentException(SR.GetString(SR.net_cache_shadowstream_not_writable), "shadowStream");
381             }
382             m_ShadowStream = shadowStream;
383             m_BytesToSkip = bytesToSkip;
384             m_ThrowOnWriteError = throwOnWriteError;
385         }
386 
387         public override bool CanRead {
388             get {return WrappedStream.CanRead;}
389         }
390 
391         // If CanSeek is false, Position, Seek, Length, and SetLength should throw.
392         public override bool CanSeek {
393             get {return false;}
394         }
395 
396         public override bool CanWrite {
397             get {return false;}
398         }
399 
400         public override long Length {
401             get {
402                 return WrappedStream.Length - m_BytesToSkip;
403             }
404         }
405 
406         public override long Position {
407             get {
408                 return WrappedStream.Position - m_BytesToSkip;
409             }
410 
411             set {
412                 throw new NotSupportedException(SR.GetString(SR.net_noseek));
413             }
414         }
415 
Seek(long offset, SeekOrigin origin)416         public override long Seek(long offset, SeekOrigin origin) {
417             throw new NotSupportedException(SR.GetString(SR.net_noseek));
418         }
419 
SetLength(long value)420         public override void SetLength(long value) {
421             throw new NotSupportedException(SR.GetString(SR.net_noseek));
422         }
423 
Write(byte[] buffer, int offset, int count)424         public override void Write(byte[] buffer, int offset, int count) {
425             throw new NotSupportedException(SR.GetString(SR.net_noseek));
426         }
427 
BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, Object state)428         public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, Object state) {
429             throw new NotSupportedException(SR.GetString(SR.net_noseek));
430         }
431 
EndWrite(IAsyncResult asyncResult)432         public override void EndWrite(IAsyncResult asyncResult) {
433             throw new NotSupportedException(SR.GetString(SR.net_noseek));
434         }
435 
Flush()436         public override void Flush() {
437         }
438 
Read(byte[] buffer, int offset, int count)439         public override int Read(byte[] buffer, int offset, int count) {
440 
441             bool isDoingWrite = false;
442             int result = -1;
443             if (Interlocked.Increment(ref m_ReadNesting) != 1) {
444                 throw new NotSupportedException(SR.GetString(SR.net_io_invalidnestedcall, "Read", "read"));
445             }
446 
447             try {
448 
449                 if (m_BytesToSkip != 0L) {
450                     // Sometime we want to combine cached + live stream AND the user requested explicit range starts from not 0
451                     byte[] tempBuffer = new byte[4096];
452                     while (m_BytesToSkip != 0L) {
453                         int bytes = WrappedStream.Read(tempBuffer, 0, (m_BytesToSkip < (long)tempBuffer.Length? (int)m_BytesToSkip: tempBuffer.Length));
454                         if (bytes == 0)
455                             m_SeenReadEOF = true;
456 
457                         m_BytesToSkip -= bytes;
458                         if (!m_ShadowStreamIsDead)
459                             m_ShadowStream.Write(tempBuffer, 0, bytes);
460                     }
461                 }
462 
463                 result = WrappedStream.Read(buffer, offset, count);
464                 if (result == 0)
465                     m_SeenReadEOF = true;
466 
467                 if (m_ShadowStreamIsDead) {
468                     return result;
469                 }
470                 isDoingWrite = true;
471                 m_ShadowStream.Write(buffer, offset, result);
472                 return result;
473             }
474             catch (Exception e) {
475                 if (e is ThreadAbortException || e is StackOverflowException || e is OutOfMemoryException)
476                     throw;
477 
478                 GlobalLog.Print("ShadowReadStream::Read() Got Exception, disabling the shadow stream, stack trace = " + e.ToString());
479                 if (!m_ShadowStreamIsDead) {
480                     // try to ignore even serious exception, since got nothing to loose?
481                     m_ShadowStreamIsDead = true;
482                     try {
483                         if (m_ShadowStream is ICloseEx)
484                             ((ICloseEx)m_ShadowStream).CloseEx(CloseExState.Abort | CloseExState.Silent);
485                         else
486                             m_ShadowStream.Close();
487                     }
488                     catch (Exception ee) {
489                         if (e is ThreadAbortException || e is StackOverflowException || e is OutOfMemoryException)
490                             throw;
491                         GlobalLog.Print("ShadowReadStream::Read() Got (ignoring) Exception, on shadow stream.Close, stack trace = " + ee.ToString());
492                     }
493                 }
494                 if (!isDoingWrite || m_ThrowOnWriteError)
495                     throw;
496 
497                 return result;
498             }
499             finally {
500                 Interlocked.Decrement(ref m_ReadNesting);
501             }
502         }
503 
504 
505         //
506         // This is a wrapper result used to substitue the AsyncResult returned from WrappedStream IO
507         // Note that once seen a m_ShadowStream error we will stop using this wrapper.
508         //
509         private class InnerAsyncResult: LazyAsyncResult {
510             public byte[] Buffer;
511             public int    Offset;
512             public int    Count;
513             public bool   IsWriteCompletion;
514 
InnerAsyncResult(object userState, AsyncCallback userCallback, byte[] buffer, int offset, int count)515             public InnerAsyncResult(object userState, AsyncCallback userCallback, byte[] buffer, int offset, int count)
516             :base (null, userState, userCallback) {
517 
518                 Buffer = buffer;
519                 Offset = offset;
520                 Count  = count;
521             }
522 
523         }
524 
ReadCallback(IAsyncResult transportResult)525         private void ReadCallback(IAsyncResult transportResult) {
526             GlobalLog.Assert(transportResult.AsyncState is InnerAsyncResult, "InnerAsyncResult::ReadCallback|The state expected to be of type InnerAsyncResult, received {0}.", transportResult.GetType().FullName);
527             if (transportResult.CompletedSynchronously)
528             {
529                 return;
530             }
531 
532             // Recover our asyncResult
533             InnerAsyncResult userResult = transportResult.AsyncState as InnerAsyncResult;
534 
535             ReadComplete(transportResult);
536         }
537 
ReadComplete(IAsyncResult transportResult)538         private void ReadComplete(IAsyncResult transportResult)
539         {
540             while(true)
541             {
542                 // Recover our asyncResult
543                 InnerAsyncResult userResult = transportResult.AsyncState as InnerAsyncResult;
544 
545                 try
546                 {
547                     if (!userResult.IsWriteCompletion)
548                     {
549                         userResult.Count = WrappedStream.EndRead(transportResult);
550                         if (userResult.Count == 0)
551                             m_SeenReadEOF = true;
552 
553 
554                         if (!m_ShadowStreamIsDead) {
555                             userResult.IsWriteCompletion = true;
556                             //Optionally charge notification write IO
557                             transportResult = m_ShadowStream.BeginWrite(userResult.Buffer, userResult.Offset, userResult.Count, m_ReadCallback, userResult);
558                             if (transportResult.CompletedSynchronously)
559                             {
560                                 continue;
561                             }
562                             return;
563                         }
564                     }
565                     else
566                     {
567                         GlobalLog.Assert(!m_ShadowStreamIsDead, "ForwardingReadStream::ReadComplete|ERROR: IsWriteCompletion && m_ShadowStreamIsDead");
568 
569                         m_ShadowStream.EndWrite(transportResult);
570                         userResult.IsWriteCompletion = false;
571                     }
572                 }
573                 catch (Exception e)
574                 {
575                     //ASYNC: try to ignore even serious exceptions (nothing to loose?)
576                     if (userResult.InternalPeekCompleted)
577                     {
578                         GlobalLog.Print("ShadowReadStream::ReadComplete() Rethrowing Exception (end), userResult.IsCompleted, stack trace = " + e.ToString());
579                         throw;
580                     }
581 
582                     try
583                     {
584                         m_ShadowStreamIsDead = true;
585                         if (m_ShadowStream is ICloseEx)
586                             ((ICloseEx)m_ShadowStream).CloseEx(CloseExState.Abort | CloseExState.Silent);
587                         else
588                             m_ShadowStream.Close();
589                     }
590                     catch (Exception ee)
591                     {
592                         //ASYNC: Again try to ignore even serious exceptions
593                         GlobalLog.Print("ShadowReadStream::ReadComplete() Got (ignoring) Exception, on shadow stream.Close, stack trace = " + ee.ToString());
594                     }
595 
596                     if (!userResult.IsWriteCompletion || m_ThrowOnWriteError)
597                     {
598                         if (transportResult.CompletedSynchronously)
599                         {
600                             throw;
601                         }
602 
603                         userResult.InvokeCallback(e);
604                         return;
605                     }
606                 }
607 
608                 // Need to process, re-issue the read.
609                 try
610                 {
611                     if (m_BytesToSkip != 0L) {
612                         m_BytesToSkip -= userResult.Count;
613                         userResult.Count = m_BytesToSkip < (long)userResult.Buffer.Length? (int)m_BytesToSkip: userResult.Buffer.Length;
614                         if (m_BytesToSkip == 0L) {
615                             // we did hide the original IO request in the outer iaresult state.
616                             // charge the real user operation now
617                             transportResult = userResult;
618                             userResult = userResult.AsyncState as InnerAsyncResult;
619                             GlobalLog.Assert(userResult != null, "ForwardingReadStream::ReadComplete|ERROR: Inner IAResult is null after stream FastForwarding.");
620                         }
621                         transportResult = WrappedStream.BeginRead(userResult.Buffer, userResult.Offset, userResult.Count, m_ReadCallback, userResult);
622                         if (transportResult.CompletedSynchronously)
623                         {
624                             continue;
625                         }
626                         return;
627                     }
628                     //if came to here, complete original user IO
629                     userResult.InvokeCallback(userResult.Count);
630                     return;
631                 }
632                 catch (Exception e)
633                 {
634                     //ASYNC: try to ignore even serious exceptions (nothing to loose?)
635                     if (userResult.InternalPeekCompleted)
636                     {
637                         GlobalLog.Print("ShadowReadStream::ReadComplete() Rethrowing Exception (begin), userResult.IsCompleted, stack trace = " + e.ToString());
638                         throw;
639                     }
640 
641                     try
642                     {
643                         m_ShadowStreamIsDead = true;
644                         if (m_ShadowStream is ICloseEx)
645                             ((ICloseEx)m_ShadowStream).CloseEx(CloseExState.Abort | CloseExState.Silent);
646                         else
647                             m_ShadowStream.Close();
648                     }
649                     catch (Exception ee)
650                     {
651                         //ASYNC: Again try to ignore even serious exceptions
652                         GlobalLog.Print("ShadowReadStream::ReadComplete() Got (ignoring) Exception, on shadow stream.Close (after begin), stack trace = " + ee.ToString());
653                     }
654 
655                     if (transportResult.CompletedSynchronously)
656                     {
657                         throw;
658                     }
659 
660                     // This will set the exception result first then try to execute a user callback
661                     userResult.InvokeCallback(e);
662                     return;
663                 }
664             }
665         }
666 
BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, Object state)667         public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, Object state) {
668             if (Interlocked.Increment(ref m_ReadNesting) != 1) {
669                 throw new NotSupportedException(SR.GetString(SR.net_io_invalidnestedcall, "BeginRead", "read"));
670             }
671 
672             try {
673 
674                 if (m_ReadCallback == null) {
675                     m_ReadCallback = new AsyncCallback(ReadCallback);
676                 }
677 
678                 if (m_ShadowStreamIsDead && m_BytesToSkip == 0L) {
679                     return WrappedStream.BeginRead(buffer, offset, count, callback, state);
680                 }
681                 else {
682                     InnerAsyncResult userResult = new InnerAsyncResult(state, callback, buffer, offset, count);
683                     if (m_BytesToSkip != 0L) {
684                         InnerAsyncResult temp = userResult;
685                         userResult = new InnerAsyncResult(temp, null, new byte[4096],
686                                                           0, m_BytesToSkip < (long) buffer.Length? (int)m_BytesToSkip: buffer.Length);
687                     }
688                     IAsyncResult result = WrappedStream.BeginRead(userResult.Buffer, userResult.Offset, userResult.Count, m_ReadCallback, userResult);
689                     if (result.CompletedSynchronously)
690                     {
691                         ReadComplete(result);
692                     }
693                     return userResult;
694                 }
695             }
696             catch {
697                 Interlocked.Decrement(ref m_ReadNesting);
698                 throw;
699             }
700         }
701 
EndRead(IAsyncResult asyncResult)702         public override int EndRead(IAsyncResult asyncResult) {
703 
704             if (Interlocked.Decrement(ref m_ReadNesting) != 0) {
705                 Interlocked.Increment(ref m_ReadNesting);
706                 throw new InvalidOperationException(SR.GetString(SR.net_io_invalidendcall, "EndRead"));
707             }
708 
709             if (asyncResult == null) {
710                 throw new ArgumentNullException("asyncResult");
711             }
712 
713             InnerAsyncResult myResult = asyncResult as InnerAsyncResult;
714 
715             if (myResult == null) {
716                 // We are just passing IO down, although the shadow stream should be dead for now.
717                 GlobalLog.Assert(m_ShadowStreamIsDead, "ForwardingReadStream::EndRead|m_ShadowStreamIsDead is false and asyncResult is not of InnerAsyncResult type {0}.", asyncResult.GetType().FullName);
718                 int bytes = WrappedStream.EndRead(asyncResult);
719                 if (bytes == 0)
720                     m_SeenReadEOF = true;
721             }
722 
723             // this is our wrapped AsyncResult
724             bool suceess = false;
725             try {
726                 myResult.InternalWaitForCompletion();
727                 // Exception?
728                 if (myResult.Result is Exception)
729                     throw (Exception)(myResult.Result);
730                 suceess = true;
731             }
732             finally {
733                 if (!suceess && !m_ShadowStreamIsDead) {
734                     m_ShadowStreamIsDead = true;
735                     if (m_ShadowStream is ICloseEx)
736                         ((ICloseEx)m_ShadowStream).CloseEx(CloseExState.Abort | CloseExState.Silent);
737                     else
738                         m_ShadowStream.Close();
739                 }
740             }
741 
742             // Report the read count
743             return (int)myResult.Result;
744         }
745 
746         // Subclasses should use Dispose(bool, CloseExState)
Dispose(bool disposing)747         protected sealed override void Dispose(bool disposing) {
748             Dispose(disposing, CloseExState.Normal);
749         }
750 
751         private int _Disposed;
ICloseEx.CloseEx(CloseExState closeState)752         void ICloseEx.CloseEx(CloseExState closeState) {
753 
754             if (Interlocked.Increment(ref _Disposed) == 1) {
755                 // This would allow us to cache the response stream that user throws away
756                 // Next time the cached version could save us from an extra roundtrip
757                 if (closeState == CloseExState.Silent) {
758                     try {
759                         int total = 0;
760                         int bytesRead;
761                         while (total < ConnectStream.s_DrainingBuffer.Length && (bytesRead = Read(ConnectStream.s_DrainingBuffer, 0, ConnectStream.s_DrainingBuffer.Length)) > 0) {
762                             total += bytesRead;
763                         }
764                     }
765                     catch (Exception exception) {
766                         //ATTN: this path will swalow errors regardless of m_IsThrowOnWriteError setting
767                         //      A "Silent" close is for an intermediate response that is to be ignored anyway
768                         if (exception is ThreadAbortException || exception is StackOverflowException || exception is OutOfMemoryException) {
769                             throw;
770                         }
771                     }
772                 }
773 
774                 Dispose(true, closeState);
775             }
776         }
777 
Dispose(bool disposing, CloseExState closeState)778         protected virtual void Dispose(bool disposing, CloseExState closeState) {
779 
780             // All below calls should already be idempotent
781 
782             try {
783                 if (disposing) {
784                     try {
785                         ICloseEx icloseEx = WrappedStream as ICloseEx;
786                         if (icloseEx != null) {
787                             icloseEx.CloseEx(closeState);
788                         }
789                         else {
790                             WrappedStream.Close();
791                         }
792                     }
793                     finally {
794 
795                         // Notify the wirte stream on a partial response if did not see EOF on read
796                         if (!m_SeenReadEOF)
797                             closeState |= CloseExState.Abort;
798 
799                         //
800                         // We don't want to touch m_ShadowStreamIsDead because Close() can be called from other thread while IO is in progress.
801                         // We assume that all streams used by this class are thread safe on Close().
802                         // m_ShadowStreamIsDead = true;
803 
804                         if (m_ShadowStream is ICloseEx)
805                             ((ICloseEx)m_ShadowStream).CloseEx(closeState);
806                         else
807                             m_ShadowStream.Close();
808                     }
809                 }
810             }
811             finally {
812                 base.Dispose(disposing);
813             }
814         }
815 
816         public override bool CanTimeout {
817             get {
818                 return WrappedStream.CanTimeout && m_ShadowStream.CanTimeout;
819             }
820         }
821 
822         public override int ReadTimeout {
823             get {
824                 return WrappedStream.ReadTimeout;
825             }
826             set {
827                 WrappedStream.ReadTimeout = m_ShadowStream.ReadTimeout = value;
828             }
829         }
830 
831         public override int WriteTimeout {
832             get {
833                 return m_ShadowStream.WriteTimeout;
834             }
835             set {
836                 WrappedStream.WriteTimeout = m_ShadowStream.WriteTimeout = value;
837             }
838         }
839     }
840 
841     //
842     // This stream will listen on the parent stream Close.
843     // Assuming the parent stream represents a READ stream such as CombinedReadStream or a response stream.
844     // When the paretn stream is closed this wrapper will update the metadata associated with the entry.
845     internal class MetadataUpdateStream : BaseWrapperStream, ICloseEx {
846         private RequestCache m_Cache;
847         private string      m_Key;
848         private DateTime    m_Expires;
849         private DateTime    m_LastModified;
850         private DateTime    m_LastSynchronized;
851         private TimeSpan    m_MaxStale;
852         private StringCollection m_EntryMetadata;
853         private StringCollection m_SystemMetadata;
854         private bool        m_CacheDestroy;
855         private bool        m_IsStrictCacheErrors;
856 
857 
MetadataUpdateStream( Stream parentStream, RequestCache cache, string key, DateTime expiresGMT, DateTime lastModifiedGMT, DateTime lastSynchronizedGMT, TimeSpan maxStale, StringCollection entryMetadata, StringCollection systemMetadata, bool isStrictCacheErrors)858         internal MetadataUpdateStream(  Stream parentStream,
859                                         RequestCache cache,
860                                         string      key,
861                                         DateTime    expiresGMT,
862                                         DateTime    lastModifiedGMT,
863                                         DateTime    lastSynchronizedGMT,
864                                         TimeSpan    maxStale,
865                                         StringCollection entryMetadata,
866                                         StringCollection systemMetadata,
867                                         bool        isStrictCacheErrors)
868             : base(parentStream)
869         {
870             m_Cache             = cache;
871             m_Key               = key;
872             m_Expires           = expiresGMT;
873             m_LastModified      = lastModifiedGMT;
874             m_LastSynchronized  = lastSynchronizedGMT;
875             m_MaxStale          = maxStale;
876             m_EntryMetadata     = entryMetadata;
877             m_SystemMetadata    = systemMetadata;
878             m_IsStrictCacheErrors = isStrictCacheErrors;
879         }
880 
881         //
882         // This constructor will result in removing a cache entry upon closure
883         //
MetadataUpdateStream(Stream parentStream, RequestCache cache, string key, bool isStrictCacheErrors)884         private MetadataUpdateStream(Stream parentStream, RequestCache cache, string key, bool isStrictCacheErrors)
885             : base(parentStream)
886         {
887             m_Cache             = cache;
888             m_Key               = key;
889             m_CacheDestroy      = true;
890             m_IsStrictCacheErrors = isStrictCacheErrors;
891         }
892         //
893         //
894         //
895         /*
896         // Consider removing.
897         public static Stream CreateEntryRemovalStream(  Stream parentStream, RequestCache cache, string key, bool isStrictCacheErrors)
898         {
899             return new MetadataUpdateStream(parentStream, cache, key, isStrictCacheErrors);
900         }
901         */
902         //
903         public override bool CanRead {
904             get {return WrappedStream.CanRead;}
905         }
906         //
907         // If CanSeek is false, Position, Seek, Length, and SetLength should throw.
908         public override bool CanSeek {
909             get {return WrappedStream.CanSeek;}
910         }
911         //
912         public override bool CanWrite {
913             get {return WrappedStream.CanWrite;}
914         }
915         //
916         public override long Length {
917             get {return WrappedStream.Length;}
918         }
919         //
920         public override long Position {
921             get {return WrappedStream.Position;}
922 
923             set {WrappedStream.Position = value;}
924         }
925 
Seek(long offset, SeekOrigin origin)926         public override long Seek(long offset, SeekOrigin origin) {
927             return WrappedStream.Seek(offset, origin);
928         }
929 
SetLength(long value)930         public override void SetLength(long value) {
931             WrappedStream.SetLength(value);
932         }
933 
Write(byte[] buffer, int offset, int count)934         public override void Write(byte[] buffer, int offset, int count) {
935             WrappedStream.Write(buffer, offset, count);
936         }
937 
BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, Object state)938         public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, Object state) {
939             return WrappedStream.BeginWrite(buffer, offset, count, callback, state);
940         }
941 
EndWrite(IAsyncResult asyncResult)942         public override void EndWrite(IAsyncResult asyncResult) {
943             WrappedStream.EndWrite(asyncResult);
944         }
945 
Flush()946         public override void Flush() {
947             WrappedStream.Flush();
948         }
949 
Read(byte[] buffer, int offset, int count)950         public override int Read(byte[] buffer, int offset, int count) {
951             return WrappedStream.Read(buffer, offset,  count);
952         }
953 
BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, Object state)954         public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, Object state) {
955             return WrappedStream.BeginRead(buffer, offset, count, callback, state);
956         }
957 
EndRead(IAsyncResult asyncResult)958         public override int EndRead(IAsyncResult asyncResult) {
959             return WrappedStream.EndRead(asyncResult);
960         }
961 
962         // Subclasses should use Dispose(bool, CloseExState)
Dispose(bool disposing)963         protected sealed override void Dispose(bool disposing) {
964             Dispose(disposing, CloseExState.Normal);
965         }
966 
ICloseEx.CloseEx(CloseExState closeState)967         void ICloseEx.CloseEx(CloseExState closeState) {
968             Dispose(true, closeState);
969         }
970 
971         public override bool CanTimeout {
972             get {
973                 return WrappedStream.CanTimeout;
974             }
975         }
976 
977         public override int ReadTimeout {
978             get {
979                 return WrappedStream.ReadTimeout;
980             }
981             set {
982                 WrappedStream.ReadTimeout = value;
983             }
984         }
985 
986         public override int WriteTimeout {
987             get {
988                 return WrappedStream.WriteTimeout;
989             }
990             set {
991                 WrappedStream.WriteTimeout = value;
992             }
993         }
994 
995         private int _Disposed;
Dispose(bool disposing, CloseExState closeState)996         protected virtual void Dispose(bool disposing, CloseExState closeState) {
997 
998             try {
999                 if (Interlocked.Increment(ref _Disposed) == 1) {
1000                     if (disposing) {
1001                         ICloseEx icloseEx = WrappedStream as ICloseEx;
1002 
1003                         if (icloseEx != null) {
1004                             icloseEx.CloseEx(closeState);
1005                         }
1006                         else {
1007                             WrappedStream.Close();
1008                         }
1009 
1010                         if (m_CacheDestroy)
1011                         {
1012                             if (m_IsStrictCacheErrors)
1013                             {
1014                                 m_Cache.Remove(m_Key);
1015                             }
1016                             else
1017                             {
1018                                 m_Cache.TryRemove(m_Key);
1019                             }
1020                         }
1021                         else
1022                         {
1023                             if (m_IsStrictCacheErrors)
1024                             {
1025                                 m_Cache.Update(m_Key, m_Expires, m_LastModified, m_LastSynchronized, m_MaxStale, m_EntryMetadata, m_SystemMetadata);
1026                             }
1027                             else
1028                             {
1029                                 m_Cache.TryUpdate(m_Key, m_Expires, m_LastModified, m_LastSynchronized, m_MaxStale, m_EntryMetadata, m_SystemMetadata);
1030                             }
1031 
1032                         }
1033                     }
1034                 }
1035             }
1036             finally {
1037                 base.Dispose(disposing);
1038             }
1039         }
1040     }
1041 
1042     //
1043     // This stream is for Partial responses.
1044     // It will scroll to the given position and limit the original stream windows to given size
1045     internal class RangeStream : BaseWrapperStream, ICloseEx {
1046         long    m_Offset;
1047         long    m_Size;
1048         long    m_Position;
1049 
RangeStream(Stream parentStream, long offset, long size)1050         internal RangeStream (Stream parentStream, long offset, long size)
1051             : base(parentStream)
1052         {
1053             m_Offset            = offset;
1054             m_Size              = size;
1055             if (WrappedStream.CanSeek) {
1056                 WrappedStream.Position = offset;
1057                 m_Position = offset;
1058             }
1059             else {
1060                 // for now we expect a FileStream that is seekable.
1061                 throw new NotSupportedException(SR.GetString(SR.net_cache_non_seekable_stream_not_supported));
1062             }
1063         }
1064 
1065         public override bool CanRead {
1066             get {return WrappedStream.CanRead;}
1067         }
1068 
1069         // If CanSeek is false, Position, Seek, Length, and SetLength should throw.
1070         public override bool CanSeek {
1071             get {return WrappedStream.CanSeek;}
1072         }
1073 
1074         public override bool CanWrite {
1075             get {return WrappedStream.CanWrite;}
1076         }
1077 
1078         public override long Length {
1079             get {
1080                 long dummy = WrappedStream.Length;
1081                 return m_Size;
1082             }
1083         }
1084 
1085         public override long Position {
1086             get {return WrappedStream.Position-m_Offset;}
1087 
1088             set {
1089                 value += m_Offset;
1090                 if (value > m_Offset + m_Size) {
1091                     value = m_Offset + m_Size;
1092                 }
1093                 WrappedStream.Position = value;
1094             }
1095         }
1096 
Seek(long offset, SeekOrigin origin)1097         public override long Seek(long offset, SeekOrigin origin) {
1098             switch (origin) {
1099             case SeekOrigin.Begin:
1100                         offset += m_Offset;
1101                         if (offset > m_Offset+m_Size) {
1102                             offset = m_Offset+m_Size;
1103                         }
1104                         if (offset < m_Offset) {
1105                             offset = m_Offset;
1106                         }
1107                         break;
1108             case SeekOrigin.End:
1109                         offset -= (m_Offset+m_Size);
1110                         if (offset > 0) {
1111                             offset = 0;
1112                         }
1113                         if (offset < -m_Size) {
1114                             offset = -m_Size;
1115                         }
1116                         break;
1117             default:
1118                         if (m_Position+offset > m_Offset+m_Size) {
1119                             offset = (m_Offset+m_Size) - m_Position;
1120                         }
1121                         if (m_Position+offset < m_Offset) {
1122                             offset = m_Offset-m_Position;
1123                         }
1124                         break;
1125             }
1126             m_Position=WrappedStream.Seek(offset, origin);
1127             return m_Position-m_Offset;
1128         }
1129 
SetLength(long value)1130         public override void SetLength(long value) {
1131             throw new NotSupportedException(SR.GetString(SR.net_cache_unsupported_partial_stream));
1132         }
1133 
Write(byte[] buffer, int offset, int count)1134         public override void Write(byte[] buffer, int offset, int count) {
1135             if (m_Position + count > m_Offset+m_Size) {
1136                 throw new NotSupportedException(SR.GetString(SR.net_cache_unsupported_partial_stream));
1137             }
1138             WrappedStream.Write(buffer, offset, count);
1139             m_Position += count;
1140         }
1141 
BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, Object state)1142         public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, Object state) {
1143             if (m_Position+offset > m_Offset+m_Size) {
1144                 throw new NotSupportedException(SR.GetString(SR.net_cache_unsupported_partial_stream));
1145             }
1146             return WrappedStream.BeginWrite(buffer, offset, count, callback, state);
1147         }
1148 
EndWrite(IAsyncResult asyncResult)1149         public override void EndWrite(IAsyncResult asyncResult) {
1150             WrappedStream.EndWrite(asyncResult);
1151             m_Position = WrappedStream.Position;
1152         }
1153 
Flush()1154         public override void Flush() {
1155             WrappedStream.Flush();
1156         }
1157 
Read(byte[] buffer, int offset, int count)1158         public override int Read(byte[] buffer, int offset, int count) {
1159             if (m_Position >= m_Offset+m_Size) {
1160                 return 0;
1161             }
1162             if (m_Position + count > m_Offset+m_Size) {
1163                 count = (int)(m_Offset + m_Size - m_Position);
1164             }
1165             int result = WrappedStream.Read(buffer, offset,  count);
1166             m_Position += result;
1167             return result;
1168         }
1169 
BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, Object state)1170         public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, Object state) {
1171             if (m_Position >= m_Offset+m_Size) {
1172                 count = 0;
1173             }
1174             else if (m_Position + count > m_Offset+m_Size) {
1175                 count = (int)(m_Offset + m_Size - m_Position);
1176             }
1177             return WrappedStream.BeginRead(buffer, offset, count, callback, state);
1178         }
1179 
EndRead(IAsyncResult asyncResult)1180         public override int EndRead(IAsyncResult asyncResult) {
1181             int result = WrappedStream.EndRead(asyncResult);
1182             m_Position += result;
1183             return result;
1184         }
1185 
1186         // Subclasses should use Dispose(bool, CloseExState)
Dispose(bool disposing)1187         protected sealed override void Dispose(bool disposing) {
1188             Dispose(disposing, CloseExState.Normal);
1189         }
1190 
ICloseEx.CloseEx(CloseExState closeState)1191         void ICloseEx.CloseEx(CloseExState closeState) {
1192             Dispose(true, closeState);
1193         }
1194 
1195         public override bool CanTimeout {
1196             get {
1197                 return WrappedStream.CanTimeout;
1198             }
1199         }
1200 
1201         public override int ReadTimeout {
1202             get {
1203                 return WrappedStream.ReadTimeout;
1204             }
1205             set {
1206                 WrappedStream.ReadTimeout = value;
1207             }
1208         }
1209 
1210         public override int WriteTimeout {
1211             get {
1212                 return WrappedStream.WriteTimeout;
1213             }
1214             set {
1215                 WrappedStream.WriteTimeout = value;
1216             }
1217         }
1218 
Dispose(bool disposing, CloseExState closeState)1219         protected virtual void Dispose(bool disposing, CloseExState closeState) {
1220 
1221             // All calls below should already be idempotent.
1222 
1223             try
1224             {
1225                 if (disposing) {
1226 
1227                     ICloseEx icloseEx = WrappedStream as ICloseEx;
1228 
1229                     if (icloseEx != null) {
1230                         icloseEx.CloseEx(closeState);
1231                     }
1232                     else {
1233                         WrappedStream.Close();
1234                     }
1235                 }
1236             }
1237             finally
1238             {
1239                 base.Dispose(disposing);
1240             }
1241 
1242         }
1243     }
1244 
1245 }
1246