1 //------------------------------------------------------------
2 // Copyright (c) Microsoft Corporation.  All rights reserved.
3 //------------------------------------------------------------
4 namespace System.ServiceModel.Channels
5 {
6     using System.Collections.Generic;
7     using System.Diagnostics;
8     using System.Runtime;
9     using System.Runtime.Serialization;
10     using System.Security;
11     using System.Security.Permissions;
12     using System.ServiceModel;
13     using System.ServiceModel.Diagnostics;
14     using System.ServiceModel.Dispatcher;
15     using System.ServiceModel.Security;
16     using System.Threading;
17     using System.Xml;
18 
19     class PeerFlooder : PeerFlooderSimple
20     {
PeerFlooder(PeerNodeConfig config, PeerNeighborManager neighborManager)21         PeerFlooder(PeerNodeConfig config, PeerNeighborManager neighborManager) : base(config, neighborManager) { }
22 
CreateFlooder(PeerNodeConfig config, PeerNeighborManager neighborManager, IPeerNodeMessageHandling messageHandler)23         public static PeerFlooder CreateFlooder(PeerNodeConfig config, PeerNeighborManager neighborManager, IPeerNodeMessageHandling messageHandler)
24         {
25             PeerFlooder flooder = new PeerFlooder(config, neighborManager);
26             flooder.messageHandler = messageHandler;
27             return flooder;
28         }
29     }
30 
31     interface IFlooderForThrottle
32     {
OnThrottleReached()33         void OnThrottleReached();
OnThrottleReleased()34         void OnThrottleReleased();
35     }
36 
37     abstract class PeerFlooderBase<TFloodContract, TLinkContract> : IFlooderForThrottle, IPeerFlooderContract<TFloodContract, TLinkContract> where TFloodContract : Message
38     {
39         protected PeerNodeConfig config;
40         protected PeerNeighborManager neighborManager;
41         protected List<IPeerNeighbor> neighbors;
42         object thisLock = new object();
43 
44         internal IPeerNodeMessageHandling messageHandler;
45         internal PeerThrottleHelper quotaHelper;
46         long messageSequence;
47 
48         public event EventHandler ThrottleReached;
49         public event EventHandler SlowNeighborKilled;
50         public event EventHandler ThrottleReleased;
51         public EventHandler OnMessageSentHandler;
52 
53 
PeerFlooderBase(PeerNodeConfig config, PeerNeighborManager neighborManager)54         public PeerFlooderBase(PeerNodeConfig config, PeerNeighborManager neighborManager)
55         {
56             this.neighborManager = neighborManager;
57             this.neighbors = new List<IPeerNeighbor>();
58             this.config = config;
59             this.neighbors = this.neighborManager.GetConnectedNeighbors();
60             this.quotaHelper = new PeerThrottleHelper(this, this.config.MaxPendingOutgoingCalls);
61             OnMessageSentHandler = new EventHandler(OnMessageSent);
62         }
63 
PruneNeighborCallback(IPeerNeighbor peer)64         void PruneNeighborCallback(IPeerNeighbor peer)
65         {
66             lock (ThisLock)
67             {
68                 if (this.Neighbors.Count <= 1)
69                     return;
70                 if (DiagnosticUtility.ShouldTraceWarning)
71                 {
72                     string message = SR.GetString(SR.PeerThrottlePruning, this.config.MeshId);
73                     PeerThrottleTraceRecord record = new PeerThrottleTraceRecord(this.config.MeshId, message);
74                     TraceUtility.TraceEvent(TraceEventType.Warning, TraceCode.PeerFlooderReceiveMessageQuotaExceeded,
75                         SR.GetString(SR.TraceCodePeerFlooderReceiveMessageQuotaExceeded), record, this, null);
76                 }
77             }
78             try
79             {
80                 peer.Abort(PeerCloseReason.NodeTooSlow, PeerCloseInitiator.LocalNode);
81             }
82             catch (Exception e)
83             {
84                 if (Fx.IsFatal(e)) throw;
85                 if (null != CloseNeighborIfKnownException(neighborManager, e, peer))
86                 {
87                     throw;
88                 }
89                 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
90             }
91         }
92 
IFlooderForThrottle.OnThrottleReached()93         void IFlooderForThrottle.OnThrottleReached()
94         {
95             if (DiagnosticUtility.ShouldTraceInformation)
96             {
97                 string message = SR.GetString(SR.PeerThrottleWaiting, this.config.MeshId);
98                 PeerThrottleTraceRecord record = new PeerThrottleTraceRecord(this.config.MeshId, message);
99                 TraceUtility.TraceEvent(TraceEventType.Information, TraceCode.PeerFlooderReceiveMessageQuotaExceeded,
100                     SR.GetString(SR.TraceCodePeerFlooderReceiveMessageQuotaExceeded), record, this, null);
101             }
102 
103             IPeerNeighbor peer = this.neighborManager.SlowestNeighbor();
104             if (peer == null)
105                 return;
106             UtilityExtension extension = peer.Utility;
107             if (peer.IsConnected && extension != null)
108             {
109                 if (extension.PendingMessages > PeerTransportConstants.MessageThreshold)
110                 {
111                     extension.BeginCheckPoint(new UtilityExtension.PruneNeighborCallback(PruneNeighborCallback));
112                 }
113                 else
114                 {
115                     Fx.Assert(false, "Neighbor is marked slow with messages " + extension.PendingMessages);
116                 }
117                 FireReachedEvent();
118             }
119         }
120 
IFlooderForThrottle.OnThrottleReleased()121         void IFlooderForThrottle.OnThrottleReleased()
122         {
123             FireDequeuedEvent();
124         }
125 
FireDequeuedEvent()126         public void FireDequeuedEvent() { FireEvent(ThrottleReleased); }
127 
FireReachedEvent()128         public void FireReachedEvent() { FireEvent(ThrottleReached); }
129 
FireKilledEvent()130         public void FireKilledEvent() { FireEvent(SlowNeighborKilled); }
131 
FireEvent(EventHandler handler)132         void FireEvent(EventHandler handler)
133         {
134             if (handler != null)
135                 handler(this, EventArgs.Empty);
136         }
137 
138         [PermissionSet(SecurityAction.Demand, Unrestricted = true), SecuritySafeCritical]
BeginFloodEncodedMessage(byte[] id, MessageBuffer encodedMessage, TimeSpan timeout, AsyncCallback callback, object state)139         public virtual IAsyncResult BeginFloodEncodedMessage(byte[] id, MessageBuffer encodedMessage, TimeSpan timeout, AsyncCallback callback, object state)
140         {
141             RecordOutgoingMessage(id);
142             SynchronizationContext syncContext = ThreadBehavior.GetCurrentSynchronizationContext();
143             SynchronizationContext.SetSynchronizationContext(null);
144 
145             if (neighbors.Count == 0)
146             {
147                 return new CompletedAsyncResult(callback, state);
148             }
149             try
150             {
151                 return FloodMessageToNeighbors(encodedMessage, timeout, callback, state, -1, null, null, OnMessageSentHandler);
152             }
153             finally
154             {
155                 SynchronizationContext.SetSynchronizationContext(syncContext);
156             }
157 
158         }
159 
BeginFloodReceivedMessage(IPeerNeighbor sender, MessageBuffer messageBuffer, TimeSpan timeout, AsyncCallback callback, object state, int index, MessageHeader hopHeader)160         protected virtual IAsyncResult BeginFloodReceivedMessage(IPeerNeighbor sender, MessageBuffer messageBuffer,
161             TimeSpan timeout, AsyncCallback callback, object state, int index, MessageHeader hopHeader)
162         {
163             quotaHelper.AcquireNoQueue();
164 
165             try
166             {
167                 return FloodMessageToNeighbors(messageBuffer, timeout, callback, state, index, hopHeader, sender, OnMessageSentHandler);
168             }
169             catch (Exception e)
170             {
171                 if (Fx.IsFatal(e)) throw;
172                 if (e is QuotaExceededException || (e is CommunicationException && e.InnerException is QuotaExceededException))
173                 {
174                     DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
175                     if (DiagnosticUtility.ShouldTraceError)
176                     {
177                         PeerFlooderTraceRecord record = new PeerFlooderTraceRecord(
178                                                             this.config.MeshId,
179                                                             sender.ListenAddress,
180                                                             e);
181                         TraceUtility.TraceEvent(
182                                     TraceEventType.Error,
183                                     TraceCode.PeerFlooderReceiveMessageQuotaExceeded,
184                                     SR.GetString(SR.TraceCodePeerFlooderReceiveMessageQuotaExceeded),
185                                     record,
186                                     this,
187                                     null);
188                     }
189                     return null;
190                 }
191                 throw;
192             }
193         }
194 
BeginSendHelper(IPeerNeighbor neighbor, TimeSpan timeout, Message message, FloodAsyncResult fresult)195         protected IAsyncResult BeginSendHelper(IPeerNeighbor neighbor, TimeSpan timeout, Message message, FloodAsyncResult fresult)
196         {
197             IAsyncResult result = null;
198             bool fatal = false;
199             try
200             {
201                 UtilityExtension.OnMessageSent(neighbor);
202                 result = neighbor.BeginSend(message, timeout, Fx.ThunkCallback(new AsyncCallback(fresult.OnSendComplete)), message);
203                 fresult.AddResult(result, neighbor);
204                 if (result.CompletedSynchronously)
205                 {
206                     neighbor.EndSend(result);
207                     UtilityExtension.OnEndSend(neighbor, fresult);
208                 }
209                 return result;
210             }
211             catch (Exception e)
212             {
213                 if (Fx.IsFatal(e))
214                 {
215                     fatal = true;
216                     throw;
217                 }
218                 if (null != CloseNeighborIfKnownException(neighborManager, e, neighbor))
219                 {
220                     fresult.MarkEnd(false);
221                     throw;
222                 }
223 
224                 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
225                 return null;
226             }
227             finally
228             {
229                 if ((result == null || result.CompletedSynchronously) && !fatal)
230                     message.Close();
231 
232             }
233         }
234 
OnMessageSent(object sender, EventArgs args)235         public void OnMessageSent(object sender, EventArgs args)
236         {
237             quotaHelper.ItemDequeued();
238         }
239 
KillSlowNeighbor()240         void KillSlowNeighbor()
241         {
242             IPeerNeighbor neighbor = this.neighborManager.SlowestNeighbor();
243             if (neighbor != null)
244                 neighbor.Abort(PeerCloseReason.NodeTooSlow, PeerCloseInitiator.LocalNode);
245         }
246 
247 
FloodMessageToNeighbors(MessageBuffer messageBuffer, TimeSpan timeout, AsyncCallback callback, object state, int index, MessageHeader hopHeader, IPeerNeighbor except, EventHandler OnMessageSentCallback)248         protected virtual IAsyncResult FloodMessageToNeighbors(MessageBuffer messageBuffer,
249                                                                TimeSpan timeout, AsyncCallback callback, object state,
250                                                                int index, MessageHeader hopHeader, IPeerNeighbor except,
251                                                                EventHandler OnMessageSentCallback)
252         {
253             long temp = Interlocked.Increment(ref messageSequence);
254             FloodAsyncResult fresult = new FloodAsyncResult(this.neighborManager, timeout, callback, state);
255             fresult.OnMessageSent += OnMessageSentCallback;
256             List<IPeerNeighbor> neighbors = this.Neighbors;
257 
258             foreach (IPeerNeighbor neighbor in neighbors)
259             {
260                 if (neighbor.Equals(except))
261                     continue;
262                 // Don't do anything if the neighbor is not connected
263                 if (PeerNeighborStateHelper.IsConnected(neighbor.State))
264                 {
265                     Message fmessage = messageBuffer.CreateMessage();
266                     if (index != -1)
267                     {
268                         fmessage.Headers.ReplaceAt(index, hopHeader);
269                     }
270 
271                     // Don't do anything if the neighbor is not connected
272                     if (PeerNeighborStateHelper.IsConnected(neighbor.State))
273                     {
274                         BeginSendHelper(neighbor, timeout, fmessage, fresult);
275                     }
276                 }
277             }
278             fresult.MarkEnd(true);
279             return fresult;
280 
281         }
282 
Open()283         public void Open()
284         {
285             OnOpen();
286         }
287 
Close()288         public void Close()
289         {
290             OnClose();
291         }
292 
OnOpen()293         public abstract void OnOpen();
294 
OnClose()295         public abstract void OnClose();
296 
OnNeighborConnected(IPeerNeighbor neighbor)297         public virtual void OnNeighborConnected(IPeerNeighbor neighbor)
298         {
299             this.neighbors = this.neighborManager.GetConnectedNeighbors();
300         }
301 
OnNeighborClosed(IPeerNeighbor neighbor)302         public virtual void OnNeighborClosed(IPeerNeighbor neighbor)
303         {
304             this.neighbors = this.neighborManager.GetConnectedNeighbors();
305         }
306 
ProcessLinkUtility(IPeerNeighbor neighbor, TLinkContract utilityInfo)307         public abstract void ProcessLinkUtility(IPeerNeighbor neighbor, TLinkContract utilityInfo);
308 
ShouldProcess(TFloodContract floodInfo)309         public abstract bool ShouldProcess(TFloodContract floodInfo);
RecordOutgoingMessage(byte[] id)310         public abstract void RecordOutgoingMessage(byte[] id);
311 
UpdateHopCount(Message message, out MessageHeader hopHeader, out ulong currentValue)312         int UpdateHopCount(Message message, out MessageHeader hopHeader, out ulong currentValue)
313         {
314             int index = -1;
315             currentValue = PeerTransportConstants.MaxHopCount;
316             hopHeader = null;
317             try
318             {
319                 // If a message contains multiple Hopcounts with our name and namespace or the message can't deserialize to a ulong then ignore the HopCount
320                 index = message.Headers.FindHeader(PeerStrings.HopCountElementName, PeerStrings.HopCountElementNamespace);
321                 if (index != -1)
322                 {
323                     currentValue = PeerMessageHelpers.GetHeaderULong(message.Headers, index);
324                     hopHeader = MessageHeader.CreateHeader(PeerStrings.HopCountElementName, PeerStrings.HopCountElementNamespace, --currentValue, false);
325                 }
326             }
327             catch (MessageHeaderException e)
328             {
329                 DiagnosticUtility.TraceHandledException(e, TraceEventType.Warning);
330             }
331             catch (CommunicationException e)
332             {
333                 DiagnosticUtility.TraceHandledException(e, TraceEventType.Warning);
334             }
335             catch (SerializationException e)
336             {
337                 DiagnosticUtility.TraceHandledException(e, TraceEventType.Warning);
338             }
339             catch (XmlException e)
340             {
341                 DiagnosticUtility.TraceHandledException(e, TraceEventType.Warning);
342             }
343             Fx.Assert((index == -1) || (hopHeader != null), "Could not successfully create new HopCount Header!");
344             return index;
345         }
346 
OnFloodedMessage(IPeerNeighbor neighbor, TFloodContract floodInfo, AsyncCallback callback, object state)347         public virtual IAsyncResult OnFloodedMessage(IPeerNeighbor neighbor, TFloodContract floodInfo, AsyncCallback callback, object state)
348         {
349             bool process = false;
350             MessageBuffer messageBuffer = null;
351             Message message = null;
352             Uri via;
353             Uri to;
354             int index = 0;
355             ulong remainingHops = PeerTransportConstants.MaxHopCount;
356             MessageHeader hopHeader = null;
357             bool fatal = false;
358             PeerMessageProperty peerProperty = null;
359             IAsyncResult result = null;
360 
361             try
362             {
363                 peerProperty = (PeerMessageProperty)floodInfo.Properties[PeerStrings.PeerProperty];
364                 if (!peerProperty.MessageVerified)
365                 {
366                     if (peerProperty.CacheMiss > UtilityExtension.AcceptableMissDistance)
367                     {
368                         UtilityExtension.ReportCacheMiss(neighbor, peerProperty.CacheMiss);
369                     }
370                     result = new CompletedAsyncResult(callback, state);
371                 }
372                 else
373                 {
374                     process = true;
375                     messageBuffer = floodInfo.CreateBufferedCopy((int)this.config.MaxReceivedMessageSize);
376                     message = messageBuffer.CreateMessage();
377                     via = peerProperty.PeerVia;
378                     to = peerProperty.PeerTo;
379                     message.Headers.To = message.Properties.Via = via;
380 
381                     index = UpdateHopCount(message, out hopHeader, out remainingHops);
382 
383                     PeerMessagePropagation propagateFlags = PeerMessagePropagation.LocalAndRemote;
384                     if (peerProperty.SkipLocalChannels)
385                         propagateFlags = PeerMessagePropagation.Remote;
386                     else if (messageHandler.HasMessagePropagation)
387                     {
388                         using (Message filterMessage = messageBuffer.CreateMessage())
389                         {
390                             propagateFlags = messageHandler.DetermineMessagePropagation(filterMessage, PeerMessageOrigination.Remote);
391                         }
392                     }
393 
394                     if ((propagateFlags & PeerMessagePropagation.Remote) != 0)
395                     {
396                         if (remainingHops == 0)
397                             propagateFlags &= ~PeerMessagePropagation.Remote;
398                     }
399                     if ((propagateFlags & PeerMessagePropagation.Remote) != 0)
400                     {
401                         result = BeginFloodReceivedMessage(neighbor, messageBuffer, PeerTransportConstants.ForwardTimeout, callback, state, index, hopHeader);
402                     }
403                     else
404                     {
405                         result = new CompletedAsyncResult(callback, state);
406                     }
407                     if ((propagateFlags & PeerMessagePropagation.Local) != 0)
408                     {
409                         messageHandler.HandleIncomingMessage(messageBuffer, propagateFlags, index, hopHeader, via, to);
410                     }
411                 }
412                 UtilityExtension.UpdateLinkUtility(neighbor, process);
413             }
414             catch (Exception e)
415             {
416                 if (Fx.IsFatal(e))
417                 {
418                     fatal = true;
419                     throw;
420                 }
421                 if (null != CloseNeighborIfKnownException(neighborManager, e, neighbor))
422                 {
423                     throw;
424                 }
425                 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
426             }
427             finally
428             {
429                 if (!fatal)
430                 {
431                     if (message != null)
432                         message.Close();
433                     if (messageBuffer != null)
434                         messageBuffer.Close();
435                 }
436             }
437             return result;
438         }
439 
EndFloodMessage(IAsyncResult result)440         public virtual void EndFloodMessage(IAsyncResult result)
441         {
442             if (result is CompletedAsyncResult)
443             {
444                 CompletedAsyncResult.End(result);
445                 return;
446             }
447             FloodAsyncResult fresult = result as FloodAsyncResult;
448             Fx.Assert(fresult != null, "Invalid AsyncResult type in EndFloodResult");
449             fresult.End();
450 
451         }
452 
453         protected long MaxReceivedMessageSize
454         {
455             get { return config.MaxReceivedMessageSize; }
456         }
457 
458         protected MessageEncoder MessageEncoder
459         {
460             get { return config.MessageEncoder; }
461         }
462 
463         protected object ThisLock
464         {
465             get { return this.thisLock; }
466         }
467 
468         protected List<IPeerNeighbor> Neighbors
469         {
470             get { return this.neighbors; }
471         }
472 
473         // Guaranteed not to throw anything other than fatal exceptions
CloseNeighborIfKnownException(PeerNeighborManager neighborManager, Exception exception, IPeerNeighbor peer)474         static internal Exception CloseNeighborIfKnownException(PeerNeighborManager neighborManager, Exception exception, IPeerNeighbor peer)
475         {
476             try
477             {
478                 //ignore this one since the channel is already closed.
479                 if (exception is ObjectDisposedException)
480                     return null;
481                 else if (
482                     (exception is CommunicationException && !(exception.InnerException is QuotaExceededException))
483                     || (exception is TimeoutException)
484                     || (exception is InvalidOperationException)
485                     || (exception is MessageSecurityException)
486                 )
487                 {
488                     //is this the right close reason?
489                     neighborManager.CloseNeighbor(peer, PeerCloseReason.InternalFailure, PeerCloseInitiator.LocalNode, exception);
490                     return null;
491                 }
492                 else
493                 {
494                     //exception that we dont know or cant act on.
495                     //we will throw this exception to the user.
496                     return exception;
497                 }
498             }
499             catch (Exception e)
500             {
501                 if (Fx.IsFatal(e)) throw;
502                 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
503                 return e;
504             }
505         }
506 
EndFloodEncodedMessage(IAsyncResult result)507         public static void EndFloodEncodedMessage(IAsyncResult result)
508         {
509             CompletedAsyncResult cresult = result as CompletedAsyncResult;
510             if (cresult != null)
511                 CompletedAsyncResult.End(result);
512             else
513             {
514                 FloodAsyncResult fresult = result as FloodAsyncResult;
515                 if (fresult == null)
516                     throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgument("result", SR.GetString(SR.InvalidAsyncResult));
517                 fresult.End();
518             }
519         }
520 
EndFloodReceivedMessage(IAsyncResult result)521         public void EndFloodReceivedMessage(IAsyncResult result)
522         {
523             FloodAsyncResult fresult = result as FloodAsyncResult;
524             Fx.Assert(fresult != null, "Invalid FloodAsyncResult instance during EndFloodReceivedMessage");
525         }
526 
527 
528         public class PeerThrottleHelper
529         {
530             int outgoingEnqueuedCount = 0;
531             int outgoingQuota = 128;
532             IFlooderForThrottle flooder;
533 
534 
PeerThrottleHelper(IFlooderForThrottle flooder, int outgoingLimit)535             public PeerThrottleHelper(IFlooderForThrottle flooder, int outgoingLimit)
536             {
537                 this.outgoingQuota = outgoingLimit;
538                 this.flooder = flooder;
539             }
540 
ItemDequeued()541             public void ItemDequeued()
542             {
543                 Interlocked.Decrement(ref outgoingEnqueuedCount);
544             }
545 
AcquireNoQueue()546             public void AcquireNoQueue()
547             {
548                 int value = Interlocked.Increment(ref outgoingEnqueuedCount);
549                 if (value >= outgoingQuota)
550                 {
551                     flooder.OnThrottleReached();
552                 }
553             }
554         }
555     }
556 
557     class PeerFlooderSimple : PeerFlooderBase<Message, UtilityInfo>
558     {
559         ListManager messageIds;
560         const uint MaxBuckets = 5;
561 
PeerFlooderSimple(PeerNodeConfig config, PeerNeighborManager neighborManager)562         internal PeerFlooderSimple(PeerNodeConfig config, PeerNeighborManager neighborManager)
563             : base(config, neighborManager)
564         {
565             //we want a message id cache that holds message ids for atmost 5 mins.
566             this.messageIds = new ListManager(MaxBuckets);
567         }
568 
ShouldProcess(Message message)569         public override bool ShouldProcess(Message message)
570         {
571             return message.Properties.ContainsKey(PeerStrings.MessageVerified);
572         }
IsNotSeenBefore(Message message, out byte[] id, out int cacheHit)573         public bool IsNotSeenBefore(Message message, out byte[] id, out int cacheHit)
574         {
575             cacheHit = -1;
576             id = PeerNodeImplementation.DefaultId;
577             if (message is SecurityVerifiedMessage)
578             {
579                 id = (message as SecurityVerifiedMessage).PrimarySignatureValue;
580 
581             }
582             else
583             {
584                 System.Xml.UniqueId messageId = PeerMessageHelpers.GetHeaderUniqueId(message.Headers, PeerStrings.MessageId, PeerStrings.Namespace);
585                 if (messageId == null)
586                     return false;
587                 if (messageId.IsGuid)
588                 {
589                     id = new byte[16];
590                     messageId.TryGetGuid(id, 0);
591                 }
592                 else
593                     return false;
594             }
595             cacheHit = messageIds.AddForLookup(id);
596             if (cacheHit == -1)
597             {
598                 return true;
599             }
600             return false;
601 
602         }
603 
RecordOutgoingMessage(byte[] id)604         public override void RecordOutgoingMessage(byte[] id)
605         {
606             this.messageIds.AddForFlood(id);
607         }
608 
OnOpen()609         public override void OnOpen()
610         {
611         }
612 
OnClose()613         public override void OnClose()
614         {
615             this.messageIds.Close();
616         }
617 
618 
OnFloodedMessage(IPeerNeighbor neighbor, Message floodInfo, AsyncCallback callback, object state)619         public override IAsyncResult OnFloodedMessage(IPeerNeighbor neighbor, Message floodInfo, AsyncCallback callback, object state)
620         {
621             return base.OnFloodedMessage(neighbor, floodInfo, callback, state);
622         }
623 
EndFloodMessage(IAsyncResult result)624         public override void EndFloodMessage(IAsyncResult result)
625         {
626             base.EndFloodMessage(result);
627 
628         }
629 
ProcessLinkUtility(IPeerNeighbor neighbor, UtilityInfo utilityInfo)630         public override void ProcessLinkUtility(IPeerNeighbor neighbor, UtilityInfo utilityInfo)
631         {
632             if (!PeerNeighborStateHelper.IsConnected(neighbor.State))
633             {
634                 neighbor.Abort(PeerCloseReason.InvalidNeighbor, PeerCloseInitiator.LocalNode);
635                 return;
636             }
637 
638             try
639             {
640                 UtilityExtension.ProcessLinkUtility(neighbor, utilityInfo);
641             }
642             catch (Exception e)
643             {
644                 if (Fx.IsFatal(e)) throw;
645                 if (null != CloseNeighborIfKnownException(neighborManager, e, neighbor))
646                 {
647                     throw;
648                 }
649                 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
650             }
651         }
652 
653         class ListManager
654         {
655             uint active;            //current bucket.
656             readonly uint buckets;
657 
658             // Double-checked locking pattern requires volatile for read/write synchronization
659             volatile bool disposed = false;
660             IOThreadTimer messagePruningTimer;
661             //we service the hashtables every one minute
662             static readonly int PruningTimout = 60 * 1000;
663             static readonly int InitialCount = 1000;
664             Dictionary<byte[], bool>[] tables;
665             //Hashtable[] tables;
666             object thisLock;
667             static InMemoryNonceCache.NonceCacheImpl.NonceKeyComparer keyComparer = new InMemoryNonceCache.NonceCacheImpl.NonceKeyComparer();
668             const int NotFound = -1;
669             //creating this ListManager with n implies that the entries will be available for n minutes atmost.
670             //in the n+1 minute, the timer message handler will kick in to clear older messages.
671             //every minute, the
ListManager(uint buckets)672             public ListManager(uint buckets)
673             {
674                 if (!(buckets > 1))
675                 {
676                     throw Fx.AssertAndThrow("ListManager should be used atleast with 2 buckets");
677                 }
678                 this.buckets = buckets;
679                 tables = new Dictionary<byte[], bool>[buckets];
680 
681                 for (uint i = 0; i < buckets; i++)
682                 {
683                     tables[i] = NewCache(InitialCount);
684                 }
685                 //create a timer and kickit off for 1 minute
686                 messagePruningTimer = new IOThreadTimer(new Action<object>(OnTimeout), null, false);
687                 messagePruningTimer.Set(PruningTimout);
688                 this.active = 0;
689                 this.disposed = false;
690                 this.thisLock = new object();
691             }
692 
693             object ThisLock
694             {
695                 get
696                 {
697                     return thisLock;
698                 }
699             }
700 
AddForLookup(byte[] key)701             public int AddForLookup(byte[] key)
702             {
703                 int table = NotFound;
704                 if (disposed)
705                 {
706                     throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new InvalidOperationException(SR.GetString(SR.PeerFlooderDisposed)));
707                 }
708 
709                 lock (ThisLock)
710                 {
711                     if ((table = Contains(key)) == NotFound)
712                     {
713                         tables[active].Add(key, false);
714                     }
715                     return table;
716                 }
717             }
718 
AddForFlood(byte[] key)719             public bool AddForFlood(byte[] key)
720             {
721                 if (disposed)
722                 {
723                     throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new InvalidOperationException(SR.GetString(SR.PeerFlooderDisposed)));
724                 }
725 
726                 lock (ThisLock)
727                 {
728                     if (UpdateFloodEntry(key))
729                     {
730                         return true;
731                     }
732                     else
733                     {
734                         return false;
735                     }
736                 }
737             }
738 
Close()739             internal void Close()
740             {
741                 lock (ThisLock)
742                 {
743                     if (disposed)
744                         return;
745                     messagePruningTimer.Cancel();
746                     messagePruningTimer = null;
747                     tables = null;
748                     disposed = true;
749                 }
750             }
751 
752             //it does not use locks and expects the caller to hold the lock.
UpdateFloodEntry(byte[] key)753             internal bool UpdateFloodEntry(byte[] key)
754             {
755                 bool flooded = false;
756                 //check if the message is present in any of the buckets.
757                 //assumption is that a hit is likely in the current or most recent bucket.
758                 //we start looking in the current active table and then in the previous and then backwards ...
759                 for (uint i = buckets; i > 0; i--)
760                 {
761                     if (tables[(active + i) % buckets].TryGetValue(key, out flooded))
762                     {
763                         if (!flooded)
764                         {
765                             tables[(active + i) % buckets][key] = true;
766                             return true;
767                         }
768                         else
769                             return false;
770                     }
771                 }
772                 tables[active].Add(key, true);
773                 return true;
774             }
775 
776             //it does not use locks and expects the caller to hold the lock.
Contains(byte[] key)777             internal int Contains(byte[] key)
778             {
779                 int cache = NotFound;
780                 uint i = 0;
781                 //check if the message is present in any of the buckets.
782                 //assumption is that a hit is likely in the current or most recent bucket.
783                 //we start looking in the current active table and then in the previous and then backwards ...
784                 for (i = buckets; i > 0; i--)
785                 {
786                     if (tables[(active + i) % buckets].ContainsKey(key))
787                         cache = (int)i;
788                 }
789                 if (cache < 0)
790                     return cache;
791                 cache = (int)((active + buckets - i) % buckets);
792                 return cache;
793             }
794 
OnTimeout(object state)795             void OnTimeout(object state)
796             {
797                 if (disposed)
798                     return;
799                 lock (ThisLock)
800                 {
801                     if (disposed)
802                         return;
803                     active = (active + 1) % (buckets);
804                     tables[active] = NewCache(tables[active].Count);
805                     messagePruningTimer.Set(PruningTimout);
806                 }
807             }
808 
NewCache(int capacity)809             Dictionary<byte[], bool> NewCache(int capacity)
810             {
811                 return new Dictionary<byte[], bool>(capacity, keyComparer);
812             }
813         }
814     }
815 
816 
817     // this class should contain a collection of IAsyncResults returned from neighbor.BeginSend
818     // and complete once all sends have completed
819     class FloodAsyncResult : AsyncResult
820     {
821         bool doneAdding = false;
822         Exception exception;
823         PeerNeighborManager pnm;
824 
825         // Double-checked locking pattern requires volatile for read/write synchronization
826         volatile bool isCompleted = false;
827         //async results who signaled completion but we have not called EndSend.
828         List<IAsyncResult> pending = new List<IAsyncResult>();
829         Dictionary<IAsyncResult, IPeerNeighbor> results = new Dictionary<IAsyncResult, IPeerNeighbor>();
830         bool shouldCallComplete = false;
831         object thisLock = new object();
832         TimeoutHelper timeoutHelper;
833         bool offNode = false;
834         public event EventHandler OnMessageSent;
835 
836 
FloodAsyncResult(PeerNeighborManager owner, TimeSpan timeout, AsyncCallback callback, object state)837         public FloodAsyncResult(PeerNeighborManager owner, TimeSpan timeout, AsyncCallback callback, object state)
838             : base(callback, state)
839         {
840             this.pnm = owner;
841             this.timeoutHelper = new TimeoutHelper(timeout);
842         }
843 
844         object ThisLock
845         {
846             get
847             {
848                 return thisLock;
849             }
850         }
851 
AddResult(IAsyncResult result, IPeerNeighbor neighbor)852         public void AddResult(IAsyncResult result, IPeerNeighbor neighbor)
853         {
854             lock (ThisLock)
855             {
856                 this.results.Add(result, neighbor);
857             }
858         }
859 
860         //user wants to end business. This method is called as a result of EndSend on the flooder.
861         //internal methods do not call this. we are asserting that this method should not be called in case of failed BeginX
End()862         public void End()
863         {
864             if (!(this.doneAdding && this.shouldCallComplete))
865             {
866                 throw Fx.AssertAndThrow("Unexpected end!");
867             }
868             if (this.isCompleted)
869             {
870                 return;
871             }
872 
873             //simply wait on the base's event handle
874             bool completed = TimeoutHelper.WaitOne(this.AsyncWaitHandle, this.timeoutHelper.RemainingTime());
875             if (!completed)
876             {
877                 // a time out occurred - if mo message went off node then tell AsyncResult to throw.
878                 if (!offNode)
879                 {
880                     try
881                     {
882                         throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new TimeoutException());
883                     }
884                     catch (Exception e)
885                     {
886                         if (Fx.IsFatal(e)) throw;
887                         DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
888                         this.exception = e;
889                     }
890                 }
891                 //otherwise trace that the timeout was not sufficient for complete send
892                 lock (ThisLock)
893                 {
894                     if (this.isCompleted)
895                         return;
896                     this.isCompleted = true;
897                 }
898                 CompleteOp(false);
899             }
900             AsyncResult.End<FloodAsyncResult>(this);
901         }
902 
903         //this method marks the end of BeginX by the flooder.
904         //if there were errors during BeginX, this method may be prematurely called
905         //in this case, our only job is to call EndX on successful BeginX calls. we do not report back to caller in this case.
906         //base.Complete will not be called and End() will not be called. User has already received exception during BeginX
907         //if there was no exception during BeginX, excep param is null. In this case, we call base.Complete upon the last EndX
MarkEnd(bool success)908         public void MarkEnd(bool success)
909         {
910             bool callComplete = false;
911             try
912             {
913                 lock (this.ThisLock)
914                 {
915                     foreach (IAsyncResult result in pending)
916                     {
917                         OnSendComplete(result);
918                     }
919                     pending.Clear();
920                     this.doneAdding = true;
921                     this.shouldCallComplete = success; //only call base.Complete if there is no error during BeginX
922                     if (this.results.Count == 0)
923                     {
924                         this.isCompleted = true;
925                         callComplete = true;
926                     }
927                 }
928             }
929             finally
930             {
931                 if (callComplete)
932                 {
933                     CompleteOp(true);
934                 }
935             }
936 
937         }
938 
939 
940         //this is the callback routine for async completion on channel BeginSend() operations.
941         //if we are done, simply return. This can happen if user called sync EndX.
942         //if the flooder is still processing BeginSend(), then we probably cant complete. In this case, add the result to pending and return
943         //main thread will flush the pending completions in MarkEnd().
944         //otherwise, call EndX on the result and remove it from results.
945         //if this is the last invoke, signal user using base.Complete AND isCompleted=true
OnSendComplete(IAsyncResult result)946         internal void OnSendComplete(IAsyncResult result)
947         {
948             bool callComplete = false;
949             IPeerNeighbor neighbor = null;
950             bool fatal = false;
951             if (isCompleted)
952                 return;
953             Message message = (Message)result.AsyncState;
954 
955             //wait until flooder had a chance to call all outgoing channels and give us Async results.
956             lock (ThisLock)
957             {
958                 if (isCompleted)
959                     return;
960 
961                 if (!this.results.TryGetValue(result, out neighbor))
962                 {
963                     if (!doneAdding)
964                         this.pending.Add(result);
965                     else
966                     {
967                         throw Fx.AssertAndThrow("IAsyncResult is un-accounted for.");
968                     }
969                     return;
970                 }
971                 this.results.Remove(result);
972 
973                 try
974                 {
975                     //try doing this only if the async result is marked !CompletedSynchronously.
976                     if (!result.CompletedSynchronously)
977                     {
978                         neighbor.EndSend(result);
979                         offNode = true;
980                         UtilityExtension.OnEndSend(neighbor, this);
981                     }
982                 }
983                 catch (Exception e)
984                 {
985                     if (Fx.IsFatal(e))
986                     {
987                         fatal = true;
988                         throw;
989                     }
990 
991                     Exception temp = PeerFlooder.CloseNeighborIfKnownException(pnm, e, neighbor);
992                     //we want to return the very first exception to the user.
993                     if (temp != null && this.doneAdding && !this.shouldCallComplete)
994                         throw;
995                     if (this.exception == null)
996                     {
997                         this.exception = temp;
998                     }
999                     DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
1000                 }
1001                 finally
1002                 {
1003                     if (message != null && !result.CompletedSynchronously && !fatal)
1004                         message.Close();
1005                 }
1006                 //dont want to call Complete from the lock.
1007                 //we just decide if this thread should call complete and call outside the lock.
1008                 if (this.results.Count == 0 && this.doneAdding && this.shouldCallComplete)
1009                 {
1010                     this.isCompleted = true;
1011                     callComplete = true;
1012                 }
1013             }
1014             //if we are done with callbacks and beginx calls,
1015             if (callComplete && this.shouldCallComplete)
1016             {
1017                 CompleteOp(false);
1018             }
1019         }
1020 
CompleteOp(bool sync)1021         void CompleteOp(bool sync)
1022         {
1023             //call the callback upon finish
1024             OnMessageSent(this, EventArgs.Empty);
1025             base.Complete(sync, this.exception);
1026         }
1027 
1028     }
1029 }
1030