1 //Copyright 2010 Microsoft Corporation
2 //
3 //Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.
4 //You may obtain a copy of the License at
5 //
6 //http://www.apache.org/licenses/LICENSE-2.0
7 //
8 //Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an
9 //"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10 //See the License for the specific language governing permissions and limitations under the License.
11 
12 
13 namespace System.Data.Services.Client
14 {
15     using System;
16     using System.Collections;
17     using System.Collections.Generic;
18     using System.Diagnostics;
19     using System.IO;
20 #if !ASTORIA_LIGHT
21     using System.Net;
22 #else
23     using System.Data.Services.Http;
24 #endif
25 
26     internal class QueryResult : BaseAsyncResult
27     {
28         internal readonly DataServiceRequest ServiceRequest;
29 
30         internal readonly HttpWebRequest Request;
31 
32         private static byte[] reusableAsyncCopyBuffer;
33 
34         private MemoryStream requestStreamContent;
35 
36         private Stream requestStream;
37 
38         private HttpWebResponse httpWebResponse;
39 
40         private Stream asyncResponseStream;
41 
42         private byte[] asyncStreamCopyBuffer;
43 
44         private Stream responseStream;
45 
46         private string contentType;
47 
48         private long contentLength;
49 
50         private HttpStatusCode statusCode;
51 
52         private bool responseStreamOwner;
53 
54         private bool usingBuffer;
55 
56 #if StreamContainsBuffer
57         private bool responseStreamIsCopyBuffer;
58 #endif
59 
QueryResult(object source, string method, DataServiceRequest serviceRequest, HttpWebRequest request, AsyncCallback callback, object state)60         internal QueryResult(object source, string method, DataServiceRequest serviceRequest, HttpWebRequest request, AsyncCallback callback, object state)
61             : base(source, method, callback, state)
62         {
63             Debug.Assert(null != request, "null request");
64             this.ServiceRequest = serviceRequest;
65             this.Request = request;
66             this.Abortable = request;
67         }
68 
69         #region HttpResponse wrapper - ContentLength, ContentType, StatusCode
70 
71         internal long ContentLength
72         {
73             get { return this.contentLength; }
74         }
75 
76         internal string ContentType
77         {
78             get { return this.contentType; }
79         }
80 
81         internal HttpStatusCode StatusCode
82         {
83             get { return this.statusCode; }
84         }
85 
86         #endregion
87 
EndExecute(object source, IAsyncResult asyncResult)88         internal static QueryResult EndExecute<TElement>(object source, IAsyncResult asyncResult)
89         {
90             QueryResult response = null;
91 
92             try
93             {
94                 response = BaseAsyncResult.EndExecute<QueryResult>(source, "Execute", asyncResult);
95             }
96             catch (InvalidOperationException ex)
97             {
98                 response = asyncResult as QueryResult;
99                 Debug.Assert(response != null, "response != null, BaseAsyncResult.EndExecute() would have thrown a different exception otherwise.");
100 
101                 QueryOperationResponse operationResponse = response.GetResponse<TElement>(MaterializeAtom.EmptyResults);
102                 if (operationResponse != null)
103                 {
104                     operationResponse.Error = ex;
105                     throw new DataServiceQueryException(Strings.DataServiceException_GeneralError, ex, operationResponse);
106                 }
107 
108                 throw;
109             }
110 
111             return response;
112         }
113 
GetResponseStream()114         internal Stream GetResponseStream()
115         {
116             return this.responseStream;
117         }
118 
BeginExecute()119         internal void BeginExecute()
120         {
121             try
122             {
123                 IAsyncResult asyncResult;
124 #if false
125                 if ((null != requestContent) && (0 < requestContent.Length))
126                 {
127                     requestContent.Position = 0;
128                     this.requestStreamContent = requestContent;
129                     this.Request.ContentLength = requestContent.Length;
130                     asyncResult = this.Request.BeginGetRequestStream(QueryAsyncResult.AsyncEndGetRequestStream, this);
131                 }
132                 else
133 #endif
134                 {
135                     asyncResult = BaseAsyncResult.InvokeAsync(this.Request.BeginGetResponse, QueryResult.AsyncEndGetResponse, this);
136                 }
137 
138                 this.CompletedSynchronously &= asyncResult.CompletedSynchronously;
139             }
140             catch (Exception e)
141             {
142                 this.HandleFailure(e);
143                 throw;
144             }
145             finally
146             {
147                 this.HandleCompleted();
148             }
149 
150             Debug.Assert(!this.CompletedSynchronously || this.IsCompleted, "if CompletedSynchronously then MUST IsCompleted");
151         }
152 
153 #if !ASTORIA_LIGHT
Execute()154         internal void Execute()
155         {
156             try
157             {
158 #if false
159                 if ((null != requestContent) && (0 < requestContent.Length))
160                 {
161                     using (System.IO.Stream stream = Util.NullCheck(this.Request.GetRequestStream(), InternalError.InvalidGetRequestStream))
162                     {
163                         byte[] buffer = requestContent.GetBuffer();
164                         int bufferOffset = checked((int)requestContent.Position);
165                         int bufferLength = checked((int)requestContent.Length) - bufferOffset;
166 
167                         stream.Write(buffer, bufferOffset, bufferLength);
168                     }
169                 }
170 #endif
171 
172                 HttpWebResponse response = null;
173                 try
174                 {
175                     response = (HttpWebResponse)this.Request.GetResponse();
176                 }
177                 catch (WebException ex)
178                 {
179                     response = (HttpWebResponse)ex.Response;
180                     if (null == response)
181                     {
182                         throw;
183                     }
184                 }
185 
186                 this.SetHttpWebResponse(Util.NullCheck(response, InternalError.InvalidGetResponse));
187 
188                 if (HttpStatusCode.NoContent != this.StatusCode)
189                 {
190                     using (Stream stream = this.httpWebResponse.GetResponseStream())
191                     {
192                         if (null != stream)
193                         {
194                             Stream copy = this.GetAsyncResponseStreamCopy();
195                             this.responseStream = copy;
196 
197                             Byte[] buffer = this.GetAsyncResponseStreamCopyBuffer();
198 
199                             long copied = WebUtil.CopyStream(stream, copy, ref buffer);
200                             if (this.responseStreamOwner)
201                             {
202                                 if (0 == copied)
203                                 {
204                                     this.responseStream = null;
205                                 }
206                                 else if (copy.Position < copy.Length)
207                                 {                                    ((MemoryStream)copy).SetLength(copy.Position);
208                                 }
209                             }
210 
211                             this.PutAsyncResponseStreamCopyBuffer(buffer);
212                         }
213                     }
214                 }
215             }
216             catch (Exception e)
217             {
218                 this.HandleFailure(e);
219                 throw;
220             }
221             finally
222             {
223                 this.SetCompleted();
224                 this.CompletedRequest();
225             }
226 
227             if (null != this.Failure)
228             {
229                 throw this.Failure;
230             }
231         }
232 #endif
233 
GetResponse(MaterializeAtom results)234         internal QueryOperationResponse<TElement> GetResponse<TElement>(MaterializeAtom results)
235         {
236             if (this.httpWebResponse != null)
237             {
238                 Dictionary<string, string> headers = WebUtil.WrapResponseHeaders(this.httpWebResponse);
239                 QueryOperationResponse<TElement> response = new QueryOperationResponse<TElement>(headers, this.ServiceRequest, results);
240                 response.StatusCode = (int)this.httpWebResponse.StatusCode;
241                 return response;
242             }
243 
244             return null;
245         }
246 
GetResponseWithType(MaterializeAtom results, Type elementType)247         internal QueryOperationResponse GetResponseWithType(MaterializeAtom results, Type elementType)
248         {
249             if (this.httpWebResponse != null)
250             {
251                 Dictionary<string, string> headers = WebUtil.WrapResponseHeaders(this.httpWebResponse);
252                 QueryOperationResponse response = QueryOperationResponse.GetInstance(elementType, headers, this.ServiceRequest, results);
253                 response.StatusCode = (int)this.httpWebResponse.StatusCode;
254                 return response;
255             }
256 
257             return null;
258         }
259 
GetMaterializer(DataServiceContext context, ProjectionPlan plan)260         internal MaterializeAtom GetMaterializer(DataServiceContext context, ProjectionPlan plan)
261         {
262             Debug.Assert(this.IsCompletedInternally, "request hasn't completed yet");
263 
264             MaterializeAtom materializer;
265             if (HttpStatusCode.NoContent != this.StatusCode)
266             {
267                 materializer = DataServiceRequest.Materialize(context, this.ServiceRequest.QueryComponents, plan, this.ContentType, this.GetResponseStream());
268             }
269             else
270             {
271                 materializer = MaterializeAtom.EmptyResults;
272             }
273 
274             return materializer;
275         }
276 
ProcessResult(DataServiceContext context, ProjectionPlan plan)277         internal QueryOperationResponse<TElement> ProcessResult<TElement>(DataServiceContext context, ProjectionPlan plan)
278         {
279             MaterializeAtom materializeAtom = DataServiceRequest.Materialize(context, this.ServiceRequest.QueryComponents, plan, this.ContentType, this.GetResponseStream());
280             return this.GetResponse<TElement>(materializeAtom);
281         }
282 
CompletedRequest()283         protected override void CompletedRequest()
284         {
285             Util.Dispose(ref this.asyncResponseStream);
286             Util.Dispose(ref this.requestStream);
287             Util.Dispose(ref this.requestStreamContent);
288 
289             byte[] buffer = this.asyncStreamCopyBuffer;
290             this.asyncStreamCopyBuffer = null;
291 #if StreamContainsBuffer
292             if (!this.responseStreamIsCopyBuffer)
293 #endif
294             if ((null != buffer) && !this.usingBuffer)
295             {
296                 this.PutAsyncResponseStreamCopyBuffer(buffer);
297             }
298 
299             if (this.responseStreamOwner)
300             {
301                 if (null != this.responseStream)
302                 {
303                     this.responseStream.Position = 0;
304                 }
305             }
306 
307             Debug.Assert(null != this.httpWebResponse || null != this.Failure, "should have response or exception");
308             if (null != this.httpWebResponse)
309             {
310                 this.httpWebResponse.Close();
311 
312                 Exception ex = DataServiceContext.HandleResponse(this.StatusCode, this.httpWebResponse.Headers[XmlConstants.HttpDataServiceVersion], this.GetResponseStream, false);
313                 if (null != ex)
314                 {
315                     this.HandleFailure(ex);
316                 }
317             }
318         }
319 
GetAsyncResponseStreamCopy()320         protected virtual Stream GetAsyncResponseStreamCopy()
321         {
322             this.responseStreamOwner = true;
323 
324             long length = this.contentLength;
325             if ((0 < length) && (length <= Int32.MaxValue))
326             {
327                 Debug.Assert(null == this.asyncStreamCopyBuffer, "not expecting buffer");
328 
329 #if StreamContainsBuffer
330                 byte[] buffer = new byte[(int)length];
331                 if (length < UInt16.MaxValue)
332                 {                    responseStreamIsCopyBuffer = true;
333                     this.asyncStreamCopyBuffer = buffer;
334                 }
335                 return new MemoryStream(buffer, 0, buffer.Length, true, true);
336 #else
337                 return new MemoryStream((int)length);
338 #endif
339             }
340 
341             return new MemoryStream();
342         }
343 
GetAsyncResponseStreamCopyBuffer()344         protected virtual byte[] GetAsyncResponseStreamCopyBuffer()
345         {            Debug.Assert(null == this.asyncStreamCopyBuffer, "non-null this.asyncStreamCopyBuffer");
346             return System.Threading.Interlocked.Exchange(ref reusableAsyncCopyBuffer, null) ?? new byte[8000];
347         }
348 
PutAsyncResponseStreamCopyBuffer(byte[] buffer)349         protected virtual void PutAsyncResponseStreamCopyBuffer(byte[] buffer)
350         {
351             reusableAsyncCopyBuffer = buffer;
352         }
353 
SetHttpWebResponse(HttpWebResponse response)354         protected virtual void SetHttpWebResponse(HttpWebResponse response)
355         {
356             this.httpWebResponse = response;
357             this.statusCode = response.StatusCode;
358             this.contentLength = response.ContentLength;
359             this.contentType = response.ContentType;
360         }
361 
362 #if false
363         [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes", Justification = "required for this feature")]
AsyncEndGetRequestStream(IAsyncResult asyncResult)364         private static void AsyncEndGetRequestStream(IAsyncResult asyncResult)
365         {
366             QueryAsyncResult state = asyncResult.AsyncState as QueryAsyncResult;
367             try
368             {
369                 int step = CompleteCheck(state, InternalError.InvalidEndGetRequestCompleted);
370                 state.CompletedSynchronously &= asyncResult.CompletedSynchronously;
371                 HttpWebRequest httpWebRequest = Util.NullCheck(state.Request, InternalError.InvalidEndGetRequestStreamRequest);
372 
373                 Stream stream = Util.NullCheck(httpWebRequest.EndGetRequestStream(asyncResult), InternalError.InvalidEndGetRequestStreamStream);
374                 state.requestStream = stream;
375 
376                 MemoryStream memoryStream = Util.NullCheck(state.requestStreamContent, InternalError.InvalidEndGetRequestStreamContent);
377                 byte[] buffer = memoryStream.GetBuffer();
378                 int bufferOffset = checked((int)memoryStream.Position);
379                 int bufferLength = checked((int)memoryStream.Length) - bufferOffset;
380                 if ((null == buffer) || (0 == bufferLength))
381                 {
382                     Error.ThrowInternalError(InternalError.InvalidEndGetRequestStreamContentLength);
383                 }
384 
385                 asyncResult = stream.BeginWrite(buffer, bufferOffset, bufferLength, QueryAsyncResult.AsyncEndWrite, state);
386 
387                 bool reallyCompletedSynchronously = asyncResult.CompletedSynchronously && (step < state.asyncCompleteStep);
388                 state.CompletedSynchronously &= reallyCompletedSynchronously;            }
389             catch (Exception e)
390             {
391                 if (state.HandleFailure(e))
392                 {
393                     throw;
394                 }
395             }
396             finally
397             {
398                 state.HandleCompleted();
399             }
400         }
401 
402         [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes", Justification = "required for this feature")]
AsyncEndWrite(IAsyncResult asyncResult)403         private static void AsyncEndWrite(IAsyncResult asyncResult)
404         {
405             QueryAsyncResult state = asyncResult.AsyncState as QueryAsyncResult;
406             try
407             {
408                 int step = CompleteCheck(state, InternalError.InvalidEndWriteCompleted);
409                 state.CompletedSynchronously &= asyncResult.CompletedSynchronously;
410                 HttpWebRequest httpWebRequest = Util.NullCheck(state.Request, InternalError.InvalidEndWriteRequest);
411 
412                 Stream stream = Util.NullCheck(state.requestStream, InternalError.InvalidEndWriteStream);
413                 stream.EndWrite(asyncResult);
414 
415                 state.requestStream = null;
416                 stream.Dispose();
417 
418                 stream = state.requestStreamContent;
419                 if (null != stream)
420                 {
421                     state.requestStreamContent = null;
422                     stream.Dispose();
423                 }
424 
425                 asyncResult = httpWebRequest.BeginGetResponse(QueryAsyncResult.AsyncEndGetResponse, state);
426 
427                 bool reallyCompletedSynchronously = asyncResult.CompletedSynchronously && (step < state.asyncCompleteStep);
428                 state.CompletedSynchronously &= reallyCompletedSynchronously;            }
429             catch (Exception e)
430             {
431                 if (state.HandleFailure(e))
432                 {
433                     throw;
434                 }
435             }
436             finally
437             {
438                 state.HandleCompleted();
439             }
440         }
441 #endif
442 
443         [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes", Justification = "required for this feature")]
AsyncEndGetResponse(IAsyncResult asyncResult)444         private static void AsyncEndGetResponse(IAsyncResult asyncResult)
445         {
446             Debug.Assert(asyncResult != null && asyncResult.IsCompleted, "asyncResult.IsCompleted");
447             QueryResult state = asyncResult.AsyncState as QueryResult;
448             try
449             {
450                 CompleteCheck(state, InternalError.InvalidEndGetResponseCompleted);
451                 state.CompletedSynchronously &= asyncResult.CompletedSynchronously;
452                 HttpWebRequest httpWebRequest = Util.NullCheck(state.Request, InternalError.InvalidEndGetResponseRequest);
453 
454                 HttpWebResponse response = null;
455                 try
456                 {
457                     response = (HttpWebResponse)httpWebRequest.EndGetResponse(asyncResult);
458                 }
459                 catch (WebException e)
460                 {
461                     response = (HttpWebResponse)e.Response;
462                     if (null == response)
463                     {
464                         throw;
465                     }
466                 }
467 
468                 state.SetHttpWebResponse(Util.NullCheck(response, InternalError.InvalidEndGetResponseResponse));
469                 Debug.Assert(null == state.asyncResponseStream, "non-null asyncResponseStream");
470 
471                 Stream stream = null;
472                 if (HttpStatusCode.NoContent != response.StatusCode)
473                 {
474                     stream = response.GetResponseStream();
475                     state.asyncResponseStream = stream;
476                 }
477 
478                 if ((null != stream) && stream.CanRead)
479                 {
480                     if (null == state.responseStream)
481                     {                        state.responseStream = Util.NullCheck(state.GetAsyncResponseStreamCopy(), InternalError.InvalidAsyncResponseStreamCopy);
482                     }
483 
484                     if (null == state.asyncStreamCopyBuffer)
485                     {                        state.asyncStreamCopyBuffer = Util.NullCheck(state.GetAsyncResponseStreamCopyBuffer(), InternalError.InvalidAsyncResponseStreamCopyBuffer);
486                     }
487 
488                     QueryResult.ReadResponseStream(state);
489                 }
490                 else
491                 {
492                     state.SetCompleted();
493                 }
494             }
495             catch (Exception e)
496             {
497                 if (state.HandleFailure(e))
498                 {
499                     throw;
500                 }
501             }
502             finally
503             {
504                 state.HandleCompleted();
505             }
506         }
507 
ReadResponseStream(QueryResult queryResult)508         private static void ReadResponseStream(QueryResult queryResult)
509         {
510             IAsyncResult asyncResult;
511 
512             byte[] buffer = queryResult.asyncStreamCopyBuffer;
513             Stream stream = queryResult.asyncResponseStream;
514             do
515             {
516                 int bufferOffset, bufferLength;
517 #if StreamContainsBuffer
518                 if (state.responseStreamIsCopyBuffer)
519                 {                    bufferOffset = checked((int)state.responseStream.Position);
520                     bufferLength = buffer.Length - bufferOffset;
521                 }
522                 else
523 #endif
524                 {
525                     bufferOffset = 0;
526                     bufferLength = buffer.Length;
527                 }
528 
529                 queryResult.usingBuffer = true;
530                 asyncResult = BaseAsyncResult.InvokeAsync(stream.BeginRead, buffer, bufferOffset, bufferLength, QueryResult.AsyncEndRead, queryResult);
531                 queryResult.CompletedSynchronously &= asyncResult.CompletedSynchronously;            }
532             while (asyncResult.CompletedSynchronously && !queryResult.IsCompletedInternally && stream.CanRead);
533 
534             Debug.Assert(!queryResult.CompletedSynchronously || queryResult.IsCompletedInternally, "AsyncEndGetResponse !IsCompleted");
535         }
536 
537         [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes", Justification = "required for this feature")]
AsyncEndRead(IAsyncResult asyncResult)538         private static void AsyncEndRead(IAsyncResult asyncResult)
539         {
540             Debug.Assert(asyncResult != null && asyncResult.IsCompleted, "asyncResult.IsCompleted");
541             QueryResult state = asyncResult.AsyncState as QueryResult;
542             int count = 0;
543             try
544             {
545                 CompleteCheck(state, InternalError.InvalidEndReadCompleted);
546                 state.CompletedSynchronously &= asyncResult.CompletedSynchronously;
547                 Stream stream = Util.NullCheck(state.asyncResponseStream, InternalError.InvalidEndReadStream);
548                 Stream outputResponse = Util.NullCheck(state.responseStream, InternalError.InvalidEndReadCopy);
549                 byte[] buffer = Util.NullCheck(state.asyncStreamCopyBuffer, InternalError.InvalidEndReadBuffer);
550 
551                 count = stream.EndRead(asyncResult);
552                 state.usingBuffer = false;
553                 if (0 < count)
554                 {
555 #if StreamContainsBuffer
556                     if (state.responseStreamIsCopyBuffer)
557                     {                        outputResponse.Position = outputResponse.Position + count;
558                     }
559                     else
560 #endif
561                     {
562                         outputResponse.Write(buffer, 0, count);
563                     }
564                 }
565 
566                 if (0 < count && 0 < buffer.Length && stream.CanRead)
567                 {
568                     if (!asyncResult.CompletedSynchronously)
569                     {
570                         QueryResult.ReadResponseStream(state);
571                     }
572                 }
573                 else
574                 {
575 #if StreamContainsBuffer
576                     Debug.Assert(!state.responseStreamIsCopyBuffer || (outputResponse.Position == outputResponse.Length), "didn't read expected count");
577 #endif
578                     if (outputResponse.Position < outputResponse.Length)
579                     {
580                         ((MemoryStream)outputResponse).SetLength(outputResponse.Position);
581                     }
582 
583                     state.SetCompleted();
584                 }
585             }
586             catch (Exception e)
587             {
588                 if (state.HandleFailure(e))
589                 {
590                     throw;
591                 }
592             }
593             finally
594             {
595                 state.HandleCompleted();
596             }
597         }
598 
CompleteCheck(QueryResult pereq, InternalError errorcode)599         private static void CompleteCheck(QueryResult pereq, InternalError errorcode)
600         {
601             if ((null == pereq) || (pereq.IsCompletedInternally && !pereq.IsAborted))
602             {
603                 Error.ThrowInternalError(errorcode);
604             }
605         }
606     }
607 }
608