1 // Licensed to the .NET Foundation under one or more agreements. 2 // The .NET Foundation licenses this file to you under the MIT license. 3 // See the LICENSE file in the project root for more information. 4 5 using System.Collections.Generic; 6 using System.Diagnostics; 7 using System.IO; 8 using System.Runtime.CompilerServices; 9 using System.Runtime.ExceptionServices; 10 using System.Runtime.InteropServices; 11 using System.Threading; 12 using System.Threading.Tasks; 13 using Microsoft.Win32.SafeHandles; 14 15 using CURLcode = Interop.Http.CURLcode; 16 using CURLMcode = Interop.Http.CURLMcode; 17 using CURLINFO = Interop.Http.CURLINFO; 18 19 namespace System.Net.Http 20 { 21 internal partial class CurlHandler : HttpMessageHandler 22 { 23 /// <summary>Provides a multi handle and the associated processing for all requests on the handle.</summary> 24 private sealed class MultiAgent : IDisposable 25 { 26 /// <summary> 27 /// Amount of time in milliseconds to keep a multiagent worker alive when there's no work to be done. 28 /// Increasing this value makes it more likely that a worker will end up getting reused for subsequent 29 /// requests, saving on the costs associated with spinning up a new worker, but at the same time it 30 /// burns a thread for that period of time. 31 /// </summary> 32 private const int KeepAliveMilliseconds = 50; 33 34 private static readonly Interop.Http.ReadWriteCallback s_receiveHeadersCallback = CurlReceiveHeadersCallback; 35 private static readonly Interop.Http.ReadWriteCallback s_sendCallback = CurlSendCallback; 36 private static readonly Interop.Http.SeekCallback s_seekCallback = CurlSeekCallback; 37 private static readonly Interop.Http.ReadWriteCallback s_receiveBodyCallback = CurlReceiveBodyCallback; 38 private static readonly Interop.Http.DebugCallback s_debugCallback = CurlDebugFunction; 39 40 /// <summary>CurlHandler that created this MultiAgent. If null, this is a shared handler.</summary> 41 private readonly CurlHandler _creatingHandler; 42 43 /// <summary> 44 /// A collection of not-yet-processed incoming requests for work to be done 45 /// by this multi agent. This can include making new requests, canceling 46 /// active requests, or unpausing active requests. 47 /// Protected by a lock on <see cref="_incomingRequests"/>. 48 /// </summary> 49 private readonly Queue<IncomingRequest> _incomingRequests = new Queue<IncomingRequest>(); 50 51 /// <summary>Map of activeOperations, indexed by a GCHandle to a StrongToWeakReference{EasyRequest}.</summary> 52 private readonly Dictionary<IntPtr, ActiveRequest> _activeOperations = new Dictionary<IntPtr, ActiveRequest>(); 53 54 /// <summary> 55 /// Special file descriptor used to wake-up curl_multi_wait calls. This is the read 56 /// end of a pipe, with the write end written to when work is queued or when cancellation 57 /// is requested. This is only valid while the worker is executing. 58 /// </summary> 59 private SafeFileHandle _wakeupRequestedPipeFd; 60 61 /// <summary> 62 /// Write end of the pipe connected to <see cref="_wakeupRequestedPipeFd"/>. 63 /// This is only valid while the worker is executing. 64 /// </summary> 65 private SafeFileHandle _requestWakeupPipeFd; 66 67 /// <summary> 68 /// Task for the currently running worker, or null if there is no current worker. 69 /// Protected by a lock on <see cref="_incomingRequests"/>. 70 /// </summary> 71 private Task _runningWorker; 72 73 /// <summary> 74 /// Multi handle used to service all requests on this agent. It's lazily 75 /// created when it's first needed, so that it can utilize all of the settings 76 /// from the associated handler, and it's kept open for the duration of this 77 /// agent so that all of the resources it pools (connection pool, DNS cache, etc.) 78 /// can be used for all requests on this agent. 79 /// </summary> 80 private Interop.Http.SafeCurlMultiHandle _multiHandle; 81 82 /// <summary>Set when Dispose has been called.</summary> 83 private bool _disposed; 84 85 /// <summary>Initializes the MultiAgent.</summary> 86 /// <param name="handler">The handler that created this agent, or null if it's shared.</param> MultiAgent(CurlHandler handler)87 public MultiAgent(CurlHandler handler) 88 { 89 _creatingHandler = handler; 90 } 91 92 /// <summary>Disposes of the agent.</summary> Dispose()93 public void Dispose() 94 { 95 EventSourceTrace(null); 96 _disposed = true; 97 QueueIfRunning(new IncomingRequest { Type = IncomingRequestType.Shutdown }); 98 _multiHandle?.Dispose(); 99 } 100 101 /// <summary>Queues a request for the multi handle to process.</summary> Queue(IncomingRequest request)102 public void Queue(IncomingRequest request) 103 { 104 lock (_incomingRequests) 105 { 106 // Add the request, then initiate processing. 107 _incomingRequests.Enqueue(request); 108 EnsureWorkerIsRunning(); 109 } 110 } 111 112 /// <summary>Queues a request for the multi handle to process, but only if there's already an active worker running.</summary> QueueIfRunning(IncomingRequest request)113 public void QueueIfRunning(IncomingRequest request) 114 { 115 lock (_incomingRequests) 116 { 117 if (_runningWorker != null) 118 { 119 _incomingRequests.Enqueue(request); 120 if (_incomingRequests.Count == 1) 121 { 122 RequestWakeup(); 123 } 124 } 125 } 126 } 127 128 /// <summary>Gets the ID of the currently running worker, or null if there isn't one.</summary> 129 internal int? RunningWorkerId => _runningWorker?.Id; 130 131 /// <summary>Schedules the processing worker if one hasn't already been scheduled.</summary> EnsureWorkerIsRunning()132 private void EnsureWorkerIsRunning() 133 { 134 Debug.Assert(Monitor.IsEntered(_incomingRequests), "Needs to be called under _incomingRequests lock"); 135 136 if (_runningWorker == null) 137 { 138 EventSourceTrace("MultiAgent worker queueing"); 139 140 // Ensure we've created the multi handle for this agent. 141 if (_multiHandle == null) 142 { 143 _multiHandle = CreateAndConfigureMultiHandle(); 144 } 145 146 // Create pipe used to forcefully wake up curl_multi_wait calls when something important changes. 147 // This is created here so that the pipe is available immediately for subsequent queue calls to use. 148 Debug.Assert(_wakeupRequestedPipeFd == null, "Read pipe should have been cleared"); 149 Debug.Assert(_requestWakeupPipeFd == null, "Write pipe should have been cleared"); 150 unsafe 151 { 152 int* fds = stackalloc int[2]; 153 Interop.CheckIo(Interop.Sys.Pipe(fds)); 154 _wakeupRequestedPipeFd = new SafeFileHandle((IntPtr)fds[Interop.Sys.ReadEndOfPipe], true); 155 _requestWakeupPipeFd = new SafeFileHandle((IntPtr)fds[Interop.Sys.WriteEndOfPipe], true); 156 } 157 158 // Create the processing task. It's "DenyChildAttach" to avoid any surprises if 159 // code happens to create attached tasks, and it's LongRunning because this thread 160 // is likely going to sit around for a while in a wait loop (and the more requests 161 // are concurrently issued to the same agent, the longer the thread will be around). 162 _runningWorker = new Task(s => ((MultiAgent)s).WorkerBody(), this, 163 CancellationToken.None, TaskCreationOptions.DenyChildAttach | TaskCreationOptions.LongRunning); 164 165 // We want to avoid situations where a Dispose occurs while we're in the middle 166 // of processing requests and causes us to tear out the multi handle while it's 167 // in active use. To avoid that, we add-ref it here, and release it at the end 168 // of the worker loop. 169 bool ignored = false; 170 _multiHandle.DangerousAddRef(ref ignored); 171 172 // Kick off the processing task. This is done after both setting _runningWorker 173 // to non-null and add-refing the handle, both to avoid race conditions. The worker 174 // body needs to see _runningWorker as non-null and assumes that it's free to use 175 // the multi handle, without fear of it having been disposed. 176 _runningWorker.Start(TaskScheduler.Default); 177 } 178 else // _runningWorker != null 179 { 180 // The worker is already running. If there are already queued requests, we're done. 181 // However, if there aren't any queued requests, Process could be blocked inside of 182 // curl_multi_wait, and we want to make sure it wakes up to see that there additional 183 // requests waiting to be handled. So we write to the wakeup pipe. 184 Debug.Assert(_incomingRequests.Count >= 1, "We just queued a request, so the count should be at least 1"); 185 if (_incomingRequests.Count == 1) 186 { 187 RequestWakeup(); 188 } 189 } 190 } 191 192 /// <summary>Write a byte to the wakeup pipe.</summary> RequestWakeup()193 private unsafe void RequestWakeup() 194 { 195 EventSourceTrace(null); 196 byte b = 1; 197 Interop.CheckIo(Interop.Sys.Write(_requestWakeupPipeFd, &b, 1)); 198 } 199 200 /// <summary>Clears data from the wakeup pipe.</summary> 201 /// <remarks> 202 /// This must only be called when we know there's data to be read. 203 /// The MultiAgent could easily deadlock if it's called when there's no data in the pipe. 204 /// </remarks> ReadFromWakeupPipeWhenKnownToContainData()205 private unsafe void ReadFromWakeupPipeWhenKnownToContainData() 206 { 207 // It's possible but unlikely that there will be tons of extra data in the pipe, 208 // more than we end up reading out here (it's unlikely because we only write a byte to the pipe when 209 // transitioning from 0 to 1 incoming request). In that unlikely event, the worst 210 // case will be that the next one or more waits will wake up immediately, with each one 211 // subsequently clearing out more of the pipe. 212 const int ClearBufferSize = 64; // sufficiently large to clear the pipe in any normal case 213 byte* clearBuf = stackalloc byte[ClearBufferSize]; 214 Interop.CheckIo(Interop.Sys.Read(_wakeupRequestedPipeFd, clearBuf, ClearBufferSize)); 215 } 216 217 /// <summary>Requests that libcurl unpause the connection associated with this request.</summary> RequestUnpause(EasyRequest easy)218 internal void RequestUnpause(EasyRequest easy) 219 { 220 EventSourceTrace(null, easy: easy); 221 QueueIfRunning(new IncomingRequest { Easy = easy, Type = IncomingRequestType.Unpause }); 222 } 223 224 /// <summary>Requests that the request associated with the easy operation be canceled.</summary> RequestCancel(EasyRequest easy)225 internal void RequestCancel(EasyRequest easy) 226 { 227 EventSourceTrace(null, easy: easy); 228 QueueIfRunning(new IncomingRequest { Easy = easy, Type = IncomingRequestType.Cancel }); 229 } 230 231 /// <summary>Creates and configures a new multi handle.</summary> CreateAndConfigureMultiHandle()232 private Interop.Http.SafeCurlMultiHandle CreateAndConfigureMultiHandle() 233 { 234 // Create the new handle 235 Interop.Http.SafeCurlMultiHandle multiHandle = Interop.Http.MultiCreate(); 236 if (multiHandle.IsInvalid) 237 { 238 throw CreateHttpRequestException(new CurlException((int)CURLcode.CURLE_FAILED_INIT, isMulti: false)); 239 } 240 241 // In support of HTTP/2, enable HTTP/2 connections to be multiplexed if possible. 242 // We must only do this if the version of libcurl being used supports HTTP/2 multiplexing. 243 // Due to a change in a libcurl signature, if we try to make this call on an older libcurl, 244 // we'll end up accidentally and unconditionally enabling HTTP 1.1 pipelining. 245 if (s_supportsHttp2Multiplexing) 246 { 247 ThrowIfCURLMError(Interop.Http.MultiSetOptionLong(multiHandle, 248 Interop.Http.CURLMoption.CURLMOPT_PIPELINING, 249 (long)Interop.Http.CurlPipe.CURLPIPE_MULTIPLEX)); 250 EventSourceTrace("Set multiplexing on multi handle"); 251 } 252 253 // Configure max connections per host if it was changed from the default. In shared mode, 254 // this will be pulled from the handler that first created the agent; the setting from subsequent 255 // handlers that use this will be ignored. 256 if (_creatingHandler != null) 257 { 258 int maxConnections = _creatingHandler.MaxConnectionsPerServer; 259 if (maxConnections < int.MaxValue) // int.MaxValue considered infinite, mapping to libcurl default of 0 260 { 261 CURLMcode code = Interop.Http.MultiSetOptionLong(multiHandle, Interop.Http.CURLMoption.CURLMOPT_MAX_HOST_CONNECTIONS, maxConnections); 262 switch (code) 263 { 264 case CURLMcode.CURLM_OK: 265 EventSourceTrace("Set max host connections to {0}", maxConnections); 266 break; 267 default: 268 // Treat failures as non-fatal in release; worst case is we employ more connections than desired. 269 EventSourceTrace("Setting CURLMOPT_MAX_HOST_CONNECTIONS failed: {0}. Ignoring option.", code); 270 break; 271 } 272 } 273 } 274 275 return multiHandle; 276 } 277 278 /// <summary>Thread work item entrypoint for a multiagent worker.</summary> WorkerBody()279 private void WorkerBody() 280 { 281 Debug.Assert(!Monitor.IsEntered(_incomingRequests), $"No locks should be held while invoking {nameof(WorkerBody)}"); 282 Debug.Assert(_runningWorker != null && _runningWorker.Id == Task.CurrentId, "This is the worker, so it must be running"); 283 284 EventSourceTrace("MultiAgent worker running"); 285 try 286 { 287 try 288 { 289 // Do the actual processing 290 WorkerBodyLoop(); 291 } 292 finally 293 { 294 EventSourceTrace("MultiAgent worker shutting down"); 295 296 // The multi handle's reference count was increased prior to launching 297 // this processing task. Release that reference; any Dispose operations 298 // that occurred during the worker's processing will now be allowed to 299 // proceed to clean up the multi handle. 300 _multiHandle.DangerousRelease(); 301 302 lock (_incomingRequests) 303 { 304 // Close our wakeup pipe (ignore close errors). 305 // This is done while holding the lock to prevent 306 // subsequent Queue calls to see an improperly configured 307 // set of descriptors. 308 _wakeupRequestedPipeFd.Dispose(); 309 _wakeupRequestedPipeFd = null; 310 _requestWakeupPipeFd.Dispose(); 311 _requestWakeupPipeFd = null; 312 313 // In the time between we stopped processing and taking the lock, 314 // more requests could have been added. If they were, 315 // kick off another processing loop. 316 _runningWorker = null; 317 if (_incomingRequests.Count > 0 && !_disposed) 318 { 319 EnsureWorkerIsRunning(); 320 } 321 } 322 } 323 } 324 catch (Exception exc) 325 { 326 // Something went very wrong. In general this should not happen. The only time it might reasonably 327 // happen is if CurlHandler is disposed of while it's actively processing, in which case we could 328 // get an ObjectDisposedException. 329 EventSourceTrace("Unexpected worker failure: {0}", exc); 330 Debug.Assert(exc is ObjectDisposedException, $"Unexpected exception from processing loop: {exc}"); 331 332 // At this point if there any queued requests but there's no worker, 333 // those queued requests are potentially going to sit there waiting forever, 334 // resulting in a hang. Instead, fail those requests. 335 lock (_incomingRequests) 336 { 337 if (_runningWorker == null) 338 { 339 while (_incomingRequests.Count > 0) 340 { 341 _incomingRequests.Dequeue().Easy.CleanupAndFailRequest(exc); 342 } 343 } 344 } 345 } 346 } 347 348 /// <summary>Main processing loop employed by the multiagent worker body.</summary> WorkerBodyLoop()349 private void WorkerBodyLoop() 350 { 351 Debug.Assert(_wakeupRequestedPipeFd != null, "Should have a non-null pipe for wake ups"); 352 Debug.Assert(!_wakeupRequestedPipeFd.IsInvalid, "Should have a valid pipe for wake ups"); 353 Debug.Assert(!_wakeupRequestedPipeFd.IsClosed, "Should have an open pipe for wake ups"); 354 355 Debug.Assert(_multiHandle != null, "Should have a non-null multi handle"); 356 Debug.Assert(!_multiHandle.IsInvalid, "Should have a valid multi handle"); 357 Debug.Assert(!_multiHandle.IsClosed, "Should have an open multi handle"); 358 359 // Clear our active operations table. This should already be clear, either because 360 // all previous operations completed without unexpected exception, or in the case of an 361 // unexpected exception we should have cleaned up gracefully anyway. But just in case... 362 Debug.Assert(_activeOperations.Count == 0, "We shouldn't have any active operations when starting processing."); 363 _activeOperations.Clear(); 364 365 Exception eventLoopError = null; 366 try 367 { 368 // Continue processing as long as there are any active operations 369 while (true) 370 { 371 // First handle any requests in the incoming requests queue. 372 // (This method is factored out mostly to keep this loop concise, but also partly 373 // to avoid keeping any references to EasyRequests rooted by the stack and thus 374 // preventing them from being GC'd and the response stream finalized. That's mainly 375 // a concern for debug builds, where the JIT may extend a local's lifetime. The same 376 // logic applies to some of the other helpers used later in this loop.) 377 HandleIncomingRequests(); 378 379 // If we have no active operations, there's no work to do right now. 380 if (_activeOperations.Count == 0) 381 { 382 // Setting up a MultiAgent processing loop involves creating a new MultiAgent, creating a task 383 // and a thread to process it, creating a new pipe, etc., which has non-trivial cost associated 384 // with it. Thus, to avoid repeatedly spinning up and down these workers, we can keep this worker 385 // alive for a little while longer in case another request comes in within some reasonably small 386 // amount of time. This helps with the case where a sequence of requests is made serially rather 387 // than in parallel, avoiding the likely case of spinning up a new multiagent for each. 388 Interop.Sys.PollEvents triggered; 389 Interop.Error pollRv = Interop.Sys.Poll(_wakeupRequestedPipeFd, Interop.Sys.PollEvents.POLLIN, KeepAliveMilliseconds, out triggered); 390 if (pollRv == Interop.Error.SUCCESS && (triggered & Interop.Sys.PollEvents.POLLIN) != 0) 391 { 392 // Another request came in while we were waiting. Clear the pipe and loop around to continue processing. 393 ReadFromWakeupPipeWhenKnownToContainData(); 394 continue; 395 } 396 397 // We're done. Exit the multiagent. 398 return; 399 } 400 401 // We have one or more active operations. Run any work that needs to be run. 402 PerformCurlWork(); 403 404 // Complete and remove any requests that have finished being processed. 405 Interop.Http.CURLMSG message; 406 IntPtr easyHandle; 407 CURLcode result; 408 while (Interop.Http.MultiInfoRead(_multiHandle, out message, out easyHandle, out result)) 409 { 410 HandleCurlMessage(message, easyHandle, result); 411 } 412 413 // If there are any active operations, wait for more things to do. 414 if (_activeOperations.Count > 0) 415 { 416 WaitForWork(); 417 } 418 } 419 } 420 catch (Exception exc) 421 { 422 eventLoopError = exc; 423 throw; 424 } 425 finally 426 { 427 // There may still be active operations, if an unexpected exception occurred. 428 // Make sure to clean up any remaining operations, failing them and releasing their resources. 429 if (_activeOperations.Count > 0) 430 { 431 CleanUpRemainingActiveOperations(eventLoopError); 432 } 433 } 434 } 435 436 /// <summary> 437 /// Drains the incoming requests queue, dequeuing each request and handling it according to its type. 438 /// </summary> HandleIncomingRequests()439 private void HandleIncomingRequests() 440 { 441 Debug.Assert(!Monitor.IsEntered(_incomingRequests), "Incoming requests lock should only be held while accessing the queue"); 442 EventSourceTrace(null); 443 444 while (true) 445 { 446 // Get the next request 447 IncomingRequest request; 448 lock (_incomingRequests) 449 { 450 if (_incomingRequests.Count == 0) 451 { 452 return; 453 } 454 455 request = _incomingRequests.Dequeue(); 456 } 457 458 // Process the request 459 EasyRequest easy = request.Easy; 460 EventSourceTrace("Type: {0}", request.Type, easy: easy); 461 switch (request.Type) 462 { 463 case IncomingRequestType.New: 464 ActivateNewRequest(easy); 465 break; 466 467 case IncomingRequestType.Cancel: 468 Debug.Assert(easy._associatedMultiAgent == this, "Should only cancel associated easy requests"); 469 FindFailAndCleanupActiveRequest(easy, new OperationCanceledException(easy._cancellationToken)); 470 break; 471 472 case IncomingRequestType.Unpause: 473 Debug.Assert(easy._associatedMultiAgent == this, "Should only unpause associated easy requests"); 474 if (!easy._easyHandle.IsClosed) 475 { 476 IntPtr gcHandlePtr; 477 ActiveRequest ar; 478 Debug.Assert(FindActiveRequest(easy, out gcHandlePtr, out ar), "Couldn't find active request for unpause"); 479 480 try 481 { 482 ThrowIfCURLEError(Interop.Http.EasyUnpause(easy._easyHandle)); 483 } 484 catch (Exception exc) 485 { 486 FindFailAndCleanupActiveRequest(easy, exc); 487 } 488 } 489 break; 490 491 case IncomingRequestType.Shutdown: 492 // When we get a shutdown request, we want to stop all operations that haven't had 493 // their response message published. Other operations may continue. 494 Debug.Assert(easy == null, "Expected null easy for a Shutdown request"); 495 CleanUpRemainingActiveOperations( 496 new OperationCanceledException(SR.net_http_unix_handler_disposed), 497 onlyIfResponseMessageNotPublished: true); 498 break; 499 500 default: 501 Debug.Fail("Invalid request type: " + request.Type); 502 break; 503 } 504 } 505 } 506 507 /// <summary>Tell libcurl to perform any available processing on the easy handles associated with this agent's multi handle.</summary> PerformCurlWork()508 private void PerformCurlWork() 509 { 510 CURLMcode performResult; 511 EventSourceTrace("Ask libcurl to perform any available work..."); 512 while ((performResult = Interop.Http.MultiPerform(_multiHandle)) == CURLMcode.CURLM_CALL_MULTI_PERFORM) ; 513 EventSourceTrace("...done performing work: {0}", performResult); 514 ThrowIfCURLMError(performResult); 515 } 516 517 /// <summary> 518 /// Tell libcurl to block waiting for work to be ready to handle. It'll return when there's work to be 519 /// performed, when a timeout has occurred, or when new requests have entered our incoming requests queue. 520 /// </summary> WaitForWork()521 private void WaitForWork() 522 { 523 // Ask libcurl to wait for more things to do. We pass in our wakeup-requested pipe handle so that libcurl 524 // will wait on that file descriptor as well and wake up if an incoming request arrived into our queue. 525 bool isWakeupRequestedPipeActive; 526 bool isTimeout; 527 ThrowIfCURLMError(Interop.Http.MultiWait(_multiHandle, _wakeupRequestedPipeFd, out isWakeupRequestedPipeActive, out isTimeout)); 528 529 if (isWakeupRequestedPipeActive) 530 { 531 // We woke up (at least in part) because a wake-up was requested. 532 // Read the data out of the pipe to clear it. 533 Debug.Assert(!isTimeout, $"Should not have timed out when {nameof(isWakeupRequestedPipeActive)} is true"); 534 EventSourceTrace("Wait wake-up"); 535 ReadFromWakeupPipeWhenKnownToContainData(); 536 } 537 538 if (isTimeout) 539 { 540 EventSourceTrace("Wait timeout"); 541 } 542 543 // PERF NOTE: curl_multi_wait uses poll (assuming it's available), which is O(N) in terms of the number of fds 544 // being waited on. If this ends up being a scalability bottleneck, we can look into using the curl_multi_socket_* 545 // APIs, which would let us switch to using epoll by being notified when sockets file descriptors are added or 546 // removed and configuring the epoll context with EPOLL_CTL_ADD/DEL, which at the expense of a lot of additional 547 // complexity would let us turn the O(N) operation into an O(1) operation. The additional complexity would come 548 // not only in the form of additional callbacks and managing the socket collection, but also in the form of timer 549 // management, which is necessary when using the curl_multi_socket_* APIs and which we avoid by using just 550 // curl_multi_wait/perform. 551 } 552 553 /// <summary>Handle a libcurl message received as part of processing work. This should signal a completed operation.</summary> HandleCurlMessage(Interop.Http.CURLMSG message, IntPtr easyHandle, CURLcode result)554 private void HandleCurlMessage(Interop.Http.CURLMSG message, IntPtr easyHandle, CURLcode result) 555 { 556 if (message != Interop.Http.CURLMSG.CURLMSG_DONE) 557 { 558 Debug.Fail($"CURLMSG_DONE is supposed to be the only message type, but got {message}"); 559 EventSourceTrace("Unexpected CURLMSG: {0}", message); 560 return; 561 } 562 563 // Get the GCHandle pointer from the easy handle's state 564 IntPtr gcHandlePtr; 565 CURLcode getInfoResult = Interop.Http.EasyGetInfoPointer(easyHandle, CURLINFO.CURLINFO_PRIVATE, out gcHandlePtr); 566 Debug.Assert(getInfoResult == CURLcode.CURLE_OK, $"Failed to get info on a completing easy handle: {getInfoResult}"); 567 if (getInfoResult == CURLcode.CURLE_OK) 568 { 569 // Use the GCHandle to look up the associated ActiveRequest 570 ActiveRequest completedOperation; 571 bool gotActiveOp = _activeOperations.TryGetValue(gcHandlePtr, out completedOperation); 572 Debug.Assert(gotActiveOp, "Expected to find GCHandle ptr in active operations table"); 573 574 // Deactivate the easy handle and finish all processing related to the request 575 DeactivateActiveRequest(completedOperation, gcHandlePtr); 576 FinishRequest(completedOperation.EasyWrapper, result); 577 } 578 } 579 580 /// <summary>When shutting down the multi agent worker, ensure any active operations are forcibly completed.</summary> 581 /// <param name="error">The error to use to complete any remaining operations.</param> 582 /// <param name="onlyIfResponseMessageNotPublished"> 583 /// true if the only active operations that should be canceled and cleaned up are those which have not 584 /// yet had their response message published. false if all active operations should be canceled regardless 585 /// of where they are in processing. 586 /// </param> CleanUpRemainingActiveOperations(Exception error, bool onlyIfResponseMessageNotPublished = false)587 private void CleanUpRemainingActiveOperations(Exception error, bool onlyIfResponseMessageNotPublished = false) 588 { 589 EventSourceTrace("Shutting down {0} active operations.", _activeOperations.Count); 590 try 591 { 592 // Copy the operations to a tmp array so that we don't try to modify the dictionary while enumerating it 593 var activeOps = new KeyValuePair<IntPtr, ActiveRequest>[_activeOperations.Count]; 594 ((IDictionary<IntPtr, ActiveRequest>)_activeOperations).CopyTo(activeOps, 0); 595 596 // Fail all active ops. 597 Exception lastError = null; 598 foreach (KeyValuePair<IntPtr, ActiveRequest> pair in activeOps) 599 { 600 try 601 { 602 IntPtr failingOperationGcHandle = pair.Key; 603 ActiveRequest failingActiveRequest = pair.Value; 604 EasyRequest easy = failingActiveRequest.EasyWrapper.Target; // may be null if the EasyRequest was already collected 605 if (!onlyIfResponseMessageNotPublished || (easy != null && !easy.Task.IsCompleted)) 606 { 607 // Deactivate the request, removing it from the multi handle and allowing it to be cleaned up 608 DeactivateActiveRequest(failingActiveRequest, failingOperationGcHandle); 609 610 // Complete the operation's task and clean up any of its resources, if it still exists. 611 easy?.CleanupAndFailRequest(CreateHttpRequestException(error)); 612 } 613 } 614 catch (Exception e) 615 { 616 // We don't want a spurious failure while cleaning up one request to prevent us from trying 617 // to clean up the rest of them. 618 lastError = e; 619 } 620 } 621 622 // Now propagate any failure that may have occurred while cleaning up 623 if (lastError != null) 624 { 625 ExceptionDispatchInfo.Throw(lastError); 626 } 627 } 628 finally 629 { 630 if (!onlyIfResponseMessageNotPublished) 631 { 632 // Ensure the table is now cleared. 633 _activeOperations.Clear(); 634 } 635 } 636 } 637 638 /// <summary> 639 /// Activates the request represented by the EasyRequest. This includes creating the libcurl easy handle, 640 /// configuring it, and associating it with the multi handle so that it may be processed. 641 /// </summary> ActivateNewRequest(EasyRequest easy)642 private void ActivateNewRequest(EasyRequest easy) 643 { 644 Debug.Assert(easy != null, "We should never get a null request"); 645 Debug.Assert(easy._associatedMultiAgent == this, "Request should be associated with this agent"); 646 647 // If cancellation has been requested, complete the request proactively 648 if (easy._cancellationToken.IsCancellationRequested) 649 { 650 easy.CleanupAndFailRequest(new OperationCanceledException(easy._cancellationToken)); 651 return; 652 } 653 654 // We need to create a GCHandle that we can pass to libcurl to let it keep associated managed 655 // state alive and help us to determine which state corresponds to the particular request. However, 656 // having a GCHandle that keeps an EasyRequest alive will prevent finalization of anything to do with 657 // that EasyRequest, which means we could end up in a situation where code creates and then drops a 658 // request, but then libcurl ends up keeping the state alive (until the reuest/response eventually times 659 // out, assuming the timeout wasn't set to infinite). To address this, we create a GCHandle to a wrapper 660 // object. At first, that wrapper object wraps a strong reference to the EasyRequest, since until a 661 // response comes back, the caller doesn't actually have a reference to anything related to the request. 662 // Then once a response comes back and the caller is responsible for keeping the request/response alive, 663 // we replace the wrapped state with a weak reference to the EasyRequest. That way, if the user then 664 // drops the response, we can allow it to be finalized and not keep it alive indefinitely by the 665 // native reference. Finalization of the response stream will cause all of the relevant state to be 666 // closed, including closing out the native easy session and then free'ing this GCHandle. 667 easy._selfStrongToWeakReference = new StrongToWeakReference<EasyRequest>(easy); // store wrapper onto the easy so that it can transition it to weak and then lose the ref 668 GCHandle gcHandle = GCHandle.Alloc(easy._selfStrongToWeakReference); 669 IntPtr gcHandlePtr = GCHandle.ToIntPtr(gcHandle); 670 671 // Configure the easy request and add it to the multi handle. 672 bool addedRef = false; 673 try 674 { 675 easy.InitializeCurl(); 676 677 easy.SetCurlOption(Interop.Http.CURLoption.CURLOPT_PRIVATE, gcHandlePtr); 678 easy.SetCurlCallbacks(gcHandlePtr, s_receiveHeadersCallback, s_sendCallback, s_seekCallback, s_receiveBodyCallback, s_debugCallback); 679 680 // Make sure that as long as the easy handle is referenced by the multi handle that 681 // it doesn't get finalized. Doing so can lead to serious problems like seg faults, 682 // for example if the multi handle is trying to access the easy handle on one thread 683 // while it's being finalized on another. 684 easy._easyHandle.DangerousAddRef(ref addedRef); 685 686 // Finally, register the easy handle with the multi handle 687 ThrowIfCURLMError(Interop.Http.MultiAddHandle(_multiHandle, easy._easyHandle)); 688 } 689 catch (Exception exc) 690 { 691 if (addedRef) 692 { 693 easy._easyHandle.DangerousRelease(); 694 } 695 gcHandle.Free(); 696 easy.CleanupAndFailRequest(exc); 697 return; 698 } 699 700 // And if cancellation can be requested, hook up a cancellation callback. 701 // This callback will put the easy request back into the queue, which will 702 // ensure that a wake-up request has been issued. 703 var cancellationReg = default(CancellationTokenRegistration); 704 if (easy._cancellationToken.CanBeCanceled) 705 { 706 // To avoid keeping the EasyRequest rooted in the associated CancellationTokenSource, 707 // the cancellation registration is given the wrapper rather than the object directly. 708 cancellationReg = easy._cancellationToken.Register(s => 709 { 710 var wrapper = (StrongToWeakReference<EasyRequest>)s; 711 EasyRequest e = wrapper.Target; // may be null if already collected 712 e?._associatedMultiAgent.RequestCancel(e); 713 }, easy._selfStrongToWeakReference); 714 } 715 716 // Finally, add it to our map. 717 _activeOperations.Add(gcHandlePtr, new ActiveRequest 718 { 719 EasyWrapper = easy._selfStrongToWeakReference, 720 EasyHandle = easy._easyHandle, 721 CancellationRegistration = cancellationReg, 722 }); 723 } 724 725 /// <summary>Extract the EasyRequest from the GCHandle pointer to it.</summary> TryGetEasyRequestFromGCHandle(IntPtr gcHandlePtr, out EasyRequest easy)726 internal static bool TryGetEasyRequestFromGCHandle(IntPtr gcHandlePtr, out EasyRequest easy) 727 { 728 // Get the EasyRequest from the context 729 try 730 { 731 GCHandle handle = GCHandle.FromIntPtr(gcHandlePtr); 732 easy = (handle.Target as StrongToWeakReference<EasyRequest>)?.Target; 733 return easy != null; 734 } 735 catch (Exception e) when (e is InvalidCastException || e is InvalidOperationException) 736 { 737 Debug.Fail($"Error accessing GCHandle: {e}"); 738 } 739 740 easy = null; 741 return false; 742 } 743 744 /// <summary> 745 /// Corresponding to ActivateNewRequest, removes the active request from the multi handle, frees the GCHandle, 746 /// removes the request from our tracking table, and ensures cancellation has been unregistered. 747 /// </summary> DeactivateActiveRequest(ActiveRequest activeRequest, IntPtr gcHandlePtr)748 private void DeactivateActiveRequest(ActiveRequest activeRequest, IntPtr gcHandlePtr) 749 { 750 try 751 { 752 // Remove the operation from the multi handle so we can shut down the multi handle cleanly 753 CURLMcode removeResult = Interop.Http.MultiRemoveHandle(_multiHandle, activeRequest.EasyHandle); 754 Debug.Assert(removeResult == CURLMcode.CURLM_OK, "Failed to remove easy handle"); // ignore cleanup errors in release 755 756 // Release the associated GCHandle so that it's not kept alive forever 757 if (gcHandlePtr != IntPtr.Zero) 758 { 759 try 760 { 761 GCHandle.FromIntPtr(gcHandlePtr).Free(); 762 bool removed = _activeOperations.Remove(gcHandlePtr); 763 Debug.Assert(removed, "Expected GCHandle to still be referenced by active operations table"); 764 } 765 catch (InvalidOperationException) 766 { 767 Debug.Fail("Couldn't get/free the GCHandle for an active operation while shutting down due to failure"); 768 } 769 } 770 771 // Undo cancellation registration 772 activeRequest.CancellationRegistration.Dispose(); 773 } 774 finally 775 { 776 // We previously AddRef'd the easy handle to ensure that it wasn't finalized 777 // while it was still registered with the multi handle. Now that it's been removed, 778 // we need to remove the reference. 779 activeRequest.EasyHandle.DangerousRelease(); 780 } 781 } 782 783 /// <summary> 784 /// Looks up an ActiveRequest in the active operations table by EasyRequest. This is a linear operation 785 /// and should not be used on hot paths. 786 /// </summary> FindActiveRequest(EasyRequest easy, out IntPtr gcHandlePtr, out ActiveRequest activeRequest)787 private bool FindActiveRequest(EasyRequest easy, out IntPtr gcHandlePtr, out ActiveRequest activeRequest) 788 { 789 // We maintain an IntPtr=>ActiveRequest mapping, which makes it cheap to look-up by GCHandle ptr but 790 // expensive to look up by EasyRequest. If we find this becoming a bottleneck, we can add a reverse 791 // map that stores the other direction as well. It should only be used on slow paths, such as when 792 // completing an operation due to failure. 793 foreach (KeyValuePair<IntPtr, ActiveRequest> pair in _activeOperations) 794 { 795 if (pair.Value.EasyWrapper.Target == easy) 796 { 797 gcHandlePtr = pair.Key; 798 activeRequest = pair.Value; 799 return true; 800 } 801 } 802 803 gcHandlePtr = IntPtr.Zero; 804 activeRequest = default(ActiveRequest); 805 return false; 806 } 807 808 /// <summary> 809 /// Finds in the active operations table the operation for the specified easy request, 810 /// and then assuming it's found, deactivates and fails it with the specified exception. 811 /// </summary> FindFailAndCleanupActiveRequest(EasyRequest easy, Exception error)812 private void FindFailAndCleanupActiveRequest(EasyRequest easy, Exception error) 813 { 814 EventSourceTrace("Error: {0}", error, easy: easy); 815 816 IntPtr gcHandlePtr; 817 ActiveRequest activeRequest; 818 if (FindActiveRequest(easy, out gcHandlePtr, out activeRequest)) 819 { 820 DeactivateActiveRequest(activeRequest, gcHandlePtr); 821 easy.CleanupAndFailRequest(error); 822 } 823 else 824 { 825 Debug.Assert(easy.Task.IsCompleted, "We should only not be able to find the request if it failed or we started to send back the response."); 826 } 827 } 828 829 /// <summary>Finishes the processing of a completed easy operation.</summary> FinishRequest(StrongToWeakReference<EasyRequest> easyWrapper, CURLcode messageResult)830 private void FinishRequest(StrongToWeakReference<EasyRequest> easyWrapper, CURLcode messageResult) 831 { 832 EasyRequest completedOperation = easyWrapper.Target; 833 EventSourceTrace("Curl result: {0}", messageResult, easy: completedOperation); 834 835 if (completedOperation == null) 836 { 837 // Already collected; nothing more to do. 838 return; 839 } 840 841 if (completedOperation._responseMessage.StatusCode != HttpStatusCode.Unauthorized) 842 { 843 // If preauthentication is enabled, then we want to transfer credentials to the handler's credential cache. 844 // That entails asking the easy operation which auth types are supported, and then giving that info to the 845 // handler, which along with the request URI and its server credentials will populate the cache appropriately. 846 if (completedOperation._handler.PreAuthenticate) 847 { 848 long authAvailable; 849 if (Interop.Http.EasyGetInfoLong(completedOperation._easyHandle, CURLINFO.CURLINFO_HTTPAUTH_AVAIL, out authAvailable) == CURLcode.CURLE_OK) 850 { 851 completedOperation._handler.TransferCredentialsToCache( 852 completedOperation._requestMessage.RequestUri, (Interop.Http.CURLAUTH)authAvailable); 853 } 854 // Ignore errors: no need to fail for the sake of putting the credentials into the cache 855 } 856 } 857 858 // Complete or fail the request 859 try 860 { 861 // At this point, we've completed processing the entire request, either due to error 862 // or due to completing the entire response. 863 completedOperation.Cleanup(); 864 865 // libcurl will return CURLE_UNSUPPORTED_PROTOCOL if the url it tried to go to had an unsupported protocol. 866 // This could be the original url provided or one provided in a Location header for a redirect. Since 867 // we vet the original url passed in, such an error here must be for a redirect, in which case we want to 868 // ignore it and treat such failures as successes, to match the Windows behavior. 869 if (messageResult != CURLcode.CURLE_UNSUPPORTED_PROTOCOL) 870 { 871 // libcurl will return CURLE_RECV_ERROR (56) if proxy authentication failed when connecting to a https server, 872 // whereas it returns CURLE_OK for a http server proxy authentication failure. We ignore this curl behavior error, 873 // and let the user rely on response message status code to match the Windows behavior. 874 if (messageResult != CURLcode.CURLE_RECV_ERROR || 875 completedOperation._responseMessage.StatusCode != HttpStatusCode.ProxyAuthenticationRequired) 876 { 877 ThrowIfCURLEError(messageResult); 878 } 879 } 880 881 // Make sure the response message is published, in case it wasn't already, and since we're done processing 882 // everything to do with this request, make sure the response stream is marked complete as well. 883 completedOperation.EnsureResponseMessagePublished(); 884 completedOperation._responseMessage.ResponseStream.SignalComplete(); 885 } 886 catch (Exception exc) 887 { 888 completedOperation.FailRequest(exc); 889 } 890 } 891 892 /// <summary>Callback invoked by libcurl when debug information is available.</summary> CurlDebugFunction(IntPtr curl, Interop.Http.CurlInfoType type, IntPtr data, ulong size, IntPtr context)893 private static void CurlDebugFunction(IntPtr curl, Interop.Http.CurlInfoType type, IntPtr data, ulong size, IntPtr context) 894 { 895 EasyRequest easy; 896 TryGetEasyRequestFromGCHandle(context, out easy); 897 // If we're unable to get an associated request, we simply trace without it. 898 899 try 900 { 901 switch (type) 902 { 903 case Interop.Http.CurlInfoType.CURLINFO_TEXT: 904 case Interop.Http.CurlInfoType.CURLINFO_HEADER_IN: 905 case Interop.Http.CurlInfoType.CURLINFO_HEADER_OUT: 906 string text = Marshal.PtrToStringAnsi(data, (int)size).Trim(); 907 if (text.Length > 0) 908 { 909 CurlHandler.EventSourceTrace("{0}: {1}", type, text, 0, easy: easy); 910 } 911 break; 912 913 default: 914 CurlHandler.EventSourceTrace("{0}: {1} bytes", type, size, 0, easy: easy); 915 break; 916 } 917 } 918 catch (Exception exc) 919 { 920 CurlHandler.EventSourceTrace("Error: {0}", exc, easy: easy); 921 } 922 } 923 924 /// <summary>Callback invoked by libcurl for each response header received.</summary> CurlReceiveHeadersCallback(IntPtr buffer, ulong size, ulong nitems, IntPtr context)925 private static ulong CurlReceiveHeadersCallback(IntPtr buffer, ulong size, ulong nitems, IntPtr context) 926 { 927 // The callback is invoked once per header; multi-line headers get merged into a single line. 928 929 size *= nitems; 930 Debug.Assert(size <= Interop.Http.CURL_MAX_HTTP_HEADER, $"Expected header size <= {Interop.Http.CURL_MAX_HTTP_HEADER}, got {size}"); 931 932 EasyRequest easy; 933 if (TryGetEasyRequestFromGCHandle(context, out easy)) 934 { 935 CurlHandler.EventSourceTrace("Size: {0}", size, easy: easy); 936 try 937 { 938 if (size == 0) 939 { 940 return 0; 941 } 942 943 // Make sure we've not yet published the response. This could happen with trailer headers, 944 // in which case we just ignore them (we don't want to add them to the response headers at 945 // this point, as it'd contribute to a race condition, both in terms of headers appearing 946 // "randomly" and in terms of accessing a non-thread-safe data structure from this thread 947 // while the consumer might be accessing / mutating it elsewhere.) 948 if (easy.Task.IsCompleted) 949 { 950 CurlHandler.EventSourceTrace("Response already published. Ignoring headers.", easy: easy); 951 return size; 952 } 953 954 CurlResponseMessage response = easy._responseMessage; 955 CurlResponseHeaderReader reader = new CurlResponseHeaderReader(buffer, size); 956 957 // Validate that we haven't received too much header data. 958 // MaxResponseHeadersLength property is in units in K (1024) bytes. 959 ulong headerBytesReceived = response._headerBytesReceived + size; 960 if (headerBytesReceived > (ulong)(easy._handler.MaxResponseHeadersLength * 1024)) 961 { 962 throw new HttpRequestException( 963 SR.Format(SR.net_http_response_headers_exceeded_length, easy._handler.MaxResponseHeadersLength)); 964 } 965 response._headerBytesReceived = (uint)headerBytesReceived; 966 967 // Parse the header 968 if (reader.ReadStatusLine(response)) 969 { 970 CurlHandler.EventSourceTrace("Received status line", easy: easy); 971 972 // Clear the headers when the status line is received. This may happen multiple times if there are multiple response headers (like in redirection). 973 response.Headers.Clear(); 974 response.Content.Headers.Clear(); 975 response._headerBytesReceived = (uint)size; 976 977 // Update the request message with the Uri 978 easy.StoreLastEffectiveUri(); 979 } 980 else 981 { 982 string headerName; 983 string headerValue; 984 985 if (reader.ReadHeader(out headerName, out headerValue)) 986 { 987 if (!response.Headers.TryAddWithoutValidation(headerName, headerValue)) 988 { 989 response.Content.Headers.TryAddWithoutValidation(headerName, headerValue); 990 } 991 else if ((int)response.StatusCode >= 300 && (int)response.StatusCode < 400 && 992 easy._handler.AllowAutoRedirect && 993 string.Equals(headerName, HttpKnownHeaderNames.Location, StringComparison.OrdinalIgnoreCase)) 994 { 995 // A "Location" header field can mean different things for different status codes. For 3xx status codes, 996 // it implies a redirect. As such, if we got a 3xx status code and we support automatically redirecting, 997 // reconfigure the easy handle under the assumption that libcurl will redirect. If it does redirect, we'll 998 // be prepared; if it doesn't (e.g. it doesn't treat some particular 3xx as a redirect, if we've reached 999 // our redirect limit, etc.), this will have been unnecessary work in reconfiguring the easy handle, but 1000 // nothing incorrect, as we'll tear down the handle once the request finishes, anyway, and all of the configuration 1001 // we're doing is about initiating a new request. 1002 easy.SetPossibleRedirectForLocationHeader(headerValue); 1003 } 1004 else if (string.Equals(headerName, HttpKnownHeaderNames.SetCookie, StringComparison.OrdinalIgnoreCase)) 1005 { 1006 easy._handler.AddResponseCookies(easy, headerValue); 1007 } 1008 } 1009 } 1010 1011 return size; 1012 } 1013 catch (Exception ex) 1014 { 1015 easy.FailRequest(ex); // cleanup will be handled by main processing loop 1016 } 1017 } 1018 1019 // Returning a value other than size fails the callback and forces 1020 // request completion with an error 1021 CurlHandler.EventSourceTrace("Aborting request", easy: easy); 1022 return size - 1; 1023 } 1024 1025 /// <summary>Callback invoked by libcurl for body data received.</summary> CurlReceiveBodyCallback( IntPtr buffer, ulong size, ulong nitems, IntPtr context)1026 private static ulong CurlReceiveBodyCallback( 1027 IntPtr buffer, ulong size, ulong nitems, IntPtr context) 1028 { 1029 size *= nitems; 1030 1031 EasyRequest easy; 1032 if (TryGetEasyRequestFromGCHandle(context, out easy)) 1033 { 1034 CurlHandler.EventSourceTrace("Size: {0}", size, easy: easy); 1035 try 1036 { 1037 if (!(easy.Task.IsCanceled || easy.Task.IsFaulted)) 1038 { 1039 // Complete the task if it hasn't already been. This will make the 1040 // stream available to consumers. A previous write callback 1041 // may have already completed the task to publish the response. 1042 easy.EnsureResponseMessagePublished(); 1043 1044 // Try to transfer the data to a reader. This will return either the 1045 // amount of data transferred (equal to the amount requested 1046 // to be transferred), or it will return a pause request. 1047 return easy._responseMessage.ResponseStream.TransferDataToResponseStream(buffer, (long)size); 1048 } 1049 } 1050 catch (Exception ex) 1051 { 1052 easy.FailRequest(ex); // cleanup will be handled by main processing loop 1053 } 1054 } 1055 1056 // Returning a value other than size fails the callback and forces 1057 // request completion with an error. 1058 CurlHandler.EventSourceTrace("Aborting request", easy: easy); 1059 return (size > 0) ? size - 1 : 1; 1060 } 1061 1062 /// <summary>Callback invoked by libcurl to read request data.</summary> CurlSendCallback(IntPtr buffer, ulong size, ulong nitems, IntPtr context)1063 private static ulong CurlSendCallback(IntPtr buffer, ulong size, ulong nitems, IntPtr context) 1064 { 1065 int length = checked((int)(size * nitems)); 1066 Debug.Assert(length <= MaxRequestBufferSize, $"length {length} should not be larger than RequestBufferSize {MaxRequestBufferSize}"); 1067 1068 EasyRequest easy; 1069 if (TryGetEasyRequestFromGCHandle(context, out easy)) 1070 { 1071 CurlHandler.EventSourceTrace("Size: {0}", length, easy: easy); 1072 1073 if (length == 0) 1074 { 1075 return 0; 1076 } 1077 1078 Debug.Assert(easy._requestMessage.Content != null, "We should only be in the send callback if we have request content"); 1079 Debug.Assert(easy._associatedMultiAgent != null, "The request should be associated with a multi agent."); 1080 1081 try 1082 { 1083 // Transfer data from the request's content stream to libcurl 1084 return TransferDataFromRequestStream(buffer, length, easy); 1085 } 1086 catch (Exception ex) 1087 { 1088 easy.FailRequest(ex); // cleanup will be handled by main processing loop 1089 } 1090 } 1091 1092 // Something went wrong. 1093 CurlHandler.EventSourceTrace("Aborting request", easy: easy); 1094 return Interop.Http.CURL_READFUNC_ABORT; 1095 } 1096 1097 /// <summary> 1098 /// Transfers up to <paramref name="length"/> data from the <paramref name="easy"/>'s 1099 /// request content (non-memory) stream to the buffer. 1100 /// </summary> 1101 /// <returns>The number of bytes transferred.</returns> TransferDataFromRequestStream(IntPtr buffer, int length, EasyRequest easy)1102 private static ulong TransferDataFromRequestStream(IntPtr buffer, int length, EasyRequest easy) 1103 { 1104 CurlHandler.EventSourceTrace("Length: {0}", length, easy: easy); 1105 1106 MultiAgent multi = easy._associatedMultiAgent; 1107 1108 // First check to see whether there's any data available from a previous asynchronous read request. 1109 // If there is, the transfer state's Task field will be non-null, with its Result representing 1110 // the number of bytes read. The Buffer will then contain all of that read data. If the Count 1111 // is 0, then this is the first time we're checking that Task, and so we populate the Count 1112 // from that read result. After that, we can transfer as much data remains between Offset and 1113 // Count. Multiple callbacks may pull from that one read. 1114 1115 EasyRequest.SendTransferState sts = easy._sendTransferState; 1116 if (sts != null) 1117 { 1118 // Is there a previous read that may still have data to be consumed? 1119 if (sts.Task != null) 1120 { 1121 if (!sts.Task.IsCompleted) 1122 { 1123 // We have a previous read that's not yet completed. This should be quite rare, but it can 1124 // happen when we're unpaused prematurely, potentially due to the request still finishing 1125 // being sent as the server starts to send a response. Since we still have the outstanding 1126 // read, we simply re-pause. When the task completes (which could have happened immediately 1127 // after the check). the continuation we previously created will fire and queue an unpause. 1128 // Since all of this processing is single-threaded on the current thread, that unpause request 1129 // is guaranteed to happen after this re-pause. 1130 multi.EventSourceTrace("Re-pausing reading after a spurious un-pause", easy: easy); 1131 return Interop.Http.CURL_READFUNC_PAUSE; 1132 } 1133 1134 // Determine how many bytes were read on the last asynchronous read. 1135 // If nothing was read, then we're done and can simply return 0 to indicate 1136 // the end of the stream. 1137 int bytesRead = sts.Task.GetAwaiter().GetResult(); // will throw if read failed 1138 Debug.Assert(bytesRead >= 0 && bytesRead <= sts.Buffer.Length, $"ReadAsync returned an invalid result length: {bytesRead}"); 1139 if (bytesRead == 0) 1140 { 1141 sts.SetTaskOffsetCount(null, 0, 0); 1142 return 0; 1143 } 1144 1145 // If Count is still 0, then this is the first time after the task completed 1146 // that we're examining the data: transfer the bytesRead to the Count. 1147 if (sts.Count == 0) 1148 { 1149 multi.EventSourceTrace("ReadAsync completed with bytes: {0}", bytesRead, easy: easy); 1150 sts.Count = bytesRead; 1151 } 1152 1153 // Now Offset and Count are both accurate. Determine how much data we can copy to libcurl... 1154 int availableData = sts.Count - sts.Offset; 1155 Debug.Assert(availableData > 0, "There must be some data still available."); 1156 1157 // ... and copy as much of that as libcurl will allow. 1158 int bytesToCopy = Math.Min(availableData, length); 1159 Marshal.Copy(sts.Buffer, sts.Offset, buffer, bytesToCopy); 1160 multi.EventSourceTrace("Copied {0} bytes from request stream", bytesToCopy, easy: easy); 1161 1162 // Update the offset. If we've gone through all of the data, reset the state 1163 // so that the next time we're called back we'll do a new read. 1164 sts.Offset += bytesToCopy; 1165 Debug.Assert(sts.Offset <= sts.Count, "Offset should never exceed count"); 1166 if (sts.Offset == sts.Count) 1167 { 1168 sts.SetTaskOffsetCount(null, 0, 0); 1169 } 1170 1171 // Return the amount of data copied 1172 Debug.Assert(bytesToCopy > 0, "We should never return 0 bytes here."); 1173 return (ulong)bytesToCopy; 1174 } 1175 1176 // sts was non-null but sts.Task was null, meaning there was no previous task/data 1177 // from which to satisfy any of this request. 1178 } 1179 else // sts == null 1180 { 1181 // Allocate a transfer state object to use for the remainder of this request. 1182 Debug.Assert(easy._requestMessage.Content != null, "Content shouldn't be null, since we already got a content request stream"); 1183 long bufferSize = easy._requestMessage.Content.Headers.ContentLength.GetValueOrDefault(); 1184 if (bufferSize <= 0 || bufferSize > MaxRequestBufferSize) 1185 { 1186 bufferSize = MaxRequestBufferSize; 1187 } 1188 easy._sendTransferState = sts = new EasyRequest.SendTransferState((int)bufferSize); 1189 } 1190 1191 Debug.Assert(sts != null, "By this point we should have a transfer object"); 1192 Debug.Assert(sts.Task == null, "There shouldn't be a task now."); 1193 Debug.Assert(sts.Count == 0, "Count should be zero."); 1194 Debug.Assert(sts.Offset == 0, "Offset should be zero."); 1195 1196 // If we get here, there was no previously read data available to copy. 1197 1198 // Make sure we actually have a stream to read from. This will be null if either 1199 // this is the first time we're reading it, or if the stream was reset as part 1200 // of curl trying to rewind. Then do the read. 1201 ValueTask<int> asyncRead; 1202 if (easy._requestContentStream == null) 1203 { 1204 multi.EventSourceTrace("Calling ReadAsStreamAsync to get new request stream", easy: easy); 1205 Task<Stream> readAsStreamTask = easy._requestMessage.Content.ReadAsStreamAsync(); 1206 asyncRead = readAsStreamTask.IsCompleted ? 1207 StoreRetrievedContentStreamAndReadAsync(readAsStreamTask, easy, sts, length) : 1208 new ValueTask<int>(easy._requestMessage.Content.ReadAsStreamAsync().ContinueWith((t, s) => 1209 { 1210 var stateAndRequest = (Tuple<int, EasyRequest.SendTransferState, EasyRequest>)s; 1211 return StoreRetrievedContentStreamAndReadAsync(t, 1212 stateAndRequest.Item3, stateAndRequest.Item2, stateAndRequest.Item1).AsTask(); 1213 }, Tuple.Create(length, sts, easy), CancellationToken.None, 1214 TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.DenyChildAttach, TaskScheduler.Default).Unwrap()); 1215 } 1216 else 1217 { 1218 multi.EventSourceTrace("Starting async read", easy: easy); 1219 asyncRead = easy._requestContentStream.ReadAsync( 1220 new Memory<byte>(sts.Buffer, 0, Math.Min(sts.Buffer.Length, length)), easy._cancellationToken); 1221 } 1222 Debug.Assert(asyncRead != null, "Badly implemented stream returned a null task from ReadAsync"); 1223 1224 // Even though it's "Async", it's possible this read could complete synchronously or extremely quickly. 1225 // Check to see if it did, in which case we can also satisfy the libcurl request synchronously in this callback. 1226 if (asyncRead.IsCompleted) 1227 { 1228 multi.EventSourceTrace("Async read completed immediately", easy: easy); 1229 1230 // Get the amount of data read. 1231 int bytesRead = asyncRead.GetAwaiter().GetResult(); // will throw if read failed 1232 if (bytesRead == 0) 1233 { 1234 multi.EventSourceTrace("Read 0 bytes", easy: easy); 1235 return 0; 1236 } 1237 1238 // Copy as much as we can. 1239 int bytesToCopy = Math.Min(bytesRead, length); 1240 Debug.Assert(bytesToCopy > 0 && bytesToCopy <= sts.Buffer.Length, $"ReadAsync quickly returned an invalid result length: {bytesToCopy}"); 1241 Marshal.Copy(sts.Buffer, 0, buffer, bytesToCopy); 1242 multi.EventSourceTrace("Read {0} bytes", bytesToCopy, easy: easy); 1243 1244 // If we read more than we were able to copy, stash it away for the next read. 1245 if (bytesToCopy < bytesRead) 1246 { 1247 multi.EventSourceTrace("Storing {0} bytes for later", bytesRead - bytesToCopy, easy: easy); 1248 sts.SetTaskOffsetCount(asyncRead.AsTask(), bytesToCopy, bytesRead); 1249 } 1250 1251 // Return the number of bytes read. 1252 return (ulong)bytesToCopy; 1253 } 1254 1255 // Otherwise, the read completed asynchronously. Store the task, and hook up a continuation 1256 // such that the connection will be unpaused once the task completes. 1257 sts.SetTaskOffsetCount(asyncRead.AsTask(), 0, 0); 1258 sts.Task.ContinueWith((t, s) => 1259 { 1260 EasyRequest easyRef = (EasyRequest)s; 1261 easyRef._associatedMultiAgent.RequestUnpause(easyRef); 1262 }, easy, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default); 1263 1264 // Then pause the connection. 1265 multi.EventSourceTrace("Pausing transfer from request stream", easy: easy); 1266 return Interop.Http.CURL_READFUNC_PAUSE; 1267 } 1268 1269 /// <summary> 1270 /// Given a completed task used to retrieve the content stream asynchronously, extracts the stream, 1271 /// stores it into <see cref="EasyRequest._requestContentStream"/>, and does an initial read on it. 1272 /// </summary> StoreRetrievedContentStreamAndReadAsync( Task<Stream> readAsStreamTask, EasyRequest easy, EasyRequest.SendTransferState sts, int length)1273 private static ValueTask<int> StoreRetrievedContentStreamAndReadAsync( 1274 Task<Stream> readAsStreamTask, EasyRequest easy, EasyRequest.SendTransferState sts, int length) 1275 { 1276 Debug.Assert(readAsStreamTask.IsCompleted, $"Expected {nameof(readAsStreamTask)} to be completed, got {readAsStreamTask.Status}"); 1277 try 1278 { 1279 MultiAgent multi = easy._associatedMultiAgent; 1280 multi.EventSourceTrace("Async operation completed: {0}", readAsStreamTask.Status, easy: easy); 1281 1282 // Get and store the resulting stream 1283 easy._requestContentStream = readAsStreamTask.GetAwaiter().GetResult(); 1284 multi.EventSourceTrace("Got stream: {0}", easy._requestContentStream.GetType(), easy: easy); 1285 1286 // If the stream is seekable, store its original position. We'll use this any time we need to seek 1287 // back to the "beginning", as it's possible the stream isn't at position 0. 1288 if (easy._requestContentStream.CanSeek) 1289 { 1290 long startingPos = easy._requestContentStream.Position; 1291 easy._requestContentStreamStartingPosition = startingPos; 1292 CurlHandler.EventSourceTrace("Stream starting position: {0}", startingPos, easy: easy); 1293 } 1294 1295 // Now that we have a stream, do the desired read 1296 multi.EventSourceTrace("Starting async read", easy: easy); 1297 return easy._requestContentStream.ReadAsync(new Memory<byte>(sts.Buffer, 0, Math.Min(sts.Buffer.Length, length)), easy._cancellationToken); 1298 } 1299 catch (OperationCanceledException oce) 1300 { 1301 return new ValueTask<int>(oce.CancellationToken.IsCancellationRequested ? 1302 Task.FromCanceled<int>(oce.CancellationToken) : 1303 Task.FromCanceled<int>(new CancellationToken(true))); 1304 } 1305 catch (Exception exc) 1306 { 1307 return new ValueTask<int>(Task.FromException<int>(exc)); 1308 } 1309 } 1310 1311 /// <summary>Callback invoked by libcurl to seek to a position within the request stream.</summary> CurlSeekCallback(IntPtr context, long offset, int origin)1312 private static Interop.Http.CurlSeekResult CurlSeekCallback(IntPtr context, long offset, int origin) 1313 { 1314 EasyRequest easy; 1315 if (TryGetEasyRequestFromGCHandle(context, out easy)) 1316 { 1317 CurlHandler.EventSourceTrace("Offset: {0}, Origin: {1}", offset, origin, 0, easy: easy); 1318 try 1319 { 1320 // If we don't have a stream yet, we can't seek. 1321 if (easy._requestContentStream == null) 1322 { 1323 CurlHandler.EventSourceTrace("No request stream exists yet. Can't seek", easy: easy); 1324 return Interop.Http.CurlSeekResult.CURL_SEEKFUNC_CANTSEEK; 1325 } 1326 1327 // If the stream is seekable, which is a very common case, everyone is happy. 1328 // Simply seek on the stream. 1329 if (easy._requestContentStream.CanSeek) 1330 { 1331 CurlHandler.EventSourceTrace("Seeking on the existing stream", easy: easy); 1332 SeekOrigin seek = (SeekOrigin)origin; 1333 if (seek == SeekOrigin.Begin) 1334 { 1335 Debug.Assert(easy._requestContentStreamStartingPosition.HasValue); 1336 easy._requestContentStream.Position = easy._requestContentStreamStartingPosition.GetValueOrDefault(); 1337 } 1338 else 1339 { 1340 easy._requestContentStream.Seek(offset, seek); 1341 } 1342 return Interop.Http.CurlSeekResult.CURL_SEEKFUNC_OK; 1343 } 1344 1345 // The stream isn't seekable. Now we start getting into shakier ground. 1346 // Most of the time the seek callback is used, it's because libcurl is rewinding 1347 // to the beginning of the stream due to a redirect, an auth challenge, etc. (other 1348 // cases where it might try to seek elsewhere would be, e.g., with a Range header). 1349 // In such cases, we can't seek, but we can simply re-read the stream from the content. 1350 // In most cases this will "just work." There are corner cases, however, where it'll 1351 // fail but we won't yet know it failed, e.g. if a StreamContent is used, ReadAsStreamAsync 1352 // will give us back a wrapper stream over the same original underlying stream and without 1353 // having changed its position (it's not seekable). At that point we'll think 1354 // we have a new stream, but when reading starts happening, it'll be at the existing 1355 // position, and we'll only end up sending part of the data (or none in the common case 1356 // where we'd already read ot the end). As a workaround for that, we can at least special case 1357 // the StreamContent type, for which we know this will be an issue. It won't help with other 1358 // corner -case contents like this, but for such contents, we would still end up failing the 1359 // request, just sooner. 1360 if (offset == 0 && origin == (int)SeekOrigin.Begin && 1361 !(easy._requestMessage.Content is StreamContent)) // avoid known problematic case 1362 { 1363 CurlHandler.EventSourceTrace("Removing the existing request stream, to be replaced on subsequent read", easy: easy); 1364 easy._requestContentStream = null; 1365 } 1366 1367 // Can't seek. Let libcurl know: it may still be able to recover. 1368 CurlHandler.EventSourceTrace("Can't seek", easy: easy); 1369 return Interop.Http.CurlSeekResult.CURL_SEEKFUNC_CANTSEEK; 1370 } 1371 catch (Exception ex) 1372 { 1373 easy.FailRequest(ex); // cleanup will be handled by main processing loop 1374 } 1375 } 1376 1377 // Something went wrong 1378 CurlHandler.EventSourceTrace("Seek failed", easy: easy); 1379 return Interop.Http.CurlSeekResult.CURL_SEEKFUNC_FAIL; 1380 } 1381 EventSourceTrace(string formatMessage, TArg0 arg0, EasyRequest easy = null, [CallerMemberName] string memberName = null)1382 private void EventSourceTrace<TArg0>(string formatMessage, TArg0 arg0, EasyRequest easy = null, [CallerMemberName] string memberName = null) 1383 { 1384 CurlHandler.EventSourceTrace(formatMessage, arg0, this, easy, memberName); 1385 } 1386 EventSourceTrace(string message, EasyRequest easy = null, [CallerMemberName] string memberName = null)1387 private void EventSourceTrace(string message, EasyRequest easy = null, [CallerMemberName] string memberName = null) 1388 { 1389 CurlHandler.EventSourceTrace(message, this, easy, memberName); 1390 } 1391 1392 /// <summary>Represents an active request currently being processed by the agent.</summary> 1393 private struct ActiveRequest 1394 { 1395 public StrongToWeakReference<EasyRequest> EasyWrapper; 1396 public Interop.Http.SafeCurlHandle EasyHandle; 1397 public CancellationTokenRegistration CancellationRegistration; 1398 } 1399 1400 /// <summary>Represents an incoming request to be processed by the agent.</summary> 1401 internal struct IncomingRequest 1402 { 1403 public IncomingRequestType Type; 1404 public EasyRequest Easy; 1405 } 1406 1407 /// <summary>The type of an incoming request to be processed by the agent.</summary> 1408 internal enum IncomingRequestType : byte 1409 { 1410 /// <summary>A new request that's never been submitted to an agent.</summary> 1411 New, 1412 /// <summary>A request to cancel a request previously submitted to the agent.</summary> 1413 Cancel, 1414 /// <summary>A request to unpause the connection associated with a request previously submitted to the agent.</summary> 1415 Unpause, 1416 /// <summary>A request to shutdown the agent and all active operations. No easy request is associated with this type.</summary> 1417 Shutdown 1418 } 1419 } 1420 1421 } 1422 } 1423