1 //Copyright 2010 Microsoft Corporation 2 // 3 //Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. 4 //You may obtain a copy of the License at 5 // 6 //http://www.apache.org/licenses/LICENSE-2.0 7 // 8 //Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an 9 //"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 10 //See the License for the specific language governing permissions and limitations under the License. 11 12 13 namespace System.Data.Services.Client 14 { 15 using System; 16 using System.Collections; 17 using System.Collections.Generic; 18 using System.Diagnostics; 19 using System.IO; 20 #if !ASTORIA_LIGHT 21 using System.Net; 22 #else 23 using System.Data.Services.Http; 24 #endif 25 26 internal class QueryResult : BaseAsyncResult 27 { 28 internal readonly DataServiceRequest ServiceRequest; 29 30 internal readonly HttpWebRequest Request; 31 32 private static byte[] reusableAsyncCopyBuffer; 33 34 private MemoryStream requestStreamContent; 35 36 private Stream requestStream; 37 38 private HttpWebResponse httpWebResponse; 39 40 private Stream asyncResponseStream; 41 42 private byte[] asyncStreamCopyBuffer; 43 44 private Stream responseStream; 45 46 private string contentType; 47 48 private long contentLength; 49 50 private HttpStatusCode statusCode; 51 52 private bool responseStreamOwner; 53 54 private bool usingBuffer; 55 56 #if StreamContainsBuffer 57 private bool responseStreamIsCopyBuffer; 58 #endif 59 QueryResult(object source, string method, DataServiceRequest serviceRequest, HttpWebRequest request, AsyncCallback callback, object state)60 internal QueryResult(object source, string method, DataServiceRequest serviceRequest, HttpWebRequest request, AsyncCallback callback, object state) 61 : base(source, method, callback, state) 62 { 63 Debug.Assert(null != request, "null request"); 64 this.ServiceRequest = serviceRequest; 65 this.Request = request; 66 this.Abortable = request; 67 } 68 69 #region HttpResponse wrapper - ContentLength, ContentType, StatusCode 70 71 internal long ContentLength 72 { 73 get { return this.contentLength; } 74 } 75 76 internal string ContentType 77 { 78 get { return this.contentType; } 79 } 80 81 internal HttpStatusCode StatusCode 82 { 83 get { return this.statusCode; } 84 } 85 86 #endregion 87 EndExecute(object source, IAsyncResult asyncResult)88 internal static QueryResult EndExecute<TElement>(object source, IAsyncResult asyncResult) 89 { 90 QueryResult response = null; 91 92 try 93 { 94 response = BaseAsyncResult.EndExecute<QueryResult>(source, "Execute", asyncResult); 95 } 96 catch (InvalidOperationException ex) 97 { 98 response = asyncResult as QueryResult; 99 Debug.Assert(response != null, "response != null, BaseAsyncResult.EndExecute() would have thrown a different exception otherwise."); 100 101 QueryOperationResponse operationResponse = response.GetResponse<TElement>(MaterializeAtom.EmptyResults); 102 if (operationResponse != null) 103 { 104 operationResponse.Error = ex; 105 throw new DataServiceQueryException(Strings.DataServiceException_GeneralError, ex, operationResponse); 106 } 107 108 throw; 109 } 110 111 return response; 112 } 113 GetResponseStream()114 internal Stream GetResponseStream() 115 { 116 return this.responseStream; 117 } 118 BeginExecute()119 internal void BeginExecute() 120 { 121 try 122 { 123 IAsyncResult asyncResult; 124 #if false 125 if ((null != requestContent) && (0 < requestContent.Length)) 126 { 127 requestContent.Position = 0; 128 this.requestStreamContent = requestContent; 129 this.Request.ContentLength = requestContent.Length; 130 asyncResult = this.Request.BeginGetRequestStream(QueryAsyncResult.AsyncEndGetRequestStream, this); 131 } 132 else 133 #endif 134 { 135 asyncResult = BaseAsyncResult.InvokeAsync(this.Request.BeginGetResponse, QueryResult.AsyncEndGetResponse, this); 136 } 137 138 this.CompletedSynchronously &= asyncResult.CompletedSynchronously; 139 } 140 catch (Exception e) 141 { 142 this.HandleFailure(e); 143 throw; 144 } 145 finally 146 { 147 this.HandleCompleted(); 148 } 149 150 Debug.Assert(!this.CompletedSynchronously || this.IsCompleted, "if CompletedSynchronously then MUST IsCompleted"); 151 } 152 153 #if !ASTORIA_LIGHT Execute()154 internal void Execute() 155 { 156 try 157 { 158 #if false 159 if ((null != requestContent) && (0 < requestContent.Length)) 160 { 161 using (System.IO.Stream stream = Util.NullCheck(this.Request.GetRequestStream(), InternalError.InvalidGetRequestStream)) 162 { 163 byte[] buffer = requestContent.GetBuffer(); 164 int bufferOffset = checked((int)requestContent.Position); 165 int bufferLength = checked((int)requestContent.Length) - bufferOffset; 166 167 stream.Write(buffer, bufferOffset, bufferLength); 168 } 169 } 170 #endif 171 172 HttpWebResponse response = null; 173 try 174 { 175 response = (HttpWebResponse)this.Request.GetResponse(); 176 } 177 catch (WebException ex) 178 { 179 response = (HttpWebResponse)ex.Response; 180 if (null == response) 181 { 182 throw; 183 } 184 } 185 186 this.SetHttpWebResponse(Util.NullCheck(response, InternalError.InvalidGetResponse)); 187 188 if (HttpStatusCode.NoContent != this.StatusCode) 189 { 190 using (Stream stream = this.httpWebResponse.GetResponseStream()) 191 { 192 if (null != stream) 193 { 194 Stream copy = this.GetAsyncResponseStreamCopy(); 195 this.responseStream = copy; 196 197 Byte[] buffer = this.GetAsyncResponseStreamCopyBuffer(); 198 199 long copied = WebUtil.CopyStream(stream, copy, ref buffer); 200 if (this.responseStreamOwner) 201 { 202 if (0 == copied) 203 { 204 this.responseStream = null; 205 } 206 else if (copy.Position < copy.Length) 207 { ((MemoryStream)copy).SetLength(copy.Position); 208 } 209 } 210 211 this.PutAsyncResponseStreamCopyBuffer(buffer); 212 } 213 } 214 } 215 } 216 catch (Exception e) 217 { 218 this.HandleFailure(e); 219 throw; 220 } 221 finally 222 { 223 this.SetCompleted(); 224 this.CompletedRequest(); 225 } 226 227 if (null != this.Failure) 228 { 229 throw this.Failure; 230 } 231 } 232 #endif 233 GetResponse(MaterializeAtom results)234 internal QueryOperationResponse<TElement> GetResponse<TElement>(MaterializeAtom results) 235 { 236 if (this.httpWebResponse != null) 237 { 238 Dictionary<string, string> headers = WebUtil.WrapResponseHeaders(this.httpWebResponse); 239 QueryOperationResponse<TElement> response = new QueryOperationResponse<TElement>(headers, this.ServiceRequest, results); 240 response.StatusCode = (int)this.httpWebResponse.StatusCode; 241 return response; 242 } 243 244 return null; 245 } 246 GetResponseWithType(MaterializeAtom results, Type elementType)247 internal QueryOperationResponse GetResponseWithType(MaterializeAtom results, Type elementType) 248 { 249 if (this.httpWebResponse != null) 250 { 251 Dictionary<string, string> headers = WebUtil.WrapResponseHeaders(this.httpWebResponse); 252 QueryOperationResponse response = QueryOperationResponse.GetInstance(elementType, headers, this.ServiceRequest, results); 253 response.StatusCode = (int)this.httpWebResponse.StatusCode; 254 return response; 255 } 256 257 return null; 258 } 259 GetMaterializer(DataServiceContext context, ProjectionPlan plan)260 internal MaterializeAtom GetMaterializer(DataServiceContext context, ProjectionPlan plan) 261 { 262 Debug.Assert(this.IsCompletedInternally, "request hasn't completed yet"); 263 264 MaterializeAtom materializer; 265 if (HttpStatusCode.NoContent != this.StatusCode) 266 { 267 materializer = DataServiceRequest.Materialize(context, this.ServiceRequest.QueryComponents, plan, this.ContentType, this.GetResponseStream()); 268 } 269 else 270 { 271 materializer = MaterializeAtom.EmptyResults; 272 } 273 274 return materializer; 275 } 276 ProcessResult(DataServiceContext context, ProjectionPlan plan)277 internal QueryOperationResponse<TElement> ProcessResult<TElement>(DataServiceContext context, ProjectionPlan plan) 278 { 279 MaterializeAtom materializeAtom = DataServiceRequest.Materialize(context, this.ServiceRequest.QueryComponents, plan, this.ContentType, this.GetResponseStream()); 280 return this.GetResponse<TElement>(materializeAtom); 281 } 282 CompletedRequest()283 protected override void CompletedRequest() 284 { 285 Util.Dispose(ref this.asyncResponseStream); 286 Util.Dispose(ref this.requestStream); 287 Util.Dispose(ref this.requestStreamContent); 288 289 byte[] buffer = this.asyncStreamCopyBuffer; 290 this.asyncStreamCopyBuffer = null; 291 #if StreamContainsBuffer 292 if (!this.responseStreamIsCopyBuffer) 293 #endif 294 if ((null != buffer) && !this.usingBuffer) 295 { 296 this.PutAsyncResponseStreamCopyBuffer(buffer); 297 } 298 299 if (this.responseStreamOwner) 300 { 301 if (null != this.responseStream) 302 { 303 this.responseStream.Position = 0; 304 } 305 } 306 307 Debug.Assert(null != this.httpWebResponse || null != this.Failure, "should have response or exception"); 308 if (null != this.httpWebResponse) 309 { 310 this.httpWebResponse.Close(); 311 312 Exception ex = DataServiceContext.HandleResponse(this.StatusCode, this.httpWebResponse.Headers[XmlConstants.HttpDataServiceVersion], this.GetResponseStream, false); 313 if (null != ex) 314 { 315 this.HandleFailure(ex); 316 } 317 } 318 } 319 GetAsyncResponseStreamCopy()320 protected virtual Stream GetAsyncResponseStreamCopy() 321 { 322 this.responseStreamOwner = true; 323 324 long length = this.contentLength; 325 if ((0 < length) && (length <= Int32.MaxValue)) 326 { 327 Debug.Assert(null == this.asyncStreamCopyBuffer, "not expecting buffer"); 328 329 #if StreamContainsBuffer 330 byte[] buffer = new byte[(int)length]; 331 if (length < UInt16.MaxValue) 332 { responseStreamIsCopyBuffer = true; 333 this.asyncStreamCopyBuffer = buffer; 334 } 335 return new MemoryStream(buffer, 0, buffer.Length, true, true); 336 #else 337 return new MemoryStream((int)length); 338 #endif 339 } 340 341 return new MemoryStream(); 342 } 343 GetAsyncResponseStreamCopyBuffer()344 protected virtual byte[] GetAsyncResponseStreamCopyBuffer() 345 { Debug.Assert(null == this.asyncStreamCopyBuffer, "non-null this.asyncStreamCopyBuffer"); 346 return System.Threading.Interlocked.Exchange(ref reusableAsyncCopyBuffer, null) ?? new byte[8000]; 347 } 348 PutAsyncResponseStreamCopyBuffer(byte[] buffer)349 protected virtual void PutAsyncResponseStreamCopyBuffer(byte[] buffer) 350 { 351 reusableAsyncCopyBuffer = buffer; 352 } 353 SetHttpWebResponse(HttpWebResponse response)354 protected virtual void SetHttpWebResponse(HttpWebResponse response) 355 { 356 this.httpWebResponse = response; 357 this.statusCode = response.StatusCode; 358 this.contentLength = response.ContentLength; 359 this.contentType = response.ContentType; 360 } 361 362 #if false 363 [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes", Justification = "required for this feature")] AsyncEndGetRequestStream(IAsyncResult asyncResult)364 private static void AsyncEndGetRequestStream(IAsyncResult asyncResult) 365 { 366 QueryAsyncResult state = asyncResult.AsyncState as QueryAsyncResult; 367 try 368 { 369 int step = CompleteCheck(state, InternalError.InvalidEndGetRequestCompleted); 370 state.CompletedSynchronously &= asyncResult.CompletedSynchronously; 371 HttpWebRequest httpWebRequest = Util.NullCheck(state.Request, InternalError.InvalidEndGetRequestStreamRequest); 372 373 Stream stream = Util.NullCheck(httpWebRequest.EndGetRequestStream(asyncResult), InternalError.InvalidEndGetRequestStreamStream); 374 state.requestStream = stream; 375 376 MemoryStream memoryStream = Util.NullCheck(state.requestStreamContent, InternalError.InvalidEndGetRequestStreamContent); 377 byte[] buffer = memoryStream.GetBuffer(); 378 int bufferOffset = checked((int)memoryStream.Position); 379 int bufferLength = checked((int)memoryStream.Length) - bufferOffset; 380 if ((null == buffer) || (0 == bufferLength)) 381 { 382 Error.ThrowInternalError(InternalError.InvalidEndGetRequestStreamContentLength); 383 } 384 385 asyncResult = stream.BeginWrite(buffer, bufferOffset, bufferLength, QueryAsyncResult.AsyncEndWrite, state); 386 387 bool reallyCompletedSynchronously = asyncResult.CompletedSynchronously && (step < state.asyncCompleteStep); 388 state.CompletedSynchronously &= reallyCompletedSynchronously; } 389 catch (Exception e) 390 { 391 if (state.HandleFailure(e)) 392 { 393 throw; 394 } 395 } 396 finally 397 { 398 state.HandleCompleted(); 399 } 400 } 401 402 [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes", Justification = "required for this feature")] AsyncEndWrite(IAsyncResult asyncResult)403 private static void AsyncEndWrite(IAsyncResult asyncResult) 404 { 405 QueryAsyncResult state = asyncResult.AsyncState as QueryAsyncResult; 406 try 407 { 408 int step = CompleteCheck(state, InternalError.InvalidEndWriteCompleted); 409 state.CompletedSynchronously &= asyncResult.CompletedSynchronously; 410 HttpWebRequest httpWebRequest = Util.NullCheck(state.Request, InternalError.InvalidEndWriteRequest); 411 412 Stream stream = Util.NullCheck(state.requestStream, InternalError.InvalidEndWriteStream); 413 stream.EndWrite(asyncResult); 414 415 state.requestStream = null; 416 stream.Dispose(); 417 418 stream = state.requestStreamContent; 419 if (null != stream) 420 { 421 state.requestStreamContent = null; 422 stream.Dispose(); 423 } 424 425 asyncResult = httpWebRequest.BeginGetResponse(QueryAsyncResult.AsyncEndGetResponse, state); 426 427 bool reallyCompletedSynchronously = asyncResult.CompletedSynchronously && (step < state.asyncCompleteStep); 428 state.CompletedSynchronously &= reallyCompletedSynchronously; } 429 catch (Exception e) 430 { 431 if (state.HandleFailure(e)) 432 { 433 throw; 434 } 435 } 436 finally 437 { 438 state.HandleCompleted(); 439 } 440 } 441 #endif 442 443 [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes", Justification = "required for this feature")] AsyncEndGetResponse(IAsyncResult asyncResult)444 private static void AsyncEndGetResponse(IAsyncResult asyncResult) 445 { 446 Debug.Assert(asyncResult != null && asyncResult.IsCompleted, "asyncResult.IsCompleted"); 447 QueryResult state = asyncResult.AsyncState as QueryResult; 448 try 449 { 450 CompleteCheck(state, InternalError.InvalidEndGetResponseCompleted); 451 state.CompletedSynchronously &= asyncResult.CompletedSynchronously; 452 HttpWebRequest httpWebRequest = Util.NullCheck(state.Request, InternalError.InvalidEndGetResponseRequest); 453 454 HttpWebResponse response = null; 455 try 456 { 457 response = (HttpWebResponse)httpWebRequest.EndGetResponse(asyncResult); 458 } 459 catch (WebException e) 460 { 461 response = (HttpWebResponse)e.Response; 462 if (null == response) 463 { 464 throw; 465 } 466 } 467 468 state.SetHttpWebResponse(Util.NullCheck(response, InternalError.InvalidEndGetResponseResponse)); 469 Debug.Assert(null == state.asyncResponseStream, "non-null asyncResponseStream"); 470 471 Stream stream = null; 472 if (HttpStatusCode.NoContent != response.StatusCode) 473 { 474 stream = response.GetResponseStream(); 475 state.asyncResponseStream = stream; 476 } 477 478 if ((null != stream) && stream.CanRead) 479 { 480 if (null == state.responseStream) 481 { state.responseStream = Util.NullCheck(state.GetAsyncResponseStreamCopy(), InternalError.InvalidAsyncResponseStreamCopy); 482 } 483 484 if (null == state.asyncStreamCopyBuffer) 485 { state.asyncStreamCopyBuffer = Util.NullCheck(state.GetAsyncResponseStreamCopyBuffer(), InternalError.InvalidAsyncResponseStreamCopyBuffer); 486 } 487 488 QueryResult.ReadResponseStream(state); 489 } 490 else 491 { 492 state.SetCompleted(); 493 } 494 } 495 catch (Exception e) 496 { 497 if (state.HandleFailure(e)) 498 { 499 throw; 500 } 501 } 502 finally 503 { 504 state.HandleCompleted(); 505 } 506 } 507 ReadResponseStream(QueryResult queryResult)508 private static void ReadResponseStream(QueryResult queryResult) 509 { 510 IAsyncResult asyncResult; 511 512 byte[] buffer = queryResult.asyncStreamCopyBuffer; 513 Stream stream = queryResult.asyncResponseStream; 514 do 515 { 516 int bufferOffset, bufferLength; 517 #if StreamContainsBuffer 518 if (state.responseStreamIsCopyBuffer) 519 { bufferOffset = checked((int)state.responseStream.Position); 520 bufferLength = buffer.Length - bufferOffset; 521 } 522 else 523 #endif 524 { 525 bufferOffset = 0; 526 bufferLength = buffer.Length; 527 } 528 529 queryResult.usingBuffer = true; 530 asyncResult = BaseAsyncResult.InvokeAsync(stream.BeginRead, buffer, bufferOffset, bufferLength, QueryResult.AsyncEndRead, queryResult); 531 queryResult.CompletedSynchronously &= asyncResult.CompletedSynchronously; } 532 while (asyncResult.CompletedSynchronously && !queryResult.IsCompletedInternally && stream.CanRead); 533 534 Debug.Assert(!queryResult.CompletedSynchronously || queryResult.IsCompletedInternally, "AsyncEndGetResponse !IsCompleted"); 535 } 536 537 [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes", Justification = "required for this feature")] AsyncEndRead(IAsyncResult asyncResult)538 private static void AsyncEndRead(IAsyncResult asyncResult) 539 { 540 Debug.Assert(asyncResult != null && asyncResult.IsCompleted, "asyncResult.IsCompleted"); 541 QueryResult state = asyncResult.AsyncState as QueryResult; 542 int count = 0; 543 try 544 { 545 CompleteCheck(state, InternalError.InvalidEndReadCompleted); 546 state.CompletedSynchronously &= asyncResult.CompletedSynchronously; 547 Stream stream = Util.NullCheck(state.asyncResponseStream, InternalError.InvalidEndReadStream); 548 Stream outputResponse = Util.NullCheck(state.responseStream, InternalError.InvalidEndReadCopy); 549 byte[] buffer = Util.NullCheck(state.asyncStreamCopyBuffer, InternalError.InvalidEndReadBuffer); 550 551 count = stream.EndRead(asyncResult); 552 state.usingBuffer = false; 553 if (0 < count) 554 { 555 #if StreamContainsBuffer 556 if (state.responseStreamIsCopyBuffer) 557 { outputResponse.Position = outputResponse.Position + count; 558 } 559 else 560 #endif 561 { 562 outputResponse.Write(buffer, 0, count); 563 } 564 } 565 566 if (0 < count && 0 < buffer.Length && stream.CanRead) 567 { 568 if (!asyncResult.CompletedSynchronously) 569 { 570 QueryResult.ReadResponseStream(state); 571 } 572 } 573 else 574 { 575 #if StreamContainsBuffer 576 Debug.Assert(!state.responseStreamIsCopyBuffer || (outputResponse.Position == outputResponse.Length), "didn't read expected count"); 577 #endif 578 if (outputResponse.Position < outputResponse.Length) 579 { 580 ((MemoryStream)outputResponse).SetLength(outputResponse.Position); 581 } 582 583 state.SetCompleted(); 584 } 585 } 586 catch (Exception e) 587 { 588 if (state.HandleFailure(e)) 589 { 590 throw; 591 } 592 } 593 finally 594 { 595 state.HandleCompleted(); 596 } 597 } 598 CompleteCheck(QueryResult pereq, InternalError errorcode)599 private static void CompleteCheck(QueryResult pereq, InternalError errorcode) 600 { 601 if ((null == pereq) || (pereq.IsCompletedInternally && !pereq.IsAborted)) 602 { 603 Error.ThrowInternalError(errorcode); 604 } 605 } 606 } 607 } 608