1 // Licensed to the .NET Foundation under one or more agreements.
2 // The .NET Foundation licenses this file to you under the MIT license.
3 // See the LICENSE file in the project root for more information.
4 
5 using System.Collections.Generic;
6 using System.Diagnostics;
7 using System.IO;
8 using System.Runtime.CompilerServices;
9 using System.Runtime.ExceptionServices;
10 using System.Runtime.InteropServices;
11 using System.Threading;
12 using System.Threading.Tasks;
13 using Microsoft.Win32.SafeHandles;
14 
15 using CURLcode = Interop.Http.CURLcode;
16 using CURLMcode = Interop.Http.CURLMcode;
17 using CURLINFO = Interop.Http.CURLINFO;
18 
19 namespace System.Net.Http
20 {
21     internal partial class CurlHandler : HttpMessageHandler
22     {
23         /// <summary>Provides a multi handle and the associated processing for all requests on the handle.</summary>
24         private sealed class MultiAgent : IDisposable
25         {
26             /// <summary>
27             /// Amount of time in milliseconds to keep a multiagent worker alive when there's no work to be done.
28             /// Increasing this value makes it more likely that a worker will end up getting reused for subsequent
29             /// requests, saving on the costs associated with spinning up a new worker, but at the same time it
30             /// burns a thread for that period of time.
31             /// </summary>
32             private const int KeepAliveMilliseconds = 50;
33 
34             private static readonly Interop.Http.ReadWriteCallback s_receiveHeadersCallback = CurlReceiveHeadersCallback;
35             private static readonly Interop.Http.ReadWriteCallback s_sendCallback = CurlSendCallback;
36             private static readonly Interop.Http.SeekCallback s_seekCallback = CurlSeekCallback;
37             private static readonly Interop.Http.ReadWriteCallback s_receiveBodyCallback = CurlReceiveBodyCallback;
38             private static readonly Interop.Http.DebugCallback s_debugCallback = CurlDebugFunction;
39 
40             /// <summary>CurlHandler that created this MultiAgent.  If null, this is a shared handler.</summary>
41             private readonly CurlHandler _creatingHandler;
42 
43             /// <summary>
44             /// A collection of not-yet-processed incoming requests for work to be done
45             /// by this multi agent.  This can include making new requests, canceling
46             /// active requests, or unpausing active requests.
47             /// Protected by a lock on <see cref="_incomingRequests"/>.
48             /// </summary>
49             private readonly Queue<IncomingRequest> _incomingRequests = new Queue<IncomingRequest>();
50 
51             /// <summary>Map of activeOperations, indexed by a GCHandle to a StrongToWeakReference{EasyRequest}.</summary>
52             private readonly Dictionary<IntPtr, ActiveRequest> _activeOperations = new Dictionary<IntPtr, ActiveRequest>();
53 
54             /// <summary>
55             /// Special file descriptor used to wake-up curl_multi_wait calls.  This is the read
56             /// end of a pipe, with the write end written to when work is queued or when cancellation
57             /// is requested. This is only valid while the worker is executing.
58             /// </summary>
59             private SafeFileHandle _wakeupRequestedPipeFd;
60 
61             /// <summary>
62             /// Write end of the pipe connected to <see cref="_wakeupRequestedPipeFd"/>.
63             /// This is only valid while the worker is executing.
64             /// </summary>
65             private SafeFileHandle _requestWakeupPipeFd;
66 
67             /// <summary>
68             /// Task for the currently running worker, or null if there is no current worker.
69             /// Protected by a lock on <see cref="_incomingRequests"/>.
70             /// </summary>
71             private Task _runningWorker;
72 
73             /// <summary>
74             /// Multi handle used to service all requests on this agent.  It's lazily
75             /// created when it's first needed, so that it can utilize all of the settings
76             /// from the associated handler, and it's kept open for the duration of this
77             /// agent so that all of the resources it pools (connection pool, DNS cache, etc.)
78             /// can be used for all requests on this agent.
79             /// </summary>
80             private Interop.Http.SafeCurlMultiHandle _multiHandle;
81 
82             /// <summary>Set when Dispose has been called.</summary>
83             private bool _disposed;
84 
85             /// <summary>Initializes the MultiAgent.</summary>
86             /// <param name="handler">The handler that created this agent, or null if it's shared.</param>
MultiAgent(CurlHandler handler)87             public MultiAgent(CurlHandler handler)
88             {
89                 _creatingHandler = handler;
90             }
91 
92             /// <summary>Disposes of the agent.</summary>
Dispose()93             public void Dispose()
94             {
95                 EventSourceTrace(null);
96                 _disposed = true;
97                 QueueIfRunning(new IncomingRequest { Type = IncomingRequestType.Shutdown });
98                 _multiHandle?.Dispose();
99             }
100 
101             /// <summary>Queues a request for the multi handle to process.</summary>
Queue(IncomingRequest request)102             public void Queue(IncomingRequest request)
103             {
104                 lock (_incomingRequests)
105                 {
106                     // Add the request, then initiate processing.
107                     _incomingRequests.Enqueue(request);
108                     EnsureWorkerIsRunning();
109                 }
110             }
111 
112             /// <summary>Queues a request for the multi handle to process, but only if there's already an active worker running.</summary>
QueueIfRunning(IncomingRequest request)113             public void QueueIfRunning(IncomingRequest request)
114             {
115                 lock (_incomingRequests)
116                 {
117                     if (_runningWorker != null)
118                     {
119                         _incomingRequests.Enqueue(request);
120                         if (_incomingRequests.Count == 1)
121                         {
122                             RequestWakeup();
123                         }
124                     }
125                 }
126             }
127 
128             /// <summary>Gets the ID of the currently running worker, or null if there isn't one.</summary>
129             internal int? RunningWorkerId => _runningWorker?.Id;
130 
131             /// <summary>Schedules the processing worker if one hasn't already been scheduled.</summary>
EnsureWorkerIsRunning()132             private void EnsureWorkerIsRunning()
133             {
134                 Debug.Assert(Monitor.IsEntered(_incomingRequests), "Needs to be called under _incomingRequests lock");
135 
136                 if (_runningWorker == null)
137                 {
138                     EventSourceTrace("MultiAgent worker queueing");
139 
140                     // Ensure we've created the multi handle for this agent.
141                     if (_multiHandle == null)
142                     {
143                         _multiHandle = CreateAndConfigureMultiHandle();
144                     }
145 
146                     // Create pipe used to forcefully wake up curl_multi_wait calls when something important changes.
147                     // This is created here so that the pipe is available immediately for subsequent queue calls to use.
148                     Debug.Assert(_wakeupRequestedPipeFd == null, "Read pipe should have been cleared");
149                     Debug.Assert(_requestWakeupPipeFd == null, "Write pipe should have been cleared");
150                     unsafe
151                     {
152                         int* fds = stackalloc int[2];
153                         Interop.CheckIo(Interop.Sys.Pipe(fds));
154                         _wakeupRequestedPipeFd = new SafeFileHandle((IntPtr)fds[Interop.Sys.ReadEndOfPipe], true);
155                         _requestWakeupPipeFd = new SafeFileHandle((IntPtr)fds[Interop.Sys.WriteEndOfPipe], true);
156                     }
157 
158                     // Create the processing task.  It's "DenyChildAttach" to avoid any surprises if
159                     // code happens to create attached tasks, and it's LongRunning because this thread
160                     // is likely going to sit around for a while in a wait loop (and the more requests
161                     // are concurrently issued to the same agent, the longer the thread will be around).
162                     _runningWorker = new Task(s => ((MultiAgent)s).WorkerBody(), this,
163                         CancellationToken.None, TaskCreationOptions.DenyChildAttach | TaskCreationOptions.LongRunning);
164 
165                     // We want to avoid situations where a Dispose occurs while we're in the middle
166                     // of processing requests and causes us to tear out the multi handle while it's
167                     // in active use.  To avoid that, we add-ref it here, and release it at the end
168                     // of the worker loop.
169                     bool ignored = false;
170                     _multiHandle.DangerousAddRef(ref ignored);
171 
172                     // Kick off the processing task.  This is done after both setting _runningWorker
173                     // to non-null and add-refing the handle, both to avoid race conditions.  The worker
174                     // body needs to see _runningWorker as non-null and assumes that it's free to use
175                     // the multi handle, without fear of it having been disposed.
176                     _runningWorker.Start(TaskScheduler.Default);
177                 }
178                 else // _runningWorker != null
179                 {
180                     // The worker is already running.  If there are already queued requests, we're done.
181                     // However, if there aren't any queued requests, Process could be blocked inside of
182                     // curl_multi_wait, and we want to make sure it wakes up to see that there additional
183                     // requests waiting to be handled.  So we write to the wakeup pipe.
184                     Debug.Assert(_incomingRequests.Count >= 1, "We just queued a request, so the count should be at least 1");
185                     if (_incomingRequests.Count == 1)
186                     {
187                         RequestWakeup();
188                     }
189                 }
190             }
191 
192             /// <summary>Write a byte to the wakeup pipe.</summary>
RequestWakeup()193             private unsafe void RequestWakeup()
194             {
195                 EventSourceTrace(null);
196                 byte b = 1;
197                 Interop.CheckIo(Interop.Sys.Write(_requestWakeupPipeFd, &b, 1));
198             }
199 
200             /// <summary>Clears data from the wakeup pipe.</summary>
201             /// <remarks>
202             /// This must only be called when we know there's data to be read.
203             /// The MultiAgent could easily deadlock if it's called when there's no data in the pipe.
204             /// </remarks>
ReadFromWakeupPipeWhenKnownToContainData()205             private unsafe void ReadFromWakeupPipeWhenKnownToContainData()
206             {
207                 // It's possible but unlikely that there will be tons of extra data in the pipe,
208                 // more than we end up reading out here (it's unlikely because we only write a byte to the pipe when
209                 // transitioning from 0 to 1 incoming request).  In that unlikely event, the worst
210                 // case will be that the next one or more waits will wake up immediately, with each one
211                 // subsequently clearing out more of the pipe.
212                 const int ClearBufferSize = 64; // sufficiently large to clear the pipe in any normal case
213                 byte* clearBuf = stackalloc byte[ClearBufferSize];
214                 Interop.CheckIo(Interop.Sys.Read(_wakeupRequestedPipeFd, clearBuf, ClearBufferSize));
215             }
216 
217             /// <summary>Requests that libcurl unpause the connection associated with this request.</summary>
RequestUnpause(EasyRequest easy)218             internal void RequestUnpause(EasyRequest easy)
219             {
220                 EventSourceTrace(null, easy: easy);
221                 QueueIfRunning(new IncomingRequest { Easy = easy, Type = IncomingRequestType.Unpause });
222             }
223 
224             /// <summary>Requests that the request associated with the easy operation be canceled.</summary>
RequestCancel(EasyRequest easy)225             internal void RequestCancel(EasyRequest easy)
226             {
227                 EventSourceTrace(null, easy: easy);
228                 QueueIfRunning(new IncomingRequest { Easy = easy, Type = IncomingRequestType.Cancel });
229             }
230 
231             /// <summary>Creates and configures a new multi handle.</summary>
CreateAndConfigureMultiHandle()232             private Interop.Http.SafeCurlMultiHandle CreateAndConfigureMultiHandle()
233             {
234                 // Create the new handle
235                 Interop.Http.SafeCurlMultiHandle multiHandle = Interop.Http.MultiCreate();
236                 if (multiHandle.IsInvalid)
237                 {
238                     throw CreateHttpRequestException(new CurlException((int)CURLcode.CURLE_FAILED_INIT, isMulti: false));
239                 }
240 
241                 // In support of HTTP/2, enable HTTP/2 connections to be multiplexed if possible.
242                 // We must only do this if the version of libcurl being used supports HTTP/2 multiplexing.
243                 // Due to a change in a libcurl signature, if we try to make this call on an older libcurl,
244                 // we'll end up accidentally and unconditionally enabling HTTP 1.1 pipelining.
245                 if (s_supportsHttp2Multiplexing)
246                 {
247                     ThrowIfCURLMError(Interop.Http.MultiSetOptionLong(multiHandle,
248                         Interop.Http.CURLMoption.CURLMOPT_PIPELINING,
249                         (long)Interop.Http.CurlPipe.CURLPIPE_MULTIPLEX));
250                     EventSourceTrace("Set multiplexing on multi handle");
251                 }
252 
253                 // Configure max connections per host if it was changed from the default.  In shared mode,
254                 // this will be pulled from the handler that first created the agent; the setting from subsequent
255                 // handlers that use this will be ignored.
256                 if (_creatingHandler != null)
257                 {
258                     int maxConnections = _creatingHandler.MaxConnectionsPerServer;
259                     if (maxConnections < int.MaxValue) // int.MaxValue considered infinite, mapping to libcurl default of 0
260                     {
261                         CURLMcode code = Interop.Http.MultiSetOptionLong(multiHandle, Interop.Http.CURLMoption.CURLMOPT_MAX_HOST_CONNECTIONS, maxConnections);
262                         switch (code)
263                         {
264                             case CURLMcode.CURLM_OK:
265                                 EventSourceTrace("Set max host connections to {0}", maxConnections);
266                                 break;
267                             default:
268                                 // Treat failures as non-fatal in release; worst case is we employ more connections than desired.
269                                 EventSourceTrace("Setting CURLMOPT_MAX_HOST_CONNECTIONS failed: {0}. Ignoring option.", code);
270                                 break;
271                         }
272                     }
273                 }
274 
275                 return multiHandle;
276             }
277 
278             /// <summary>Thread work item entrypoint for a multiagent worker.</summary>
WorkerBody()279             private void WorkerBody()
280             {
281                 Debug.Assert(!Monitor.IsEntered(_incomingRequests), $"No locks should be held while invoking {nameof(WorkerBody)}");
282                 Debug.Assert(_runningWorker != null && _runningWorker.Id == Task.CurrentId, "This is the worker, so it must be running");
283 
284                 EventSourceTrace("MultiAgent worker running");
285                 try
286                 {
287                     try
288                     {
289                         // Do the actual processing
290                         WorkerBodyLoop();
291                     }
292                     finally
293                     {
294                         EventSourceTrace("MultiAgent worker shutting down");
295 
296                         // The multi handle's reference count was increased prior to launching
297                         // this processing task.  Release that reference; any Dispose operations
298                         // that occurred during the worker's processing will now be allowed to
299                         // proceed to clean up the multi handle.
300                         _multiHandle.DangerousRelease();
301 
302                         lock (_incomingRequests)
303                         {
304                             // Close our wakeup pipe (ignore close errors).
305                             // This is done while holding the lock to prevent
306                             // subsequent Queue calls to see an improperly configured
307                             // set of descriptors.
308                             _wakeupRequestedPipeFd.Dispose();
309                             _wakeupRequestedPipeFd = null;
310                             _requestWakeupPipeFd.Dispose();
311                             _requestWakeupPipeFd = null;
312 
313                             // In the time between we stopped processing and taking the lock,
314                             // more requests could have been added.  If they were,
315                             // kick off another processing loop.
316                             _runningWorker = null;
317                             if (_incomingRequests.Count > 0 && !_disposed)
318                             {
319                                 EnsureWorkerIsRunning();
320                             }
321                         }
322                     }
323                 }
324                 catch (Exception exc)
325                 {
326                     // Something went very wrong.  In general this should not happen.  The only time it might reasonably
327                     // happen is if CurlHandler is disposed of while it's actively processing, in which case we could
328                     // get an ObjectDisposedException.
329                     EventSourceTrace("Unexpected worker failure: {0}", exc);
330                     Debug.Assert(exc is ObjectDisposedException, $"Unexpected exception from processing loop: {exc}");
331 
332                     // At this point if there any queued requests but there's no worker,
333                     // those queued requests are potentially going to sit there waiting forever,
334                     // resulting in a hang. Instead, fail those requests.
335                     lock (_incomingRequests)
336                     {
337                         if (_runningWorker == null)
338                         {
339                             while (_incomingRequests.Count > 0)
340                             {
341                                 _incomingRequests.Dequeue().Easy.CleanupAndFailRequest(exc);
342                             }
343                         }
344                     }
345                 }
346             }
347 
348             /// <summary>Main processing loop employed by the multiagent worker body.</summary>
WorkerBodyLoop()349             private void WorkerBodyLoop()
350             {
351                 Debug.Assert(_wakeupRequestedPipeFd != null, "Should have a non-null pipe for wake ups");
352                 Debug.Assert(!_wakeupRequestedPipeFd.IsInvalid, "Should have a valid pipe for wake ups");
353                 Debug.Assert(!_wakeupRequestedPipeFd.IsClosed, "Should have an open pipe for wake ups");
354 
355                 Debug.Assert(_multiHandle != null, "Should have a non-null multi handle");
356                 Debug.Assert(!_multiHandle.IsInvalid, "Should have a valid multi handle");
357                 Debug.Assert(!_multiHandle.IsClosed, "Should have an open multi handle");
358 
359                 // Clear our active operations table.  This should already be clear, either because
360                 // all previous operations completed without unexpected exception, or in the case of an
361                 // unexpected exception we should have cleaned up gracefully anyway.  But just in case...
362                 Debug.Assert(_activeOperations.Count == 0, "We shouldn't have any active operations when starting processing.");
363                 _activeOperations.Clear();
364 
365                 Exception eventLoopError = null;
366                 try
367                 {
368                     // Continue processing as long as there are any active operations
369                     while (true)
370                     {
371                         // First handle any requests in the incoming requests queue.
372                         // (This method is factored out mostly to keep this loop concise, but also partly
373                         // to avoid keeping any references to EasyRequests rooted by the stack and thus
374                         // preventing them from being GC'd and the response stream finalized.  That's mainly
375                         // a concern for debug builds, where the JIT may extend a local's lifetime.  The same
376                         // logic applies to some of the other helpers used later in this loop.)
377                         HandleIncomingRequests();
378 
379                         // If we have no active operations, there's no work to do right now.
380                         if (_activeOperations.Count == 0)
381                         {
382                             // Setting up a MultiAgent processing loop involves creating a new MultiAgent, creating a task
383                             // and a thread to process it, creating a new pipe, etc., which has non-trivial cost associated
384                             // with it.  Thus, to avoid repeatedly spinning up and down these workers, we can keep this worker
385                             // alive for a little while longer in case another request comes in within some reasonably small
386                             // amount of time.  This helps with the case where a sequence of requests is made serially rather
387                             // than in parallel, avoiding the likely case of spinning up a new multiagent for each.
388                             Interop.Sys.PollEvents triggered;
389                             Interop.Error pollRv = Interop.Sys.Poll(_wakeupRequestedPipeFd, Interop.Sys.PollEvents.POLLIN, KeepAliveMilliseconds, out triggered);
390                             if (pollRv == Interop.Error.SUCCESS && (triggered & Interop.Sys.PollEvents.POLLIN) != 0)
391                             {
392                                 // Another request came in while we were waiting. Clear the pipe and loop around to continue processing.
393                                 ReadFromWakeupPipeWhenKnownToContainData();
394                                 continue;
395                             }
396 
397                             // We're done.  Exit the multiagent.
398                             return;
399                         }
400 
401                         // We have one or more active operations. Run any work that needs to be run.
402                         PerformCurlWork();
403 
404                         // Complete and remove any requests that have finished being processed.
405                         Interop.Http.CURLMSG message;
406                         IntPtr easyHandle;
407                         CURLcode result;
408                         while (Interop.Http.MultiInfoRead(_multiHandle, out message, out easyHandle, out result))
409                         {
410                             HandleCurlMessage(message, easyHandle, result);
411                         }
412 
413                         // If there are any active operations, wait for more things to do.
414                         if (_activeOperations.Count > 0)
415                         {
416                             WaitForWork();
417                         }
418                     }
419                 }
420                 catch (Exception exc)
421                 {
422                     eventLoopError = exc;
423                     throw;
424                 }
425                 finally
426                 {
427                     // There may still be active operations, if  an unexpected exception occurred.
428                     // Make sure to clean up any remaining operations, failing them and releasing their resources.
429                     if (_activeOperations.Count > 0)
430                     {
431                         CleanUpRemainingActiveOperations(eventLoopError);
432                     }
433                 }
434             }
435 
436             /// <summary>
437             /// Drains the incoming requests queue, dequeuing each request and handling it according to its type.
438             /// </summary>
HandleIncomingRequests()439             private void HandleIncomingRequests()
440             {
441                 Debug.Assert(!Monitor.IsEntered(_incomingRequests), "Incoming requests lock should only be held while accessing the queue");
442                 EventSourceTrace(null);
443 
444                 while (true)
445                 {
446                     // Get the next request
447                     IncomingRequest request;
448                     lock (_incomingRequests)
449                     {
450                         if (_incomingRequests.Count == 0)
451                         {
452                             return;
453                         }
454 
455                         request = _incomingRequests.Dequeue();
456                     }
457 
458                     // Process the request
459                     EasyRequest easy = request.Easy;
460                     EventSourceTrace("Type: {0}", request.Type, easy: easy);
461                     switch (request.Type)
462                     {
463                         case IncomingRequestType.New:
464                             ActivateNewRequest(easy);
465                             break;
466 
467                         case IncomingRequestType.Cancel:
468                             Debug.Assert(easy._associatedMultiAgent == this, "Should only cancel associated easy requests");
469                             FindFailAndCleanupActiveRequest(easy, new OperationCanceledException(easy._cancellationToken));
470                             break;
471 
472                         case IncomingRequestType.Unpause:
473                             Debug.Assert(easy._associatedMultiAgent == this, "Should only unpause associated easy requests");
474                             if (!easy._easyHandle.IsClosed)
475                             {
476                                 IntPtr gcHandlePtr;
477                                 ActiveRequest ar;
478                                 Debug.Assert(FindActiveRequest(easy, out gcHandlePtr, out ar), "Couldn't find active request for unpause");
479 
480                                 try
481                                 {
482                                     ThrowIfCURLEError(Interop.Http.EasyUnpause(easy._easyHandle));
483                                 }
484                                 catch (Exception exc)
485                                 {
486                                     FindFailAndCleanupActiveRequest(easy, exc);
487                                 }
488                             }
489                             break;
490 
491                         case IncomingRequestType.Shutdown:
492                             // When we get a shutdown request, we want to stop all operations that haven't had
493                             // their response message published.  Other operations may continue.
494                             Debug.Assert(easy == null, "Expected null easy for a Shutdown request");
495                             CleanUpRemainingActiveOperations(
496                                 new OperationCanceledException(SR.net_http_unix_handler_disposed),
497                                 onlyIfResponseMessageNotPublished: true);
498                             break;
499 
500                         default:
501                             Debug.Fail("Invalid request type: " + request.Type);
502                             break;
503                     }
504                 }
505             }
506 
507             /// <summary>Tell libcurl to perform any available processing on the easy handles associated with this agent's multi handle.</summary>
PerformCurlWork()508             private void PerformCurlWork()
509             {
510                 CURLMcode performResult;
511                 EventSourceTrace("Ask libcurl to perform any available work...");
512                 while ((performResult = Interop.Http.MultiPerform(_multiHandle)) == CURLMcode.CURLM_CALL_MULTI_PERFORM) ;
513                 EventSourceTrace("...done performing work: {0}", performResult);
514                 ThrowIfCURLMError(performResult);
515             }
516 
517             /// <summary>
518             /// Tell libcurl to block waiting for work to be ready to handle.  It'll return when there's work to be
519             /// performed, when a timeout has occurred, or when new requests have entered our incoming requests queue.
520             /// </summary>
WaitForWork()521             private void WaitForWork()
522             {
523                 // Ask libcurl to wait for more things to do.  We pass in our wakeup-requested pipe handle so that libcurl
524                 // will wait on that file descriptor as well and wake up if an incoming request arrived into our queue.
525                 bool isWakeupRequestedPipeActive;
526                 bool isTimeout;
527                 ThrowIfCURLMError(Interop.Http.MultiWait(_multiHandle, _wakeupRequestedPipeFd, out isWakeupRequestedPipeActive, out isTimeout));
528 
529                 if (isWakeupRequestedPipeActive)
530                 {
531                     // We woke up (at least in part) because a wake-up was requested.
532                     // Read the data out of the pipe to clear it.
533                     Debug.Assert(!isTimeout, $"Should not have timed out when {nameof(isWakeupRequestedPipeActive)} is true");
534                     EventSourceTrace("Wait wake-up");
535                     ReadFromWakeupPipeWhenKnownToContainData();
536                 }
537 
538                 if (isTimeout)
539                 {
540                     EventSourceTrace("Wait timeout");
541                 }
542 
543                 // PERF NOTE: curl_multi_wait uses poll (assuming it's available), which is O(N) in terms of the number of fds
544                 // being waited on. If this ends up being a scalability bottleneck, we can look into using the curl_multi_socket_*
545                 // APIs, which would let us switch to using epoll by being notified when sockets file descriptors are added or
546                 // removed and configuring the epoll context with EPOLL_CTL_ADD/DEL, which at the expense of a lot of additional
547                 // complexity would let us turn the O(N) operation into an O(1) operation.  The additional complexity would come
548                 // not only in the form of additional callbacks and managing the socket collection, but also in the form of timer
549                 // management, which is necessary when using the curl_multi_socket_* APIs and which we avoid by using just
550                 // curl_multi_wait/perform.
551             }
552 
553             /// <summary>Handle a libcurl message received as part of processing work.  This should signal a completed operation.</summary>
HandleCurlMessage(Interop.Http.CURLMSG message, IntPtr easyHandle, CURLcode result)554             private void HandleCurlMessage(Interop.Http.CURLMSG message, IntPtr easyHandle, CURLcode result)
555             {
556                 if (message != Interop.Http.CURLMSG.CURLMSG_DONE)
557                 {
558                     Debug.Fail($"CURLMSG_DONE is supposed to be the only message type, but got {message}");
559                     EventSourceTrace("Unexpected CURLMSG: {0}", message);
560                     return;
561                 }
562 
563                 // Get the GCHandle pointer from the easy handle's state
564                 IntPtr gcHandlePtr;
565                 CURLcode getInfoResult = Interop.Http.EasyGetInfoPointer(easyHandle, CURLINFO.CURLINFO_PRIVATE, out gcHandlePtr);
566                 Debug.Assert(getInfoResult == CURLcode.CURLE_OK, $"Failed to get info on a completing easy handle: {getInfoResult}");
567                 if (getInfoResult == CURLcode.CURLE_OK)
568                 {
569                     // Use the GCHandle to look up the associated ActiveRequest
570                     ActiveRequest completedOperation;
571                     bool gotActiveOp = _activeOperations.TryGetValue(gcHandlePtr, out completedOperation);
572                     Debug.Assert(gotActiveOp, "Expected to find GCHandle ptr in active operations table");
573 
574                     // Deactivate the easy handle and finish all processing related to the request
575                     DeactivateActiveRequest(completedOperation, gcHandlePtr);
576                     FinishRequest(completedOperation.EasyWrapper, result);
577                 }
578             }
579 
580             /// <summary>When shutting down the multi agent worker, ensure any active operations are forcibly completed.</summary>
581             /// <param name="error">The error to use to complete any remaining operations.</param>
582             /// <param name="onlyIfResponseMessageNotPublished">
583             /// true if the only active operations that should be canceled and cleaned up are those which have not
584             /// yet had their response message published. false if all active operations should be canceled regardless
585             /// of where they are in processing.
586             /// </param>
CleanUpRemainingActiveOperations(Exception error, bool onlyIfResponseMessageNotPublished = false)587             private void CleanUpRemainingActiveOperations(Exception error, bool onlyIfResponseMessageNotPublished = false)
588             {
589                 EventSourceTrace("Shutting down {0} active operations.", _activeOperations.Count);
590                 try
591                 {
592                     // Copy the operations to a tmp array so that we don't try to modify the dictionary while enumerating it
593                     var activeOps = new KeyValuePair<IntPtr, ActiveRequest>[_activeOperations.Count];
594                     ((IDictionary<IntPtr, ActiveRequest>)_activeOperations).CopyTo(activeOps, 0);
595 
596                     // Fail all active ops.
597                     Exception lastError = null;
598                     foreach (KeyValuePair<IntPtr, ActiveRequest> pair in activeOps)
599                     {
600                         try
601                         {
602                             IntPtr failingOperationGcHandle = pair.Key;
603                             ActiveRequest failingActiveRequest = pair.Value;
604                             EasyRequest easy = failingActiveRequest.EasyWrapper.Target; // may be null if the EasyRequest was already collected
605                             if (!onlyIfResponseMessageNotPublished || (easy != null && !easy.Task.IsCompleted))
606                             {
607                                 // Deactivate the request, removing it from the multi handle and allowing it to be cleaned up
608                                 DeactivateActiveRequest(failingActiveRequest, failingOperationGcHandle);
609 
610                                 // Complete the operation's task and clean up any of its resources, if it still exists.
611                                 easy?.CleanupAndFailRequest(CreateHttpRequestException(error));
612                             }
613                         }
614                         catch (Exception e)
615                         {
616                             // We don't want a spurious failure while cleaning up one request to prevent us from trying
617                             // to clean up the rest of them.
618                             lastError = e;
619                         }
620                     }
621 
622                     // Now propagate any failure that may have occurred while cleaning up
623                     if (lastError != null)
624                     {
625                         ExceptionDispatchInfo.Throw(lastError);
626                     }
627                 }
628                 finally
629                 {
630                     if (!onlyIfResponseMessageNotPublished)
631                     {
632                         // Ensure the table is now cleared.
633                         _activeOperations.Clear();
634                     }
635                 }
636             }
637 
638             /// <summary>
639             /// Activates the request represented by the EasyRequest.  This includes creating the libcurl easy handle,
640             /// configuring it, and associating it with the multi handle so that it may be processed.
641             /// </summary>
ActivateNewRequest(EasyRequest easy)642             private void ActivateNewRequest(EasyRequest easy)
643             {
644                 Debug.Assert(easy != null, "We should never get a null request");
645                 Debug.Assert(easy._associatedMultiAgent == this, "Request should be associated with this agent");
646 
647                 // If cancellation has been requested, complete the request proactively
648                 if (easy._cancellationToken.IsCancellationRequested)
649                 {
650                     easy.CleanupAndFailRequest(new OperationCanceledException(easy._cancellationToken));
651                     return;
652                 }
653 
654                 // We need to create a GCHandle that we can pass to libcurl to let it keep associated managed
655                 // state alive and help us to determine which state corresponds to the particular request.  However,
656                 // having a GCHandle that keeps an EasyRequest alive will prevent finalization of anything to do with
657                 // that EasyRequest, which means we could end up in a situation where code creates and then drops a
658                 // request, but then libcurl ends up keeping the state alive (until the reuest/response eventually times
659                 // out, assuming the timeout wasn't set to infinite).  To address this, we create a GCHandle to a wrapper
660                 // object.  At first, that wrapper object wraps a strong reference to the EasyRequest, since until a
661                 // response comes back, the caller doesn't actually have a reference to anything related to the request.
662                 // Then once a response comes back and the caller is responsible for keeping the request/response alive,
663                 // we replace the wrapped state with a weak reference to the EasyRequest.  That way, if the user then
664                 // drops the response, we can allow it to be finalized and not keep it alive indefinitely by the
665                 // native reference.  Finalization of the response stream will cause all of the relevant state to be
666                 // closed, including closing out the native easy session and then free'ing this GCHandle.
667                 easy._selfStrongToWeakReference = new StrongToWeakReference<EasyRequest>(easy); // store wrapper onto the easy so that it can transition it to weak and then lose the ref
668                 GCHandle gcHandle = GCHandle.Alloc(easy._selfStrongToWeakReference);
669                 IntPtr gcHandlePtr = GCHandle.ToIntPtr(gcHandle);
670 
671                 // Configure the easy request and add it to the multi handle.
672                 bool addedRef = false;
673                 try
674                 {
675                     easy.InitializeCurl();
676 
677                     easy.SetCurlOption(Interop.Http.CURLoption.CURLOPT_PRIVATE, gcHandlePtr);
678                     easy.SetCurlCallbacks(gcHandlePtr, s_receiveHeadersCallback, s_sendCallback, s_seekCallback, s_receiveBodyCallback, s_debugCallback);
679 
680                     // Make sure that as long as the easy handle is referenced by the multi handle that
681                     // it doesn't get finalized.  Doing so can lead to serious problems like seg faults,
682                     // for example if the multi handle is trying to access the easy handle on one thread
683                     // while it's being finalized on another.
684                     easy._easyHandle.DangerousAddRef(ref addedRef);
685 
686                     // Finally, register the easy handle with the multi handle
687                     ThrowIfCURLMError(Interop.Http.MultiAddHandle(_multiHandle, easy._easyHandle));
688                 }
689                 catch (Exception exc)
690                 {
691                     if (addedRef)
692                     {
693                         easy._easyHandle.DangerousRelease();
694                     }
695                     gcHandle.Free();
696                     easy.CleanupAndFailRequest(exc);
697                     return;
698                 }
699 
700                 // And if cancellation can be requested, hook up a cancellation callback.
701                 // This callback will put the easy request back into the queue, which will
702                 // ensure that a wake-up request has been issued.
703                 var cancellationReg = default(CancellationTokenRegistration);
704                 if (easy._cancellationToken.CanBeCanceled)
705                 {
706                     // To avoid keeping the EasyRequest rooted in the associated CancellationTokenSource,
707                     // the cancellation registration is given the wrapper rather than the object directly.
708                     cancellationReg = easy._cancellationToken.Register(s =>
709                     {
710                         var wrapper = (StrongToWeakReference<EasyRequest>)s;
711                         EasyRequest e = wrapper.Target; // may be null if already collected
712                         e?._associatedMultiAgent.RequestCancel(e);
713                     }, easy._selfStrongToWeakReference);
714                 }
715 
716                 // Finally, add it to our map.
717                 _activeOperations.Add(gcHandlePtr, new ActiveRequest
718                 {
719                     EasyWrapper = easy._selfStrongToWeakReference,
720                     EasyHandle = easy._easyHandle,
721                     CancellationRegistration = cancellationReg,
722                 });
723             }
724 
725             /// <summary>Extract the EasyRequest from the GCHandle pointer to it.</summary>
TryGetEasyRequestFromGCHandle(IntPtr gcHandlePtr, out EasyRequest easy)726             internal static bool TryGetEasyRequestFromGCHandle(IntPtr gcHandlePtr, out EasyRequest easy)
727             {
728                 // Get the EasyRequest from the context
729                 try
730                 {
731                     GCHandle handle = GCHandle.FromIntPtr(gcHandlePtr);
732                     easy = (handle.Target as StrongToWeakReference<EasyRequest>)?.Target;
733                     return easy != null;
734                 }
735                 catch (Exception e) when (e is InvalidCastException || e is InvalidOperationException)
736                 {
737                     Debug.Fail($"Error accessing GCHandle: {e}");
738                 }
739 
740                 easy = null;
741                 return false;
742             }
743 
744             /// <summary>
745             /// Corresponding to ActivateNewRequest, removes the active request from the multi handle, frees the GCHandle,
746             /// removes the request from our tracking table, and ensures cancellation has been unregistered.
747             /// </summary>
DeactivateActiveRequest(ActiveRequest activeRequest, IntPtr gcHandlePtr)748             private void DeactivateActiveRequest(ActiveRequest activeRequest, IntPtr gcHandlePtr)
749             {
750                 try
751                 {
752                     // Remove the operation from the multi handle so we can shut down the multi handle cleanly
753                     CURLMcode removeResult = Interop.Http.MultiRemoveHandle(_multiHandle, activeRequest.EasyHandle);
754                     Debug.Assert(removeResult == CURLMcode.CURLM_OK, "Failed to remove easy handle"); // ignore cleanup errors in release
755 
756                     // Release the associated GCHandle so that it's not kept alive forever
757                     if (gcHandlePtr != IntPtr.Zero)
758                     {
759                         try
760                         {
761                             GCHandle.FromIntPtr(gcHandlePtr).Free();
762                             bool removed = _activeOperations.Remove(gcHandlePtr);
763                             Debug.Assert(removed, "Expected GCHandle to still be referenced by active operations table");
764                         }
765                         catch (InvalidOperationException)
766                         {
767                             Debug.Fail("Couldn't get/free the GCHandle for an active operation while shutting down due to failure");
768                         }
769                     }
770 
771                     // Undo cancellation registration
772                     activeRequest.CancellationRegistration.Dispose();
773                 }
774                 finally
775                 {
776                     // We previously AddRef'd the easy handle to ensure that it wasn't finalized
777                     // while it was still registered with the multi handle.  Now that it's been removed,
778                     // we need to remove the reference.
779                     activeRequest.EasyHandle.DangerousRelease();
780                 }
781             }
782 
783             /// <summary>
784             /// Looks up an ActiveRequest in the active operations table by EasyRequest.  This is a linear operation
785             /// and should not be used on hot paths.
786             /// </summary>
FindActiveRequest(EasyRequest easy, out IntPtr gcHandlePtr, out ActiveRequest activeRequest)787             private bool FindActiveRequest(EasyRequest easy, out IntPtr gcHandlePtr, out ActiveRequest activeRequest)
788             {
789                 // We maintain an IntPtr=>ActiveRequest mapping, which makes it cheap to look-up by GCHandle ptr but
790                 // expensive to look up by EasyRequest.  If we find this becoming a bottleneck, we can add a reverse
791                 // map that stores the other direction as well.  It should only be used on slow paths, such as when
792                 // completing an operation due to failure.
793                 foreach (KeyValuePair<IntPtr, ActiveRequest> pair in _activeOperations)
794                 {
795                     if (pair.Value.EasyWrapper.Target == easy)
796                     {
797                         gcHandlePtr = pair.Key;
798                         activeRequest = pair.Value;
799                         return true;
800                     }
801                 }
802 
803                 gcHandlePtr = IntPtr.Zero;
804                 activeRequest = default(ActiveRequest);
805                 return false;
806             }
807 
808             /// <summary>
809             /// Finds in the active operations table the operation for the specified easy request,
810             /// and then assuming it's found, deactivates and fails it with the specified exception.
811             /// </summary>
FindFailAndCleanupActiveRequest(EasyRequest easy, Exception error)812             private void FindFailAndCleanupActiveRequest(EasyRequest easy, Exception error)
813             {
814                 EventSourceTrace("Error: {0}", error, easy: easy);
815 
816                 IntPtr gcHandlePtr;
817                 ActiveRequest activeRequest;
818                 if (FindActiveRequest(easy, out gcHandlePtr, out activeRequest))
819                 {
820                     DeactivateActiveRequest(activeRequest, gcHandlePtr);
821                     easy.CleanupAndFailRequest(error);
822                 }
823                 else
824                 {
825                     Debug.Assert(easy.Task.IsCompleted, "We should only not be able to find the request if it failed or we started to send back the response.");
826                 }
827             }
828 
829             /// <summary>Finishes the processing of a completed easy operation.</summary>
FinishRequest(StrongToWeakReference<EasyRequest> easyWrapper, CURLcode messageResult)830             private void FinishRequest(StrongToWeakReference<EasyRequest> easyWrapper, CURLcode messageResult)
831             {
832                 EasyRequest completedOperation = easyWrapper.Target;
833                 EventSourceTrace("Curl result: {0}", messageResult, easy: completedOperation);
834 
835                 if (completedOperation == null)
836                 {
837                     // Already collected; nothing more to do.
838                     return;
839                 }
840 
841                 if (completedOperation._responseMessage.StatusCode != HttpStatusCode.Unauthorized)
842                 {
843                     // If preauthentication is enabled, then we want to transfer credentials to the handler's credential cache.
844                     // That entails asking the easy operation which auth types are supported, and then giving that info to the
845                     // handler, which along with the request URI and its server credentials will populate the cache appropriately.
846                     if (completedOperation._handler.PreAuthenticate)
847                     {
848                         long authAvailable;
849                         if (Interop.Http.EasyGetInfoLong(completedOperation._easyHandle, CURLINFO.CURLINFO_HTTPAUTH_AVAIL, out authAvailable) == CURLcode.CURLE_OK)
850                         {
851                             completedOperation._handler.TransferCredentialsToCache(
852                                 completedOperation._requestMessage.RequestUri, (Interop.Http.CURLAUTH)authAvailable);
853                         }
854                         // Ignore errors: no need to fail for the sake of putting the credentials into the cache
855                     }
856                 }
857 
858                 // Complete or fail the request
859                 try
860                 {
861                     // At this point, we've completed processing the entire request, either due to error
862                     // or due to completing the entire response.
863                     completedOperation.Cleanup();
864 
865                     // libcurl will return CURLE_UNSUPPORTED_PROTOCOL if the url it tried to go to had an unsupported protocol.
866                     // This could be the original url provided or one provided in a Location header for a redirect.  Since
867                     // we vet the original url passed in, such an error here must be for a redirect, in which case we want to
868                     // ignore it and treat such failures as successes, to match the Windows behavior.
869                     if (messageResult != CURLcode.CURLE_UNSUPPORTED_PROTOCOL)
870                     {
871                         // libcurl will return CURLE_RECV_ERROR (56) if proxy authentication failed when connecting to a https server,
872                         // whereas it returns CURLE_OK for a http server proxy authentication failure. We ignore this curl behavior error,
873                         // and let the user rely on response message status code to match the Windows behavior.
874                         if (messageResult != CURLcode.CURLE_RECV_ERROR ||
875                                 completedOperation._responseMessage.StatusCode != HttpStatusCode.ProxyAuthenticationRequired)
876                         {
877                             ThrowIfCURLEError(messageResult);
878                         }
879                     }
880 
881                     // Make sure the response message is published, in case it wasn't already, and since we're done processing
882                     // everything to do with this request, make sure the response stream is marked complete as well.
883                     completedOperation.EnsureResponseMessagePublished();
884                     completedOperation._responseMessage.ResponseStream.SignalComplete();
885                 }
886                 catch (Exception exc)
887                 {
888                     completedOperation.FailRequest(exc);
889                 }
890             }
891 
892             /// <summary>Callback invoked by libcurl when debug information is available.</summary>
CurlDebugFunction(IntPtr curl, Interop.Http.CurlInfoType type, IntPtr data, ulong size, IntPtr context)893             private static void CurlDebugFunction(IntPtr curl, Interop.Http.CurlInfoType type, IntPtr data, ulong size, IntPtr context)
894             {
895                 EasyRequest easy;
896                 TryGetEasyRequestFromGCHandle(context, out easy);
897                 // If we're unable to get an associated request, we simply trace without it.
898 
899                 try
900                 {
901                     switch (type)
902                     {
903                         case Interop.Http.CurlInfoType.CURLINFO_TEXT:
904                         case Interop.Http.CurlInfoType.CURLINFO_HEADER_IN:
905                         case Interop.Http.CurlInfoType.CURLINFO_HEADER_OUT:
906                             string text = Marshal.PtrToStringAnsi(data, (int)size).Trim();
907                             if (text.Length > 0)
908                             {
909                                 CurlHandler.EventSourceTrace("{0}: {1}", type, text, 0, easy: easy);
910                             }
911                             break;
912 
913                         default:
914                             CurlHandler.EventSourceTrace("{0}: {1} bytes", type, size, 0, easy: easy);
915                             break;
916                     }
917                 }
918                 catch (Exception exc)
919                 {
920                     CurlHandler.EventSourceTrace("Error: {0}", exc, easy: easy);
921                 }
922             }
923 
924             /// <summary>Callback invoked by libcurl for each response header received.</summary>
CurlReceiveHeadersCallback(IntPtr buffer, ulong size, ulong nitems, IntPtr context)925             private static ulong CurlReceiveHeadersCallback(IntPtr buffer, ulong size, ulong nitems, IntPtr context)
926             {
927                 // The callback is invoked once per header; multi-line headers get merged into a single line.
928 
929                 size *= nitems;
930                 Debug.Assert(size <= Interop.Http.CURL_MAX_HTTP_HEADER, $"Expected header size <= {Interop.Http.CURL_MAX_HTTP_HEADER}, got {size}");
931 
932                 EasyRequest easy;
933                 if (TryGetEasyRequestFromGCHandle(context, out easy))
934                 {
935                     CurlHandler.EventSourceTrace("Size: {0}", size, easy: easy);
936                     try
937                     {
938                         if (size == 0)
939                         {
940                             return 0;
941                         }
942 
943                         // Make sure we've not yet published the response. This could happen with trailer headers,
944                         // in which case we just ignore them (we don't want to add them to the response headers at
945                         // this point, as it'd contribute to a race condition, both in terms of headers appearing
946                         // "randomly" and in terms of accessing a non-thread-safe data structure from this thread
947                         // while the consumer might be accessing / mutating it elsewhere.)
948                         if (easy.Task.IsCompleted)
949                         {
950                             CurlHandler.EventSourceTrace("Response already published. Ignoring headers.", easy: easy);
951                             return size;
952                         }
953 
954                         CurlResponseMessage response = easy._responseMessage;
955                         CurlResponseHeaderReader reader = new CurlResponseHeaderReader(buffer, size);
956 
957                         // Validate that we haven't received too much header data.
958                         // MaxResponseHeadersLength property is in units in K (1024) bytes.
959                         ulong headerBytesReceived = response._headerBytesReceived + size;
960                         if (headerBytesReceived > (ulong)(easy._handler.MaxResponseHeadersLength * 1024))
961                         {
962                             throw new HttpRequestException(
963                                 SR.Format(SR.net_http_response_headers_exceeded_length, easy._handler.MaxResponseHeadersLength));
964                         }
965                         response._headerBytesReceived = (uint)headerBytesReceived;
966 
967                         // Parse the header
968                         if (reader.ReadStatusLine(response))
969                         {
970                             CurlHandler.EventSourceTrace("Received status line", easy: easy);
971 
972                             // Clear the headers when the status line is received. This may happen multiple times if there are multiple response headers (like in redirection).
973                             response.Headers.Clear();
974                             response.Content.Headers.Clear();
975                             response._headerBytesReceived = (uint)size;
976 
977                             // Update the request message with the Uri
978                             easy.StoreLastEffectiveUri();
979                         }
980                         else
981                         {
982                             string headerName;
983                             string headerValue;
984 
985                             if (reader.ReadHeader(out headerName, out headerValue))
986                             {
987                                 if (!response.Headers.TryAddWithoutValidation(headerName, headerValue))
988                                 {
989                                     response.Content.Headers.TryAddWithoutValidation(headerName, headerValue);
990                                 }
991                                 else if ((int)response.StatusCode >= 300 && (int)response.StatusCode < 400 &&
992                                     easy._handler.AllowAutoRedirect &&
993                                     string.Equals(headerName, HttpKnownHeaderNames.Location, StringComparison.OrdinalIgnoreCase))
994                                 {
995                                     // A "Location" header field can mean different things for different status codes.  For 3xx status codes,
996                                     // it implies a redirect.  As such, if we got a 3xx status code and we support automatically redirecting,
997                                     // reconfigure the easy handle under the assumption that libcurl will redirect.  If it does redirect, we'll
998                                     // be prepared; if it doesn't (e.g. it doesn't treat some particular 3xx as a redirect, if we've reached
999                                     // our redirect limit, etc.), this will have been unnecessary work in reconfiguring the easy handle, but
1000                                     // nothing incorrect, as we'll tear down the handle once the request finishes, anyway, and all of the configuration
1001                                     // we're doing is about initiating a new request.
1002                                     easy.SetPossibleRedirectForLocationHeader(headerValue);
1003                                 }
1004                                 else if (string.Equals(headerName, HttpKnownHeaderNames.SetCookie, StringComparison.OrdinalIgnoreCase))
1005                                 {
1006                                     easy._handler.AddResponseCookies(easy, headerValue);
1007                                 }
1008                             }
1009                         }
1010 
1011                         return size;
1012                     }
1013                     catch (Exception ex)
1014                     {
1015                         easy.FailRequest(ex); // cleanup will be handled by main processing loop
1016                     }
1017                 }
1018 
1019                 // Returning a value other than size fails the callback and forces
1020                 // request completion with an error
1021                 CurlHandler.EventSourceTrace("Aborting request", easy: easy);
1022                 return size - 1;
1023             }
1024 
1025             /// <summary>Callback invoked by libcurl for body data received.</summary>
CurlReceiveBodyCallback( IntPtr buffer, ulong size, ulong nitems, IntPtr context)1026             private static ulong CurlReceiveBodyCallback(
1027                 IntPtr buffer, ulong size, ulong nitems, IntPtr context)
1028             {
1029                 size *= nitems;
1030 
1031                 EasyRequest easy;
1032                 if (TryGetEasyRequestFromGCHandle(context, out easy))
1033                 {
1034                     CurlHandler.EventSourceTrace("Size: {0}", size, easy: easy);
1035                     try
1036                     {
1037                         if (!(easy.Task.IsCanceled || easy.Task.IsFaulted))
1038                         {
1039                             // Complete the task if it hasn't already been.  This will make the
1040                             // stream available to consumers.  A previous write callback
1041                             // may have already completed the task to publish the response.
1042                             easy.EnsureResponseMessagePublished();
1043 
1044                             // Try to transfer the data to a reader.  This will return either the
1045                             // amount of data transferred (equal to the amount requested
1046                             // to be transferred), or it will return a pause request.
1047                             return easy._responseMessage.ResponseStream.TransferDataToResponseStream(buffer, (long)size);
1048                         }
1049                     }
1050                     catch (Exception ex)
1051                     {
1052                         easy.FailRequest(ex); // cleanup will be handled by main processing loop
1053                     }
1054                 }
1055 
1056                 // Returning a value other than size fails the callback and forces
1057                 // request completion with an error.
1058                 CurlHandler.EventSourceTrace("Aborting request", easy: easy);
1059                 return (size > 0) ? size - 1 : 1;
1060             }
1061 
1062             /// <summary>Callback invoked by libcurl to read request data.</summary>
CurlSendCallback(IntPtr buffer, ulong size, ulong nitems, IntPtr context)1063             private static ulong CurlSendCallback(IntPtr buffer, ulong size, ulong nitems, IntPtr context)
1064             {
1065                 int length = checked((int)(size * nitems));
1066                 Debug.Assert(length <= MaxRequestBufferSize, $"length {length} should not be larger than RequestBufferSize {MaxRequestBufferSize}");
1067 
1068                 EasyRequest easy;
1069                 if (TryGetEasyRequestFromGCHandle(context, out easy))
1070                 {
1071                     CurlHandler.EventSourceTrace("Size: {0}", length, easy: easy);
1072 
1073                     if (length == 0)
1074                     {
1075                         return 0;
1076                     }
1077 
1078                     Debug.Assert(easy._requestMessage.Content != null, "We should only be in the send callback if we have request content");
1079                     Debug.Assert(easy._associatedMultiAgent != null, "The request should be associated with a multi agent.");
1080 
1081                     try
1082                     {
1083                         // Transfer data from the request's content stream to libcurl
1084                         return TransferDataFromRequestStream(buffer, length, easy);
1085                     }
1086                     catch (Exception ex)
1087                     {
1088                         easy.FailRequest(ex); // cleanup will be handled by main processing loop
1089                     }
1090                 }
1091 
1092                 // Something went wrong.
1093                 CurlHandler.EventSourceTrace("Aborting request", easy: easy);
1094                 return Interop.Http.CURL_READFUNC_ABORT;
1095             }
1096 
1097             /// <summary>
1098             /// Transfers up to <paramref name="length"/> data from the <paramref name="easy"/>'s
1099             /// request content (non-memory) stream to the buffer.
1100             /// </summary>
1101             /// <returns>The number of bytes transferred.</returns>
TransferDataFromRequestStream(IntPtr buffer, int length, EasyRequest easy)1102             private static ulong TransferDataFromRequestStream(IntPtr buffer, int length, EasyRequest easy)
1103             {
1104                 CurlHandler.EventSourceTrace("Length: {0}", length, easy: easy);
1105 
1106                 MultiAgent multi = easy._associatedMultiAgent;
1107 
1108                 // First check to see whether there's any data available from a previous asynchronous read request.
1109                 // If there is, the transfer state's Task field will be non-null, with its Result representing
1110                 // the number of bytes read.  The Buffer will then contain all of that read data.  If the Count
1111                 // is 0, then this is the first time we're checking that Task, and so we populate the Count
1112                 // from that read result.  After that, we can transfer as much data remains between Offset and
1113                 // Count.  Multiple callbacks may pull from that one read.
1114 
1115                 EasyRequest.SendTransferState sts = easy._sendTransferState;
1116                 if (sts != null)
1117                 {
1118                     // Is there a previous read that may still have data to be consumed?
1119                     if (sts.Task != null)
1120                     {
1121                         if (!sts.Task.IsCompleted)
1122                         {
1123                             // We have a previous read that's not yet completed.  This should be quite rare, but it can
1124                             // happen when we're unpaused prematurely, potentially due to the request still finishing
1125                             // being sent as the server starts to send a response.  Since we still have the outstanding
1126                             // read, we simply re-pause.  When the task completes (which could have happened immediately
1127                             // after the check). the continuation we previously created will fire and queue an unpause.
1128                             // Since all of this processing is single-threaded on the current thread, that unpause request
1129                             // is guaranteed to happen after this re-pause.
1130                             multi.EventSourceTrace("Re-pausing reading after a spurious un-pause", easy: easy);
1131                             return Interop.Http.CURL_READFUNC_PAUSE;
1132                         }
1133 
1134                         // Determine how many bytes were read on the last asynchronous read.
1135                         // If nothing was read, then we're done and can simply return 0 to indicate
1136                         // the end of the stream.
1137                         int bytesRead = sts.Task.GetAwaiter().GetResult(); // will throw if read failed
1138                         Debug.Assert(bytesRead >= 0 && bytesRead <= sts.Buffer.Length, $"ReadAsync returned an invalid result length: {bytesRead}");
1139                         if (bytesRead == 0)
1140                         {
1141                             sts.SetTaskOffsetCount(null, 0, 0);
1142                             return 0;
1143                         }
1144 
1145                         // If Count is still 0, then this is the first time after the task completed
1146                         // that we're examining the data: transfer the bytesRead to the Count.
1147                         if (sts.Count == 0)
1148                         {
1149                             multi.EventSourceTrace("ReadAsync completed with bytes: {0}", bytesRead, easy: easy);
1150                             sts.Count = bytesRead;
1151                         }
1152 
1153                         // Now Offset and Count are both accurate.  Determine how much data we can copy to libcurl...
1154                         int availableData = sts.Count - sts.Offset;
1155                         Debug.Assert(availableData > 0, "There must be some data still available.");
1156 
1157                         // ... and copy as much of that as libcurl will allow.
1158                         int bytesToCopy = Math.Min(availableData, length);
1159                         Marshal.Copy(sts.Buffer, sts.Offset, buffer, bytesToCopy);
1160                         multi.EventSourceTrace("Copied {0} bytes from request stream", bytesToCopy, easy: easy);
1161 
1162                         // Update the offset.  If we've gone through all of the data, reset the state
1163                         // so that the next time we're called back we'll do a new read.
1164                         sts.Offset += bytesToCopy;
1165                         Debug.Assert(sts.Offset <= sts.Count, "Offset should never exceed count");
1166                         if (sts.Offset == sts.Count)
1167                         {
1168                             sts.SetTaskOffsetCount(null, 0, 0);
1169                         }
1170 
1171                         // Return the amount of data copied
1172                         Debug.Assert(bytesToCopy > 0, "We should never return 0 bytes here.");
1173                         return (ulong)bytesToCopy;
1174                     }
1175 
1176                     // sts was non-null but sts.Task was null, meaning there was no previous task/data
1177                     // from which to satisfy any of this request.
1178                 }
1179                 else // sts == null
1180                 {
1181                     // Allocate a transfer state object to use for the remainder of this request.
1182                     Debug.Assert(easy._requestMessage.Content != null, "Content shouldn't be null, since we already got a content request stream");
1183                     long bufferSize = easy._requestMessage.Content.Headers.ContentLength.GetValueOrDefault();
1184                     if (bufferSize <= 0 || bufferSize > MaxRequestBufferSize)
1185                     {
1186                         bufferSize = MaxRequestBufferSize;
1187                     }
1188                     easy._sendTransferState = sts = new EasyRequest.SendTransferState((int)bufferSize);
1189                 }
1190 
1191                 Debug.Assert(sts != null, "By this point we should have a transfer object");
1192                 Debug.Assert(sts.Task == null, "There shouldn't be a task now.");
1193                 Debug.Assert(sts.Count == 0, "Count should be zero.");
1194                 Debug.Assert(sts.Offset == 0, "Offset should be zero.");
1195 
1196                 // If we get here, there was no previously read data available to copy.
1197 
1198                 // Make sure we actually have a stream to read from.  This will be null if either
1199                 // this is the first time we're reading it, or if the stream was reset as part
1200                 // of curl trying to rewind.  Then do the read.
1201                 ValueTask<int> asyncRead;
1202                 if (easy._requestContentStream == null)
1203                 {
1204                     multi.EventSourceTrace("Calling ReadAsStreamAsync to get new request stream", easy: easy);
1205                     Task<Stream> readAsStreamTask = easy._requestMessage.Content.ReadAsStreamAsync();
1206                     asyncRead = readAsStreamTask.IsCompleted ?
1207                         StoreRetrievedContentStreamAndReadAsync(readAsStreamTask, easy, sts, length) :
1208                         new ValueTask<int>(easy._requestMessage.Content.ReadAsStreamAsync().ContinueWith((t, s) =>
1209                         {
1210                             var stateAndRequest = (Tuple<int, EasyRequest.SendTransferState, EasyRequest>)s;
1211                             return StoreRetrievedContentStreamAndReadAsync(t,
1212                                 stateAndRequest.Item3, stateAndRequest.Item2, stateAndRequest.Item1).AsTask();
1213                         }, Tuple.Create(length, sts, easy), CancellationToken.None,
1214                         TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.DenyChildAttach, TaskScheduler.Default).Unwrap());
1215                 }
1216                 else
1217                 {
1218                     multi.EventSourceTrace("Starting async read", easy: easy);
1219                     asyncRead = easy._requestContentStream.ReadAsync(
1220                        new Memory<byte>(sts.Buffer, 0, Math.Min(sts.Buffer.Length, length)), easy._cancellationToken);
1221                 }
1222                 Debug.Assert(asyncRead != null, "Badly implemented stream returned a null task from ReadAsync");
1223 
1224                 // Even though it's "Async", it's possible this read could complete synchronously or extremely quickly.
1225                 // Check to see if it did, in which case we can also satisfy the libcurl request synchronously in this callback.
1226                 if (asyncRead.IsCompleted)
1227                 {
1228                     multi.EventSourceTrace("Async read completed immediately", easy: easy);
1229 
1230                     // Get the amount of data read.
1231                     int bytesRead = asyncRead.GetAwaiter().GetResult(); // will throw if read failed
1232                     if (bytesRead == 0)
1233                     {
1234                         multi.EventSourceTrace("Read 0 bytes", easy: easy);
1235                         return 0;
1236                     }
1237 
1238                     // Copy as much as we can.
1239                     int bytesToCopy = Math.Min(bytesRead, length);
1240                     Debug.Assert(bytesToCopy > 0 && bytesToCopy <= sts.Buffer.Length, $"ReadAsync quickly returned an invalid result length: {bytesToCopy}");
1241                     Marshal.Copy(sts.Buffer, 0, buffer, bytesToCopy);
1242                     multi.EventSourceTrace("Read {0} bytes", bytesToCopy, easy: easy);
1243 
1244                     // If we read more than we were able to copy, stash it away for the next read.
1245                     if (bytesToCopy < bytesRead)
1246                     {
1247                         multi.EventSourceTrace("Storing {0} bytes for later", bytesRead - bytesToCopy, easy: easy);
1248                         sts.SetTaskOffsetCount(asyncRead.AsTask(), bytesToCopy, bytesRead);
1249                     }
1250 
1251                     // Return the number of bytes read.
1252                     return (ulong)bytesToCopy;
1253                 }
1254 
1255                 // Otherwise, the read completed asynchronously.  Store the task, and hook up a continuation
1256                 // such that the connection will be unpaused once the task completes.
1257                 sts.SetTaskOffsetCount(asyncRead.AsTask(), 0, 0);
1258                 sts.Task.ContinueWith((t, s) =>
1259                 {
1260                     EasyRequest easyRef = (EasyRequest)s;
1261                     easyRef._associatedMultiAgent.RequestUnpause(easyRef);
1262                 }, easy, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
1263 
1264                 // Then pause the connection.
1265                 multi.EventSourceTrace("Pausing transfer from request stream", easy: easy);
1266                 return Interop.Http.CURL_READFUNC_PAUSE;
1267             }
1268 
1269             /// <summary>
1270             /// Given a completed task used to retrieve the content stream asynchronously, extracts the stream,
1271             /// stores it into <see cref="EasyRequest._requestContentStream"/>, and does an initial read on it.
1272             /// </summary>
StoreRetrievedContentStreamAndReadAsync( Task<Stream> readAsStreamTask, EasyRequest easy, EasyRequest.SendTransferState sts, int length)1273             private static ValueTask<int> StoreRetrievedContentStreamAndReadAsync(
1274                 Task<Stream> readAsStreamTask, EasyRequest easy, EasyRequest.SendTransferState sts, int length)
1275             {
1276                 Debug.Assert(readAsStreamTask.IsCompleted, $"Expected {nameof(readAsStreamTask)} to be completed, got {readAsStreamTask.Status}");
1277                 try
1278                 {
1279                     MultiAgent multi = easy._associatedMultiAgent;
1280                     multi.EventSourceTrace("Async operation completed: {0}", readAsStreamTask.Status, easy: easy);
1281 
1282                     // Get and store the resulting stream
1283                     easy._requestContentStream = readAsStreamTask.GetAwaiter().GetResult();
1284                     multi.EventSourceTrace("Got stream: {0}", easy._requestContentStream.GetType(), easy: easy);
1285 
1286                     // If the stream is seekable, store its original position.  We'll use this any time we need to seek
1287                     // back to the "beginning", as it's possible the stream isn't at position 0.
1288                     if (easy._requestContentStream.CanSeek)
1289                     {
1290                         long startingPos = easy._requestContentStream.Position;
1291                         easy._requestContentStreamStartingPosition = startingPos;
1292                         CurlHandler.EventSourceTrace("Stream starting position: {0}", startingPos, easy: easy);
1293                     }
1294 
1295                     // Now that we have a stream, do the desired read
1296                     multi.EventSourceTrace("Starting async read", easy: easy);
1297                     return easy._requestContentStream.ReadAsync(new Memory<byte>(sts.Buffer, 0, Math.Min(sts.Buffer.Length, length)), easy._cancellationToken);
1298                 }
1299                 catch (OperationCanceledException oce)
1300                 {
1301                     return new ValueTask<int>(oce.CancellationToken.IsCancellationRequested ?
1302                         Task.FromCanceled<int>(oce.CancellationToken) :
1303                         Task.FromCanceled<int>(new CancellationToken(true)));
1304                 }
1305                 catch (Exception exc)
1306                 {
1307                     return new ValueTask<int>(Task.FromException<int>(exc));
1308                 }
1309             }
1310 
1311             /// <summary>Callback invoked by libcurl to seek to a position within the request stream.</summary>
CurlSeekCallback(IntPtr context, long offset, int origin)1312             private static Interop.Http.CurlSeekResult CurlSeekCallback(IntPtr context, long offset, int origin)
1313             {
1314                 EasyRequest easy;
1315                 if (TryGetEasyRequestFromGCHandle(context, out easy))
1316                 {
1317                     CurlHandler.EventSourceTrace("Offset: {0}, Origin: {1}", offset, origin, 0, easy: easy);
1318                     try
1319                     {
1320                         // If we don't have a stream yet, we can't seek.
1321                         if (easy._requestContentStream == null)
1322                         {
1323                             CurlHandler.EventSourceTrace("No request stream exists yet. Can't seek", easy: easy);
1324                             return Interop.Http.CurlSeekResult.CURL_SEEKFUNC_CANTSEEK;
1325                         }
1326 
1327                         // If the stream is seekable, which is a very common case, everyone is happy.
1328                         // Simply seek on the stream.
1329                         if (easy._requestContentStream.CanSeek)
1330                         {
1331                             CurlHandler.EventSourceTrace("Seeking on the existing stream", easy: easy);
1332                             SeekOrigin seek = (SeekOrigin)origin;
1333                             if (seek == SeekOrigin.Begin)
1334                             {
1335                                 Debug.Assert(easy._requestContentStreamStartingPosition.HasValue);
1336                                 easy._requestContentStream.Position = easy._requestContentStreamStartingPosition.GetValueOrDefault();
1337                             }
1338                             else
1339                             {
1340                                 easy._requestContentStream.Seek(offset, seek);
1341                             }
1342                             return Interop.Http.CurlSeekResult.CURL_SEEKFUNC_OK;
1343                         }
1344 
1345                         // The stream isn't seekable.  Now we start getting into shakier ground.
1346                         // Most of the time the seek callback is used, it's because libcurl is rewinding
1347                         // to the beginning of the stream due to a redirect, an auth challenge, etc. (other
1348                         // cases where it might try to seek elsewhere would be, e.g., with a Range header).
1349                         // In such cases, we can't seek, but we can simply re-read the stream from the content.
1350                         // In most cases this will "just work." There are corner cases, however, where it'll
1351                         // fail but we won't yet know it failed, e.g. if a StreamContent is used, ReadAsStreamAsync
1352                         // will give us back a wrapper stream over the same original underlying stream and without
1353                         // having changed its position (it's not seekable).  At that point we'll think
1354                         // we have a new stream, but when reading starts happening, it'll be at the existing
1355                         // position, and we'll only end up sending part of the data (or none in the common case
1356                         // where we'd already read ot the end).  As a workaround for that, we can at least special case
1357                         // the StreamContent type, for which we know this will be an issue.  It won't help with other
1358                         // corner -case contents like this, but for such contents, we would still end up failing the
1359                         // request, just sooner.
1360                         if (offset == 0 && origin == (int)SeekOrigin.Begin &&
1361                             !(easy._requestMessage.Content is StreamContent)) // avoid known problematic case
1362                         {
1363                             CurlHandler.EventSourceTrace("Removing the existing request stream, to be replaced on subsequent read", easy: easy);
1364                             easy._requestContentStream = null;
1365                         }
1366 
1367                         // Can't seek.  Let libcurl know: it may still be able to recover.
1368                         CurlHandler.EventSourceTrace("Can't seek", easy: easy);
1369                         return Interop.Http.CurlSeekResult.CURL_SEEKFUNC_CANTSEEK;
1370                     }
1371                     catch (Exception ex)
1372                     {
1373                         easy.FailRequest(ex); // cleanup will be handled by main processing loop
1374                     }
1375                 }
1376 
1377                 // Something went wrong
1378                 CurlHandler.EventSourceTrace("Seek failed", easy: easy);
1379                 return Interop.Http.CurlSeekResult.CURL_SEEKFUNC_FAIL;
1380             }
1381 
EventSourceTrace(string formatMessage, TArg0 arg0, EasyRequest easy = null, [CallerMemberName] string memberName = null)1382             private void EventSourceTrace<TArg0>(string formatMessage, TArg0 arg0, EasyRequest easy = null, [CallerMemberName] string memberName = null)
1383             {
1384                 CurlHandler.EventSourceTrace(formatMessage, arg0, this, easy, memberName);
1385             }
1386 
EventSourceTrace(string message, EasyRequest easy = null, [CallerMemberName] string memberName = null)1387             private void EventSourceTrace(string message, EasyRequest easy = null, [CallerMemberName] string memberName = null)
1388             {
1389                 CurlHandler.EventSourceTrace(message, this, easy, memberName);
1390             }
1391 
1392             /// <summary>Represents an active request currently being processed by the agent.</summary>
1393             private struct ActiveRequest
1394             {
1395                 public StrongToWeakReference<EasyRequest> EasyWrapper;
1396                 public Interop.Http.SafeCurlHandle EasyHandle;
1397                 public CancellationTokenRegistration CancellationRegistration;
1398             }
1399 
1400             /// <summary>Represents an incoming request to be processed by the agent.</summary>
1401             internal struct IncomingRequest
1402             {
1403                 public IncomingRequestType Type;
1404                 public EasyRequest Easy;
1405             }
1406 
1407             /// <summary>The type of an incoming request to be processed by the agent.</summary>
1408             internal enum IncomingRequestType : byte
1409             {
1410                 /// <summary>A new request that's never been submitted to an agent.</summary>
1411                 New,
1412                 /// <summary>A request to cancel a request previously submitted to the agent.</summary>
1413                 Cancel,
1414                 /// <summary>A request to unpause the connection associated with a request previously submitted to the agent.</summary>
1415                 Unpause,
1416                 /// <summary>A request to shutdown the agent and all active operations.  No easy request is associated with this type.</summary>
1417                 Shutdown
1418             }
1419         }
1420 
1421     }
1422 }
1423