1 //------------------------------------------------------------ 2 // Copyright (c) Microsoft Corporation. All rights reserved. 3 //------------------------------------------------------------ 4 namespace System.ServiceModel.Channels 5 { 6 using System.Diagnostics; 7 using System.Runtime; 8 using System.ServiceModel; 9 using System.ServiceModel.Description; 10 using System.Threading; 11 12 class UtilityExtension : IExtension<IPeerNeighbor> 13 { 14 uint linkUtility; 15 uint updateCount; 16 IOThreadTimer ackTimer; 17 const uint linkUtilityIncrement = 128; 18 const uint maxLinkUtility = 4096; 19 int outTotal; 20 uint inTotal; 21 uint inUseful; 22 IPeerNeighbor owner; 23 object thisLock = new object(); 24 object throttleLock = new object(); 25 public event EventHandler UtilityInfoReceived; 26 public event EventHandler UtilityInfoSent; 27 TypedMessageConverter messageConverter; 28 public const int AcceptableMissDistance = 2; 29 int pendingSends = 0; 30 int checkPointPendingSends = 0; 31 bool isMonitoring = false; 32 int expectedClearance; 33 IOThreadTimer pruneTimer; 34 const int PruneIntervalMilliseconds = 10000; 35 TimeSpan pruneInterval; 36 const int MinimumPendingMessages = 8; PruneNeighborCallback(IPeerNeighbor peer)37 public delegate void PruneNeighborCallback(IPeerNeighbor peer); 38 PruneNeighborCallback pruneNeighbor; 39 UtilityExtension()40 UtilityExtension() 41 { 42 ackTimer = new IOThreadTimer(new Action<object>(AcknowledgeLoop), null, false); 43 pendingSends = 0; 44 pruneTimer = new IOThreadTimer(new Action<object>(VerifyCheckPoint), null, false); 45 pruneInterval = TimeSpan.FromMilliseconds(PruneIntervalMilliseconds + new Random(Process.GetCurrentProcess().Id).Next(PruneIntervalMilliseconds)); 46 } 47 48 public bool IsAccurate 49 { 50 get { return updateCount >= 32; } 51 } 52 53 public uint LinkUtility 54 { 55 get 56 { 57 return linkUtility; 58 } 59 } 60 61 internal TypedMessageConverter MessageConverter 62 { 63 get 64 { 65 if (messageConverter == null) 66 { 67 messageConverter = TypedMessageConverter.Create(typeof(UtilityInfo), PeerStrings.LinkUtilityAction); 68 } 69 return messageConverter; 70 } 71 } 72 Attach(IPeerNeighbor host)73 public void Attach(IPeerNeighbor host) 74 { 75 this.owner = host; 76 ackTimer.Set(PeerTransportConstants.AckTimeout); 77 } 78 OnNeighborConnected(IPeerNeighbor neighbor)79 static public void OnNeighborConnected(IPeerNeighbor neighbor) 80 { 81 Fx.Assert(neighbor != null, "Neighbor must have a value"); 82 neighbor.Extensions.Add(new UtilityExtension()); 83 } 84 OnNeighborClosed(IPeerNeighbor neighbor)85 static public void OnNeighborClosed(IPeerNeighbor neighbor) 86 { 87 Fx.Assert(neighbor != null, "Neighbor must have a value"); 88 UtilityExtension ext = neighbor.Extensions.Find<UtilityExtension>(); 89 if (ext != null) neighbor.Extensions.Remove(ext); 90 } 91 Detach(IPeerNeighbor host)92 public void Detach(IPeerNeighbor host) 93 { 94 ackTimer.Cancel(); 95 owner = null; 96 97 lock (throttleLock) 98 { 99 pruneTimer.Cancel(); 100 } 101 } 102 103 public object ThisLock 104 { 105 get 106 { 107 return thisLock; 108 } 109 } 110 OnMessageSent(IPeerNeighbor neighbor)111 public static void OnMessageSent(IPeerNeighbor neighbor) 112 { 113 UtilityExtension ext = neighbor.Extensions.Find<UtilityExtension>(); 114 if (ext != null) ext.OnMessageSent(); 115 } 116 OnMessageSent()117 void OnMessageSent() 118 { 119 lock (ThisLock) 120 { 121 outTotal++; 122 } 123 Interlocked.Increment(ref pendingSends); 124 } 125 OnEndSend(IPeerNeighbor neighbor, FloodAsyncResult fresult)126 public static void OnEndSend(IPeerNeighbor neighbor, FloodAsyncResult fresult) 127 { 128 if (neighbor.State >= PeerNeighborState.Disconnecting) 129 return; 130 UtilityExtension instance = neighbor.Utility; 131 if (instance == null) 132 return; 133 instance.OnEndSend(fresult); 134 } 135 OnEndSend(FloodAsyncResult fresult)136 public void OnEndSend(FloodAsyncResult fresult) 137 { 138 Interlocked.Decrement(ref pendingSends); 139 } 140 AcknowledgeLoop(object state)141 void AcknowledgeLoop(object state) 142 { 143 IPeerNeighbor peer = owner; 144 if (peer == null || !peer.IsConnected) 145 return; 146 FlushAcknowledge(); 147 if (owner != null) 148 ackTimer.Set(PeerTransportConstants.AckTimeout); 149 } 150 ProcessLinkUtility(IPeerNeighbor neighbor, UtilityInfo umessage)151 static public void ProcessLinkUtility(IPeerNeighbor neighbor, UtilityInfo umessage) 152 { 153 Fx.Assert(neighbor != null, "Neighbor must have a value"); 154 UtilityExtension ext = neighbor.Extensions.Find<UtilityExtension>(); 155 if (ext != null) 156 { 157 ext.ProcessLinkUtility(umessage.Useful, umessage.Total); 158 } 159 } 160 161 // Update link utility for the neighbor. received from the neighbor ProcessLinkUtility(uint useful, uint total)162 void ProcessLinkUtility(uint useful, uint total) 163 { 164 uint i = 0; 165 lock (ThisLock) 166 { 167 if (total > PeerTransportConstants.AckWindow 168 || useful > total 169 || (uint)outTotal < total 170 ) 171 { 172 throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new InvalidOperationException(SR.GetString(SR.PeerLinkUtilityInvalidValues, useful, total))); 173 } 174 175 //VERIFY with in this range, we are hoping that the order of useful/useless messages doesnt matter much. 176 for (i = 0; i < useful; i++) 177 { 178 this.linkUtility = Calculate(this.linkUtility, true); 179 } 180 for (; i < total; i++) 181 { 182 this.linkUtility = Calculate(this.linkUtility, false); 183 } 184 outTotal -= (int)total; 185 } 186 if (UtilityInfoReceived != null) 187 { 188 UtilityInfoReceived(this, EventArgs.Empty); 189 } 190 191 } 192 Calculate(uint current, bool increase)193 uint Calculate(uint current, bool increase) 194 { 195 uint utility = 0; 196 // Refer to graph maintenance white paper for explanation of the formula 197 // used to compute utility index. 198 utility = (uint)current * 31 / 32; 199 if (increase) 200 utility += linkUtilityIncrement; 201 if (!(utility <= maxLinkUtility)) 202 { 203 throw Fx.AssertAndThrow("Link utility should not exceed " + maxLinkUtility); 204 } 205 if (!IsAccurate) 206 ++updateCount; 207 return utility; 208 } 209 UpdateLinkUtility(IPeerNeighbor neighbor, bool useful)210 public static uint UpdateLinkUtility(IPeerNeighbor neighbor, bool useful) 211 { 212 Fx.Assert(neighbor != null, "Neighbor must have a value"); 213 uint linkUtility = 0; 214 UtilityExtension ext = neighbor.Extensions.Find<UtilityExtension>(); 215 if (ext != null) 216 { 217 // Can happen if the neighbor has been closed for instance 218 linkUtility = ext.UpdateLinkUtility(useful); 219 } 220 return linkUtility; 221 } 222 UpdateLinkUtility(bool useful)223 public uint UpdateLinkUtility(bool useful) 224 { 225 lock (ThisLock) 226 { 227 inTotal++; 228 if (useful) 229 inUseful++; 230 linkUtility = Calculate(linkUtility, useful); 231 if (inTotal == PeerTransportConstants.AckWindow) 232 { 233 FlushAcknowledge(); 234 } 235 } 236 return linkUtility; 237 } 238 FlushAcknowledge()239 public void FlushAcknowledge() 240 { 241 if (inTotal == 0) 242 return; 243 uint tempUseful = 0, tempTotal = 0; 244 lock (ThisLock) 245 { 246 tempUseful = inUseful; 247 tempTotal = inTotal; 248 inUseful = 0; 249 inTotal = 0; 250 } 251 SendUtilityMessage(tempUseful, tempTotal); 252 } 253 254 class AsyncUtilityState 255 { 256 public Message message; 257 public UtilityInfo info; AsyncUtilityState(Message message, UtilityInfo info)258 public AsyncUtilityState(Message message, UtilityInfo info) 259 { 260 this.message = message; 261 this.info = info; 262 } 263 } 264 SendUtilityMessage(uint useful, uint total)265 void SendUtilityMessage(uint useful, uint total) 266 { 267 IPeerNeighbor host = owner; 268 if (host == null || !PeerNeighborStateHelper.IsConnected(host.State) || total == 0) 269 return; 270 UtilityInfo umessage = new UtilityInfo(useful, total); 271 IAsyncResult result = null; 272 Message message = MessageConverter.ToMessage(umessage, MessageVersion.Soap12WSAddressing10); 273 bool fatal = false; 274 try 275 { 276 result = host.BeginSend(message, Fx.ThunkCallback(new AsyncCallback(UtilityMessageSent)), new AsyncUtilityState(message, umessage)); 277 if (result.CompletedSynchronously) 278 { 279 host.EndSend(result); 280 EventHandler handler = UtilityInfoSent; 281 if (handler != null) 282 handler(this, EventArgs.Empty); 283 } 284 } 285 catch (Exception e) 286 { 287 if (Fx.IsFatal(e)) 288 { 289 fatal = true; 290 throw; 291 } 292 if (null != HandleSendException(host, e, umessage)) 293 throw; 294 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information); 295 } 296 finally 297 { 298 if (!fatal && (result == null || result.CompletedSynchronously)) 299 message.Close(); 300 } 301 } 302 UtilityMessageSent(IAsyncResult result)303 void UtilityMessageSent(IAsyncResult result) 304 { 305 if (result == null || result.AsyncState == null) 306 return; 307 IPeerNeighbor host = this.owner; 308 if (host == null || !PeerNeighborStateHelper.IsConnected(host.State)) 309 return; 310 if (result.CompletedSynchronously) 311 return; 312 313 AsyncUtilityState state = (AsyncUtilityState)result.AsyncState; 314 Fx.Assert(state != null, "IAsyncResult.AsyncState does not contain AsyncUtilityState"); 315 Message message = state.message; 316 UtilityInfo umessage = state.info; 317 bool fatal = false; 318 if (!(umessage != null)) 319 { 320 throw Fx.AssertAndThrow("expecting a UtilityInfo message in the AsyncState!"); 321 } 322 323 try 324 { 325 host.EndSend(result); 326 } 327 catch (Exception e) 328 { 329 if (Fx.IsFatal(e)) 330 { 331 fatal = true; 332 throw; 333 } 334 if (null != HandleSendException(host, e, umessage)) 335 throw; 336 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information); 337 } 338 finally 339 { 340 if (!fatal) 341 { 342 Fx.Assert(!result.CompletedSynchronously, "result.CompletedSynchronously"); 343 message.Close(); 344 } 345 } 346 EventHandler handler = UtilityInfoSent; 347 if (handler != null) 348 handler(this, EventArgs.Empty); 349 } 350 HandleSendException(IPeerNeighbor host, Exception e, UtilityInfo umessage)351 Exception HandleSendException(IPeerNeighbor host, Exception e, UtilityInfo umessage) 352 { 353 if ((e is ObjectDisposedException) || 354 (e is TimeoutException) || 355 (e is CommunicationException)) 356 { 357 if (!(!(e.InnerException is QuotaExceededException))) 358 { 359 throw Fx.AssertAndThrow("insufficient quota for sending messages!"); 360 } 361 lock (ThisLock) 362 { 363 this.inTotal += umessage.Total; 364 this.inUseful += umessage.Useful; 365 } 366 return null; 367 } 368 else 369 { 370 return e; 371 } 372 373 } 374 ReportCacheMiss(IPeerNeighbor neighbor, int missedBy)375 static internal void ReportCacheMiss(IPeerNeighbor neighbor, int missedBy) 376 { 377 Fx.Assert(missedBy > AcceptableMissDistance, "Call this method for cache misses ONLY!"); 378 Fx.Assert(neighbor != null, "Neighbor must have a value"); 379 380 if (!neighbor.IsConnected) 381 return; 382 UtilityExtension ext = neighbor.Extensions.Find<UtilityExtension>(); 383 if (ext != null) 384 { 385 ext.ReportCacheMiss(missedBy); 386 } 387 } 388 ReportCacheMiss(int missedBy)389 void ReportCacheMiss(int missedBy) 390 { 391 lock (ThisLock) 392 { 393 for (int i = 0; i < missedBy; i++) 394 { 395 this.linkUtility = Calculate(this.linkUtility, false); 396 } 397 } 398 } 399 400 public int PendingMessages 401 { 402 get 403 { 404 return this.pendingSends; 405 } 406 } 407 BeginCheckPoint(PruneNeighborCallback pruneCallback)408 public void BeginCheckPoint(PruneNeighborCallback pruneCallback) 409 { 410 if (this.isMonitoring) 411 return; 412 413 lock (throttleLock) 414 { 415 if (this.isMonitoring) 416 return; 417 this.checkPointPendingSends = this.pendingSends; 418 this.pruneNeighbor = pruneCallback; 419 this.expectedClearance = this.pendingSends / 2; 420 this.isMonitoring = true; 421 if (owner == null) 422 return; 423 pruneTimer.Set(pruneInterval); 424 } 425 426 } 427 VerifyCheckPoint(object state)428 void VerifyCheckPoint(object state) 429 { 430 int lclPendingSends; 431 int lclCheckPointPendingSends; 432 IPeerNeighbor peer = (IPeerNeighbor)owner; 433 434 if (peer == null || !peer.IsConnected) 435 return; 436 437 lock (throttleLock) 438 { 439 lclPendingSends = this.pendingSends; 440 lclCheckPointPendingSends = this.checkPointPendingSends; 441 } 442 if (lclPendingSends <= MinimumPendingMessages) 443 { 444 lock (throttleLock) 445 { 446 isMonitoring = false; 447 } 448 } 449 else if (lclPendingSends + this.expectedClearance >= lclCheckPointPendingSends) 450 { 451 pruneNeighbor(peer); 452 } 453 else 454 { 455 lock (throttleLock) 456 { 457 if (owner == null) 458 return; 459 this.checkPointPendingSends = this.pendingSends; 460 this.expectedClearance = this.expectedClearance / 2; 461 pruneTimer.Set(pruneInterval); 462 } 463 464 } 465 } 466 } 467 } 468