1 //------------------------------------------------------------ 2 // Copyright (c) Microsoft Corporation. All rights reserved. 3 //------------------------------------------------------------ 4 namespace System.ServiceModel.Channels 5 { 6 using System.Collections.Generic; 7 using System.Diagnostics; 8 using System.Runtime; 9 using System.Runtime.Serialization; 10 using System.Security; 11 using System.Security.Permissions; 12 using System.ServiceModel; 13 using System.ServiceModel.Diagnostics; 14 using System.ServiceModel.Dispatcher; 15 using System.ServiceModel.Security; 16 using System.Threading; 17 using System.Xml; 18 19 class PeerFlooder : PeerFlooderSimple 20 { PeerFlooder(PeerNodeConfig config, PeerNeighborManager neighborManager)21 PeerFlooder(PeerNodeConfig config, PeerNeighborManager neighborManager) : base(config, neighborManager) { } 22 CreateFlooder(PeerNodeConfig config, PeerNeighborManager neighborManager, IPeerNodeMessageHandling messageHandler)23 public static PeerFlooder CreateFlooder(PeerNodeConfig config, PeerNeighborManager neighborManager, IPeerNodeMessageHandling messageHandler) 24 { 25 PeerFlooder flooder = new PeerFlooder(config, neighborManager); 26 flooder.messageHandler = messageHandler; 27 return flooder; 28 } 29 } 30 31 interface IFlooderForThrottle 32 { OnThrottleReached()33 void OnThrottleReached(); OnThrottleReleased()34 void OnThrottleReleased(); 35 } 36 37 abstract class PeerFlooderBase<TFloodContract, TLinkContract> : IFlooderForThrottle, IPeerFlooderContract<TFloodContract, TLinkContract> where TFloodContract : Message 38 { 39 protected PeerNodeConfig config; 40 protected PeerNeighborManager neighborManager; 41 protected List<IPeerNeighbor> neighbors; 42 object thisLock = new object(); 43 44 internal IPeerNodeMessageHandling messageHandler; 45 internal PeerThrottleHelper quotaHelper; 46 long messageSequence; 47 48 public event EventHandler ThrottleReached; 49 public event EventHandler SlowNeighborKilled; 50 public event EventHandler ThrottleReleased; 51 public EventHandler OnMessageSentHandler; 52 53 PeerFlooderBase(PeerNodeConfig config, PeerNeighborManager neighborManager)54 public PeerFlooderBase(PeerNodeConfig config, PeerNeighborManager neighborManager) 55 { 56 this.neighborManager = neighborManager; 57 this.neighbors = new List<IPeerNeighbor>(); 58 this.config = config; 59 this.neighbors = this.neighborManager.GetConnectedNeighbors(); 60 this.quotaHelper = new PeerThrottleHelper(this, this.config.MaxPendingOutgoingCalls); 61 OnMessageSentHandler = new EventHandler(OnMessageSent); 62 } 63 PruneNeighborCallback(IPeerNeighbor peer)64 void PruneNeighborCallback(IPeerNeighbor peer) 65 { 66 lock (ThisLock) 67 { 68 if (this.Neighbors.Count <= 1) 69 return; 70 if (DiagnosticUtility.ShouldTraceWarning) 71 { 72 string message = SR.GetString(SR.PeerThrottlePruning, this.config.MeshId); 73 PeerThrottleTraceRecord record = new PeerThrottleTraceRecord(this.config.MeshId, message); 74 TraceUtility.TraceEvent(TraceEventType.Warning, TraceCode.PeerFlooderReceiveMessageQuotaExceeded, 75 SR.GetString(SR.TraceCodePeerFlooderReceiveMessageQuotaExceeded), record, this, null); 76 } 77 } 78 try 79 { 80 peer.Abort(PeerCloseReason.NodeTooSlow, PeerCloseInitiator.LocalNode); 81 } 82 catch (Exception e) 83 { 84 if (Fx.IsFatal(e)) throw; 85 if (null != CloseNeighborIfKnownException(neighborManager, e, peer)) 86 { 87 throw; 88 } 89 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information); 90 } 91 } 92 IFlooderForThrottle.OnThrottleReached()93 void IFlooderForThrottle.OnThrottleReached() 94 { 95 if (DiagnosticUtility.ShouldTraceInformation) 96 { 97 string message = SR.GetString(SR.PeerThrottleWaiting, this.config.MeshId); 98 PeerThrottleTraceRecord record = new PeerThrottleTraceRecord(this.config.MeshId, message); 99 TraceUtility.TraceEvent(TraceEventType.Information, TraceCode.PeerFlooderReceiveMessageQuotaExceeded, 100 SR.GetString(SR.TraceCodePeerFlooderReceiveMessageQuotaExceeded), record, this, null); 101 } 102 103 IPeerNeighbor peer = this.neighborManager.SlowestNeighbor(); 104 if (peer == null) 105 return; 106 UtilityExtension extension = peer.Utility; 107 if (peer.IsConnected && extension != null) 108 { 109 if (extension.PendingMessages > PeerTransportConstants.MessageThreshold) 110 { 111 extension.BeginCheckPoint(new UtilityExtension.PruneNeighborCallback(PruneNeighborCallback)); 112 } 113 else 114 { 115 Fx.Assert(false, "Neighbor is marked slow with messages " + extension.PendingMessages); 116 } 117 FireReachedEvent(); 118 } 119 } 120 IFlooderForThrottle.OnThrottleReleased()121 void IFlooderForThrottle.OnThrottleReleased() 122 { 123 FireDequeuedEvent(); 124 } 125 FireDequeuedEvent()126 public void FireDequeuedEvent() { FireEvent(ThrottleReleased); } 127 FireReachedEvent()128 public void FireReachedEvent() { FireEvent(ThrottleReached); } 129 FireKilledEvent()130 public void FireKilledEvent() { FireEvent(SlowNeighborKilled); } 131 FireEvent(EventHandler handler)132 void FireEvent(EventHandler handler) 133 { 134 if (handler != null) 135 handler(this, EventArgs.Empty); 136 } 137 138 [PermissionSet(SecurityAction.Demand, Unrestricted = true), SecuritySafeCritical] BeginFloodEncodedMessage(byte[] id, MessageBuffer encodedMessage, TimeSpan timeout, AsyncCallback callback, object state)139 public virtual IAsyncResult BeginFloodEncodedMessage(byte[] id, MessageBuffer encodedMessage, TimeSpan timeout, AsyncCallback callback, object state) 140 { 141 RecordOutgoingMessage(id); 142 SynchronizationContext syncContext = ThreadBehavior.GetCurrentSynchronizationContext(); 143 SynchronizationContext.SetSynchronizationContext(null); 144 145 if (neighbors.Count == 0) 146 { 147 return new CompletedAsyncResult(callback, state); 148 } 149 try 150 { 151 return FloodMessageToNeighbors(encodedMessage, timeout, callback, state, -1, null, null, OnMessageSentHandler); 152 } 153 finally 154 { 155 SynchronizationContext.SetSynchronizationContext(syncContext); 156 } 157 158 } 159 BeginFloodReceivedMessage(IPeerNeighbor sender, MessageBuffer messageBuffer, TimeSpan timeout, AsyncCallback callback, object state, int index, MessageHeader hopHeader)160 protected virtual IAsyncResult BeginFloodReceivedMessage(IPeerNeighbor sender, MessageBuffer messageBuffer, 161 TimeSpan timeout, AsyncCallback callback, object state, int index, MessageHeader hopHeader) 162 { 163 quotaHelper.AcquireNoQueue(); 164 165 try 166 { 167 return FloodMessageToNeighbors(messageBuffer, timeout, callback, state, index, hopHeader, sender, OnMessageSentHandler); 168 } 169 catch (Exception e) 170 { 171 if (Fx.IsFatal(e)) throw; 172 if (e is QuotaExceededException || (e is CommunicationException && e.InnerException is QuotaExceededException)) 173 { 174 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information); 175 if (DiagnosticUtility.ShouldTraceError) 176 { 177 PeerFlooderTraceRecord record = new PeerFlooderTraceRecord( 178 this.config.MeshId, 179 sender.ListenAddress, 180 e); 181 TraceUtility.TraceEvent( 182 TraceEventType.Error, 183 TraceCode.PeerFlooderReceiveMessageQuotaExceeded, 184 SR.GetString(SR.TraceCodePeerFlooderReceiveMessageQuotaExceeded), 185 record, 186 this, 187 null); 188 } 189 return null; 190 } 191 throw; 192 } 193 } 194 BeginSendHelper(IPeerNeighbor neighbor, TimeSpan timeout, Message message, FloodAsyncResult fresult)195 protected IAsyncResult BeginSendHelper(IPeerNeighbor neighbor, TimeSpan timeout, Message message, FloodAsyncResult fresult) 196 { 197 IAsyncResult result = null; 198 bool fatal = false; 199 try 200 { 201 UtilityExtension.OnMessageSent(neighbor); 202 result = neighbor.BeginSend(message, timeout, Fx.ThunkCallback(new AsyncCallback(fresult.OnSendComplete)), message); 203 fresult.AddResult(result, neighbor); 204 if (result.CompletedSynchronously) 205 { 206 neighbor.EndSend(result); 207 UtilityExtension.OnEndSend(neighbor, fresult); 208 } 209 return result; 210 } 211 catch (Exception e) 212 { 213 if (Fx.IsFatal(e)) 214 { 215 fatal = true; 216 throw; 217 } 218 if (null != CloseNeighborIfKnownException(neighborManager, e, neighbor)) 219 { 220 fresult.MarkEnd(false); 221 throw; 222 } 223 224 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information); 225 return null; 226 } 227 finally 228 { 229 if ((result == null || result.CompletedSynchronously) && !fatal) 230 message.Close(); 231 232 } 233 } 234 OnMessageSent(object sender, EventArgs args)235 public void OnMessageSent(object sender, EventArgs args) 236 { 237 quotaHelper.ItemDequeued(); 238 } 239 KillSlowNeighbor()240 void KillSlowNeighbor() 241 { 242 IPeerNeighbor neighbor = this.neighborManager.SlowestNeighbor(); 243 if (neighbor != null) 244 neighbor.Abort(PeerCloseReason.NodeTooSlow, PeerCloseInitiator.LocalNode); 245 } 246 247 FloodMessageToNeighbors(MessageBuffer messageBuffer, TimeSpan timeout, AsyncCallback callback, object state, int index, MessageHeader hopHeader, IPeerNeighbor except, EventHandler OnMessageSentCallback)248 protected virtual IAsyncResult FloodMessageToNeighbors(MessageBuffer messageBuffer, 249 TimeSpan timeout, AsyncCallback callback, object state, 250 int index, MessageHeader hopHeader, IPeerNeighbor except, 251 EventHandler OnMessageSentCallback) 252 { 253 long temp = Interlocked.Increment(ref messageSequence); 254 FloodAsyncResult fresult = new FloodAsyncResult(this.neighborManager, timeout, callback, state); 255 fresult.OnMessageSent += OnMessageSentCallback; 256 List<IPeerNeighbor> neighbors = this.Neighbors; 257 258 foreach (IPeerNeighbor neighbor in neighbors) 259 { 260 if (neighbor.Equals(except)) 261 continue; 262 // Don't do anything if the neighbor is not connected 263 if (PeerNeighborStateHelper.IsConnected(neighbor.State)) 264 { 265 Message fmessage = messageBuffer.CreateMessage(); 266 if (index != -1) 267 { 268 fmessage.Headers.ReplaceAt(index, hopHeader); 269 } 270 271 // Don't do anything if the neighbor is not connected 272 if (PeerNeighborStateHelper.IsConnected(neighbor.State)) 273 { 274 BeginSendHelper(neighbor, timeout, fmessage, fresult); 275 } 276 } 277 } 278 fresult.MarkEnd(true); 279 return fresult; 280 281 } 282 Open()283 public void Open() 284 { 285 OnOpen(); 286 } 287 Close()288 public void Close() 289 { 290 OnClose(); 291 } 292 OnOpen()293 public abstract void OnOpen(); 294 OnClose()295 public abstract void OnClose(); 296 OnNeighborConnected(IPeerNeighbor neighbor)297 public virtual void OnNeighborConnected(IPeerNeighbor neighbor) 298 { 299 this.neighbors = this.neighborManager.GetConnectedNeighbors(); 300 } 301 OnNeighborClosed(IPeerNeighbor neighbor)302 public virtual void OnNeighborClosed(IPeerNeighbor neighbor) 303 { 304 this.neighbors = this.neighborManager.GetConnectedNeighbors(); 305 } 306 ProcessLinkUtility(IPeerNeighbor neighbor, TLinkContract utilityInfo)307 public abstract void ProcessLinkUtility(IPeerNeighbor neighbor, TLinkContract utilityInfo); 308 ShouldProcess(TFloodContract floodInfo)309 public abstract bool ShouldProcess(TFloodContract floodInfo); RecordOutgoingMessage(byte[] id)310 public abstract void RecordOutgoingMessage(byte[] id); 311 UpdateHopCount(Message message, out MessageHeader hopHeader, out ulong currentValue)312 int UpdateHopCount(Message message, out MessageHeader hopHeader, out ulong currentValue) 313 { 314 int index = -1; 315 currentValue = PeerTransportConstants.MaxHopCount; 316 hopHeader = null; 317 try 318 { 319 // If a message contains multiple Hopcounts with our name and namespace or the message can't deserialize to a ulong then ignore the HopCount 320 index = message.Headers.FindHeader(PeerStrings.HopCountElementName, PeerStrings.HopCountElementNamespace); 321 if (index != -1) 322 { 323 currentValue = PeerMessageHelpers.GetHeaderULong(message.Headers, index); 324 hopHeader = MessageHeader.CreateHeader(PeerStrings.HopCountElementName, PeerStrings.HopCountElementNamespace, --currentValue, false); 325 } 326 } 327 catch (MessageHeaderException e) 328 { 329 DiagnosticUtility.TraceHandledException(e, TraceEventType.Warning); 330 } 331 catch (CommunicationException e) 332 { 333 DiagnosticUtility.TraceHandledException(e, TraceEventType.Warning); 334 } 335 catch (SerializationException e) 336 { 337 DiagnosticUtility.TraceHandledException(e, TraceEventType.Warning); 338 } 339 catch (XmlException e) 340 { 341 DiagnosticUtility.TraceHandledException(e, TraceEventType.Warning); 342 } 343 Fx.Assert((index == -1) || (hopHeader != null), "Could not successfully create new HopCount Header!"); 344 return index; 345 } 346 OnFloodedMessage(IPeerNeighbor neighbor, TFloodContract floodInfo, AsyncCallback callback, object state)347 public virtual IAsyncResult OnFloodedMessage(IPeerNeighbor neighbor, TFloodContract floodInfo, AsyncCallback callback, object state) 348 { 349 bool process = false; 350 MessageBuffer messageBuffer = null; 351 Message message = null; 352 Uri via; 353 Uri to; 354 int index = 0; 355 ulong remainingHops = PeerTransportConstants.MaxHopCount; 356 MessageHeader hopHeader = null; 357 bool fatal = false; 358 PeerMessageProperty peerProperty = null; 359 IAsyncResult result = null; 360 361 try 362 { 363 peerProperty = (PeerMessageProperty)floodInfo.Properties[PeerStrings.PeerProperty]; 364 if (!peerProperty.MessageVerified) 365 { 366 if (peerProperty.CacheMiss > UtilityExtension.AcceptableMissDistance) 367 { 368 UtilityExtension.ReportCacheMiss(neighbor, peerProperty.CacheMiss); 369 } 370 result = new CompletedAsyncResult(callback, state); 371 } 372 else 373 { 374 process = true; 375 messageBuffer = floodInfo.CreateBufferedCopy((int)this.config.MaxReceivedMessageSize); 376 message = messageBuffer.CreateMessage(); 377 via = peerProperty.PeerVia; 378 to = peerProperty.PeerTo; 379 message.Headers.To = message.Properties.Via = via; 380 381 index = UpdateHopCount(message, out hopHeader, out remainingHops); 382 383 PeerMessagePropagation propagateFlags = PeerMessagePropagation.LocalAndRemote; 384 if (peerProperty.SkipLocalChannels) 385 propagateFlags = PeerMessagePropagation.Remote; 386 else if (messageHandler.HasMessagePropagation) 387 { 388 using (Message filterMessage = messageBuffer.CreateMessage()) 389 { 390 propagateFlags = messageHandler.DetermineMessagePropagation(filterMessage, PeerMessageOrigination.Remote); 391 } 392 } 393 394 if ((propagateFlags & PeerMessagePropagation.Remote) != 0) 395 { 396 if (remainingHops == 0) 397 propagateFlags &= ~PeerMessagePropagation.Remote; 398 } 399 if ((propagateFlags & PeerMessagePropagation.Remote) != 0) 400 { 401 result = BeginFloodReceivedMessage(neighbor, messageBuffer, PeerTransportConstants.ForwardTimeout, callback, state, index, hopHeader); 402 } 403 else 404 { 405 result = new CompletedAsyncResult(callback, state); 406 } 407 if ((propagateFlags & PeerMessagePropagation.Local) != 0) 408 { 409 messageHandler.HandleIncomingMessage(messageBuffer, propagateFlags, index, hopHeader, via, to); 410 } 411 } 412 UtilityExtension.UpdateLinkUtility(neighbor, process); 413 } 414 catch (Exception e) 415 { 416 if (Fx.IsFatal(e)) 417 { 418 fatal = true; 419 throw; 420 } 421 if (null != CloseNeighborIfKnownException(neighborManager, e, neighbor)) 422 { 423 throw; 424 } 425 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information); 426 } 427 finally 428 { 429 if (!fatal) 430 { 431 if (message != null) 432 message.Close(); 433 if (messageBuffer != null) 434 messageBuffer.Close(); 435 } 436 } 437 return result; 438 } 439 EndFloodMessage(IAsyncResult result)440 public virtual void EndFloodMessage(IAsyncResult result) 441 { 442 if (result is CompletedAsyncResult) 443 { 444 CompletedAsyncResult.End(result); 445 return; 446 } 447 FloodAsyncResult fresult = result as FloodAsyncResult; 448 Fx.Assert(fresult != null, "Invalid AsyncResult type in EndFloodResult"); 449 fresult.End(); 450 451 } 452 453 protected long MaxReceivedMessageSize 454 { 455 get { return config.MaxReceivedMessageSize; } 456 } 457 458 protected MessageEncoder MessageEncoder 459 { 460 get { return config.MessageEncoder; } 461 } 462 463 protected object ThisLock 464 { 465 get { return this.thisLock; } 466 } 467 468 protected List<IPeerNeighbor> Neighbors 469 { 470 get { return this.neighbors; } 471 } 472 473 // Guaranteed not to throw anything other than fatal exceptions CloseNeighborIfKnownException(PeerNeighborManager neighborManager, Exception exception, IPeerNeighbor peer)474 static internal Exception CloseNeighborIfKnownException(PeerNeighborManager neighborManager, Exception exception, IPeerNeighbor peer) 475 { 476 try 477 { 478 //ignore this one since the channel is already closed. 479 if (exception is ObjectDisposedException) 480 return null; 481 else if ( 482 (exception is CommunicationException && !(exception.InnerException is QuotaExceededException)) 483 || (exception is TimeoutException) 484 || (exception is InvalidOperationException) 485 || (exception is MessageSecurityException) 486 ) 487 { 488 //is this the right close reason? 489 neighborManager.CloseNeighbor(peer, PeerCloseReason.InternalFailure, PeerCloseInitiator.LocalNode, exception); 490 return null; 491 } 492 else 493 { 494 //exception that we dont know or cant act on. 495 //we will throw this exception to the user. 496 return exception; 497 } 498 } 499 catch (Exception e) 500 { 501 if (Fx.IsFatal(e)) throw; 502 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information); 503 return e; 504 } 505 } 506 EndFloodEncodedMessage(IAsyncResult result)507 public static void EndFloodEncodedMessage(IAsyncResult result) 508 { 509 CompletedAsyncResult cresult = result as CompletedAsyncResult; 510 if (cresult != null) 511 CompletedAsyncResult.End(result); 512 else 513 { 514 FloodAsyncResult fresult = result as FloodAsyncResult; 515 if (fresult == null) 516 throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgument("result", SR.GetString(SR.InvalidAsyncResult)); 517 fresult.End(); 518 } 519 } 520 EndFloodReceivedMessage(IAsyncResult result)521 public void EndFloodReceivedMessage(IAsyncResult result) 522 { 523 FloodAsyncResult fresult = result as FloodAsyncResult; 524 Fx.Assert(fresult != null, "Invalid FloodAsyncResult instance during EndFloodReceivedMessage"); 525 } 526 527 528 public class PeerThrottleHelper 529 { 530 int outgoingEnqueuedCount = 0; 531 int outgoingQuota = 128; 532 IFlooderForThrottle flooder; 533 534 PeerThrottleHelper(IFlooderForThrottle flooder, int outgoingLimit)535 public PeerThrottleHelper(IFlooderForThrottle flooder, int outgoingLimit) 536 { 537 this.outgoingQuota = outgoingLimit; 538 this.flooder = flooder; 539 } 540 ItemDequeued()541 public void ItemDequeued() 542 { 543 Interlocked.Decrement(ref outgoingEnqueuedCount); 544 } 545 AcquireNoQueue()546 public void AcquireNoQueue() 547 { 548 int value = Interlocked.Increment(ref outgoingEnqueuedCount); 549 if (value >= outgoingQuota) 550 { 551 flooder.OnThrottleReached(); 552 } 553 } 554 } 555 } 556 557 class PeerFlooderSimple : PeerFlooderBase<Message, UtilityInfo> 558 { 559 ListManager messageIds; 560 const uint MaxBuckets = 5; 561 PeerFlooderSimple(PeerNodeConfig config, PeerNeighborManager neighborManager)562 internal PeerFlooderSimple(PeerNodeConfig config, PeerNeighborManager neighborManager) 563 : base(config, neighborManager) 564 { 565 //we want a message id cache that holds message ids for atmost 5 mins. 566 this.messageIds = new ListManager(MaxBuckets); 567 } 568 ShouldProcess(Message message)569 public override bool ShouldProcess(Message message) 570 { 571 return message.Properties.ContainsKey(PeerStrings.MessageVerified); 572 } IsNotSeenBefore(Message message, out byte[] id, out int cacheHit)573 public bool IsNotSeenBefore(Message message, out byte[] id, out int cacheHit) 574 { 575 cacheHit = -1; 576 id = PeerNodeImplementation.DefaultId; 577 if (message is SecurityVerifiedMessage) 578 { 579 id = (message as SecurityVerifiedMessage).PrimarySignatureValue; 580 581 } 582 else 583 { 584 System.Xml.UniqueId messageId = PeerMessageHelpers.GetHeaderUniqueId(message.Headers, PeerStrings.MessageId, PeerStrings.Namespace); 585 if (messageId == null) 586 return false; 587 if (messageId.IsGuid) 588 { 589 id = new byte[16]; 590 messageId.TryGetGuid(id, 0); 591 } 592 else 593 return false; 594 } 595 cacheHit = messageIds.AddForLookup(id); 596 if (cacheHit == -1) 597 { 598 return true; 599 } 600 return false; 601 602 } 603 RecordOutgoingMessage(byte[] id)604 public override void RecordOutgoingMessage(byte[] id) 605 { 606 this.messageIds.AddForFlood(id); 607 } 608 OnOpen()609 public override void OnOpen() 610 { 611 } 612 OnClose()613 public override void OnClose() 614 { 615 this.messageIds.Close(); 616 } 617 618 OnFloodedMessage(IPeerNeighbor neighbor, Message floodInfo, AsyncCallback callback, object state)619 public override IAsyncResult OnFloodedMessage(IPeerNeighbor neighbor, Message floodInfo, AsyncCallback callback, object state) 620 { 621 return base.OnFloodedMessage(neighbor, floodInfo, callback, state); 622 } 623 EndFloodMessage(IAsyncResult result)624 public override void EndFloodMessage(IAsyncResult result) 625 { 626 base.EndFloodMessage(result); 627 628 } 629 ProcessLinkUtility(IPeerNeighbor neighbor, UtilityInfo utilityInfo)630 public override void ProcessLinkUtility(IPeerNeighbor neighbor, UtilityInfo utilityInfo) 631 { 632 if (!PeerNeighborStateHelper.IsConnected(neighbor.State)) 633 { 634 neighbor.Abort(PeerCloseReason.InvalidNeighbor, PeerCloseInitiator.LocalNode); 635 return; 636 } 637 638 try 639 { 640 UtilityExtension.ProcessLinkUtility(neighbor, utilityInfo); 641 } 642 catch (Exception e) 643 { 644 if (Fx.IsFatal(e)) throw; 645 if (null != CloseNeighborIfKnownException(neighborManager, e, neighbor)) 646 { 647 throw; 648 } 649 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information); 650 } 651 } 652 653 class ListManager 654 { 655 uint active; //current bucket. 656 readonly uint buckets; 657 658 // Double-checked locking pattern requires volatile for read/write synchronization 659 volatile bool disposed = false; 660 IOThreadTimer messagePruningTimer; 661 //we service the hashtables every one minute 662 static readonly int PruningTimout = 60 * 1000; 663 static readonly int InitialCount = 1000; 664 Dictionary<byte[], bool>[] tables; 665 //Hashtable[] tables; 666 object thisLock; 667 static InMemoryNonceCache.NonceCacheImpl.NonceKeyComparer keyComparer = new InMemoryNonceCache.NonceCacheImpl.NonceKeyComparer(); 668 const int NotFound = -1; 669 //creating this ListManager with n implies that the entries will be available for n minutes atmost. 670 //in the n+1 minute, the timer message handler will kick in to clear older messages. 671 //every minute, the ListManager(uint buckets)672 public ListManager(uint buckets) 673 { 674 if (!(buckets > 1)) 675 { 676 throw Fx.AssertAndThrow("ListManager should be used atleast with 2 buckets"); 677 } 678 this.buckets = buckets; 679 tables = new Dictionary<byte[], bool>[buckets]; 680 681 for (uint i = 0; i < buckets; i++) 682 { 683 tables[i] = NewCache(InitialCount); 684 } 685 //create a timer and kickit off for 1 minute 686 messagePruningTimer = new IOThreadTimer(new Action<object>(OnTimeout), null, false); 687 messagePruningTimer.Set(PruningTimout); 688 this.active = 0; 689 this.disposed = false; 690 this.thisLock = new object(); 691 } 692 693 object ThisLock 694 { 695 get 696 { 697 return thisLock; 698 } 699 } 700 AddForLookup(byte[] key)701 public int AddForLookup(byte[] key) 702 { 703 int table = NotFound; 704 if (disposed) 705 { 706 throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new InvalidOperationException(SR.GetString(SR.PeerFlooderDisposed))); 707 } 708 709 lock (ThisLock) 710 { 711 if ((table = Contains(key)) == NotFound) 712 { 713 tables[active].Add(key, false); 714 } 715 return table; 716 } 717 } 718 AddForFlood(byte[] key)719 public bool AddForFlood(byte[] key) 720 { 721 if (disposed) 722 { 723 throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new InvalidOperationException(SR.GetString(SR.PeerFlooderDisposed))); 724 } 725 726 lock (ThisLock) 727 { 728 if (UpdateFloodEntry(key)) 729 { 730 return true; 731 } 732 else 733 { 734 return false; 735 } 736 } 737 } 738 Close()739 internal void Close() 740 { 741 lock (ThisLock) 742 { 743 if (disposed) 744 return; 745 messagePruningTimer.Cancel(); 746 messagePruningTimer = null; 747 tables = null; 748 disposed = true; 749 } 750 } 751 752 //it does not use locks and expects the caller to hold the lock. UpdateFloodEntry(byte[] key)753 internal bool UpdateFloodEntry(byte[] key) 754 { 755 bool flooded = false; 756 //check if the message is present in any of the buckets. 757 //assumption is that a hit is likely in the current or most recent bucket. 758 //we start looking in the current active table and then in the previous and then backwards ... 759 for (uint i = buckets; i > 0; i--) 760 { 761 if (tables[(active + i) % buckets].TryGetValue(key, out flooded)) 762 { 763 if (!flooded) 764 { 765 tables[(active + i) % buckets][key] = true; 766 return true; 767 } 768 else 769 return false; 770 } 771 } 772 tables[active].Add(key, true); 773 return true; 774 } 775 776 //it does not use locks and expects the caller to hold the lock. Contains(byte[] key)777 internal int Contains(byte[] key) 778 { 779 int cache = NotFound; 780 uint i = 0; 781 //check if the message is present in any of the buckets. 782 //assumption is that a hit is likely in the current or most recent bucket. 783 //we start looking in the current active table and then in the previous and then backwards ... 784 for (i = buckets; i > 0; i--) 785 { 786 if (tables[(active + i) % buckets].ContainsKey(key)) 787 cache = (int)i; 788 } 789 if (cache < 0) 790 return cache; 791 cache = (int)((active + buckets - i) % buckets); 792 return cache; 793 } 794 OnTimeout(object state)795 void OnTimeout(object state) 796 { 797 if (disposed) 798 return; 799 lock (ThisLock) 800 { 801 if (disposed) 802 return; 803 active = (active + 1) % (buckets); 804 tables[active] = NewCache(tables[active].Count); 805 messagePruningTimer.Set(PruningTimout); 806 } 807 } 808 NewCache(int capacity)809 Dictionary<byte[], bool> NewCache(int capacity) 810 { 811 return new Dictionary<byte[], bool>(capacity, keyComparer); 812 } 813 } 814 } 815 816 817 // this class should contain a collection of IAsyncResults returned from neighbor.BeginSend 818 // and complete once all sends have completed 819 class FloodAsyncResult : AsyncResult 820 { 821 bool doneAdding = false; 822 Exception exception; 823 PeerNeighborManager pnm; 824 825 // Double-checked locking pattern requires volatile for read/write synchronization 826 volatile bool isCompleted = false; 827 //async results who signaled completion but we have not called EndSend. 828 List<IAsyncResult> pending = new List<IAsyncResult>(); 829 Dictionary<IAsyncResult, IPeerNeighbor> results = new Dictionary<IAsyncResult, IPeerNeighbor>(); 830 bool shouldCallComplete = false; 831 object thisLock = new object(); 832 TimeoutHelper timeoutHelper; 833 bool offNode = false; 834 public event EventHandler OnMessageSent; 835 836 FloodAsyncResult(PeerNeighborManager owner, TimeSpan timeout, AsyncCallback callback, object state)837 public FloodAsyncResult(PeerNeighborManager owner, TimeSpan timeout, AsyncCallback callback, object state) 838 : base(callback, state) 839 { 840 this.pnm = owner; 841 this.timeoutHelper = new TimeoutHelper(timeout); 842 } 843 844 object ThisLock 845 { 846 get 847 { 848 return thisLock; 849 } 850 } 851 AddResult(IAsyncResult result, IPeerNeighbor neighbor)852 public void AddResult(IAsyncResult result, IPeerNeighbor neighbor) 853 { 854 lock (ThisLock) 855 { 856 this.results.Add(result, neighbor); 857 } 858 } 859 860 //user wants to end business. This method is called as a result of EndSend on the flooder. 861 //internal methods do not call this. we are asserting that this method should not be called in case of failed BeginX End()862 public void End() 863 { 864 if (!(this.doneAdding && this.shouldCallComplete)) 865 { 866 throw Fx.AssertAndThrow("Unexpected end!"); 867 } 868 if (this.isCompleted) 869 { 870 return; 871 } 872 873 //simply wait on the base's event handle 874 bool completed = TimeoutHelper.WaitOne(this.AsyncWaitHandle, this.timeoutHelper.RemainingTime()); 875 if (!completed) 876 { 877 // a time out occurred - if mo message went off node then tell AsyncResult to throw. 878 if (!offNode) 879 { 880 try 881 { 882 throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new TimeoutException()); 883 } 884 catch (Exception e) 885 { 886 if (Fx.IsFatal(e)) throw; 887 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information); 888 this.exception = e; 889 } 890 } 891 //otherwise trace that the timeout was not sufficient for complete send 892 lock (ThisLock) 893 { 894 if (this.isCompleted) 895 return; 896 this.isCompleted = true; 897 } 898 CompleteOp(false); 899 } 900 AsyncResult.End<FloodAsyncResult>(this); 901 } 902 903 //this method marks the end of BeginX by the flooder. 904 //if there were errors during BeginX, this method may be prematurely called 905 //in this case, our only job is to call EndX on successful BeginX calls. we do not report back to caller in this case. 906 //base.Complete will not be called and End() will not be called. User has already received exception during BeginX 907 //if there was no exception during BeginX, excep param is null. In this case, we call base.Complete upon the last EndX MarkEnd(bool success)908 public void MarkEnd(bool success) 909 { 910 bool callComplete = false; 911 try 912 { 913 lock (this.ThisLock) 914 { 915 foreach (IAsyncResult result in pending) 916 { 917 OnSendComplete(result); 918 } 919 pending.Clear(); 920 this.doneAdding = true; 921 this.shouldCallComplete = success; //only call base.Complete if there is no error during BeginX 922 if (this.results.Count == 0) 923 { 924 this.isCompleted = true; 925 callComplete = true; 926 } 927 } 928 } 929 finally 930 { 931 if (callComplete) 932 { 933 CompleteOp(true); 934 } 935 } 936 937 } 938 939 940 //this is the callback routine for async completion on channel BeginSend() operations. 941 //if we are done, simply return. This can happen if user called sync EndX. 942 //if the flooder is still processing BeginSend(), then we probably cant complete. In this case, add the result to pending and return 943 //main thread will flush the pending completions in MarkEnd(). 944 //otherwise, call EndX on the result and remove it from results. 945 //if this is the last invoke, signal user using base.Complete AND isCompleted=true OnSendComplete(IAsyncResult result)946 internal void OnSendComplete(IAsyncResult result) 947 { 948 bool callComplete = false; 949 IPeerNeighbor neighbor = null; 950 bool fatal = false; 951 if (isCompleted) 952 return; 953 Message message = (Message)result.AsyncState; 954 955 //wait until flooder had a chance to call all outgoing channels and give us Async results. 956 lock (ThisLock) 957 { 958 if (isCompleted) 959 return; 960 961 if (!this.results.TryGetValue(result, out neighbor)) 962 { 963 if (!doneAdding) 964 this.pending.Add(result); 965 else 966 { 967 throw Fx.AssertAndThrow("IAsyncResult is un-accounted for."); 968 } 969 return; 970 } 971 this.results.Remove(result); 972 973 try 974 { 975 //try doing this only if the async result is marked !CompletedSynchronously. 976 if (!result.CompletedSynchronously) 977 { 978 neighbor.EndSend(result); 979 offNode = true; 980 UtilityExtension.OnEndSend(neighbor, this); 981 } 982 } 983 catch (Exception e) 984 { 985 if (Fx.IsFatal(e)) 986 { 987 fatal = true; 988 throw; 989 } 990 991 Exception temp = PeerFlooder.CloseNeighborIfKnownException(pnm, e, neighbor); 992 //we want to return the very first exception to the user. 993 if (temp != null && this.doneAdding && !this.shouldCallComplete) 994 throw; 995 if (this.exception == null) 996 { 997 this.exception = temp; 998 } 999 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information); 1000 } 1001 finally 1002 { 1003 if (message != null && !result.CompletedSynchronously && !fatal) 1004 message.Close(); 1005 } 1006 //dont want to call Complete from the lock. 1007 //we just decide if this thread should call complete and call outside the lock. 1008 if (this.results.Count == 0 && this.doneAdding && this.shouldCallComplete) 1009 { 1010 this.isCompleted = true; 1011 callComplete = true; 1012 } 1013 } 1014 //if we are done with callbacks and beginx calls, 1015 if (callComplete && this.shouldCallComplete) 1016 { 1017 CompleteOp(false); 1018 } 1019 } 1020 CompleteOp(bool sync)1021 void CompleteOp(bool sync) 1022 { 1023 //call the callback upon finish 1024 OnMessageSent(this, EventArgs.Empty); 1025 base.Complete(sync, this.exception); 1026 } 1027 1028 } 1029 } 1030